sui_indexer_alt_framework/ingestion/
mod.rs1#![allow(clippy::disallowed_methods)]
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use prometheus::Registry;
13use serde::Deserialize;
14use serde::Serialize;
15use sui_futures::service::Service;
16use tokio::sync::mpsc;
17
18pub use crate::config::ConcurrencyConfig as IngestConcurrencyConfig;
19use crate::ingestion::broadcaster::broadcaster;
20use crate::ingestion::error::Error;
21use crate::ingestion::error::Result;
22use crate::ingestion::ingestion_client::CheckpointEnvelope;
23use crate::ingestion::ingestion_client::IngestionClient;
24use crate::ingestion::ingestion_client::IngestionClientArgs;
25use crate::ingestion::streaming_client::GrpcStreamingClient;
26use crate::ingestion::streaming_client::StreamingClientArgs;
27use crate::metrics::IngestionMetrics;
28
29mod broadcaster;
30mod byte_count;
31pub(crate) mod decode;
32pub mod error;
33pub mod ingestion_client;
34mod rpc_client;
35pub mod store_client;
36pub mod streaming_client;
37#[cfg(test)]
38mod test_utils;
39
40pub(crate) const MAX_GRPC_MESSAGE_SIZE_BYTES: usize = 128 * 1024 * 1024;
41
42#[derive(clap::Args, Clone, Debug, Default)]
45pub struct ClientArgs {
46 #[clap(flatten)]
47 pub ingestion: IngestionClientArgs,
48
49 #[clap(flatten)]
50 pub streaming: StreamingClientArgs,
51}
52
53#[derive(Serialize, Deserialize, Debug, Clone)]
54pub struct IngestionConfig {
55 pub checkpoint_buffer_size: usize,
57
58 pub ingest_concurrency: IngestConcurrencyConfig,
62
63 pub retry_interval_ms: u64,
65
66 pub streaming_backoff_initial_batch_size: usize,
68
69 pub streaming_backoff_max_batch_size: usize,
71
72 pub streaming_connection_timeout_ms: u64,
74
75 pub streaming_statement_timeout_ms: u64,
77}
78
79pub struct IngestionService {
80 config: IngestionConfig,
81 ingestion_client: IngestionClient,
82 streaming_client: Option<GrpcStreamingClient>,
83 commit_hi_tx: mpsc::UnboundedSender<(&'static str, u64)>,
84 commit_hi_rx: mpsc::UnboundedReceiver<(&'static str, u64)>,
85 subscribers: Vec<mpsc::Sender<Arc<CheckpointEnvelope>>>,
86 metrics: Arc<IngestionMetrics>,
87}
88
89impl IngestionConfig {
90 pub fn retry_interval(&self) -> Duration {
91 Duration::from_millis(self.retry_interval_ms)
92 }
93
94 pub fn streaming_connection_timeout(&self) -> Duration {
95 Duration::from_millis(self.streaming_connection_timeout_ms)
96 }
97
98 pub fn streaming_statement_timeout(&self) -> Duration {
99 Duration::from_millis(self.streaming_statement_timeout_ms)
100 }
101}
102
103impl IngestionService {
104 pub fn new(
114 args: ClientArgs,
115 config: IngestionConfig,
116 metrics_prefix: Option<&str>,
117 registry: &Registry,
118 ) -> Result<Self> {
119 let metrics = IngestionMetrics::new(metrics_prefix, registry);
120 let ingestion_client = IngestionClient::new(args.ingestion, metrics.clone())?;
121 let streaming_client = args.streaming.streaming_url.map(|uri| {
122 GrpcStreamingClient::new(
123 uri,
124 config.streaming_connection_timeout(),
125 config.streaming_statement_timeout(),
126 )
127 });
128
129 let subscribers = Vec::new();
130 let (commit_hi_tx, commit_hi_rx) = mpsc::unbounded_channel();
131 Ok(Self {
132 config,
133 ingestion_client,
134 streaming_client,
135 commit_hi_tx,
136 commit_hi_rx,
137 subscribers,
138 metrics,
139 })
140 }
141
142 pub(crate) fn ingestion_client(&self) -> &IngestionClient {
144 &self.ingestion_client
145 }
146
147 pub(crate) fn metrics(&self) -> &Arc<IngestionMetrics> {
149 &self.metrics
150 }
151
152 pub fn subscribe(
162 &mut self,
163 ) -> (
164 mpsc::Receiver<Arc<CheckpointEnvelope>>,
165 mpsc::UnboundedSender<(&'static str, u64)>,
166 ) {
167 let (sender, receiver) = mpsc::channel(self.config.checkpoint_buffer_size);
168 self.subscribers.push(sender);
169 (receiver, self.commit_hi_tx.clone())
170 }
171
172 pub async fn run<R>(
192 self,
193 checkpoints: R,
194 next_sequential_checkpoint: Option<u64>,
195 ) -> Result<Service>
196 where
197 R: std::ops::RangeBounds<u64> + Send + 'static,
198 {
199 let IngestionService {
200 config,
201 ingestion_client,
202 streaming_client,
203 commit_hi_tx: _,
204 commit_hi_rx,
205 subscribers,
206 metrics,
207 } = self;
208
209 if subscribers.is_empty() {
210 return Err(Error::NoSubscribers);
211 }
212
213 Ok(broadcaster(
214 checkpoints,
215 next_sequential_checkpoint,
216 streaming_client,
217 config,
218 ingestion_client,
219 commit_hi_rx,
220 subscribers,
221 metrics,
222 ))
223 }
224}
225
226impl Default for IngestionConfig {
227 fn default() -> Self {
228 Self {
229 checkpoint_buffer_size: 50,
230 ingest_concurrency: IngestConcurrencyConfig::Adaptive {
231 initial: 1,
232 min: 1,
233 max: 500,
234 dead_band: None,
235 },
236 retry_interval_ms: 200,
237 streaming_backoff_initial_batch_size: 10, streaming_backoff_max_batch_size: 10000, streaming_connection_timeout_ms: 5000, streaming_statement_timeout_ms: 5000, }
242 }
243}
244
245#[cfg(test)]
246mod tests {
247 use std::sync::Mutex;
248
249 use axum::http::StatusCode;
250 use sui_futures::task::TaskGuard;
251 use url::Url;
252 use wiremock::MockServer;
253 use wiremock::Request;
254
255 use crate::ingestion::store_client::tests::respond_with;
256 use crate::ingestion::store_client::tests::respond_with_chain_id;
257 use crate::ingestion::store_client::tests::status;
258 use crate::ingestion::test_utils::test_checkpoint_data;
259
260 use super::*;
261
262 async fn test_ingestion(
263 uri: String,
264 checkpoint_buffer_size: usize,
265 ingest_concurrency: usize,
266 ) -> IngestionService {
267 let registry = Registry::new();
268 IngestionService::new(
269 ClientArgs {
270 ingestion: IngestionClientArgs {
271 remote_store_url: Some(Url::parse(&uri).unwrap()),
272 ..Default::default()
273 },
274 ..Default::default()
275 },
276 IngestionConfig {
277 checkpoint_buffer_size,
278 ingest_concurrency: IngestConcurrencyConfig::Fixed {
279 value: ingest_concurrency,
280 },
281 ..Default::default()
282 },
283 None,
284 ®istry,
285 )
286 .unwrap()
287 }
288
289 async fn test_subscriber(
290 stop_after: usize,
291 mut rx: mpsc::Receiver<Arc<CheckpointEnvelope>>,
292 ) -> TaskGuard<Vec<u64>> {
293 TaskGuard::new(tokio::spawn(async move {
294 let mut seqs = vec![];
295 for _ in 0..stop_after {
296 let Some(checkpoint_envelope) = rx.recv().await else {
297 break;
298 };
299
300 seqs.push(checkpoint_envelope.checkpoint.summary.sequence_number);
301 }
302
303 seqs
304 }))
305 }
306
307 #[tokio::test]
310 async fn fail_on_no_subscribers() {
311 let server = MockServer::start().await;
314 respond_with(&server, status(StatusCode::NOT_FOUND)).await;
315
316 let ingestion_service = test_ingestion(server.uri(), 1, 1).await;
317
318 let res = ingestion_service.run(0.., None).await;
319 assert!(matches!(res, Err(Error::NoSubscribers)));
320 }
321
322 #[tokio::test]
325 async fn shutdown() {
326 let server = MockServer::start().await;
327 respond_with(
328 &server,
329 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
330 )
331 .await;
332 respond_with_chain_id(&server).await;
333
334 let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
335
336 let (rx, _) = ingestion_service.subscribe();
337 let subscriber = test_subscriber(usize::MAX, rx).await;
338 let svc = ingestion_service.run(0.., None).await.unwrap();
339
340 svc.shutdown().await.unwrap();
341 subscriber.await.unwrap();
342 }
343
344 #[tokio::test]
347 async fn shutdown_on_subscriber_drop() {
348 let server = MockServer::start().await;
349 respond_with(
350 &server,
351 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
352 )
353 .await;
354 respond_with_chain_id(&server).await;
355
356 let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
357
358 let (rx, _) = ingestion_service.subscribe();
359 let subscriber = test_subscriber(1, rx).await;
360 let mut svc = ingestion_service.run(0.., None).await.unwrap();
361
362 drop(subscriber);
363 svc.join().await.unwrap();
364 }
365
366 #[tokio::test]
369 async fn retry_on_not_found() {
370 let server = MockServer::start().await;
371 let times: Mutex<u64> = Mutex::new(0);
372 respond_with(&server, move |_: &Request| {
373 let mut times = times.lock().unwrap();
374 *times += 1;
375 match *times {
376 1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
377 4..6 => status(StatusCode::NOT_FOUND),
378 _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
379 }
380 })
381 .await;
382 respond_with_chain_id(&server).await;
383
384 let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
385
386 let (rx, _) = ingestion_service.subscribe();
387 let subscriber = test_subscriber(6, rx).await;
388 let _svc = ingestion_service.run(0.., None).await.unwrap();
389
390 let seqs = subscriber.await.unwrap();
391 assert_eq!(seqs, vec![0, 1, 2, 3, 6, 7]);
392 }
393
394 #[tokio::test]
396 async fn retry_on_transient_error() {
397 let server = MockServer::start().await;
398 let times: Mutex<u64> = Mutex::new(0);
399 respond_with(&server, move |_: &Request| {
400 let mut times = times.lock().unwrap();
401 *times += 1;
402 match *times {
403 1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
404 4..6 => status(StatusCode::REQUEST_TIMEOUT),
405 _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
406 }
407 })
408 .await;
409 respond_with_chain_id(&server).await;
410
411 let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
412
413 let (rx, _) = ingestion_service.subscribe();
414 let subscriber = test_subscriber(6, rx).await;
415 let _svc = ingestion_service.run(0.., None).await.unwrap();
416
417 let seqs = subscriber.await.unwrap();
418 assert_eq!(seqs, vec![0, 1, 2, 3, 6, 7]);
419 }
420
421 #[tokio::test]
425 async fn back_pressure_and_buffering() {
426 let server = MockServer::start().await;
427 let times: Mutex<u64> = Mutex::new(0);
428 respond_with(&server, move |_: &Request| {
429 let mut times = times.lock().unwrap();
430 *times += 1;
431 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times))
432 })
433 .await;
434 respond_with_chain_id(&server).await;
435
436 let mut ingestion_service = test_ingestion(server.uri(), 3, 1).await;
437
438 let (mut laggard, _) = ingestion_service.subscribe();
440 async fn unblock(laggard: &mut mpsc::Receiver<Arc<CheckpointEnvelope>>) -> u64 {
441 let checkpoint_envelope = laggard.recv().await.unwrap();
442 checkpoint_envelope.checkpoint.summary.sequence_number
443 }
444
445 let (rx, _) = ingestion_service.subscribe();
446 let subscriber = test_subscriber(6, rx).await;
447 let _svc = ingestion_service.run(0.., None).await.unwrap();
448
449 assert_eq!(unblock(&mut laggard).await, 0);
454 assert_eq!(unblock(&mut laggard).await, 1);
455 assert_eq!(unblock(&mut laggard).await, 2);
456
457 let seqs = subscriber.await.unwrap();
458 assert_eq!(seqs, vec![0, 1, 2, 3, 4, 5]);
459 }
460}