sui_indexer_alt_framework/ingestion/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::num::NonZeroUsize;
5use std::sync::Arc;
6use std::time::Duration;
7
8use prometheus::Registry;
9use serde::Deserialize;
10use serde::Serialize;
11use sui_futures::service::Service;
12use tokio::sync::mpsc;
13use tracing::warn;
14
15pub use crate::config::ConcurrencyConfig as IngestConcurrencyConfig;
16use crate::ingestion::broadcaster::broadcaster;
17use crate::ingestion::error::Error;
18use crate::ingestion::error::Result;
19use crate::ingestion::ingestion_client::CheckpointEnvelope;
20use crate::ingestion::ingestion_client::IngestionClient;
21use crate::ingestion::ingestion_client::IngestionClientArgs;
22use crate::ingestion::ingestion_client::retry_transient_with_slow_monitor;
23use crate::ingestion::streaming_client::CheckpointStreamingClient;
24use crate::ingestion::streaming_client::GrpcStreamingClient;
25use crate::ingestion::streaming_client::StreamingClientArgs;
26use crate::metrics::IngestionMetrics;
27
28/// Type alias for a boxed [`CheckpointStreamingClient`] trait object,
29/// the form [`IngestionService`] stores and the broadcaster consumes.
30/// Boxed (rather than `Arc`'d) because `CheckpointStreamingClient`'s
31/// methods take `&mut self`, so unique ownership is required. The
32/// `Send + Sync` bounds let the boxed client move across task
33/// boundaries and be shared behind a reference when an enclosing
34/// [`IngestionService`] is held across threads.
35pub type BoxedStreamingClient = Box<dyn CheckpointStreamingClient + Send + Sync>;
36
37mod broadcaster;
38mod byte_count;
39pub(crate) mod decode;
40pub mod error;
41pub mod ingestion_client;
42mod rpc_client;
43pub mod store_client;
44pub mod streaming_client;
45#[cfg(test)]
46mod test_utils;
47
48pub(crate) const MAX_GRPC_MESSAGE_SIZE_BYTES: usize = 128 * 1024 * 1024;
49
50/// Combined arguments for both ingestion and streaming clients.
51/// This is a convenience wrapper that flattens both argument types.
52#[derive(clap::Args, Clone, Debug, Default)]
53pub struct ClientArgs {
54    #[clap(flatten)]
55    pub ingestion: IngestionClientArgs,
56
57    #[clap(flatten)]
58    pub streaming: StreamingClientArgs,
59}
60
61#[derive(Serialize, Deserialize, Debug, Clone)]
62pub struct IngestionConfig {
63    /// Concurrency control for checkpoint ingestion. A plain integer gives fixed concurrency;
64    /// an object with `initial`, `min`, and `max` fields enables adaptive concurrency that adjusts
65    /// based on subscriber channel fill fraction.
66    pub ingest_concurrency: IngestConcurrencyConfig,
67
68    /// Polling interval to retry fetching checkpoints that do not exist, in milliseconds.
69    pub retry_interval_ms: u64,
70
71    /// Initial number of checkpoints to process using ingestion after a streaming connection failure.
72    pub streaming_backoff_initial_batch_size: NonZeroUsize,
73
74    /// Maximum number of checkpoints to process using ingestion after repeated streaming connection failures.
75    pub streaming_backoff_max_batch_size: usize,
76
77    /// Timeout for streaming connection in milliseconds.
78    pub streaming_connection_timeout_ms: u64,
79
80    /// Timeout for streaming statement (peek/next) operations in milliseconds.
81    pub streaming_statement_timeout_ms: u64,
82}
83
84pub struct IngestionService {
85    config: IngestionConfig,
86    ingestion_client: IngestionClient,
87    streaming_client: Option<BoxedStreamingClient>,
88    subscribers: Vec<mpsc::Sender<Arc<CheckpointEnvelope>>>,
89    metrics: Arc<IngestionMetrics>,
90}
91
92impl IngestionConfig {
93    pub fn retry_interval(&self) -> Duration {
94        Duration::from_millis(self.retry_interval_ms)
95    }
96
97    pub fn streaming_connection_timeout(&self) -> Duration {
98        Duration::from_millis(self.streaming_connection_timeout_ms)
99    }
100
101    pub fn streaming_statement_timeout(&self) -> Duration {
102        Duration::from_millis(self.streaming_statement_timeout_ms)
103    }
104}
105
106impl IngestionService {
107    /// Create a new instance of the ingestion service, responsible for fetching checkpoints and
108    /// disseminating them to subscribers.
109    ///
110    /// - `args` specifies where to fetch checkpoints from.
111    /// - `config` specifies the various sizes and time limits for ingestion.
112    /// - `metrics_prefix` and `registry` are used to set up metrics for the service.
113    ///
114    /// After initialization, subscribers can be added using [Self::subscribe_bounded], and the
115    /// service is started with [Self::run], given a range of checkpoints to fetch (potentially
116    /// unbounded).
117    pub fn new(
118        args: ClientArgs,
119        config: IngestionConfig,
120        metrics_prefix: Option<&str>,
121        registry: &Registry,
122    ) -> Result<Self> {
123        let metrics = IngestionMetrics::new(metrics_prefix, registry);
124        let ingestion_client = IngestionClient::new(args.ingestion, metrics.clone())?;
125        let streaming_client: Option<BoxedStreamingClient> =
126            args.streaming.streaming_url.map(|uri| {
127                Box::new(GrpcStreamingClient::new(
128                    uri,
129                    config.streaming_connection_timeout(),
130                    config.streaming_statement_timeout(),
131                )) as BoxedStreamingClient
132            });
133        Ok(Self::from_clients(
134            ingestion_client,
135            streaming_client,
136            config,
137            metrics,
138        ))
139    }
140
141    /// Construct an [`IngestionService`] from pre-built clients, bypassing [`ClientArgs`]-driven
142    /// construction.
143    ///
144    /// Callers that supply their own [`IngestionClientTrait`] / [`CheckpointStreamingClient`]
145    /// implementations — for example, when embedding the indexer in a fullnode that already has
146    /// checkpoint data on hand — use this constructor instead of [`Self::new`].
147    ///
148    /// `metrics` is the shared [`IngestionMetrics`] handle. The caller is expected to have built
149    /// it once and passed the same handle to [`IngestionClient::from_trait`] (or another
150    /// `IngestionClient` constructor) so the service and the client report against a single set of
151    /// registered metric vectors — building two sets against the same prometheus registry would
152    /// double-register the metric names.
153    ///
154    /// [`IngestionClientTrait`]: crate::ingestion::ingestion_client::IngestionClientTrait
155    pub fn with_clients(
156        ingestion_client: IngestionClient,
157        streaming_client: Option<BoxedStreamingClient>,
158        config: IngestionConfig,
159        metrics: Arc<IngestionMetrics>,
160    ) -> Self {
161        Self::from_clients(ingestion_client, streaming_client, config, metrics)
162    }
163
164    /// Common assembly point for both [`Self::new`] and [`Self::with_clients`]: stamps the fields
165    /// onto the struct once a metrics handle has been resolved (either built fresh from a
166    /// registry, or in the [`Self::new`] case, threaded through from the ingestion-client
167    /// construction).
168    fn from_clients(
169        ingestion_client: IngestionClient,
170        streaming_client: Option<BoxedStreamingClient>,
171        config: IngestionConfig,
172        metrics: Arc<IngestionMetrics>,
173    ) -> Self {
174        Self {
175            config,
176            ingestion_client,
177            streaming_client,
178            subscribers: Vec::new(),
179            metrics,
180        }
181    }
182
183    /// The ingestion client this service uses to fetch checkpoints.
184    pub(crate) fn ingestion_client(&self) -> &IngestionClient {
185        &self.ingestion_client
186    }
187
188    /// Return the latest checkpoint number known to the ingestion service, preferably via the
189    /// streaming client, and failing that via the ingestion client.
190    pub async fn latest_checkpoint_number(&mut self) -> anyhow::Result<u64> {
191        if let Some(streaming_client) = self.streaming_client.as_deref_mut() {
192            match streaming_client.latest_checkpoint_number().await {
193                Ok(checkpoint_number) => return Ok(checkpoint_number),
194                Err(e) => {
195                    warn!(
196                        operation = "latest_checkpoint_number",
197                        "Failed to get latest checkpoint number from streaming client: {e}"
198                    );
199                }
200            }
201        }
202
203        let ingestion_client = self.ingestion_client.clone();
204        let future = move || {
205            let ingestion_client = ingestion_client.clone();
206            async move {
207                ingestion_client
208                    .latest_checkpoint_number()
209                    .await
210                    .map_err(|e| backoff::Error::transient(Error::LatestCheckpointError(e)))
211            }
212        };
213
214        Ok(retry_transient_with_slow_monitor(
215            "latest_checkpoint_number",
216            future,
217            &self.metrics.ingested_latest_checkpoint_latency,
218        )
219        .await?)
220    }
221
222    /// Access to the ingestion metrics.
223    pub(crate) fn metrics(&self) -> &Arc<IngestionMetrics> {
224        &self.metrics
225    }
226
227    /// The ingestion configuration this service was built with.
228    pub fn config(&self) -> &IngestionConfig {
229        &self.config
230    }
231
232    /// Add a new subscription backed by a bounded `mpsc` channel of the given capacity. The
233    /// channel itself is the backpressure signal: when this consumer falls behind, the channel
234    /// fills and the adaptive ingestion controller cuts fetch concurrency. Send blocks when the
235    /// channel is full.
236    ///
237    /// Callers typically pass `pipeline::IngestionConfig::subscriber_channel_size()`.
238    pub fn subscribe_bounded(&mut self, size: usize) -> mpsc::Receiver<Arc<CheckpointEnvelope>> {
239        let (tx, rx) = mpsc::channel(size);
240        self.subscribers.push(tx);
241        rx
242    }
243
244    /// Start the ingestion service as a background task, consuming it in the process.
245    ///
246    /// Checkpoints are fetched concurrently from the `checkpoints` iterator and pushed to
247    /// subscribers' channels (potentially out-of-order). Each subscriber's bounded channel
248    /// acts as the backpressure signal: when it fills, the adaptive ingestion controller
249    /// throttles fetch concurrency. The slowest subscriber gates ingestion for everyone.
250    ///
251    /// If a subscriber closes its channel, the ingestion service shuts down as well.
252    ///
253    /// If ingestion reaches the leading edge of the network, it will encounter checkpoints
254    /// that do not exist yet. These are retried on a fixed `retry_interval` until they become
255    /// available.
256    pub async fn run<R>(self, checkpoints: R) -> Result<Service>
257    where
258        R: std::ops::RangeBounds<u64> + Send + 'static,
259    {
260        let IngestionService {
261            config,
262            ingestion_client,
263            streaming_client,
264            subscribers,
265            metrics,
266        } = self;
267
268        if subscribers.is_empty() {
269            return Err(Error::NoSubscribers);
270        }
271
272        Ok(broadcaster(
273            checkpoints,
274            streaming_client,
275            config,
276            ingestion_client,
277            subscribers,
278            metrics,
279        ))
280    }
281}
282
283impl Default for IngestionConfig {
284    fn default() -> Self {
285        Self {
286            ingest_concurrency: IngestConcurrencyConfig::Adaptive {
287                initial: 1,
288                min: 1,
289                max: 500,
290                dead_band: None,
291            },
292            retry_interval_ms: 200,
293            streaming_backoff_initial_batch_size: NonZeroUsize::new(10)
294                .expect("default initial streaming backoff is non-zero"), // 10 checkpoints, ~ 2 seconds
295            streaming_backoff_max_batch_size: 10000, // 10000 checkpoints, ~ 40 minutes
296            streaming_connection_timeout_ms: 5000,   // 5 seconds
297            streaming_statement_timeout_ms: 5000,    // 5 seconds
298        }
299    }
300}
301
302#[cfg(test)]
303mod tests {
304    use std::sync::Mutex;
305
306    use axum::http::StatusCode;
307    use sui_futures::task::TaskGuard;
308    use url::Url;
309    use wiremock::MockServer;
310    use wiremock::Request;
311
312    use crate::ingestion::ingestion_client::tests::MockIngestionClient;
313    use crate::ingestion::store_client::tests::respond_with;
314    use crate::ingestion::store_client::tests::respond_with_chain_id;
315    use crate::ingestion::store_client::tests::status;
316    use crate::ingestion::streaming_client::test_utils::MockStreamingClient;
317    use crate::ingestion::test_utils::test_checkpoint_data;
318    use crate::metrics::IngestionMetrics;
319
320    use super::*;
321
322    const FALLBACK: u64 = 99;
323
324    fn mock_ingestion_client(latest_checkpoint: u64) -> IngestionClient {
325        let metrics = IngestionMetrics::new(None, &Registry::new());
326        let mock = MockIngestionClient {
327            latest_checkpoint,
328            ..Default::default()
329        };
330        IngestionClient::from_trait(Arc::new(mock), metrics)
331    }
332
333    async fn test_ingestion(uri: String, ingest_concurrency: usize) -> IngestionService {
334        let registry = Registry::new();
335        IngestionService::new(
336            ClientArgs {
337                ingestion: IngestionClientArgs {
338                    remote_store_url: Some(Url::parse(&uri).unwrap()),
339                    ..Default::default()
340                },
341                ..Default::default()
342            },
343            IngestionConfig {
344                ingest_concurrency: IngestConcurrencyConfig::Fixed {
345                    value: ingest_concurrency,
346                },
347                ..Default::default()
348            },
349            None,
350            &registry,
351        )
352        .unwrap()
353    }
354
355    async fn test_subscriber(
356        stop_after: usize,
357        mut rx: mpsc::Receiver<Arc<CheckpointEnvelope>>,
358    ) -> TaskGuard<Vec<u64>> {
359        TaskGuard::new(tokio::spawn(async move {
360            let mut seqs = vec![];
361            for _ in 0..stop_after {
362                let Some(checkpoint_envelope) = rx.recv().await else {
363                    break;
364                };
365
366                seqs.push(checkpoint_envelope.checkpoint.summary.sequence_number);
367            }
368
369            seqs
370        }))
371    }
372
373    /// Probe the streaming client (if any) for the latest checkpoint number, falling back to the
374    /// ingestion client on no-streaming-client or streaming-side failure. Mirrors the inline logic in
375    /// [`IngestionService::latest_checkpoint_number`] in a form unit tests can drive directly with
376    /// concrete mock clients (without having to build an `IngestionService`).
377    #[cfg(test)]
378    async fn latest_checkpoint_number<S>(
379        streaming_client: Option<&mut S>,
380        ingestion_client: &IngestionClient,
381    ) -> anyhow::Result<u64>
382    where
383        S: CheckpointStreamingClient + Send + ?Sized,
384    {
385        if let Some(streaming_client) = streaming_client {
386            match streaming_client.latest_checkpoint_number().await {
387                Ok(checkpoint_number) => return Ok(checkpoint_number),
388                Err(e) => {
389                    warn!(
390                        operation = "latest_checkpoint_number",
391                        "Failed to get latest checkpoint number from streaming client: {e}"
392                    );
393                }
394            }
395        }
396
397        ingestion_client.latest_checkpoint_number().await
398    }
399
400    /// If the ingestion service has no subscribers, it will fail fast (before fetching any
401    /// checkpoints).
402    #[tokio::test]
403    async fn fail_on_no_subscribers() {
404        // The mock server will repeatedly return 404, so if the service does try to fetch a
405        // checkpoint, it will be stuck repeatedly retrying.
406        let server = MockServer::start().await;
407        respond_with(&server, status(StatusCode::NOT_FOUND)).await;
408
409        let ingestion_service = test_ingestion(server.uri(), 1).await;
410
411        let res = ingestion_service.run(0..).await;
412        assert!(matches!(res, Err(Error::NoSubscribers)));
413    }
414
415    /// The subscriber has no effective limit, and the mock server will always return checkpoint
416    /// information, but the ingestion service can still be stopped by shutting it down.
417    #[tokio::test]
418    async fn shutdown() {
419        let server = MockServer::start().await;
420        respond_with(
421            &server,
422            status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
423        )
424        .await;
425        respond_with_chain_id(&server).await;
426
427        let mut ingestion_service = test_ingestion(server.uri(), 1).await;
428
429        let rx = ingestion_service.subscribe_bounded(1);
430        let subscriber = test_subscriber(usize::MAX, rx).await;
431        let svc = ingestion_service.run(0..).await.unwrap();
432
433        svc.shutdown().await.unwrap();
434        subscriber.await.unwrap();
435    }
436
437    /// The subscriber will stop after receiving a single checkpoint, and this will trigger the
438    /// ingestion service to stop as well, even if there are more checkpoints to fetch.
439    #[tokio::test]
440    async fn shutdown_on_subscriber_drop() {
441        let server = MockServer::start().await;
442        respond_with(
443            &server,
444            status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
445        )
446        .await;
447        respond_with_chain_id(&server).await;
448
449        let mut ingestion_service = test_ingestion(server.uri(), 1).await;
450
451        let rx = ingestion_service.subscribe_bounded(1);
452        let subscriber = test_subscriber(1, rx).await;
453        let mut svc = ingestion_service.run(0..).await.unwrap();
454
455        drop(subscriber);
456        svc.join().await.unwrap();
457    }
458
459    /// The service will retry fetching a checkpoint that does not exist, in this test, the 4th
460    /// checkpoint will return 404 a couple of times, before eventually succeeding.
461    #[tokio::test]
462    async fn retry_on_not_found() {
463        let server = MockServer::start().await;
464        let times: Mutex<u64> = Mutex::new(0);
465        respond_with(&server, move |_: &Request| {
466            let mut times = times.lock().unwrap();
467            *times += 1;
468            match *times {
469                1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
470                4..6 => status(StatusCode::NOT_FOUND),
471                _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
472            }
473        })
474        .await;
475        respond_with_chain_id(&server).await;
476
477        let mut ingestion_service = test_ingestion(server.uri(), 1).await;
478
479        let rx = ingestion_service.subscribe_bounded(1);
480        let subscriber = test_subscriber(6, rx).await;
481        let _svc = ingestion_service.run(0..).await.unwrap();
482
483        let seqs = subscriber.await.unwrap();
484        assert_eq!(seqs, vec![0, 1, 2, 3, 6, 7]);
485    }
486
487    /// Similar to the previous test, but now it's a transient error that causes the retry.
488    #[tokio::test]
489    async fn retry_on_transient_error() {
490        let server = MockServer::start().await;
491        let times: Mutex<u64> = Mutex::new(0);
492        respond_with(&server, move |_: &Request| {
493            let mut times = times.lock().unwrap();
494            *times += 1;
495            match *times {
496                1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
497                4..6 => status(StatusCode::REQUEST_TIMEOUT),
498                _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
499            }
500        })
501        .await;
502        respond_with_chain_id(&server).await;
503
504        let mut ingestion_service = test_ingestion(server.uri(), 1).await;
505
506        let rx = ingestion_service.subscribe_bounded(1);
507        let subscriber = test_subscriber(6, rx).await;
508        let _svc = ingestion_service.run(0..).await.unwrap();
509
510        let seqs = subscriber.await.unwrap();
511        assert_eq!(seqs, vec![0, 1, 2, 3, 6, 7]);
512    }
513
514    /// One subscriber is going to stop processing checkpoints, so even though the service can keep
515    /// fetching checkpoints, it will stop short because of the slow receiver. Other subscribers
516    /// can keep processing checkpoints that were buffered for the slow one.
517    #[tokio::test]
518    async fn back_pressure_and_buffering() {
519        let server = MockServer::start().await;
520        let times: Mutex<u64> = Mutex::new(0);
521        respond_with(&server, move |_: &Request| {
522            let mut times = times.lock().unwrap();
523            *times += 1;
524            status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times))
525        })
526        .await;
527        respond_with_chain_id(&server).await;
528
529        let mut ingestion_service = test_ingestion(server.uri(), 1).await;
530        let size = 3;
531
532        // This subscriber will take its sweet time processing checkpoints.
533        let mut laggard = ingestion_service.subscribe_bounded(size);
534        async fn unblock(laggard: &mut mpsc::Receiver<Arc<CheckpointEnvelope>>) -> u64 {
535            let checkpoint_envelope = laggard.recv().await.unwrap();
536            checkpoint_envelope.checkpoint.summary.sequence_number
537        }
538
539        let rx = ingestion_service.subscribe_bounded(size);
540        let subscriber = test_subscriber(6, rx).await;
541        let _svc = ingestion_service.run(0..).await.unwrap();
542
543        // At this point, the service will have been able to pass 3 checkpoints to the non-lagging
544        // subscriber, while the laggard's buffer fills up. Now the laggard will pull two
545        // checkpoints, which will allow the rest of the pipeline to progress enough for the live
546        // subscriber to receive its quota. Checkpoint 0 is served by the chain_id mock.
547        assert_eq!(unblock(&mut laggard).await, 0);
548        assert_eq!(unblock(&mut laggard).await, 1);
549        assert_eq!(unblock(&mut laggard).await, 2);
550
551        let seqs = subscriber.await.unwrap();
552        assert_eq!(seqs, vec![0, 1, 2, 3, 4, 5]);
553    }
554
555    #[tokio::test]
556    async fn latest_checkpoint_number_no_streaming_client() {
557        let client = mock_ingestion_client(FALLBACK);
558        let mut streaming: Option<MockStreamingClient> = None;
559        let result = latest_checkpoint_number(streaming.as_mut(), &client).await;
560        assert_eq!(result.unwrap(), FALLBACK);
561    }
562
563    #[tokio::test]
564    async fn latest_checkpoint_number_from_stream() {
565        let client = mock_ingestion_client(FALLBACK);
566        let mut streaming = Some(MockStreamingClient::new([42], None));
567        let result = latest_checkpoint_number(streaming.as_mut(), &client).await;
568        assert_eq!(result.unwrap(), 42);
569    }
570
571    #[tokio::test]
572    async fn latest_checkpoint_number_stream_error_falls_back() {
573        let client = mock_ingestion_client(FALLBACK);
574        let mut mock = MockStreamingClient::new(std::iter::empty::<u64>(), None);
575        mock.insert_error();
576        let mut streaming = Some(mock);
577        let result = latest_checkpoint_number(streaming.as_mut(), &client).await;
578        assert_eq!(result.unwrap(), FALLBACK);
579    }
580
581    #[tokio::test]
582    async fn latest_checkpoint_number_empty_stream_falls_back() {
583        let client = mock_ingestion_client(FALLBACK);
584        let mut streaming = Some(MockStreamingClient::new(std::iter::empty::<u64>(), None));
585        let result = latest_checkpoint_number(streaming.as_mut(), &client).await;
586        assert_eq!(result.unwrap(), FALLBACK);
587    }
588
589    #[tokio::test]
590    async fn latest_checkpoint_number_connection_failure_falls_back() {
591        let client = mock_ingestion_client(FALLBACK);
592        let mut streaming = Some(
593            MockStreamingClient::new(std::iter::empty::<u64>(), None).fail_connection_times(1),
594        );
595        let result = latest_checkpoint_number(streaming.as_mut(), &client).await;
596        assert_eq!(result.unwrap(), FALLBACK);
597    }
598
599    #[test]
600    fn reject_zero_initial_streaming_backoff_batch_size() {
601        let mut config = serde_json::to_value(IngestionConfig::default()).unwrap();
602        config["streaming_backoff_initial_batch_size"] = serde_json::json!(0);
603
604        let error = serde_json::from_value::<IngestionConfig>(config).unwrap_err();
605        assert!(error.to_string().contains("nonzero"));
606    }
607
608    /// `with_clients` resolves `latest_checkpoint_number` through the supplied streaming client
609    /// when it is healthy, without going through `ClientArgs`-driven setup.
610    #[tokio::test]
611    async fn with_clients_uses_supplied_streaming_client() {
612        const STREAM_LATEST: u64 = 42;
613
614        let registry = Registry::new();
615        let metrics = IngestionMetrics::new(None, &registry);
616        let mut service = IngestionService::with_clients(
617            IngestionClient::from_trait(
618                Arc::new(MockIngestionClient {
619                    latest_checkpoint: FALLBACK,
620                    ..Default::default()
621                }),
622                metrics.clone(),
623            ),
624            Some(Box::new(MockStreamingClient::new([STREAM_LATEST], None))),
625            IngestionConfig::default(),
626            metrics,
627        );
628
629        let latest = service.latest_checkpoint_number().await.unwrap();
630        assert_eq!(latest, STREAM_LATEST);
631    }
632
633    /// With no streaming client supplied, `with_clients` falls back to the ingestion client for
634    /// the latest-checkpoint probe.
635    #[tokio::test]
636    async fn with_clients_falls_back_to_ingestion_when_no_streaming() {
637        let registry = Registry::new();
638        let metrics = IngestionMetrics::new(None, &registry);
639        let mut service = IngestionService::with_clients(
640            IngestionClient::from_trait(
641                Arc::new(MockIngestionClient {
642                    latest_checkpoint: FALLBACK,
643                    ..Default::default()
644                }),
645                metrics.clone(),
646            ),
647            None,
648            IngestionConfig::default(),
649            metrics,
650        );
651
652        let latest = service.latest_checkpoint_number().await.unwrap();
653        assert_eq!(latest, FALLBACK);
654    }
655}