1#![allow(clippy::disallowed_methods)]
8
9use std::{sync::Arc, time::Duration};
10
11use prometheus::Registry;
12use serde::{Deserialize, Serialize};
13use tokio::{sync::mpsc, task::JoinHandle};
14use tokio_util::sync::CancellationToken;
15
16use crate::ingestion::broadcaster::broadcaster;
17use crate::ingestion::error::{Error, Result};
18use crate::ingestion::ingestion_client::{IngestionClient, IngestionClientArgs};
19use crate::ingestion::streaming_client::{GrpcStreamingClient, StreamingClientArgs};
20use crate::metrics::IngestionMetrics;
21use crate::types::full_checkpoint_content::Checkpoint;
22
23mod broadcaster;
24pub mod error;
25pub mod ingestion_client;
26mod local_client;
27pub mod remote_client;
28mod rpc_client;
29pub mod streaming_client;
30#[cfg(test)]
31mod test_utils;
32
33pub(crate) const MAX_GRPC_MESSAGE_SIZE_BYTES: usize = 128 * 1024 * 1024;
34
35#[derive(clap::Args, Clone, Debug, Default)]
38pub struct ClientArgs {
39 #[clap(flatten)]
40 pub ingestion: IngestionClientArgs,
41
42 #[clap(flatten)]
43 pub streaming: StreamingClientArgs,
44}
45
46#[derive(Serialize, Deserialize, Debug, Clone)]
47pub struct IngestionConfig {
48 pub checkpoint_buffer_size: usize,
50
51 pub ingest_concurrency: usize,
53
54 pub retry_interval_ms: u64,
56
57 pub streaming_backoff_initial_batch_size: usize,
59
60 pub streaming_backoff_max_batch_size: usize,
62}
63
64pub struct IngestionService {
65 config: IngestionConfig,
66 ingestion_client: IngestionClient,
67 streaming_client: Option<GrpcStreamingClient>,
68 commit_hi_tx: mpsc::UnboundedSender<(&'static str, u64)>,
69 commit_hi_rx: mpsc::UnboundedReceiver<(&'static str, u64)>,
70 subscribers: Vec<mpsc::Sender<Arc<Checkpoint>>>,
71 metrics: Arc<IngestionMetrics>,
72 cancel: CancellationToken,
73}
74
75impl IngestionConfig {
76 pub fn retry_interval(&self) -> Duration {
77 Duration::from_millis(self.retry_interval_ms)
78 }
79}
80
81impl IngestionService {
82 pub fn new(
92 args: ClientArgs,
93 config: IngestionConfig,
94 metrics_prefix: Option<&str>,
95 registry: &Registry,
96 cancel: CancellationToken,
97 ) -> Result<Self> {
98 let metrics = IngestionMetrics::new(metrics_prefix, registry);
99 let ingestion_client = IngestionClient::new(args.ingestion, metrics.clone())?;
100 let streaming_client = args.streaming.streaming_url.map(GrpcStreamingClient::new);
101
102 let subscribers = Vec::new();
103 let (commit_hi_tx, commit_hi_rx) = mpsc::unbounded_channel();
104 Ok(Self {
105 config,
106 ingestion_client,
107 streaming_client,
108 commit_hi_tx,
109 commit_hi_rx,
110 subscribers,
111 metrics,
112 cancel,
113 })
114 }
115
116 pub(crate) fn ingestion_client(&self) -> &IngestionClient {
118 &self.ingestion_client
119 }
120
121 pub(crate) fn metrics(&self) -> &Arc<IngestionMetrics> {
123 &self.metrics
124 }
125
126 pub fn subscribe(
136 &mut self,
137 ) -> (
138 mpsc::Receiver<Arc<Checkpoint>>,
139 mpsc::UnboundedSender<(&'static str, u64)>,
140 ) {
141 let (sender, receiver) = mpsc::channel(self.config.checkpoint_buffer_size);
142 self.subscribers.push(sender);
143 (receiver, self.commit_hi_tx.clone())
144 }
145
146 pub async fn run<R>(
161 self,
162 checkpoints: R,
163 initial_commit_hi: Option<u64>,
164 ) -> Result<JoinHandle<()>>
165 where
166 R: std::ops::RangeBounds<u64> + Send + 'static,
167 {
168 let IngestionService {
169 config,
170 ingestion_client,
171 streaming_client,
172 commit_hi_tx: _,
173 commit_hi_rx,
174 subscribers,
175 metrics,
176 cancel,
177 } = self;
178
179 if subscribers.is_empty() {
180 return Err(Error::NoSubscribers);
181 }
182
183 let broadcaster = broadcaster(
184 checkpoints,
185 initial_commit_hi,
186 streaming_client,
187 config,
188 ingestion_client,
189 commit_hi_rx,
190 subscribers,
191 metrics,
192 cancel.clone(),
193 );
194
195 Ok(broadcaster)
196 }
197}
198
199impl Default for IngestionConfig {
200 fn default() -> Self {
201 Self {
202 checkpoint_buffer_size: 5000,
203 ingest_concurrency: 200,
204 retry_interval_ms: 200,
205 streaming_backoff_initial_batch_size: 10, streaming_backoff_max_batch_size: 10000, }
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use std::sync::Mutex;
214
215 use reqwest::StatusCode;
216 use url::Url;
217 use wiremock::{MockServer, Request};
218
219 use crate::ingestion::remote_client::tests::{respond_with, status};
220 use crate::ingestion::test_utils::test_checkpoint_data;
221
222 use super::*;
223
224 async fn test_ingestion(
225 uri: String,
226 checkpoint_buffer_size: usize,
227 ingest_concurrency: usize,
228 cancel: CancellationToken,
229 ) -> IngestionService {
230 let registry = Registry::new();
231 IngestionService::new(
232 ClientArgs {
233 ingestion: IngestionClientArgs {
234 remote_store_url: Some(Url::parse(&uri).unwrap()),
235 ..Default::default()
236 },
237 ..Default::default()
238 },
239 IngestionConfig {
240 checkpoint_buffer_size,
241 ingest_concurrency,
242 ..Default::default()
243 },
244 None,
245 ®istry,
246 cancel,
247 )
248 .unwrap()
249 }
250
251 async fn test_subscriber(
252 stop_after: usize,
253 mut rx: mpsc::Receiver<Arc<Checkpoint>>,
254 cancel: CancellationToken,
255 ) -> JoinHandle<Vec<u64>> {
256 tokio::spawn(async move {
257 let mut seqs = vec![];
258 for _ in 0..stop_after {
259 tokio::select! {
260 _ = cancel.cancelled() => break,
261 Some(checkpoint) = rx.recv() => {
262 seqs.push(checkpoint.summary.sequence_number);
263 }
264 }
265 }
266
267 rx.close();
268 seqs
269 })
270 }
271
272 #[tokio::test]
275 async fn fail_on_no_subscribers() {
276 telemetry_subscribers::init_for_testing();
277
278 let server = MockServer::start().await;
281 respond_with(&server, status(StatusCode::NOT_FOUND)).await;
282
283 let cancel = CancellationToken::new();
284 let ingestion_service = test_ingestion(server.uri(), 1, 1, cancel.clone()).await;
285
286 let err = ingestion_service.run(0.., None).await.unwrap_err();
287 assert!(matches!(err, Error::NoSubscribers));
288 }
289
290 #[tokio::test]
293 async fn shutdown_on_cancel() {
294 telemetry_subscribers::init_for_testing();
295
296 let server = MockServer::start().await;
297 respond_with(
298 &server,
299 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
300 )
301 .await;
302
303 let cancel = CancellationToken::new();
304 let mut ingestion_service = test_ingestion(server.uri(), 1, 1, cancel.clone()).await;
305
306 let (rx, _) = ingestion_service.subscribe();
307 let subscriber = test_subscriber(usize::MAX, rx, cancel.clone()).await;
308 let broadcaster = ingestion_service.run(0.., None).await.unwrap();
309
310 cancel.cancel();
311 subscriber.await.unwrap();
312 broadcaster.await.unwrap();
313 }
314
315 #[tokio::test]
318 async fn shutdown_on_subscriber_drop() {
319 telemetry_subscribers::init_for_testing();
320
321 let server = MockServer::start().await;
322 respond_with(
323 &server,
324 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
325 )
326 .await;
327
328 let cancel = CancellationToken::new();
329 let mut ingestion_service = test_ingestion(server.uri(), 1, 1, cancel.clone()).await;
330
331 let (rx, _) = ingestion_service.subscribe();
332 let subscriber = test_subscriber(1, rx, cancel.clone()).await;
333 let broadcaster = ingestion_service.run(0.., None).await.unwrap();
334
335 cancel.cancelled().await;
336 subscriber.await.unwrap();
337 broadcaster.await.unwrap();
338 }
339
340 #[tokio::test]
343 async fn retry_on_not_found() {
344 telemetry_subscribers::init_for_testing();
345
346 let server = MockServer::start().await;
347 let times: Mutex<u64> = Mutex::new(0);
348 respond_with(&server, move |_: &Request| {
349 let mut times = times.lock().unwrap();
350 *times += 1;
351 match *times {
352 1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
353 4..6 => status(StatusCode::NOT_FOUND),
354 _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
355 }
356 })
357 .await;
358
359 let cancel = CancellationToken::new();
360 let mut ingestion_service = test_ingestion(server.uri(), 1, 1, cancel.clone()).await;
361
362 let (rx, _) = ingestion_service.subscribe();
363 let subscriber = test_subscriber(5, rx, cancel.clone()).await;
364 let broadcaster = ingestion_service.run(0.., None).await.unwrap();
365
366 cancel.cancelled().await;
367 let seqs = subscriber.await.unwrap();
368 broadcaster.await.unwrap();
369
370 assert_eq!(seqs, vec![1, 2, 3, 6, 7]);
371 }
372
373 #[tokio::test]
375 async fn retry_on_transient_error() {
376 telemetry_subscribers::init_for_testing();
377
378 let server = MockServer::start().await;
379 let times: Mutex<u64> = Mutex::new(0);
380 respond_with(&server, move |_: &Request| {
381 let mut times = times.lock().unwrap();
382 *times += 1;
383 match *times {
384 1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
385 4..6 => status(StatusCode::REQUEST_TIMEOUT),
386 _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
387 }
388 })
389 .await;
390
391 let cancel = CancellationToken::new();
392 let mut ingestion_service = test_ingestion(server.uri(), 1, 1, cancel.clone()).await;
393
394 let (rx, _) = ingestion_service.subscribe();
395 let subscriber = test_subscriber(5, rx, cancel.clone()).await;
396 let broadcaster = ingestion_service.run(0.., None).await.unwrap();
397
398 cancel.cancelled().await;
399 let seqs = subscriber.await.unwrap();
400 broadcaster.await.unwrap();
401
402 assert_eq!(seqs, vec![1, 2, 3, 6, 7]);
403 }
404
405 #[tokio::test]
409 async fn back_pressure_and_buffering() {
410 telemetry_subscribers::init_for_testing();
411
412 let server = MockServer::start().await;
413 let times: Mutex<u64> = Mutex::new(0);
414 respond_with(&server, move |_: &Request| {
415 let mut times = times.lock().unwrap();
416 *times += 1;
417 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times))
418 })
419 .await;
420
421 let cancel = CancellationToken::new();
422 let mut ingestion_service =
423 test_ingestion(server.uri(), 3, 1, cancel.clone()).await;
424
425 let (mut laggard, _) = ingestion_service.subscribe();
427 async fn unblock(laggard: &mut mpsc::Receiver<Arc<Checkpoint>>) -> u64 {
428 let checkpoint = laggard.recv().await.unwrap();
429 checkpoint.summary.sequence_number
430 }
431
432 let (rx, _) = ingestion_service.subscribe();
433 let subscriber = test_subscriber(5, rx, cancel.clone()).await;
434 let broadcaster = ingestion_service.run(0.., None).await.unwrap();
435
436 assert_eq!(unblock(&mut laggard).await, 1);
441 assert_eq!(unblock(&mut laggard).await, 2);
442
443 cancel.cancelled().await;
444 let seqs = subscriber.await.unwrap();
445 broadcaster.await.unwrap();
446
447 assert_eq!(seqs, vec![1, 2, 3, 4, 5]);
448 }
449}