sui_indexer_alt_framework/ingestion/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4// Allow use of `unbounded_channel` in `ingestion` -- it is used by the regulator task to receive
5// feedback. Traffic through this task should be minimal, but if a bound is applied to it and that
6// bound is hit, the indexer could deadlock.
7#![allow(clippy::disallowed_methods)]
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use prometheus::Registry;
13use serde::Deserialize;
14use serde::Serialize;
15use sui_futures::service::Service;
16use tokio::sync::mpsc;
17
18use crate::ingestion::broadcaster::broadcaster;
19use crate::ingestion::error::Error;
20use crate::ingestion::error::Result;
21use crate::ingestion::ingestion_client::IngestionClient;
22use crate::ingestion::ingestion_client::IngestionClientArgs;
23use crate::ingestion::streaming_client::GrpcStreamingClient;
24use crate::ingestion::streaming_client::StreamingClientArgs;
25use crate::metrics::IngestionMetrics;
26use crate::types::full_checkpoint_content::Checkpoint;
27
28mod broadcaster;
29pub mod error;
30pub mod ingestion_client;
31mod local_client;
32pub mod remote_client;
33mod rpc_client;
34pub mod streaming_client;
35#[cfg(test)]
36mod test_utils;
37
38pub(crate) const MAX_GRPC_MESSAGE_SIZE_BYTES: usize = 128 * 1024 * 1024;
39
40/// Combined arguments for both ingestion and streaming clients.
41/// This is a convenience wrapper that flattens both argument types.
42#[derive(clap::Args, Clone, Debug, Default)]
43pub struct ClientArgs {
44    #[clap(flatten)]
45    pub ingestion: IngestionClientArgs,
46
47    #[clap(flatten)]
48    pub streaming: StreamingClientArgs,
49}
50
51#[derive(Serialize, Deserialize, Debug, Clone)]
52pub struct IngestionConfig {
53    /// Maximum size of checkpoint backlog across all workers downstream of the ingestion service.
54    pub checkpoint_buffer_size: usize,
55
56    /// Maximum number of checkpoints to attempt to fetch concurrently.
57    pub ingest_concurrency: usize,
58
59    /// Polling interval to retry fetching checkpoints that do not exist, in milliseconds.
60    pub retry_interval_ms: u64,
61
62    /// Initial number of checkpoints to process using ingestion after a streaming connection failure.
63    pub streaming_backoff_initial_batch_size: usize,
64
65    /// Maximum number of checkpoints to process using ingestion after repeated streaming connection failures.
66    pub streaming_backoff_max_batch_size: usize,
67
68    /// Timeout for streaming connection in milliseconds.
69    pub streaming_connection_timeout_ms: u64,
70
71    /// Timeout for streaming statement (peek/next) operations in milliseconds.
72    pub streaming_statement_timeout_ms: u64,
73}
74
75pub struct IngestionService {
76    config: IngestionConfig,
77    ingestion_client: IngestionClient,
78    streaming_client: Option<GrpcStreamingClient>,
79    commit_hi_tx: mpsc::UnboundedSender<(&'static str, u64)>,
80    commit_hi_rx: mpsc::UnboundedReceiver<(&'static str, u64)>,
81    subscribers: Vec<mpsc::Sender<Arc<Checkpoint>>>,
82    metrics: Arc<IngestionMetrics>,
83}
84
85impl IngestionConfig {
86    pub fn retry_interval(&self) -> Duration {
87        Duration::from_millis(self.retry_interval_ms)
88    }
89
90    pub fn streaming_connection_timeout(&self) -> Duration {
91        Duration::from_millis(self.streaming_connection_timeout_ms)
92    }
93
94    pub fn streaming_statement_timeout(&self) -> Duration {
95        Duration::from_millis(self.streaming_statement_timeout_ms)
96    }
97}
98
99impl IngestionService {
100    /// Create a new instance of the ingestion service, responsible for fetching checkpoints and
101    /// disseminating them to subscribers.
102    ///
103    /// - `args` specifies where to fetch checkpoints from.
104    /// - `config` specifies the various sizes and time limits for ingestion.
105    /// - `metrics_prefix` and `registry` are used to set up metrics for the service.
106    ///
107    /// After initialization, subscribers can be added using [Self::subscribe], and the service is
108    /// started with [Self::run], given a range of checkpoints to fetch (potentially unbounded).
109    pub fn new(
110        args: ClientArgs,
111        config: IngestionConfig,
112        metrics_prefix: Option<&str>,
113        registry: &Registry,
114    ) -> Result<Self> {
115        let metrics = IngestionMetrics::new(metrics_prefix, registry);
116        let ingestion_client = IngestionClient::new(args.ingestion, metrics.clone())?;
117        let streaming_client = args
118            .streaming
119            .streaming_url
120            .map(|uri| GrpcStreamingClient::new(uri, config.streaming_connection_timeout()));
121
122        let subscribers = Vec::new();
123        let (commit_hi_tx, commit_hi_rx) = mpsc::unbounded_channel();
124        Ok(Self {
125            config,
126            ingestion_client,
127            streaming_client,
128            commit_hi_tx,
129            commit_hi_rx,
130            subscribers,
131            metrics,
132        })
133    }
134
135    /// The ingestion client this service uses to fetch checkpoints.
136    pub(crate) fn ingestion_client(&self) -> &IngestionClient {
137        &self.ingestion_client
138    }
139
140    /// Access to the ingestion metrics.
141    pub(crate) fn metrics(&self) -> &Arc<IngestionMetrics> {
142        &self.metrics
143    }
144
145    /// Add a new subscription to the ingestion service. Note that the service is susceptible to
146    /// the "slow receiver" problem: If one receiver is slower to process checkpoints than the
147    /// checkpoint ingestion rate, it will eventually hold up all receivers.
148    ///
149    /// The ingestion service can optionally receive checkpoint high values from its
150    /// subscribers. If a subscriber provides a commit_hi, the ingestion service will commit to not
151    /// run ahead of the commit_hi by more than the config's buffer_size.
152    ///
153    /// Returns the channel to receive checkpoints from and the channel to send commit_hi values to.
154    pub fn subscribe(
155        &mut self,
156    ) -> (
157        mpsc::Receiver<Arc<Checkpoint>>,
158        mpsc::UnboundedSender<(&'static str, u64)>,
159    ) {
160        let (sender, receiver) = mpsc::channel(self.config.checkpoint_buffer_size);
161        self.subscribers.push(sender);
162        (receiver, self.commit_hi_tx.clone())
163    }
164
165    /// Start the ingestion service as a background task, consuming it in the process.
166    ///
167    /// Checkpoints are fetched concurrently from the `checkpoints` iterator, and pushed to
168    /// subscribers' channels (potentially out-of-order). Subscribers can communicate with the
169    /// ingestion service via their channels in the following ways:
170    ///
171    /// - If a subscriber is lagging (not receiving checkpoints fast enough), it will eventually
172    ///   provide back-pressure to the ingestion service, which will stop fetching new checkpoints.
173    /// - If a subscriber closes its channel, the ingestion service will interpret that as a signal
174    ///   to shutdown as well.
175    ///
176    /// If ingestion reaches the leading edge of the network, it will encounter checkpoints that do
177    /// not exist yet. These will be retried repeatedly on a fixed `retry_interval` until they
178    /// become available.
179    pub async fn run<R>(self, checkpoints: R, initial_commit_hi: Option<u64>) -> Result<Service>
180    where
181        R: std::ops::RangeBounds<u64> + Send + 'static,
182    {
183        let IngestionService {
184            config,
185            ingestion_client,
186            streaming_client,
187            commit_hi_tx: _,
188            commit_hi_rx,
189            subscribers,
190            metrics,
191        } = self;
192
193        if subscribers.is_empty() {
194            return Err(Error::NoSubscribers);
195        }
196
197        Ok(broadcaster(
198            checkpoints,
199            initial_commit_hi,
200            streaming_client,
201            config,
202            ingestion_client,
203            commit_hi_rx,
204            subscribers,
205            metrics,
206        ))
207    }
208}
209
210impl Default for IngestionConfig {
211    fn default() -> Self {
212        Self {
213            checkpoint_buffer_size: 5000,
214            ingest_concurrency: 200,
215            retry_interval_ms: 200,
216            streaming_backoff_initial_batch_size: 10, // 10 checkpoints, ~ 2 seconds
217            streaming_backoff_max_batch_size: 10000,  // 10000 checkpoints, ~ 40 minutes
218            streaming_connection_timeout_ms: 5000,    // 5 seconds
219            streaming_statement_timeout_ms: 5000,     // 5 seconds
220        }
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use std::sync::Mutex;
227
228    use reqwest::StatusCode;
229    use sui_futures::task::TaskGuard;
230    use url::Url;
231    use wiremock::MockServer;
232    use wiremock::Request;
233
234    use crate::ingestion::remote_client::tests::respond_with;
235    use crate::ingestion::remote_client::tests::status;
236    use crate::ingestion::test_utils::test_checkpoint_data;
237
238    use super::*;
239
240    async fn test_ingestion(
241        uri: String,
242        checkpoint_buffer_size: usize,
243        ingest_concurrency: usize,
244    ) -> IngestionService {
245        let registry = Registry::new();
246        IngestionService::new(
247            ClientArgs {
248                ingestion: IngestionClientArgs {
249                    remote_store_url: Some(Url::parse(&uri).unwrap()),
250                    ..Default::default()
251                },
252                ..Default::default()
253            },
254            IngestionConfig {
255                checkpoint_buffer_size,
256                ingest_concurrency,
257                ..Default::default()
258            },
259            None,
260            &registry,
261        )
262        .unwrap()
263    }
264
265    async fn test_subscriber(
266        stop_after: usize,
267        mut rx: mpsc::Receiver<Arc<Checkpoint>>,
268    ) -> TaskGuard<Vec<u64>> {
269        TaskGuard::new(tokio::spawn(async move {
270            let mut seqs = vec![];
271            for _ in 0..stop_after {
272                let Some(checkpoint) = rx.recv().await else {
273                    break;
274                };
275
276                seqs.push(checkpoint.summary.sequence_number);
277            }
278
279            seqs
280        }))
281    }
282
283    /// If the ingestion service has no subscribers, it will fail fast (before fetching any
284    /// checkpoints).
285    #[tokio::test]
286    async fn fail_on_no_subscribers() {
287        // The mock server will repeatedly return 404, so if the service does try to fetch a
288        // checkpoint, it will be stuck repeatedly retrying.
289        let server = MockServer::start().await;
290        respond_with(&server, status(StatusCode::NOT_FOUND)).await;
291
292        let ingestion_service = test_ingestion(server.uri(), 1, 1).await;
293
294        let res = ingestion_service.run(0.., None).await;
295        assert!(matches!(res, Err(Error::NoSubscribers)));
296    }
297
298    /// The subscriber has no effective limit, and the mock server will always return checkpoint
299    /// information, but the ingestion service can still be stopped by shutting it down.
300    #[tokio::test]
301    async fn shutdown() {
302        let server = MockServer::start().await;
303        respond_with(
304            &server,
305            status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
306        )
307        .await;
308
309        let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
310
311        let (rx, _) = ingestion_service.subscribe();
312        let subscriber = test_subscriber(usize::MAX, rx).await;
313        let svc = ingestion_service.run(0.., None).await.unwrap();
314
315        svc.shutdown().await.unwrap();
316        subscriber.await.unwrap();
317    }
318
319    /// The subscriber will stop after receiving a single checkpoint, and this will trigger the
320    /// ingestion service to stop as well, even if there are more checkpoints to fetch.
321    #[tokio::test]
322    async fn shutdown_on_subscriber_drop() {
323        let server = MockServer::start().await;
324        respond_with(
325            &server,
326            status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
327        )
328        .await;
329
330        let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
331
332        let (rx, _) = ingestion_service.subscribe();
333        let subscriber = test_subscriber(1, rx).await;
334        let mut svc = ingestion_service.run(0.., None).await.unwrap();
335
336        drop(subscriber);
337        svc.join().await.unwrap();
338    }
339
340    /// The service will retry fetching a checkpoint that does not exist, in this test, the 4th
341    /// checkpoint will return 404 a couple of times, before eventually succeeding.
342    #[tokio::test]
343    async fn retry_on_not_found() {
344        let server = MockServer::start().await;
345        let times: Mutex<u64> = Mutex::new(0);
346        respond_with(&server, move |_: &Request| {
347            let mut times = times.lock().unwrap();
348            *times += 1;
349            match *times {
350                1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
351                4..6 => status(StatusCode::NOT_FOUND),
352                _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
353            }
354        })
355        .await;
356
357        let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
358
359        let (rx, _) = ingestion_service.subscribe();
360        let subscriber = test_subscriber(5, rx).await;
361        let _svc = ingestion_service.run(0.., None).await.unwrap();
362
363        let seqs = subscriber.await.unwrap();
364        assert_eq!(seqs, vec![1, 2, 3, 6, 7]);
365    }
366
367    /// Similar to the previous test, but now it's a transient error that causes the retry.
368    #[tokio::test]
369    async fn retry_on_transient_error() {
370        let server = MockServer::start().await;
371        let times: Mutex<u64> = Mutex::new(0);
372        respond_with(&server, move |_: &Request| {
373            let mut times = times.lock().unwrap();
374            *times += 1;
375            match *times {
376                1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
377                4..6 => status(StatusCode::REQUEST_TIMEOUT),
378                _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
379            }
380        })
381        .await;
382
383        let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
384
385        let (rx, _) = ingestion_service.subscribe();
386        let subscriber = test_subscriber(5, rx).await;
387        let _svc = ingestion_service.run(0.., None).await.unwrap();
388
389        let seqs = subscriber.await.unwrap();
390        assert_eq!(seqs, vec![1, 2, 3, 6, 7]);
391    }
392
393    /// One subscriber is going to stop processing checkpoints, so even though the service can keep
394    /// fetching checkpoints, it will stop short because of the slow receiver. Other subscribers
395    /// can keep processing checkpoints that were buffered for the slow one.
396    #[tokio::test]
397    async fn back_pressure_and_buffering() {
398        let server = MockServer::start().await;
399        let times: Mutex<u64> = Mutex::new(0);
400        respond_with(&server, move |_: &Request| {
401            let mut times = times.lock().unwrap();
402            *times += 1;
403            status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times))
404        })
405        .await;
406
407        let mut ingestion_service = test_ingestion(server.uri(), 3, 1).await;
408
409        // This subscriber will take its sweet time processing checkpoints.
410        let (mut laggard, _) = ingestion_service.subscribe();
411        async fn unblock(laggard: &mut mpsc::Receiver<Arc<Checkpoint>>) -> u64 {
412            let checkpoint = laggard.recv().await.unwrap();
413            checkpoint.summary.sequence_number
414        }
415
416        let (rx, _) = ingestion_service.subscribe();
417        let subscriber = test_subscriber(5, rx).await;
418        let _svc = ingestion_service.run(0.., None).await.unwrap();
419
420        // At this point, the service will have been able to pass 3 checkpoints to the non-lagging
421        // subscriber, while the laggard's buffer fills up. Now the laggard will pull two
422        // checkpoints, which will allow the rest of the pipeline to progress enough for the live
423        // subscriber to receive its quota.
424        assert_eq!(unblock(&mut laggard).await, 1);
425        assert_eq!(unblock(&mut laggard).await, 2);
426
427        let seqs = subscriber.await.unwrap();
428        assert_eq!(seqs, vec![1, 2, 3, 4, 5]);
429    }
430}