sui_indexer_alt_framework/ingestion/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4// Allow use of `unbounded_channel` in `ingestion` -- it is used by the regulator task to receive
5// feedback. Traffic through this task should be minimal, but if a bound is applied to it and that
6// bound is hit, the indexer could deadlock.
7#![allow(clippy::disallowed_methods)]
8
9use std::{sync::Arc, time::Duration};
10
11use prometheus::Registry;
12use serde::{Deserialize, Serialize};
13use tokio::{sync::mpsc, task::JoinHandle};
14use tokio_util::sync::CancellationToken;
15
16use crate::ingestion::broadcaster::broadcaster;
17use crate::ingestion::error::{Error, Result};
18use crate::ingestion::ingestion_client::{IngestionClient, IngestionClientArgs};
19use crate::ingestion::streaming_client::{GrpcStreamingClient, StreamingClientArgs};
20use crate::metrics::IngestionMetrics;
21use crate::types::full_checkpoint_content::Checkpoint;
22
23mod broadcaster;
24pub mod error;
25pub mod ingestion_client;
26mod local_client;
27pub mod remote_client;
28mod rpc_client;
29pub mod streaming_client;
30#[cfg(test)]
31mod test_utils;
32
33pub(crate) const MAX_GRPC_MESSAGE_SIZE_BYTES: usize = 128 * 1024 * 1024;
34
35/// Combined arguments for both ingestion and streaming clients.
36/// This is a convenience wrapper that flattens both argument types.
37#[derive(clap::Args, Clone, Debug, Default)]
38pub struct ClientArgs {
39    #[clap(flatten)]
40    pub ingestion: IngestionClientArgs,
41
42    #[clap(flatten)]
43    pub streaming: StreamingClientArgs,
44}
45
46#[derive(Serialize, Deserialize, Debug, Clone)]
47pub struct IngestionConfig {
48    /// Maximum size of checkpoint backlog across all workers downstream of the ingestion service.
49    pub checkpoint_buffer_size: usize,
50
51    /// Maximum number of checkpoints to attempt to fetch concurrently.
52    pub ingest_concurrency: usize,
53
54    /// Polling interval to retry fetching checkpoints that do not exist, in milliseconds.
55    pub retry_interval_ms: u64,
56
57    /// Initial number of checkpoints to process using ingestion after a streaming connection failure.
58    pub streaming_backoff_initial_batch_size: usize,
59
60    /// Maximum number of checkpoints to process using ingestion after repeated streaming connection failures.
61    pub streaming_backoff_max_batch_size: usize,
62}
63
64pub struct IngestionService {
65    config: IngestionConfig,
66    ingestion_client: IngestionClient,
67    streaming_client: Option<GrpcStreamingClient>,
68    commit_hi_tx: mpsc::UnboundedSender<(&'static str, u64)>,
69    commit_hi_rx: mpsc::UnboundedReceiver<(&'static str, u64)>,
70    subscribers: Vec<mpsc::Sender<Arc<Checkpoint>>>,
71    metrics: Arc<IngestionMetrics>,
72    cancel: CancellationToken,
73}
74
75impl IngestionConfig {
76    pub fn retry_interval(&self) -> Duration {
77        Duration::from_millis(self.retry_interval_ms)
78    }
79}
80
81impl IngestionService {
82    /// Create a new instance of the ingestion service, responsible for fetching checkpoints and
83    /// disseminating them to subscribers.
84    ///
85    /// - `args` specifies where to fetch checkpoints from.
86    /// - `config` specifies the various sizes and time limits for ingestion.
87    /// - `metrics_prefix` and `registry` are used to set up metrics for the service.
88    ///
89    /// After initialization, subscribers can be added using [Self::subscribe], and the service is
90    /// started with [Self::run], given a range of checkpoints to fetch (potentially unbounded).
91    pub fn new(
92        args: ClientArgs,
93        config: IngestionConfig,
94        metrics_prefix: Option<&str>,
95        registry: &Registry,
96        cancel: CancellationToken,
97    ) -> Result<Self> {
98        let metrics = IngestionMetrics::new(metrics_prefix, registry);
99        let ingestion_client = IngestionClient::new(args.ingestion, metrics.clone())?;
100        let streaming_client = args.streaming.streaming_url.map(GrpcStreamingClient::new);
101
102        let subscribers = Vec::new();
103        let (commit_hi_tx, commit_hi_rx) = mpsc::unbounded_channel();
104        Ok(Self {
105            config,
106            ingestion_client,
107            streaming_client,
108            commit_hi_tx,
109            commit_hi_rx,
110            subscribers,
111            metrics,
112            cancel,
113        })
114    }
115
116    /// The ingestion client this service uses to fetch checkpoints.
117    pub(crate) fn ingestion_client(&self) -> &IngestionClient {
118        &self.ingestion_client
119    }
120
121    /// Access to the ingestion metrics.
122    pub(crate) fn metrics(&self) -> &Arc<IngestionMetrics> {
123        &self.metrics
124    }
125
126    /// Add a new subscription to the ingestion service. Note that the service is susceptible to
127    /// the "slow receiver" problem: If one receiver is slower to process checkpoints than the
128    /// checkpoint ingestion rate, it will eventually hold up all receivers.
129    ///
130    /// The ingestion service can optionally receive checkpoint high values from its
131    /// subscribers. If a subscriber provides a commit_hi, the ingestion service will commit to not
132    /// run ahead of the commit_hi by more than the config's buffer_size.
133    ///
134    /// Returns the channel to receive checkpoints from and the channel to send commit_hi values to.
135    pub fn subscribe(
136        &mut self,
137    ) -> (
138        mpsc::Receiver<Arc<Checkpoint>>,
139        mpsc::UnboundedSender<(&'static str, u64)>,
140    ) {
141        let (sender, receiver) = mpsc::channel(self.config.checkpoint_buffer_size);
142        self.subscribers.push(sender);
143        (receiver, self.commit_hi_tx.clone())
144    }
145
146    /// Start the ingestion service as a background task, consuming it in the process.
147    ///
148    /// Checkpoints are fetched concurrently from the `checkpoints` iterator, and pushed to
149    /// subscribers' channels (potentially out-of-order). Subscribers can communicate with the
150    /// ingestion service via their channels in the following ways:
151    ///
152    /// - If a subscriber is lagging (not receiving checkpoints fast enough), it will eventually
153    ///   provide back-pressure to the ingestion service, which will stop fetching new checkpoints.
154    /// - If a subscriber closes its channel, the ingestion service will interpret that as a signal
155    ///   to shutdown as well.
156    ///
157    /// If ingestion reaches the leading edge of the network, it will encounter checkpoints that do
158    /// not exist yet. These will be retried repeatedly on a fixed `retry_interval` until they
159    /// become available.
160    pub async fn run<R>(
161        self,
162        checkpoints: R,
163        initial_commit_hi: Option<u64>,
164    ) -> Result<JoinHandle<()>>
165    where
166        R: std::ops::RangeBounds<u64> + Send + 'static,
167    {
168        let IngestionService {
169            config,
170            ingestion_client,
171            streaming_client,
172            commit_hi_tx: _,
173            commit_hi_rx,
174            subscribers,
175            metrics,
176            cancel,
177        } = self;
178
179        if subscribers.is_empty() {
180            return Err(Error::NoSubscribers);
181        }
182
183        let broadcaster = broadcaster(
184            checkpoints,
185            initial_commit_hi,
186            streaming_client,
187            config,
188            ingestion_client,
189            commit_hi_rx,
190            subscribers,
191            metrics,
192            cancel.clone(),
193        );
194
195        Ok(broadcaster)
196    }
197}
198
199impl Default for IngestionConfig {
200    fn default() -> Self {
201        Self {
202            checkpoint_buffer_size: 5000,
203            ingest_concurrency: 200,
204            retry_interval_ms: 200,
205            streaming_backoff_initial_batch_size: 10, // 10 checkpoints, ~ 2 seconds
206            streaming_backoff_max_batch_size: 10000,  // 10000 checkpoints, ~ 40 minutes
207        }
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use std::sync::Mutex;
214
215    use reqwest::StatusCode;
216    use url::Url;
217    use wiremock::{MockServer, Request};
218
219    use crate::ingestion::remote_client::tests::{respond_with, status};
220    use crate::ingestion::test_utils::test_checkpoint_data;
221
222    use super::*;
223
224    async fn test_ingestion(
225        uri: String,
226        checkpoint_buffer_size: usize,
227        ingest_concurrency: usize,
228        cancel: CancellationToken,
229    ) -> IngestionService {
230        let registry = Registry::new();
231        IngestionService::new(
232            ClientArgs {
233                ingestion: IngestionClientArgs {
234                    remote_store_url: Some(Url::parse(&uri).unwrap()),
235                    ..Default::default()
236                },
237                ..Default::default()
238            },
239            IngestionConfig {
240                checkpoint_buffer_size,
241                ingest_concurrency,
242                ..Default::default()
243            },
244            None,
245            &registry,
246            cancel,
247        )
248        .unwrap()
249    }
250
251    async fn test_subscriber(
252        stop_after: usize,
253        mut rx: mpsc::Receiver<Arc<Checkpoint>>,
254        cancel: CancellationToken,
255    ) -> JoinHandle<Vec<u64>> {
256        tokio::spawn(async move {
257            let mut seqs = vec![];
258            for _ in 0..stop_after {
259                tokio::select! {
260                    _ = cancel.cancelled() => break,
261                    Some(checkpoint) = rx.recv() => {
262                        seqs.push(checkpoint.summary.sequence_number);
263                    }
264                }
265            }
266
267            rx.close();
268            seqs
269        })
270    }
271
272    /// If the ingestion service has no subscribers, it will fail fast (before fetching any
273    /// checkpoints).
274    #[tokio::test]
275    async fn fail_on_no_subscribers() {
276        telemetry_subscribers::init_for_testing();
277
278        // The mock server will repeatedly return 404, so if the service does try to fetch a
279        // checkpoint, it will be stuck repeatedly retrying.
280        let server = MockServer::start().await;
281        respond_with(&server, status(StatusCode::NOT_FOUND)).await;
282
283        let cancel = CancellationToken::new();
284        let ingestion_service = test_ingestion(server.uri(), 1, 1, cancel.clone()).await;
285
286        let err = ingestion_service.run(0.., None).await.unwrap_err();
287        assert!(matches!(err, Error::NoSubscribers));
288    }
289
290    /// The subscriber has no effective limit, and the mock server will always return checkpoint
291    /// information, but the ingestion service can still be stopped using the cancellation token.
292    #[tokio::test]
293    async fn shutdown_on_cancel() {
294        telemetry_subscribers::init_for_testing();
295
296        let server = MockServer::start().await;
297        respond_with(
298            &server,
299            status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
300        )
301        .await;
302
303        let cancel = CancellationToken::new();
304        let mut ingestion_service = test_ingestion(server.uri(), 1, 1, cancel.clone()).await;
305
306        let (rx, _) = ingestion_service.subscribe();
307        let subscriber = test_subscriber(usize::MAX, rx, cancel.clone()).await;
308        let broadcaster = ingestion_service.run(0.., None).await.unwrap();
309
310        cancel.cancel();
311        subscriber.await.unwrap();
312        broadcaster.await.unwrap();
313    }
314
315    /// The subscriber will stop after receiving a single checkpoint, and this will trigger the
316    /// ingestion service to stop as well, even if there are more checkpoints to fetch.
317    #[tokio::test]
318    async fn shutdown_on_subscriber_drop() {
319        telemetry_subscribers::init_for_testing();
320
321        let server = MockServer::start().await;
322        respond_with(
323            &server,
324            status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
325        )
326        .await;
327
328        let cancel = CancellationToken::new();
329        let mut ingestion_service = test_ingestion(server.uri(), 1, 1, cancel.clone()).await;
330
331        let (rx, _) = ingestion_service.subscribe();
332        let subscriber = test_subscriber(1, rx, cancel.clone()).await;
333        let broadcaster = ingestion_service.run(0.., None).await.unwrap();
334
335        cancel.cancelled().await;
336        subscriber.await.unwrap();
337        broadcaster.await.unwrap();
338    }
339
340    /// The service will retry fetching a checkpoint that does not exist, in this test, the 4th
341    /// checkpoint will return 404 a couple of times, before eventually succeeding.
342    #[tokio::test]
343    async fn retry_on_not_found() {
344        telemetry_subscribers::init_for_testing();
345
346        let server = MockServer::start().await;
347        let times: Mutex<u64> = Mutex::new(0);
348        respond_with(&server, move |_: &Request| {
349            let mut times = times.lock().unwrap();
350            *times += 1;
351            match *times {
352                1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
353                4..6 => status(StatusCode::NOT_FOUND),
354                _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
355            }
356        })
357        .await;
358
359        let cancel = CancellationToken::new();
360        let mut ingestion_service = test_ingestion(server.uri(), 1, 1, cancel.clone()).await;
361
362        let (rx, _) = ingestion_service.subscribe();
363        let subscriber = test_subscriber(5, rx, cancel.clone()).await;
364        let broadcaster = ingestion_service.run(0.., None).await.unwrap();
365
366        cancel.cancelled().await;
367        let seqs = subscriber.await.unwrap();
368        broadcaster.await.unwrap();
369
370        assert_eq!(seqs, vec![1, 2, 3, 6, 7]);
371    }
372
373    /// Similar to the previous test, but now it's a transient error that causes the retry.
374    #[tokio::test]
375    async fn retry_on_transient_error() {
376        telemetry_subscribers::init_for_testing();
377
378        let server = MockServer::start().await;
379        let times: Mutex<u64> = Mutex::new(0);
380        respond_with(&server, move |_: &Request| {
381            let mut times = times.lock().unwrap();
382            *times += 1;
383            match *times {
384                1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
385                4..6 => status(StatusCode::REQUEST_TIMEOUT),
386                _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
387            }
388        })
389        .await;
390
391        let cancel = CancellationToken::new();
392        let mut ingestion_service = test_ingestion(server.uri(), 1, 1, cancel.clone()).await;
393
394        let (rx, _) = ingestion_service.subscribe();
395        let subscriber = test_subscriber(5, rx, cancel.clone()).await;
396        let broadcaster = ingestion_service.run(0.., None).await.unwrap();
397
398        cancel.cancelled().await;
399        let seqs = subscriber.await.unwrap();
400        broadcaster.await.unwrap();
401
402        assert_eq!(seqs, vec![1, 2, 3, 6, 7]);
403    }
404
405    /// One subscriber is going to stop processing checkpoints, so even though the service can keep
406    /// fetching checkpoints, it will stop short because of the slow receiver. Other subscribers
407    /// can keep processing checkpoints that were buffered for the slow one.
408    #[tokio::test]
409    async fn back_pressure_and_buffering() {
410        telemetry_subscribers::init_for_testing();
411
412        let server = MockServer::start().await;
413        let times: Mutex<u64> = Mutex::new(0);
414        respond_with(&server, move |_: &Request| {
415            let mut times = times.lock().unwrap();
416            *times += 1;
417            status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times))
418        })
419        .await;
420
421        let cancel = CancellationToken::new();
422        let mut ingestion_service =
423            test_ingestion(server.uri(), /* buffer */ 3, 1, cancel.clone()).await;
424
425        // This subscriber will take its sweet time processing checkpoints.
426        let (mut laggard, _) = ingestion_service.subscribe();
427        async fn unblock(laggard: &mut mpsc::Receiver<Arc<Checkpoint>>) -> u64 {
428            let checkpoint = laggard.recv().await.unwrap();
429            checkpoint.summary.sequence_number
430        }
431
432        let (rx, _) = ingestion_service.subscribe();
433        let subscriber = test_subscriber(5, rx, cancel.clone()).await;
434        let broadcaster = ingestion_service.run(0.., None).await.unwrap();
435
436        // At this point, the service will have been able to pass 3 checkpoints to the non-lagging
437        // subscriber, while the laggard's buffer fills up. Now the laggard will pull two
438        // checkpoints, which will allow the rest of the pipeline to progress enough for the live
439        // subscriber to receive its quota.
440        assert_eq!(unblock(&mut laggard).await, 1);
441        assert_eq!(unblock(&mut laggard).await, 2);
442
443        cancel.cancelled().await;
444        let seqs = subscriber.await.unwrap();
445        broadcaster.await.unwrap();
446
447        assert_eq!(seqs, vec![1, 2, 3, 4, 5]);
448    }
449}