1use std::sync::Arc;
5
6use async_trait::async_trait;
7use serde::Deserialize;
8use serde::Serialize;
9use sui_futures::service::Service;
10use tokio::sync::mpsc;
11use tracing::info;
12
13use crate::config::ConcurrencyConfig;
14use crate::ingestion::ingestion_client::CheckpointEnvelope;
15use crate::metrics::IndexerMetrics;
16use crate::pipeline::CommitterConfig;
17use crate::pipeline::IngestionConfig;
18use crate::pipeline::Processor;
19use crate::pipeline::processor::processor;
20use crate::pipeline::sequential::collector::BatchedRows;
21use crate::pipeline::sequential::collector::collector;
22use crate::pipeline::sequential::committer::committer;
23use crate::store::SequentialStore;
24use crate::store::Store;
25
26mod collector;
27mod committer;
28
29#[async_trait]
45pub trait Handler: Processor {
46 type Store: SequentialStore;
47
48 const MIN_EAGER_ROWS: usize = 50;
50
51 const MAX_PENDING_ROWS: usize = 5000;
57
58 const MAX_BATCH_CHECKPOINTS: usize = 5 * 60;
62
63 type Batch: Default + Send + Sync + 'static;
67
68 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>);
78
79 async fn commit<'a>(
82 &self,
83 batch: &Self::Batch,
84 conn: &mut <Self::Store as Store>::Connection<'a>,
85 ) -> anyhow::Result<usize>;
86}
87
88#[derive(Serialize, Deserialize, Debug, Clone, Default)]
90pub struct SequentialConfig {
91 pub committer: CommitterConfig,
93
94 pub ingestion: IngestionConfig,
96
97 pub fanout: Option<ConcurrencyConfig>,
99
100 pub min_eager_rows: Option<usize>,
102
103 pub max_pending_rows: Option<usize>,
105
106 pub max_batch_checkpoints: Option<usize>,
108
109 pub processor_channel_size: Option<usize>,
111
112 pub pipeline_depth: Option<usize>,
115}
116
117pub(crate) fn pipeline<H: Handler>(
135 handler: H,
136 next_checkpoint: u64,
137 config: SequentialConfig,
138 store: H::Store,
139 checkpoint_rx: mpsc::Receiver<Arc<CheckpointEnvelope>>,
140 metrics: Arc<IndexerMetrics>,
141) -> Service {
142 info!(
143 pipeline = H::NAME,
144 "Starting pipeline with config: {config:#?}",
145 );
146
147 let concurrency = config
148 .fanout
149 .clone()
150 .unwrap_or(ConcurrencyConfig::Adaptive {
151 initial: 1,
152 min: 1,
153 max: num_cpus::get(),
154 dead_band: None,
155 });
156 let min_eager_rows = config.min_eager_rows.unwrap_or(H::MIN_EAGER_ROWS);
157 let max_pending_rows = config.max_pending_rows.unwrap_or(H::MAX_PENDING_ROWS);
158 let max_batch_checkpoints = config
159 .max_batch_checkpoints
160 .unwrap_or(H::MAX_BATCH_CHECKPOINTS);
161
162 let processor_channel_size = config.processor_channel_size.unwrap_or(num_cpus::get() / 2);
163 let (processor_tx, collector_rx) = mpsc::channel(processor_channel_size);
164
165 let pipeline_depth = config
166 .pipeline_depth
167 .unwrap_or_else(|| (num_cpus::get() / 2).max(4));
168 let (collector_tx, committer_rx) = mpsc::channel::<BatchedRows<H>>(pipeline_depth);
169
170 let handler = Arc::new(handler);
171
172 let s_processor = processor(
173 handler.clone(),
174 checkpoint_rx,
175 processor_tx,
176 metrics.clone(),
177 concurrency,
178 store.clone(),
179 );
180
181 let s_collector = collector::<H>(
182 handler.clone(),
183 config,
184 next_checkpoint,
185 collector_rx,
186 metrics.clone(),
187 min_eager_rows,
188 max_pending_rows,
189 max_batch_checkpoints,
190 collector_tx,
191 );
192
193 let s_committer = committer::<H>(handler, store, metrics.clone(), committer_rx);
194
195 s_processor.merge(s_collector).merge(s_committer)
196}
197
198#[cfg(test)]
199mod tests {
200 use std::time::Duration;
201
202 use prometheus::Registry;
203 use sui_types::full_checkpoint_content::Checkpoint;
204
205 use crate::mocks::store::FallibleMockConnection;
206 use crate::mocks::store::FallibleMockStore;
207 use crate::pipeline::IndexedCheckpoint;
208
209 use super::*;
210
211 #[derive(Default)]
213 struct TestHandler;
214
215 #[async_trait]
216 impl Processor for TestHandler {
217 const NAME: &'static str = "test";
218 type Value = u64;
219
220 async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
221 Ok(vec![])
222 }
223 }
224
225 #[async_trait]
226 impl Handler for TestHandler {
227 type Store = FallibleMockStore;
228 type Batch = Vec<u64>;
229 const MAX_BATCH_CHECKPOINTS: usize = 3; const MIN_EAGER_ROWS: usize = 4; fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
233 batch.extend(values);
234 }
235
236 async fn commit<'a>(
237 &self,
238 batch: &Self::Batch,
239 conn: &mut FallibleMockConnection<'a>,
240 ) -> anyhow::Result<usize> {
241 if !batch.is_empty() {
242 let mut sequential_data = conn.0.sequential_checkpoint_data.lock().unwrap();
243 sequential_data.extend(batch.iter().cloned());
244 }
245 Ok(batch.len())
246 }
247 }
248
249 struct TestSetup {
250 store: FallibleMockStore,
251 checkpoint_tx: mpsc::Sender<IndexedCheckpoint<TestHandler>>,
252 #[allow(unused)]
253 service: Service,
254 }
255
256 fn setup_test(
260 next_checkpoint: u64,
261 config: SequentialConfig,
262 store: FallibleMockStore,
263 ) -> TestSetup {
264 let metrics = IndexerMetrics::new(None, &Registry::default());
265
266 let min_eager_rows = config.min_eager_rows.unwrap_or(TestHandler::MIN_EAGER_ROWS);
267 let max_pending_rows = config
268 .max_pending_rows
269 .unwrap_or(TestHandler::MAX_PENDING_ROWS);
270 let max_batch_checkpoints = config
271 .max_batch_checkpoints
272 .unwrap_or(TestHandler::MAX_BATCH_CHECKPOINTS);
273 let pipeline_depth = config
274 .pipeline_depth
275 .unwrap_or_else(|| (num_cpus::get() / 2).max(4));
276
277 let (checkpoint_tx, checkpoint_rx) = mpsc::channel(10);
278 let (collector_tx, committer_rx) =
279 mpsc::channel::<BatchedRows<TestHandler>>(pipeline_depth);
280
281 let store_clone = store.clone();
282 let handler = Arc::new(TestHandler);
283
284 let s_collector = collector(
285 handler.clone(),
286 config,
287 next_checkpoint,
288 checkpoint_rx,
289 metrics.clone(),
290 min_eager_rows,
291 max_pending_rows,
292 max_batch_checkpoints,
293 collector_tx,
294 );
295 let s_committer = committer(handler, store_clone, metrics, committer_rx);
296
297 TestSetup {
298 store,
299 checkpoint_tx,
300 service: s_collector.merge(s_committer),
301 }
302 }
303
304 async fn send_checkpoint(setup: &mut TestSetup, checkpoint: u64) {
305 setup
306 .checkpoint_tx
307 .send(create_checkpoint(checkpoint))
308 .await
309 .unwrap();
310 }
311
312 fn create_checkpoint(checkpoint: u64) -> IndexedCheckpoint<TestHandler> {
313 IndexedCheckpoint::new(
314 checkpoint, checkpoint, checkpoint, checkpoint * 1000, vec![checkpoint], )
320 }
321
322 #[tokio::test]
323 async fn test_committer_processes_sequential_checkpoints() {
324 let config = SequentialConfig::default();
325 let mut setup = setup_test(0, config, FallibleMockStore::default());
326
327 for i in 0..3 {
329 send_checkpoint(&mut setup, i).await;
330 }
331
332 tokio::time::sleep(Duration::from_millis(200)).await;
334
335 assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2]);
337
338 let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
340 assert_eq!(watermark.checkpoint_hi_inclusive, Some(2));
341 assert_eq!(watermark.tx_hi, 2);
342 }
343
344 #[tokio::test]
347 async fn test_committer_processes_sequential_checkpoints_with_initial_watermark() {
348 let config = SequentialConfig::default();
349 let mut setup = setup_test(5, config, FallibleMockStore::default());
350
351 let watermark = setup.store.watermark(TestHandler::NAME);
353 assert!(watermark.is_none());
354
355 for i in 0..5 {
357 send_checkpoint(&mut setup, i).await;
358 }
359
360 tokio::time::sleep(Duration::from_millis(1000)).await;
362
363 let watermark = setup.store.watermark(TestHandler::NAME);
365 assert!(watermark.is_none());
366
367 for i in 5..8 {
368 send_checkpoint(&mut setup, i).await;
369 }
370
371 tokio::time::sleep(Duration::from_millis(1000)).await;
373
374 assert_eq!(setup.store.get_sequential_data(), vec![5, 6, 7]);
376
377 let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
379 assert_eq!(watermark.checkpoint_hi_inclusive, Some(7));
380 assert_eq!(watermark.tx_hi, 7);
381 }
382
383 #[tokio::test]
384 async fn test_committer_processes_out_of_order_checkpoints() {
385 let config = SequentialConfig::default();
386 let mut setup = setup_test(0, config, FallibleMockStore::default());
387
388 for i in [1, 0, 2] {
390 send_checkpoint(&mut setup, i).await;
391 }
392
393 tokio::time::sleep(Duration::from_millis(200)).await;
395
396 assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2]);
398
399 let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
401 assert_eq!(watermark.checkpoint_hi_inclusive, Some(2));
402 assert_eq!(watermark.tx_hi, 2);
403 }
404
405 #[tokio::test]
406 async fn test_committer_commit_up_to_max_batch_checkpoints() {
407 let config = SequentialConfig::default();
408 let mut setup = setup_test(0, config, FallibleMockStore::default());
409
410 for i in 0..4 {
412 send_checkpoint(&mut setup, i).await;
413 }
414
415 tokio::time::sleep(Duration::from_millis(200)).await;
417
418 assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2, 3]);
420 }
421
422 #[tokio::test]
423 async fn test_committer_commits_eagerly() {
424 let config = SequentialConfig {
425 committer: CommitterConfig {
426 collect_interval_ms: 4_000, ..Default::default()
428 },
429 ..Default::default()
430 };
431 let mut setup = setup_test(0, config, FallibleMockStore::default());
432
433 tokio::time::sleep(Duration::from_millis(200)).await;
435
436 for i in 0..3 {
438 send_checkpoint(&mut setup, i).await;
439 }
440
441 assert_eq!(setup.store.get_sequential_data(), Vec::<u64>::new());
443
444 send_checkpoint(&mut setup, 3).await;
446
447 tokio::time::sleep(Duration::from_millis(200)).await;
449
450 assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2, 3]);
452 }
453
454 #[tokio::test]
455 async fn test_committer_retries_on_transaction_failure() {
456 let config = SequentialConfig {
457 committer: CommitterConfig {
458 collect_interval_ms: 1_000, ..Default::default()
460 },
461 ..Default::default()
462 };
463
464 let store = FallibleMockStore::default().with_transaction_failures(1); let mut setup = setup_test(10, config, store);
468
469 send_checkpoint(&mut setup, 10).await;
471
472 tokio::time::sleep(Duration::from_millis(1_500)).await;
476
477 assert_eq!(setup.store.get_sequential_data(), vec![10]);
478 }
479
480 #[tokio::test]
484 async fn pipelined_commit_runs_under_slow_commit() {
485 let config = SequentialConfig {
486 committer: CommitterConfig::default(),
487 max_batch_checkpoints: Some(3),
488 min_eager_rows: Some(1),
489 pipeline_depth: Some(1),
490 ..Default::default()
491 };
492
493 let store = FallibleMockStore::default().with_commit_delay(700);
494 let mut setup = setup_test(0, config, store);
495
496 for i in 0..6 {
497 send_checkpoint(&mut setup, i).await;
498 }
499
500 tokio::time::sleep(Duration::from_millis(2_000)).await;
502
503 assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2, 3, 4, 5]);
504 let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
505 assert_eq!(watermark.checkpoint_hi_inclusive, Some(5));
506 }
507
508 #[tokio::test]
510 async fn pipelined_commit_preserves_watermark_ordering() {
511 let config = SequentialConfig {
512 committer: CommitterConfig::default(),
513 max_batch_checkpoints: Some(2),
514 min_eager_rows: Some(1),
515 pipeline_depth: Some(2),
516 ..Default::default()
517 };
518
519 let store = FallibleMockStore::default().with_commit_delay(100);
520 let mut setup = setup_test(0, config, store);
521
522 for i in 0..6 {
523 send_checkpoint(&mut setup, i).await;
524 }
525
526 tokio::time::sleep(Duration::from_millis(1_500)).await;
527
528 assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2, 3, 4, 5]);
529 let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
530 assert_eq!(watermark.checkpoint_hi_inclusive, Some(5));
531 assert_eq!(watermark.tx_hi, 5);
532 }
533}