#![allow(clippy::disallowed_methods)]
use std::{path::PathBuf, sync::Arc, time::Duration};
use serde::{Deserialize, Serialize};
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use url::Url;
use crate::ingestion::broadcaster::broadcaster;
use crate::ingestion::client::IngestionClient;
use crate::ingestion::error::{Error, Result};
use crate::ingestion::regulator::regulator;
use crate::metrics::IndexerMetrics;
use crate::types::full_checkpoint_content::CheckpointData;
mod broadcaster;
pub mod client;
pub mod error;
mod local_client;
mod regulator;
mod remote_client;
mod rpc_client;
#[cfg(test)]
mod test_utils;
#[derive(clap::Args, Clone, Debug)]
#[group(required = true)]
pub struct ClientArgs {
#[clap(long, group = "source")]
pub remote_store_url: Option<Url>,
#[clap(long, group = "source")]
pub local_ingestion_path: Option<PathBuf>,
#[clap(long, env, group = "source")]
pub rpc_api_url: Option<Url>,
#[clap(long, env)]
pub rpc_username: Option<String>,
#[clap(long, env)]
pub rpc_password: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct IngestionConfig {
pub checkpoint_buffer_size: usize,
pub ingest_concurrency: usize,
pub retry_interval_ms: u64,
}
pub struct IngestionService {
config: IngestionConfig,
client: IngestionClient,
ingest_hi_tx: mpsc::UnboundedSender<(&'static str, u64)>,
ingest_hi_rx: mpsc::UnboundedReceiver<(&'static str, u64)>,
subscribers: Vec<mpsc::Sender<Arc<CheckpointData>>>,
cancel: CancellationToken,
}
impl IngestionConfig {
pub fn retry_interval(&self) -> Duration {
Duration::from_millis(self.retry_interval_ms)
}
}
impl IngestionService {
pub fn new(
args: ClientArgs,
config: IngestionConfig,
metrics: Arc<IndexerMetrics>,
cancel: CancellationToken,
) -> Result<Self> {
let client = if let Some(url) = args.remote_store_url.as_ref() {
IngestionClient::new_remote(url.clone(), metrics.clone())?
} else if let Some(path) = args.local_ingestion_path.as_ref() {
IngestionClient::new_local(path.clone(), metrics.clone())
} else if let Some(rpc_api_url) = args.rpc_api_url.as_ref() {
IngestionClient::new_rpc(
rpc_api_url.clone(),
args.rpc_username,
args.rpc_password,
metrics.clone(),
)?
} else {
panic!("One of remote_store_url, local_ingestion_path or rpc_api_url must be provided");
};
let subscribers = Vec::new();
let (ingest_hi_tx, ingest_hi_rx) = mpsc::unbounded_channel();
Ok(Self {
config,
client,
ingest_hi_tx,
ingest_hi_rx,
subscribers,
cancel,
})
}
pub(crate) fn client(&self) -> &IngestionClient {
&self.client
}
pub fn subscribe(
&mut self,
) -> (
mpsc::Receiver<Arc<CheckpointData>>,
mpsc::UnboundedSender<(&'static str, u64)>,
) {
let (sender, receiver) = mpsc::channel(self.config.checkpoint_buffer_size);
self.subscribers.push(sender);
(receiver, self.ingest_hi_tx.clone())
}
pub async fn run<I>(self, checkpoints: I) -> Result<(JoinHandle<()>, JoinHandle<()>)>
where
I: IntoIterator<Item = u64> + Send + Sync + 'static,
I::IntoIter: Send + Sync + 'static,
{
let IngestionService {
config,
client,
ingest_hi_tx: _,
ingest_hi_rx,
subscribers,
cancel,
} = self;
if subscribers.is_empty() {
return Err(Error::NoSubscribers);
}
let (checkpoint_tx, checkpoint_rx) = mpsc::channel(config.ingest_concurrency);
let regulator = regulator(
checkpoints,
config.checkpoint_buffer_size,
ingest_hi_rx,
checkpoint_tx,
cancel.clone(),
);
let broadcaster = broadcaster(config, client, checkpoint_rx, subscribers, cancel.clone());
Ok((regulator, broadcaster))
}
}
impl Default for IngestionConfig {
fn default() -> Self {
Self {
checkpoint_buffer_size: 5000,
ingest_concurrency: 200,
retry_interval_ms: 200,
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Mutex;
use reqwest::StatusCode;
use wiremock::{MockServer, Request};
use crate::ingestion::remote_client::tests::{respond_with, status};
use crate::ingestion::test_utils::test_checkpoint_data;
use crate::metrics::tests::test_metrics;
use super::*;
async fn test_ingestion(
uri: String,
checkpoint_buffer_size: usize,
ingest_concurrency: usize,
cancel: CancellationToken,
) -> IngestionService {
IngestionService::new(
ClientArgs {
remote_store_url: Some(Url::parse(&uri).unwrap()),
local_ingestion_path: None,
rpc_api_url: None,
rpc_username: None,
rpc_password: None,
},
IngestionConfig {
checkpoint_buffer_size,
ingest_concurrency,
..Default::default()
},
test_metrics(),
cancel,
)
.unwrap()
}
async fn test_subscriber(
stop_after: usize,
mut rx: mpsc::Receiver<Arc<CheckpointData>>,
cancel: CancellationToken,
) -> JoinHandle<Vec<u64>> {
tokio::spawn(async move {
let mut seqs = vec![];
for _ in 0..stop_after {
tokio::select! {
_ = cancel.cancelled() => break,
Some(checkpoint) = rx.recv() => {
seqs.push(checkpoint.checkpoint_summary.sequence_number);
}
}
}
rx.close();
seqs
})
}
#[tokio::test]
async fn fail_on_no_subscribers() {
telemetry_subscribers::init_for_testing();
let server = MockServer::start().await;
respond_with(&server, status(StatusCode::NOT_FOUND)).await;
let cancel = CancellationToken::new();
let ingestion_service = test_ingestion(server.uri(), 1, 1, cancel.clone()).await;
let err = ingestion_service.run(0..).await.unwrap_err();
assert!(matches!(err, Error::NoSubscribers));
}
#[tokio::test]
async fn shutdown_on_cancel() {
telemetry_subscribers::init_for_testing();
let server = MockServer::start().await;
respond_with(
&server,
status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
)
.await;
let cancel = CancellationToken::new();
let mut ingestion_service = test_ingestion(server.uri(), 1, 1, cancel.clone()).await;
let (rx, _) = ingestion_service.subscribe();
let subscriber = test_subscriber(usize::MAX, rx, cancel.clone()).await;
let (regulator, broadcaster) = ingestion_service.run(0..).await.unwrap();
cancel.cancel();
subscriber.await.unwrap();
regulator.await.unwrap();
broadcaster.await.unwrap();
}
#[tokio::test]
async fn shutdown_on_subscriber_drop() {
telemetry_subscribers::init_for_testing();
let server = MockServer::start().await;
respond_with(
&server,
status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
)
.await;
let cancel = CancellationToken::new();
let mut ingestion_service = test_ingestion(server.uri(), 1, 1, cancel.clone()).await;
let (rx, _) = ingestion_service.subscribe();
let subscriber = test_subscriber(1, rx, cancel.clone()).await;
let (regulator, broadcaster) = ingestion_service.run(0..).await.unwrap();
cancel.cancelled().await;
subscriber.await.unwrap();
regulator.await.unwrap();
broadcaster.await.unwrap();
}
#[tokio::test]
async fn shutdown_on_unexpected_error() {
telemetry_subscribers::init_for_testing();
let server = MockServer::start().await;
respond_with(&server, status(StatusCode::IM_A_TEAPOT)).await;
let cancel = CancellationToken::new();
let mut ingestion_service = test_ingestion(server.uri(), 1, 1, cancel.clone()).await;
let (rx, _) = ingestion_service.subscribe();
let subscriber = test_subscriber(usize::MAX, rx, cancel.clone()).await;
let (regulator, broadcaster) = ingestion_service.run(0..).await.unwrap();
cancel.cancelled().await;
subscriber.await.unwrap();
regulator.await.unwrap();
broadcaster.await.unwrap();
}
#[tokio::test]
async fn retry_on_not_found() {
telemetry_subscribers::init_for_testing();
let server = MockServer::start().await;
let times: Mutex<u64> = Mutex::new(0);
respond_with(&server, move |_: &Request| {
let mut times = times.lock().unwrap();
*times += 1;
match *times {
1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
4..6 => status(StatusCode::NOT_FOUND),
_ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
}
})
.await;
let cancel = CancellationToken::new();
let mut ingestion_service = test_ingestion(server.uri(), 1, 1, cancel.clone()).await;
let (rx, _) = ingestion_service.subscribe();
let subscriber = test_subscriber(5, rx, cancel.clone()).await;
let (regulator, broadcaster) = ingestion_service.run(0..).await.unwrap();
cancel.cancelled().await;
let seqs = subscriber.await.unwrap();
regulator.await.unwrap();
broadcaster.await.unwrap();
assert_eq!(seqs, vec![1, 2, 3, 6, 7]);
}
#[tokio::test]
async fn retry_on_transient_error() {
telemetry_subscribers::init_for_testing();
let server = MockServer::start().await;
let times: Mutex<u64> = Mutex::new(0);
respond_with(&server, move |_: &Request| {
let mut times = times.lock().unwrap();
*times += 1;
match *times {
1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
4..6 => status(StatusCode::REQUEST_TIMEOUT),
_ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
}
})
.await;
let cancel = CancellationToken::new();
let mut ingestion_service = test_ingestion(server.uri(), 1, 1, cancel.clone()).await;
let (rx, _) = ingestion_service.subscribe();
let subscriber = test_subscriber(5, rx, cancel.clone()).await;
let (regulator, broadcaster) = ingestion_service.run(0..).await.unwrap();
cancel.cancelled().await;
let seqs = subscriber.await.unwrap();
regulator.await.unwrap();
broadcaster.await.unwrap();
assert_eq!(seqs, vec![1, 2, 3, 6, 7]);
}
#[tokio::test]
async fn back_pressure_and_buffering() {
telemetry_subscribers::init_for_testing();
let server = MockServer::start().await;
let times: Mutex<u64> = Mutex::new(0);
respond_with(&server, move |_: &Request| {
let mut times = times.lock().unwrap();
*times += 1;
status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times))
})
.await;
let cancel = CancellationToken::new();
let mut ingestion_service =
test_ingestion(server.uri(), 3, 1, cancel.clone()).await;
let (mut laggard, _) = ingestion_service.subscribe();
async fn unblock(laggard: &mut mpsc::Receiver<Arc<CheckpointData>>) -> u64 {
let checkpoint = laggard.recv().await.unwrap();
checkpoint.checkpoint_summary.sequence_number
}
let (rx, _) = ingestion_service.subscribe();
let subscriber = test_subscriber(5, rx, cancel.clone()).await;
let (regulator, broadcaster) = ingestion_service.run(0..).await.unwrap();
assert_eq!(unblock(&mut laggard).await, 1);
assert_eq!(unblock(&mut laggard).await, 2);
cancel.cancelled().await;
let seqs = subscriber.await.unwrap();
regulator.await.unwrap();
broadcaster.await.unwrap();
assert_eq!(seqs, vec![1, 2, 3, 4, 5]);
}
}