1use std::cmp::Ordering;
5use std::collections::BTreeMap;
6use std::sync::Arc;
7
8use scoped_futures::ScopedFutureExt;
9use sui_futures::service::Service;
10use tokio::sync::mpsc;
11use tokio::time::MissedTickBehavior;
12use tokio::time::interval;
13use tracing::debug;
14use tracing::info;
15use tracing::warn;
16
17use crate::metrics::CheckpointLagMetricReporter;
18use crate::metrics::IndexerMetrics;
19use crate::pipeline::IndexedCheckpoint;
20use crate::pipeline::WARN_PENDING_WATERMARKS;
21use crate::pipeline::logging::WatermarkLogger;
22use crate::pipeline::sequential::Handler;
23use crate::pipeline::sequential::SequentialConfig;
24use crate::store::Connection;
25use crate::store::SequentialStore;
26
27pub(super) fn committer<H: Handler>(
44 handler: Arc<H>,
45 config: SequentialConfig,
46 mut next_checkpoint: u64,
47 mut rx: mpsc::Receiver<IndexedCheckpoint<H>>,
48 tx: mpsc::UnboundedSender<(&'static str, u64)>,
49 store: H::Store,
50 metrics: Arc<IndexerMetrics>,
51 min_eager_rows: usize,
52 max_batch_checkpoints: usize,
53) -> Service {
54 Service::new().spawn_aborting(async move {
55 let mut poll = interval(config.committer.collect_interval());
58 poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
59
60 let checkpoint_lag = config.checkpoint_lag;
61
62 let mut attempt = 0;
70 let mut batch = H::Batch::default();
71 let mut batch_rows = 0;
72 let mut batch_checkpoints = 0;
73
74 let mut watermark = None;
80
81 let mut logger = WatermarkLogger::new("sequential_committer");
84
85 let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
86 &metrics.watermarked_checkpoint_timestamp_lag,
87 &metrics.latest_watermarked_checkpoint_timestamp_lag_ms,
88 &metrics.watermark_checkpoint_in_db,
89 );
90
91 let mut pending: BTreeMap<u64, IndexedCheckpoint<H>> = BTreeMap::new();
94 let mut pending_rows = 0;
95
96 info!(pipeline = H::NAME, "Starting committer");
97
98 loop {
99 tokio::select! {
100 _ = poll.tick() => {
101 if batch_checkpoints == 0
102 && rx.is_closed()
103 && rx.is_empty()
104 && !can_process_pending(next_checkpoint, checkpoint_lag, &pending)
105 {
106 info!(pipeline = H::NAME, "Process closed channel and no more data to commit");
107 break;
108 }
109
110 if pending.len() > WARN_PENDING_WATERMARKS {
111 warn!(
112 pipeline = H::NAME,
113 pending = pending.len(),
114 "Pipeline has a large number of pending watermarks",
115 );
116 }
117
118 let guard = metrics
119 .collector_gather_latency
120 .with_label_values(&[H::NAME])
121 .start_timer();
122
123 while batch_checkpoints < max_batch_checkpoints {
133 if !can_process_pending(next_checkpoint, checkpoint_lag, &pending) {
134 break;
135 }
136
137 let Some(entry) = pending.first_entry() else {
138 break;
139 };
140
141 match next_checkpoint.cmp(entry.key()) {
142 Ordering::Less => break,
144
145 Ordering::Equal => {
147 let indexed = entry.remove();
148 batch_rows += indexed.len();
149 batch_checkpoints += 1;
150 handler.batch(&mut batch, indexed.values.into_iter());
151 watermark = Some(indexed.watermark);
152 next_checkpoint += 1;
153 }
154
155 Ordering::Greater => {
158 metrics
159 .total_watermarks_out_of_order
160 .with_label_values(&[H::NAME])
161 .inc();
162
163 let indexed = entry.remove();
164 pending_rows -= indexed.len();
165 }
166 }
167 }
168 let elapsed = guard.stop_and_record();
171 debug!(
172 pipeline = H::NAME,
173 elapsed_ms = elapsed * 1000.0,
174 rows = batch_rows,
175 pending = pending_rows,
176 "Gathered batch",
177 );
178
179 if batch_checkpoints == 0 {
185 assert_eq!(batch_rows, 0);
186 continue;
187 }
188
189 let Some(watermark) = watermark else {
190 continue;
191 };
192
193 metrics
194 .collector_batch_size
195 .with_label_values(&[H::NAME])
196 .observe(batch_rows as f64);
197
198 metrics
199 .total_committer_batches_attempted
200 .with_label_values(&[H::NAME])
201 .inc();
202
203 metrics
204 .watermark_epoch
205 .with_label_values(&[H::NAME])
206 .set(watermark.epoch_hi_inclusive as i64);
207
208 metrics
209 .watermark_checkpoint
210 .with_label_values(&[H::NAME])
211 .set(watermark.checkpoint_hi_inclusive as i64);
212
213 metrics
214 .watermark_transaction
215 .with_label_values(&[H::NAME])
216 .set(watermark.tx_hi as i64);
217
218 metrics
219 .watermark_timestamp_ms
220 .with_label_values(&[H::NAME])
221 .set(watermark.timestamp_ms_hi_inclusive as i64);
222
223 let guard = metrics
224 .committer_commit_latency
225 .with_label_values(&[H::NAME])
226 .start_timer();
227
228 let affected = store.transaction(|conn| {
229 async {
230 conn.set_committer_watermark(H::NAME, watermark).await?;
231 handler.commit(&batch, conn).await
232 }.scope_boxed()
233 }).await;
234
235
236 let elapsed = guard.stop_and_record();
237
238 let affected = match affected {
239 Ok(affected) => affected,
240
241 Err(e) => {
242 warn!(
243 pipeline = H::NAME,
244 elapsed_ms = elapsed * 1000.0,
245 attempt,
246 committed = batch_rows,
247 pending = pending_rows,
248 "Error writing batch: {e}",
249 );
250
251 metrics
252 .total_committer_batches_failed
253 .with_label_values(&[H::NAME])
254 .inc();
255
256 attempt += 1;
257 continue;
258 }
259 };
260
261 debug!(
262 pipeline = H::NAME,
263 attempt,
264 affected,
265 committed = batch_rows,
266 pending = pending_rows,
267 "Wrote batch",
268 );
269
270 logger.log::<H>(&watermark, elapsed);
271
272 checkpoint_lag_reporter.report_lag(
273 watermark.checkpoint_hi_inclusive,
274 watermark.timestamp_ms_hi_inclusive
275 );
276
277 metrics
278 .total_committer_batches_succeeded
279 .with_label_values(&[H::NAME])
280 .inc();
281
282 metrics
283 .total_committer_rows_committed
284 .with_label_values(&[H::NAME])
285 .inc_by(batch_rows as u64);
286
287 metrics
288 .total_committer_rows_affected
289 .with_label_values(&[H::NAME])
290 .inc_by(affected as u64);
291
292 metrics
293 .committer_tx_rows
294 .with_label_values(&[H::NAME])
295 .observe(affected as f64);
296
297 metrics
298 .watermark_epoch_in_db
299 .with_label_values(&[H::NAME])
300 .set(watermark.epoch_hi_inclusive as i64);
301
302 metrics
303 .watermark_checkpoint_in_db
304 .with_label_values(&[H::NAME])
305 .set(watermark.checkpoint_hi_inclusive as i64);
306
307 metrics
308 .watermark_transaction_in_db
309 .with_label_values(&[H::NAME])
310 .set(watermark.tx_hi as i64);
311
312 metrics
313 .watermark_timestamp_in_db_ms
314 .with_label_values(&[H::NAME])
315 .set(watermark.timestamp_ms_hi_inclusive as i64);
316
317 let _ = tx.send((H::NAME, watermark.checkpoint_hi_inclusive + 1));
322 let _ = std::mem::take(&mut batch);
325 pending_rows -= batch_rows;
326 batch_checkpoints = 0;
327 batch_rows = 0;
328 attempt = 0;
329
330 if can_process_pending(next_checkpoint, checkpoint_lag, &pending) {
333 poll.reset_immediately();
334 }
335 }
336
337 Some(indexed) = rx.recv() => {
338 metrics
342 .total_collector_rows_received
343 .with_label_values(&[H::NAME])
344 .inc_by(indexed.len() as u64);
345
346 pending_rows += indexed.len();
347 pending.insert(indexed.checkpoint(), indexed);
348
349 if pending_rows < min_eager_rows {
354 continue;
355 }
356
357 if batch_checkpoints > 0
358 || can_process_pending(next_checkpoint, checkpoint_lag, &pending)
359 {
360 poll.reset_immediately();
361 }
362 }
363 }
364 }
365
366 info!(pipeline = H::NAME, "Stopping committer");
367 Ok(())
368 })
369}
370
371fn can_process_pending<T>(
377 next_checkpoint: u64,
378 checkpoint_lag: u64,
379 pending: &BTreeMap<u64, T>,
380) -> bool {
381 let Some((&first, _)) = pending.first_key_value() else {
382 return false;
383 };
384
385 let Some((&last, _)) = pending.last_key_value() else {
386 return false;
387 };
388
389 first <= next_checkpoint && first + checkpoint_lag <= last
390}
391
392#[cfg(test)]
393mod tests {
394 use std::sync::Arc;
395 use std::time::Duration;
396
397 use async_trait::async_trait;
398 use prometheus::Registry;
399 use sui_types::full_checkpoint_content::Checkpoint;
400 use tokio::sync::mpsc;
401
402 use crate::mocks::store::MockConnection;
403 use crate::mocks::store::MockStore;
404 use crate::pipeline::CommitterConfig;
405 use crate::pipeline::Processor;
406
407 use super::*;
408
409 #[derive(Default)]
411 struct TestHandler;
412
413 #[async_trait]
414 impl Processor for TestHandler {
415 const NAME: &'static str = "test";
416 type Value = u64;
417
418 async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
419 Ok(vec![])
420 }
421 }
422
423 #[async_trait]
424 impl super::Handler for TestHandler {
425 type Store = MockStore;
426 type Batch = Vec<u64>;
427 const MAX_BATCH_CHECKPOINTS: usize = 3; const MIN_EAGER_ROWS: usize = 4; fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
431 batch.extend(values);
432 }
433
434 async fn commit<'a>(
435 &self,
436 batch: &Self::Batch,
437 conn: &mut MockConnection<'a>,
438 ) -> anyhow::Result<usize> {
439 if !batch.is_empty() {
440 let mut sequential_data = conn.0.sequential_checkpoint_data.lock().unwrap();
441 sequential_data.extend(batch.iter().cloned());
442 }
443 Ok(batch.len())
444 }
445 }
446
447 struct TestSetup {
448 store: MockStore,
449 checkpoint_tx: mpsc::Sender<IndexedCheckpoint<TestHandler>>,
450 commit_hi_rx: mpsc::UnboundedReceiver<(&'static str, u64)>,
451 #[allow(unused)]
452 committer: Service,
453 }
454
455 fn setup_test(next_checkpoint: u64, config: SequentialConfig, store: MockStore) -> TestSetup {
458 let metrics = IndexerMetrics::new(None, &Registry::default());
459
460 let min_eager_rows = config
461 .min_eager_rows
462 .unwrap_or(<TestHandler as super::Handler>::MIN_EAGER_ROWS);
463 let max_batch_checkpoints = config
464 .max_batch_checkpoints
465 .unwrap_or(<TestHandler as super::Handler>::MAX_BATCH_CHECKPOINTS);
466
467 let (checkpoint_tx, checkpoint_rx) = mpsc::channel(10);
468 #[allow(clippy::disallowed_methods)]
469 let (commit_hi_tx, commit_hi_rx) = mpsc::unbounded_channel();
470
471 let store_clone = store.clone();
472 let handler = Arc::new(TestHandler);
473 let committer = committer(
474 handler,
475 config,
476 next_checkpoint,
477 checkpoint_rx,
478 commit_hi_tx,
479 store_clone,
480 metrics,
481 min_eager_rows,
482 max_batch_checkpoints,
483 );
484
485 TestSetup {
486 store,
487 checkpoint_tx,
488 commit_hi_rx,
489 committer,
490 }
491 }
492
493 async fn send_checkpoint(setup: &mut TestSetup, checkpoint: u64) {
494 setup
495 .checkpoint_tx
496 .send(create_checkpoint(checkpoint))
497 .await
498 .unwrap();
499 }
500
501 fn create_checkpoint(checkpoint: u64) -> IndexedCheckpoint<TestHandler> {
502 IndexedCheckpoint::new(
503 checkpoint, checkpoint, checkpoint, checkpoint * 1000, vec![checkpoint], )
509 }
510
511 #[tokio::test]
512 async fn test_committer_processes_sequential_checkpoints() {
513 let config = SequentialConfig {
514 committer: CommitterConfig::default(),
515 checkpoint_lag: 0, ..Default::default()
517 };
518 let mut setup = setup_test(0, config, MockStore::default());
519
520 for i in 0..3 {
522 send_checkpoint(&mut setup, i).await;
523 }
524
525 tokio::time::sleep(Duration::from_millis(200)).await;
527
528 assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2]);
530
531 {
533 let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
534 assert_eq!(watermark.checkpoint_hi_inclusive, Some(2));
535 assert_eq!(watermark.tx_hi, 2);
536 }
537
538 let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
540 assert_eq!(commit_hi.0, "test", "Pipeline name should be 'test'");
541 assert_eq!(commit_hi.1, 3, "commit_hi should be 3 (checkpoint 2 + 1)");
542 }
543
544 #[tokio::test]
547 async fn test_committer_processes_sequential_checkpoints_with_initial_watermark() {
548 let config = SequentialConfig::default();
549 let mut setup = setup_test(5, config, MockStore::default());
550
551 let watermark = setup.store.watermark(TestHandler::NAME);
553 assert!(watermark.is_none());
554
555 for i in 0..5 {
557 send_checkpoint(&mut setup, i).await;
558 }
559
560 tokio::time::sleep(Duration::from_millis(1000)).await;
562
563 let watermark = setup.store.watermark(TestHandler::NAME);
565 assert!(watermark.is_none());
566
567 for i in 5..8 {
568 send_checkpoint(&mut setup, i).await;
569 }
570
571 tokio::time::sleep(Duration::from_millis(1000)).await;
573
574 assert_eq!(setup.store.get_sequential_data(), vec![5, 6, 7]);
576
577 {
579 let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
580 assert_eq!(watermark.checkpoint_hi_inclusive, Some(7));
581 assert_eq!(watermark.tx_hi, 7);
582 }
583 }
584
585 #[tokio::test]
586 async fn test_committer_processes_out_of_order_checkpoints() {
587 let config = SequentialConfig {
588 committer: CommitterConfig::default(),
589 checkpoint_lag: 0, ..Default::default()
591 };
592 let mut setup = setup_test(0, config, MockStore::default());
593
594 for i in [1, 0, 2] {
596 send_checkpoint(&mut setup, i).await;
597 }
598
599 tokio::time::sleep(Duration::from_millis(200)).await;
601
602 assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2]);
604
605 {
607 let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
608 assert_eq!(watermark.checkpoint_hi_inclusive, Some(2));
609 assert_eq!(watermark.tx_hi, 2);
610 }
611
612 let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
614 assert_eq!(commit_hi.0, "test", "Pipeline name should be 'test'");
615 assert_eq!(commit_hi.1, 3, "commit_hi should be 3 (checkpoint 2 + 1)");
616 }
617
618 #[tokio::test]
619 async fn test_committer_commit_up_to_max_batch_checkpoints() {
620 let config = SequentialConfig {
621 committer: CommitterConfig::default(),
622 checkpoint_lag: 0, ..Default::default()
624 };
625 let mut setup = setup_test(0, config, MockStore::default());
626
627 for i in 0..4 {
629 send_checkpoint(&mut setup, i).await;
630 }
631
632 tokio::time::sleep(Duration::from_millis(200)).await;
634
635 let commit_hi1 = setup.commit_hi_rx.recv().await.unwrap();
637 assert_eq!(
638 commit_hi1.1, 3,
639 "First commit_hi should be 3 (checkpoint 2 + 1, highest processed of first batch)"
640 );
641
642 let commit_hi2 = setup.commit_hi_rx.recv().await.unwrap();
643 assert_eq!(
644 commit_hi2.1, 4,
645 "Second commit_hi should be 4 (checkpoint 3 + 1, highest processed of second batch)"
646 );
647
648 assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2, 3]);
650 }
651
652 #[tokio::test]
653 async fn test_committer_does_not_commit_until_checkpoint_lag() {
654 let config = SequentialConfig {
655 committer: CommitterConfig::default(),
656 checkpoint_lag: 1, ..Default::default()
658 };
659 let mut setup = setup_test(0, config, MockStore::default());
660
661 for i in 0..3 {
663 send_checkpoint(&mut setup, i).await;
664 }
665
666 tokio::time::sleep(Duration::from_millis(200)).await;
668
669 assert_eq!(setup.store.get_sequential_data(), vec![0, 1]);
671 let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
672 assert_eq!(commit_hi.1, 2, "commit_hi should be 2 (checkpoint 1 + 1)");
673
674 send_checkpoint(&mut setup, 3).await;
676
677 tokio::time::sleep(Duration::from_millis(1000)).await;
679
680 assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2]);
682 let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
683 assert_eq!(commit_hi.1, 3, "commit_hi should be 3 (checkpoint 2 + 1)");
684 }
685
686 #[tokio::test]
687 async fn test_committer_commits_eagerly() {
688 let config = SequentialConfig {
689 committer: CommitterConfig {
690 collect_interval_ms: 4_000, ..Default::default()
692 },
693 checkpoint_lag: 0, ..Default::default()
695 };
696 let mut setup = setup_test(0, config, MockStore::default());
697
698 tokio::time::sleep(Duration::from_millis(200)).await;
700
701 for i in 0..3 {
703 send_checkpoint(&mut setup, i).await;
704 }
705
706 assert_eq!(setup.store.get_sequential_data(), Vec::<u64>::new());
708
709 send_checkpoint(&mut setup, 3).await;
711
712 tokio::time::sleep(Duration::from_millis(200)).await;
714
715 assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2, 3]);
717 }
718
719 #[tokio::test]
720 async fn test_committer_cannot_commit_eagerly_due_to_checkpoint_lag() {
721 let config = SequentialConfig {
722 committer: CommitterConfig {
723 collect_interval_ms: 4_000, ..Default::default()
725 },
726 checkpoint_lag: 4, ..Default::default()
728 };
729 let mut setup = setup_test(0, config, MockStore::default());
730
731 tokio::time::sleep(Duration::from_millis(200)).await;
733
734 for i in 0..4 {
736 send_checkpoint(&mut setup, i).await;
737 }
738
739 tokio::time::sleep(Duration::from_millis(200)).await;
741
742 assert_eq!(setup.store.get_sequential_data(), Vec::<u64>::new());
744
745 send_checkpoint(&mut setup, 4).await;
747
748 tokio::time::sleep(Duration::from_millis(200)).await;
750
751 assert_eq!(setup.store.get_sequential_data(), vec![0]);
753 }
754
755 #[tokio::test]
756 async fn test_committer_retries_on_transaction_failure() {
757 let config = SequentialConfig {
758 committer: CommitterConfig {
759 collect_interval_ms: 1_000, ..Default::default()
761 },
762 checkpoint_lag: 0,
763 ..Default::default()
764 };
765
766 let store = MockStore::default().with_transaction_failures(1); let mut setup = setup_test(10, config, store);
770
771 send_checkpoint(&mut setup, 10).await;
773
774 tokio::time::sleep(Duration::from_millis(200)).await;
776
777 assert_eq!(setup.store.get_sequential_data(), Vec::<u64>::new());
779
780 tokio::time::sleep(Duration::from_millis(1_200)).await;
782
783 assert_eq!(setup.store.get_sequential_data(), vec![10]);
785
786 let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
788 assert_eq!(commit_hi.0, "test", "Pipeline name should be 'test'");
789 assert_eq!(
790 commit_hi.1, 11,
791 "commit_hi should be 11 (checkpoint 10 + 1)"
792 );
793 }
794}