sui_indexer_alt_framework/ingestion/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4// Allow use of `unbounded_channel` in `ingestion` -- it is used by the regulator task to receive
5// feedback. Traffic through this task should be minimal, but if a bound is applied to it and that
6// bound is hit, the indexer could deadlock.
7#![allow(clippy::disallowed_methods)]
8
9use std::{sync::Arc, time::Duration};
10
11use prometheus::Registry;
12use serde::{Deserialize, Serialize};
13use sui_futures::service::Service;
14use tokio::sync::mpsc;
15
16use crate::ingestion::broadcaster::broadcaster;
17use crate::ingestion::error::{Error, Result};
18use crate::ingestion::ingestion_client::{IngestionClient, IngestionClientArgs};
19use crate::ingestion::streaming_client::{GrpcStreamingClient, StreamingClientArgs};
20use crate::metrics::IngestionMetrics;
21use crate::types::full_checkpoint_content::Checkpoint;
22
23mod broadcaster;
24pub mod error;
25pub mod ingestion_client;
26mod local_client;
27pub mod remote_client;
28mod rpc_client;
29pub mod streaming_client;
30#[cfg(test)]
31mod test_utils;
32
33pub(crate) const MAX_GRPC_MESSAGE_SIZE_BYTES: usize = 128 * 1024 * 1024;
34
35/// Combined arguments for both ingestion and streaming clients.
36/// This is a convenience wrapper that flattens both argument types.
37#[derive(clap::Args, Clone, Debug, Default)]
38pub struct ClientArgs {
39    #[clap(flatten)]
40    pub ingestion: IngestionClientArgs,
41
42    #[clap(flatten)]
43    pub streaming: StreamingClientArgs,
44}
45
46#[derive(Serialize, Deserialize, Debug, Clone)]
47pub struct IngestionConfig {
48    /// Maximum size of checkpoint backlog across all workers downstream of the ingestion service.
49    pub checkpoint_buffer_size: usize,
50
51    /// Maximum number of checkpoints to attempt to fetch concurrently.
52    pub ingest_concurrency: usize,
53
54    /// Polling interval to retry fetching checkpoints that do not exist, in milliseconds.
55    pub retry_interval_ms: u64,
56
57    /// Initial number of checkpoints to process using ingestion after a streaming connection failure.
58    pub streaming_backoff_initial_batch_size: usize,
59
60    /// Maximum number of checkpoints to process using ingestion after repeated streaming connection failures.
61    pub streaming_backoff_max_batch_size: usize,
62
63    /// Timeout for streaming connection in milliseconds.
64    pub streaming_connection_timeout_ms: u64,
65
66    /// Timeout for streaming statement (peek/next) operations in milliseconds.
67    pub streaming_statement_timeout_ms: u64,
68}
69
70pub struct IngestionService {
71    config: IngestionConfig,
72    ingestion_client: IngestionClient,
73    streaming_client: Option<GrpcStreamingClient>,
74    commit_hi_tx: mpsc::UnboundedSender<(&'static str, u64)>,
75    commit_hi_rx: mpsc::UnboundedReceiver<(&'static str, u64)>,
76    subscribers: Vec<mpsc::Sender<Arc<Checkpoint>>>,
77    metrics: Arc<IngestionMetrics>,
78}
79
80impl IngestionConfig {
81    pub fn retry_interval(&self) -> Duration {
82        Duration::from_millis(self.retry_interval_ms)
83    }
84
85    pub fn streaming_connection_timeout(&self) -> Duration {
86        Duration::from_millis(self.streaming_connection_timeout_ms)
87    }
88
89    pub fn streaming_statement_timeout(&self) -> Duration {
90        Duration::from_millis(self.streaming_statement_timeout_ms)
91    }
92}
93
94impl IngestionService {
95    /// Create a new instance of the ingestion service, responsible for fetching checkpoints and
96    /// disseminating them to subscribers.
97    ///
98    /// - `args` specifies where to fetch checkpoints from.
99    /// - `config` specifies the various sizes and time limits for ingestion.
100    /// - `metrics_prefix` and `registry` are used to set up metrics for the service.
101    ///
102    /// After initialization, subscribers can be added using [Self::subscribe], and the service is
103    /// started with [Self::run], given a range of checkpoints to fetch (potentially unbounded).
104    pub fn new(
105        args: ClientArgs,
106        config: IngestionConfig,
107        metrics_prefix: Option<&str>,
108        registry: &Registry,
109    ) -> Result<Self> {
110        let metrics = IngestionMetrics::new(metrics_prefix, registry);
111        let ingestion_client = IngestionClient::new(args.ingestion, metrics.clone())?;
112        let streaming_client = args
113            .streaming
114            .streaming_url
115            .map(|uri| GrpcStreamingClient::new(uri, config.streaming_connection_timeout()));
116
117        let subscribers = Vec::new();
118        let (commit_hi_tx, commit_hi_rx) = mpsc::unbounded_channel();
119        Ok(Self {
120            config,
121            ingestion_client,
122            streaming_client,
123            commit_hi_tx,
124            commit_hi_rx,
125            subscribers,
126            metrics,
127        })
128    }
129
130    /// The ingestion client this service uses to fetch checkpoints.
131    pub(crate) fn ingestion_client(&self) -> &IngestionClient {
132        &self.ingestion_client
133    }
134
135    /// Access to the ingestion metrics.
136    pub(crate) fn metrics(&self) -> &Arc<IngestionMetrics> {
137        &self.metrics
138    }
139
140    /// Add a new subscription to the ingestion service. Note that the service is susceptible to
141    /// the "slow receiver" problem: If one receiver is slower to process checkpoints than the
142    /// checkpoint ingestion rate, it will eventually hold up all receivers.
143    ///
144    /// The ingestion service can optionally receive checkpoint high values from its
145    /// subscribers. If a subscriber provides a commit_hi, the ingestion service will commit to not
146    /// run ahead of the commit_hi by more than the config's buffer_size.
147    ///
148    /// Returns the channel to receive checkpoints from and the channel to send commit_hi values to.
149    pub fn subscribe(
150        &mut self,
151    ) -> (
152        mpsc::Receiver<Arc<Checkpoint>>,
153        mpsc::UnboundedSender<(&'static str, u64)>,
154    ) {
155        let (sender, receiver) = mpsc::channel(self.config.checkpoint_buffer_size);
156        self.subscribers.push(sender);
157        (receiver, self.commit_hi_tx.clone())
158    }
159
160    /// Start the ingestion service as a background task, consuming it in the process.
161    ///
162    /// Checkpoints are fetched concurrently from the `checkpoints` iterator, and pushed to
163    /// subscribers' channels (potentially out-of-order). Subscribers can communicate with the
164    /// ingestion service via their channels in the following ways:
165    ///
166    /// - If a subscriber is lagging (not receiving checkpoints fast enough), it will eventually
167    ///   provide back-pressure to the ingestion service, which will stop fetching new checkpoints.
168    /// - If a subscriber closes its channel, the ingestion service will interpret that as a signal
169    ///   to shutdown as well.
170    ///
171    /// If ingestion reaches the leading edge of the network, it will encounter checkpoints that do
172    /// not exist yet. These will be retried repeatedly on a fixed `retry_interval` until they
173    /// become available.
174    pub async fn run<R>(self, checkpoints: R, initial_commit_hi: Option<u64>) -> Result<Service>
175    where
176        R: std::ops::RangeBounds<u64> + Send + 'static,
177    {
178        let IngestionService {
179            config,
180            ingestion_client,
181            streaming_client,
182            commit_hi_tx: _,
183            commit_hi_rx,
184            subscribers,
185            metrics,
186        } = self;
187
188        if subscribers.is_empty() {
189            return Err(Error::NoSubscribers);
190        }
191
192        Ok(broadcaster(
193            checkpoints,
194            initial_commit_hi,
195            streaming_client,
196            config,
197            ingestion_client,
198            commit_hi_rx,
199            subscribers,
200            metrics,
201        ))
202    }
203}
204
205impl Default for IngestionConfig {
206    fn default() -> Self {
207        Self {
208            checkpoint_buffer_size: 5000,
209            ingest_concurrency: 200,
210            retry_interval_ms: 200,
211            streaming_backoff_initial_batch_size: 10, // 10 checkpoints, ~ 2 seconds
212            streaming_backoff_max_batch_size: 10000,  // 10000 checkpoints, ~ 40 minutes
213            streaming_connection_timeout_ms: 5000,    // 5 seconds
214            streaming_statement_timeout_ms: 5000,     // 5 seconds
215        }
216    }
217}
218
219#[cfg(test)]
220mod tests {
221    use std::sync::Mutex;
222
223    use reqwest::StatusCode;
224    use sui_futures::task::TaskGuard;
225    use url::Url;
226    use wiremock::{MockServer, Request};
227
228    use crate::ingestion::remote_client::tests::{respond_with, status};
229    use crate::ingestion::test_utils::test_checkpoint_data;
230
231    use super::*;
232
233    async fn test_ingestion(
234        uri: String,
235        checkpoint_buffer_size: usize,
236        ingest_concurrency: usize,
237    ) -> IngestionService {
238        let registry = Registry::new();
239        IngestionService::new(
240            ClientArgs {
241                ingestion: IngestionClientArgs {
242                    remote_store_url: Some(Url::parse(&uri).unwrap()),
243                    ..Default::default()
244                },
245                ..Default::default()
246            },
247            IngestionConfig {
248                checkpoint_buffer_size,
249                ingest_concurrency,
250                ..Default::default()
251            },
252            None,
253            &registry,
254        )
255        .unwrap()
256    }
257
258    async fn test_subscriber(
259        stop_after: usize,
260        mut rx: mpsc::Receiver<Arc<Checkpoint>>,
261    ) -> TaskGuard<Vec<u64>> {
262        TaskGuard::new(tokio::spawn(async move {
263            let mut seqs = vec![];
264            for _ in 0..stop_after {
265                let Some(checkpoint) = rx.recv().await else {
266                    break;
267                };
268
269                seqs.push(checkpoint.summary.sequence_number);
270            }
271
272            seqs
273        }))
274    }
275
276    /// If the ingestion service has no subscribers, it will fail fast (before fetching any
277    /// checkpoints).
278    #[tokio::test]
279    async fn fail_on_no_subscribers() {
280        // The mock server will repeatedly return 404, so if the service does try to fetch a
281        // checkpoint, it will be stuck repeatedly retrying.
282        let server = MockServer::start().await;
283        respond_with(&server, status(StatusCode::NOT_FOUND)).await;
284
285        let ingestion_service = test_ingestion(server.uri(), 1, 1).await;
286
287        let res = ingestion_service.run(0.., None).await;
288        assert!(matches!(res, Err(Error::NoSubscribers)));
289    }
290
291    /// The subscriber has no effective limit, and the mock server will always return checkpoint
292    /// information, but the ingestion service can still be stopped by shutting it down.
293    #[tokio::test]
294    async fn shutdown() {
295        let server = MockServer::start().await;
296        respond_with(
297            &server,
298            status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
299        )
300        .await;
301
302        let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
303
304        let (rx, _) = ingestion_service.subscribe();
305        let subscriber = test_subscriber(usize::MAX, rx).await;
306        let svc = ingestion_service.run(0.., None).await.unwrap();
307
308        svc.shutdown().await.unwrap();
309        subscriber.await.unwrap();
310    }
311
312    /// The subscriber will stop after receiving a single checkpoint, and this will trigger the
313    /// ingestion service to stop as well, even if there are more checkpoints to fetch.
314    #[tokio::test]
315    async fn shutdown_on_subscriber_drop() {
316        let server = MockServer::start().await;
317        respond_with(
318            &server,
319            status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
320        )
321        .await;
322
323        let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
324
325        let (rx, _) = ingestion_service.subscribe();
326        let subscriber = test_subscriber(1, rx).await;
327        let mut svc = ingestion_service.run(0.., None).await.unwrap();
328
329        drop(subscriber);
330        svc.join().await.unwrap();
331    }
332
333    /// The service will retry fetching a checkpoint that does not exist, in this test, the 4th
334    /// checkpoint will return 404 a couple of times, before eventually succeeding.
335    #[tokio::test]
336    async fn retry_on_not_found() {
337        let server = MockServer::start().await;
338        let times: Mutex<u64> = Mutex::new(0);
339        respond_with(&server, move |_: &Request| {
340            let mut times = times.lock().unwrap();
341            *times += 1;
342            match *times {
343                1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
344                4..6 => status(StatusCode::NOT_FOUND),
345                _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
346            }
347        })
348        .await;
349
350        let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
351
352        let (rx, _) = ingestion_service.subscribe();
353        let subscriber = test_subscriber(5, rx).await;
354        let _svc = ingestion_service.run(0.., None).await.unwrap();
355
356        let seqs = subscriber.await.unwrap();
357        assert_eq!(seqs, vec![1, 2, 3, 6, 7]);
358    }
359
360    /// Similar to the previous test, but now it's a transient error that causes the retry.
361    #[tokio::test]
362    async fn retry_on_transient_error() {
363        let server = MockServer::start().await;
364        let times: Mutex<u64> = Mutex::new(0);
365        respond_with(&server, move |_: &Request| {
366            let mut times = times.lock().unwrap();
367            *times += 1;
368            match *times {
369                1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
370                4..6 => status(StatusCode::REQUEST_TIMEOUT),
371                _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
372            }
373        })
374        .await;
375
376        let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
377
378        let (rx, _) = ingestion_service.subscribe();
379        let subscriber = test_subscriber(5, rx).await;
380        let _svc = ingestion_service.run(0.., None).await.unwrap();
381
382        let seqs = subscriber.await.unwrap();
383        assert_eq!(seqs, vec![1, 2, 3, 6, 7]);
384    }
385
386    /// One subscriber is going to stop processing checkpoints, so even though the service can keep
387    /// fetching checkpoints, it will stop short because of the slow receiver. Other subscribers
388    /// can keep processing checkpoints that were buffered for the slow one.
389    #[tokio::test]
390    async fn back_pressure_and_buffering() {
391        let server = MockServer::start().await;
392        let times: Mutex<u64> = Mutex::new(0);
393        respond_with(&server, move |_: &Request| {
394            let mut times = times.lock().unwrap();
395            *times += 1;
396            status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times))
397        })
398        .await;
399
400        let mut ingestion_service = test_ingestion(server.uri(), 3, 1).await;
401
402        // This subscriber will take its sweet time processing checkpoints.
403        let (mut laggard, _) = ingestion_service.subscribe();
404        async fn unblock(laggard: &mut mpsc::Receiver<Arc<Checkpoint>>) -> u64 {
405            let checkpoint = laggard.recv().await.unwrap();
406            checkpoint.summary.sequence_number
407        }
408
409        let (rx, _) = ingestion_service.subscribe();
410        let subscriber = test_subscriber(5, rx).await;
411        let _svc = ingestion_service.run(0.., None).await.unwrap();
412
413        // At this point, the service will have been able to pass 3 checkpoints to the non-lagging
414        // subscriber, while the laggard's buffer fills up. Now the laggard will pull two
415        // checkpoints, which will allow the rest of the pipeline to progress enough for the live
416        // subscriber to receive its quota.
417        assert_eq!(unblock(&mut laggard).await, 1);
418        assert_eq!(unblock(&mut laggard).await, 2);
419
420        let seqs = subscriber.await.unwrap();
421        assert_eq!(seqs, vec![1, 2, 3, 4, 5]);
422    }
423}