1use std::cmp::Ordering;
5use std::collections::BTreeMap;
6use std::sync::Arc;
7
8use scoped_futures::ScopedFutureExt;
9use sui_futures::service::Service;
10use tokio::sync::mpsc;
11use tokio::time::MissedTickBehavior;
12use tokio::time::interval;
13use tracing::debug;
14use tracing::info;
15use tracing::warn;
16
17use crate::metrics::CheckpointLagMetricReporter;
18use crate::metrics::IndexerMetrics;
19use crate::pipeline::IndexedCheckpoint;
20use crate::pipeline::WARN_PENDING_WATERMARKS;
21use crate::pipeline::logging::WatermarkLogger;
22use crate::pipeline::sequential::Handler;
23use crate::pipeline::sequential::SequentialConfig;
24use crate::store::Connection;
25use crate::store::TransactionalStore;
26
27pub(super) fn committer<H>(
44 handler: Arc<H>,
45 config: SequentialConfig,
46 mut next_checkpoint: u64,
47 mut rx: mpsc::Receiver<IndexedCheckpoint<H>>,
48 tx: mpsc::UnboundedSender<(&'static str, u64)>,
49 store: H::Store,
50 metrics: Arc<IndexerMetrics>,
51) -> Service
52where
53 H: Handler + Send + Sync + 'static,
54 H::Store: TransactionalStore + 'static,
55{
56 Service::new().spawn_aborting(async move {
57 let mut poll = interval(config.committer.collect_interval());
60 poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
61
62 let checkpoint_lag = config.checkpoint_lag;
63
64 let mut attempt = 0;
72 let mut batch = H::Batch::default();
73 let mut batch_rows = 0;
74 let mut batch_checkpoints = 0;
75
76 let mut watermark = None;
82
83 let mut logger = WatermarkLogger::new("sequential_committer");
86
87 let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
88 &metrics.watermarked_checkpoint_timestamp_lag,
89 &metrics.latest_watermarked_checkpoint_timestamp_lag_ms,
90 &metrics.watermark_checkpoint_in_db,
91 );
92
93 let mut pending: BTreeMap<u64, IndexedCheckpoint<H>> = BTreeMap::new();
96 let mut pending_rows = 0;
97
98 info!(pipeline = H::NAME, "Starting committer");
99
100 loop {
101 tokio::select! {
102 _ = poll.tick() => {
103 if batch_checkpoints == 0
104 && rx.is_closed()
105 && rx.is_empty()
106 && !can_process_pending(next_checkpoint, checkpoint_lag, &pending)
107 {
108 info!(pipeline = H::NAME, "Process closed channel and no more data to commit");
109 break;
110 }
111
112 if pending.len() > WARN_PENDING_WATERMARKS {
113 warn!(
114 pipeline = H::NAME,
115 pending = pending.len(),
116 "Pipeline has a large number of pending watermarks",
117 );
118 }
119
120 let guard = metrics
121 .collector_gather_latency
122 .with_label_values(&[H::NAME])
123 .start_timer();
124
125 while batch_checkpoints < H::MAX_BATCH_CHECKPOINTS {
135 if !can_process_pending(next_checkpoint, checkpoint_lag, &pending) {
136 break;
137 }
138
139 let Some(entry) = pending.first_entry() else {
140 break;
141 };
142
143 match next_checkpoint.cmp(entry.key()) {
144 Ordering::Less => break,
146
147 Ordering::Equal => {
149 let indexed = entry.remove();
150 batch_rows += indexed.len();
151 batch_checkpoints += 1;
152 handler.batch(&mut batch, indexed.values.into_iter());
153 watermark = Some(indexed.watermark);
154 next_checkpoint += 1;
155 }
156
157 Ordering::Greater => {
160 metrics
161 .total_watermarks_out_of_order
162 .with_label_values(&[H::NAME])
163 .inc();
164
165 let indexed = entry.remove();
166 pending_rows -= indexed.len();
167 }
168 }
169 }
170 let elapsed = guard.stop_and_record();
173 debug!(
174 pipeline = H::NAME,
175 elapsed_ms = elapsed * 1000.0,
176 rows = batch_rows,
177 pending = pending_rows,
178 "Gathered batch",
179 );
180
181 if batch_checkpoints == 0 {
187 assert_eq!(batch_rows, 0);
188 continue;
189 }
190
191 let Some(watermark) = watermark else {
192 continue;
193 };
194
195 metrics
196 .collector_batch_size
197 .with_label_values(&[H::NAME])
198 .observe(batch_rows as f64);
199
200 metrics
201 .total_committer_batches_attempted
202 .with_label_values(&[H::NAME])
203 .inc();
204
205 metrics
206 .watermark_epoch
207 .with_label_values(&[H::NAME])
208 .set(watermark.epoch_hi_inclusive as i64);
209
210 metrics
211 .watermark_checkpoint
212 .with_label_values(&[H::NAME])
213 .set(watermark.checkpoint_hi_inclusive as i64);
214
215 metrics
216 .watermark_transaction
217 .with_label_values(&[H::NAME])
218 .set(watermark.tx_hi as i64);
219
220 metrics
221 .watermark_timestamp_ms
222 .with_label_values(&[H::NAME])
223 .set(watermark.timestamp_ms_hi_inclusive as i64);
224
225 let guard = metrics
226 .committer_commit_latency
227 .with_label_values(&[H::NAME])
228 .start_timer();
229
230 let affected = store.transaction(|conn| {
231 async {
232 conn.set_committer_watermark(H::NAME, watermark).await?;
233 handler.commit(&batch, conn).await
234 }.scope_boxed()
235 }).await;
236
237
238 let elapsed = guard.stop_and_record();
239
240 let affected = match affected {
241 Ok(affected) => affected,
242
243 Err(e) => {
244 warn!(
245 pipeline = H::NAME,
246 elapsed_ms = elapsed * 1000.0,
247 attempt,
248 committed = batch_rows,
249 pending = pending_rows,
250 "Error writing batch: {e}",
251 );
252
253 metrics
254 .total_committer_batches_failed
255 .with_label_values(&[H::NAME])
256 .inc();
257
258 attempt += 1;
259 continue;
260 }
261 };
262
263 debug!(
264 pipeline = H::NAME,
265 attempt,
266 affected,
267 committed = batch_rows,
268 pending = pending_rows,
269 "Wrote batch",
270 );
271
272 logger.log::<H>(&watermark, elapsed);
273
274 checkpoint_lag_reporter.report_lag(
275 watermark.checkpoint_hi_inclusive,
276 watermark.timestamp_ms_hi_inclusive
277 );
278
279 metrics
280 .total_committer_batches_succeeded
281 .with_label_values(&[H::NAME])
282 .inc();
283
284 metrics
285 .total_committer_rows_committed
286 .with_label_values(&[H::NAME])
287 .inc_by(batch_rows as u64);
288
289 metrics
290 .total_committer_rows_affected
291 .with_label_values(&[H::NAME])
292 .inc_by(affected as u64);
293
294 metrics
295 .committer_tx_rows
296 .with_label_values(&[H::NAME])
297 .observe(affected as f64);
298
299 metrics
300 .watermark_epoch_in_db
301 .with_label_values(&[H::NAME])
302 .set(watermark.epoch_hi_inclusive as i64);
303
304 metrics
305 .watermark_checkpoint_in_db
306 .with_label_values(&[H::NAME])
307 .set(watermark.checkpoint_hi_inclusive as i64);
308
309 metrics
310 .watermark_transaction_in_db
311 .with_label_values(&[H::NAME])
312 .set(watermark.tx_hi as i64);
313
314 metrics
315 .watermark_timestamp_in_db_ms
316 .with_label_values(&[H::NAME])
317 .set(watermark.timestamp_ms_hi_inclusive as i64);
318
319 let _ = tx.send((H::NAME, watermark.checkpoint_hi_inclusive + 1));
324 let _ = std::mem::take(&mut batch);
327 pending_rows -= batch_rows;
328 batch_checkpoints = 0;
329 batch_rows = 0;
330 attempt = 0;
331
332 if can_process_pending(next_checkpoint, checkpoint_lag, &pending) {
335 poll.reset_immediately();
336 }
337 }
338
339 Some(indexed) = rx.recv() => {
340 metrics
344 .total_collector_rows_received
345 .with_label_values(&[H::NAME])
346 .inc_by(indexed.len() as u64);
347
348 pending_rows += indexed.len();
349 pending.insert(indexed.checkpoint(), indexed);
350
351 if pending_rows < H::MIN_EAGER_ROWS {
356 continue;
357 }
358
359 if batch_checkpoints > 0
360 || can_process_pending(next_checkpoint, checkpoint_lag, &pending)
361 {
362 poll.reset_immediately();
363 }
364 }
365 }
366 }
367
368 info!(pipeline = H::NAME, "Stopping committer");
369 Ok(())
370 })
371}
372
373fn can_process_pending<T>(
379 next_checkpoint: u64,
380 checkpoint_lag: u64,
381 pending: &BTreeMap<u64, T>,
382) -> bool {
383 let Some((&first, _)) = pending.first_key_value() else {
384 return false;
385 };
386
387 let Some((&last, _)) = pending.last_key_value() else {
388 return false;
389 };
390
391 first <= next_checkpoint && first + checkpoint_lag <= last
392}
393
394#[cfg(test)]
395mod tests {
396 use std::sync::Arc;
397 use std::time::Duration;
398
399 use async_trait::async_trait;
400 use prometheus::Registry;
401 use sui_types::full_checkpoint_content::Checkpoint;
402 use tokio::sync::mpsc;
403
404 use crate::mocks::store::MockConnection;
405 use crate::mocks::store::MockStore;
406 use crate::pipeline::CommitterConfig;
407 use crate::pipeline::Processor;
408
409 use super::*;
410
411 #[derive(Default)]
413 struct TestHandler;
414
415 #[async_trait]
416 impl Processor for TestHandler {
417 const NAME: &'static str = "test";
418 type Value = u64;
419
420 async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
421 Ok(vec![])
422 }
423 }
424
425 #[async_trait]
426 impl super::Handler for TestHandler {
427 type Store = MockStore;
428 type Batch = Vec<u64>;
429 const MAX_BATCH_CHECKPOINTS: usize = 3; const MIN_EAGER_ROWS: usize = 4; fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
433 batch.extend(values);
434 }
435
436 async fn commit<'a>(
437 &self,
438 batch: &Self::Batch,
439 conn: &mut MockConnection<'a>,
440 ) -> anyhow::Result<usize> {
441 if !batch.is_empty() {
442 let mut sequential_data = conn.0.sequential_checkpoint_data.lock().unwrap();
443 sequential_data.extend(batch.iter().cloned());
444 }
445 Ok(batch.len())
446 }
447 }
448
449 struct TestSetup {
450 store: MockStore,
451 checkpoint_tx: mpsc::Sender<IndexedCheckpoint<TestHandler>>,
452 commit_hi_rx: mpsc::UnboundedReceiver<(&'static str, u64)>,
453 #[allow(unused)]
454 committer: Service,
455 }
456
457 fn setup_test(next_checkpoint: u64, config: SequentialConfig, store: MockStore) -> TestSetup {
460 let metrics = IndexerMetrics::new(None, &Registry::default());
461
462 let (checkpoint_tx, checkpoint_rx) = mpsc::channel(10);
463 #[allow(clippy::disallowed_methods)]
464 let (commit_hi_tx, commit_hi_rx) = mpsc::unbounded_channel();
465
466 let store_clone = store.clone();
467 let handler = Arc::new(TestHandler);
468 let committer = committer(
469 handler,
470 config,
471 next_checkpoint,
472 checkpoint_rx,
473 commit_hi_tx,
474 store_clone,
475 metrics,
476 );
477
478 TestSetup {
479 store,
480 checkpoint_tx,
481 commit_hi_rx,
482 committer,
483 }
484 }
485
486 async fn send_checkpoint(setup: &mut TestSetup, checkpoint: u64) {
487 setup
488 .checkpoint_tx
489 .send(create_checkpoint(checkpoint))
490 .await
491 .unwrap();
492 }
493
494 fn create_checkpoint(checkpoint: u64) -> IndexedCheckpoint<TestHandler> {
495 IndexedCheckpoint::new(
496 checkpoint, checkpoint, checkpoint, checkpoint * 1000, vec![checkpoint], )
502 }
503
504 #[tokio::test]
505 async fn test_committer_processes_sequential_checkpoints() {
506 let config = SequentialConfig {
507 committer: CommitterConfig::default(),
508 checkpoint_lag: 0, };
510 let mut setup = setup_test(0, config, MockStore::default());
511
512 for i in 0..3 {
514 send_checkpoint(&mut setup, i).await;
515 }
516
517 tokio::time::sleep(Duration::from_millis(200)).await;
519
520 assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2]);
522
523 {
525 let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
526 assert_eq!(watermark.checkpoint_hi_inclusive, 2);
527 assert_eq!(watermark.tx_hi, 2);
528 }
529
530 let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
532 assert_eq!(commit_hi.0, "test", "Pipeline name should be 'test'");
533 assert_eq!(commit_hi.1, 3, "commit_hi should be 3 (checkpoint 2 + 1)");
534 }
535
536 #[tokio::test]
539 async fn test_committer_processes_sequential_checkpoints_with_initial_watermark() {
540 let config = SequentialConfig::default();
541 let mut setup = setup_test(5, config, MockStore::default());
542
543 let watermark = setup.store.watermark(TestHandler::NAME);
545 assert!(watermark.is_none());
546
547 for i in 0..5 {
549 send_checkpoint(&mut setup, i).await;
550 }
551
552 tokio::time::sleep(Duration::from_millis(1000)).await;
554
555 let watermark = setup.store.watermark(TestHandler::NAME);
557 assert!(watermark.is_none());
558
559 for i in 5..8 {
560 send_checkpoint(&mut setup, i).await;
561 }
562
563 tokio::time::sleep(Duration::from_millis(1000)).await;
565
566 assert_eq!(setup.store.get_sequential_data(), vec![5, 6, 7]);
568
569 {
571 let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
572 assert_eq!(watermark.checkpoint_hi_inclusive, 7);
573 assert_eq!(watermark.tx_hi, 7);
574 }
575 }
576
577 #[tokio::test]
578 async fn test_committer_processes_out_of_order_checkpoints() {
579 let config = SequentialConfig {
580 committer: CommitterConfig::default(),
581 checkpoint_lag: 0, };
583 let mut setup = setup_test(0, config, MockStore::default());
584
585 for i in [1, 0, 2] {
587 send_checkpoint(&mut setup, i).await;
588 }
589
590 tokio::time::sleep(Duration::from_millis(200)).await;
592
593 assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2]);
595
596 {
598 let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
599 assert_eq!(watermark.checkpoint_hi_inclusive, 2);
600 assert_eq!(watermark.tx_hi, 2);
601 }
602
603 let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
605 assert_eq!(commit_hi.0, "test", "Pipeline name should be 'test'");
606 assert_eq!(commit_hi.1, 3, "commit_hi should be 3 (checkpoint 2 + 1)");
607 }
608
609 #[tokio::test]
610 async fn test_committer_commit_up_to_max_batch_checkpoints() {
611 let config = SequentialConfig {
612 committer: CommitterConfig::default(),
613 checkpoint_lag: 0, };
615 let mut setup = setup_test(0, config, MockStore::default());
616
617 for i in 0..4 {
619 send_checkpoint(&mut setup, i).await;
620 }
621
622 tokio::time::sleep(Duration::from_millis(200)).await;
624
625 let commit_hi1 = setup.commit_hi_rx.recv().await.unwrap();
627 assert_eq!(
628 commit_hi1.1, 3,
629 "First commit_hi should be 3 (checkpoint 2 + 1, highest processed of first batch)"
630 );
631
632 let commit_hi2 = setup.commit_hi_rx.recv().await.unwrap();
633 assert_eq!(
634 commit_hi2.1, 4,
635 "Second commit_hi should be 4 (checkpoint 3 + 1, highest processed of second batch)"
636 );
637
638 assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2, 3]);
640 }
641
642 #[tokio::test]
643 async fn test_committer_does_not_commit_until_checkpoint_lag() {
644 let config = SequentialConfig {
645 committer: CommitterConfig::default(),
646 checkpoint_lag: 1, };
648 let mut setup = setup_test(0, config, MockStore::default());
649
650 for i in 0..3 {
652 send_checkpoint(&mut setup, i).await;
653 }
654
655 tokio::time::sleep(Duration::from_millis(200)).await;
657
658 assert_eq!(setup.store.get_sequential_data(), vec![0, 1]);
660 let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
661 assert_eq!(commit_hi.1, 2, "commit_hi should be 2 (checkpoint 1 + 1)");
662
663 send_checkpoint(&mut setup, 3).await;
665
666 tokio::time::sleep(Duration::from_millis(1000)).await;
668
669 assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2]);
671 let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
672 assert_eq!(commit_hi.1, 3, "commit_hi should be 3 (checkpoint 2 + 1)");
673 }
674
675 #[tokio::test]
676 async fn test_committer_commits_eagerly() {
677 let config = SequentialConfig {
678 committer: CommitterConfig {
679 collect_interval_ms: 4_000, ..Default::default()
681 },
682 checkpoint_lag: 0, };
684 let mut setup = setup_test(0, config, MockStore::default());
685
686 tokio::time::sleep(Duration::from_millis(200)).await;
688
689 for i in 0..3 {
691 send_checkpoint(&mut setup, i).await;
692 }
693
694 assert_eq!(setup.store.get_sequential_data(), Vec::<u64>::new());
696
697 send_checkpoint(&mut setup, 3).await;
699
700 tokio::time::sleep(Duration::from_millis(200)).await;
702
703 assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2, 3]);
705 }
706
707 #[tokio::test]
708 async fn test_committer_cannot_commit_eagerly_due_to_checkpoint_lag() {
709 let config = SequentialConfig {
710 committer: CommitterConfig {
711 collect_interval_ms: 4_000, ..Default::default()
713 },
714 checkpoint_lag: 4, };
716 let mut setup = setup_test(0, config, MockStore::default());
717
718 tokio::time::sleep(Duration::from_millis(200)).await;
720
721 for i in 0..4 {
723 send_checkpoint(&mut setup, i).await;
724 }
725
726 tokio::time::sleep(Duration::from_millis(200)).await;
728
729 assert_eq!(setup.store.get_sequential_data(), Vec::<u64>::new());
731
732 send_checkpoint(&mut setup, 4).await;
734
735 tokio::time::sleep(Duration::from_millis(200)).await;
737
738 assert_eq!(setup.store.get_sequential_data(), vec![0]);
740 }
741
742 #[tokio::test]
743 async fn test_committer_retries_on_transaction_failure() {
744 let config = SequentialConfig {
745 committer: CommitterConfig {
746 collect_interval_ms: 1_000, ..Default::default()
748 },
749 checkpoint_lag: 0,
750 };
751
752 let store = MockStore::default().with_transaction_failures(1); let mut setup = setup_test(10, config, store);
756
757 send_checkpoint(&mut setup, 10).await;
759
760 tokio::time::sleep(Duration::from_millis(200)).await;
762
763 assert_eq!(setup.store.get_sequential_data(), Vec::<u64>::new());
765
766 tokio::time::sleep(Duration::from_millis(1_200)).await;
768
769 assert_eq!(setup.store.get_sequential_data(), vec![10]);
771
772 let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
774 assert_eq!(commit_hi.0, "test", "Pipeline name should be 'test'");
775 assert_eq!(
776 commit_hi.1, 11,
777 "commit_hi should be 11 (checkpoint 10 + 1)"
778 );
779 }
780}