1use std::{cmp::Ordering, collections::BTreeMap, sync::Arc};
5
6use scoped_futures::ScopedFutureExt;
7use tokio::{
8 sync::mpsc,
9 task::JoinHandle,
10 time::{MissedTickBehavior, interval},
11};
12use tokio_util::sync::CancellationToken;
13use tracing::{debug, info, warn};
14
15use crate::{
16 metrics::{CheckpointLagMetricReporter, IndexerMetrics},
17 pipeline::{IndexedCheckpoint, WARN_PENDING_WATERMARKS, logging::WatermarkLogger},
18 store::{Connection, TransactionalStore},
19};
20
21use super::{Handler, SequentialConfig};
22
23pub(super) fn committer<H>(
42 config: SequentialConfig,
43 mut next_checkpoint: u64,
44 mut rx: mpsc::Receiver<IndexedCheckpoint<H>>,
45 tx: mpsc::UnboundedSender<(&'static str, u64)>,
46 store: H::Store,
47 metrics: Arc<IndexerMetrics>,
48 cancel: CancellationToken,
49) -> JoinHandle<()>
50where
51 H: Handler + Send + Sync + 'static,
52 H::Store: TransactionalStore + 'static,
53{
54 tokio::spawn(async move {
55 let mut poll = interval(config.committer.collect_interval());
58 poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
59
60 let checkpoint_lag = config.checkpoint_lag;
61
62 let mut attempt = 0;
70 let mut batch = H::Batch::default();
71 let mut batch_rows = 0;
72 let mut batch_checkpoints = 0;
73 let mut watermark = None;
79
80 let mut logger = WatermarkLogger::new("sequential_committer");
83
84 let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
85 &metrics.watermarked_checkpoint_timestamp_lag,
86 &metrics.latest_watermarked_checkpoint_timestamp_lag_ms,
87 &metrics.watermark_checkpoint_in_db,
88 );
89
90 let mut pending: BTreeMap<u64, IndexedCheckpoint<H>> = BTreeMap::new();
93 let mut pending_rows = 0;
94
95 info!(pipeline = H::NAME, "Starting committer");
96
97 loop {
98 tokio::select! {
99 _ = cancel.cancelled() => {
100 info!(pipeline = H::NAME, "Shutdown received");
101 break;
102 }
103
104 _ = poll.tick() => {
105 if batch_checkpoints == 0
106 && rx.is_closed()
107 && rx.is_empty()
108 && !can_process_pending(next_checkpoint, checkpoint_lag, &pending)
109 {
110 info!(pipeline = H::NAME, "Process closed channel and no more data to commit");
111 break;
112 }
113
114 if pending.len() > WARN_PENDING_WATERMARKS {
115 warn!(
116 pipeline = H::NAME,
117 pending = pending.len(),
118 "Pipeline has a large number of pending watermarks",
119 );
120 }
121
122 let guard = metrics
123 .collector_gather_latency
124 .with_label_values(&[H::NAME])
125 .start_timer();
126
127 while batch_checkpoints < H::MAX_BATCH_CHECKPOINTS {
137 if !can_process_pending(next_checkpoint, checkpoint_lag, &pending) {
138 break;
139 }
140
141 let Some(entry) = pending.first_entry() else {
142 break;
143 };
144
145 match next_checkpoint.cmp(entry.key()) {
146 Ordering::Less => break,
148
149 Ordering::Equal => {
151 let indexed = entry.remove();
152 batch_rows += indexed.len();
153 batch_checkpoints += 1;
154 H::batch(&mut batch, indexed.values);
155 watermark = Some(indexed.watermark);
156 next_checkpoint += 1;
157 }
158
159 Ordering::Greater => {
162 metrics
163 .total_watermarks_out_of_order
164 .with_label_values(&[H::NAME])
165 .inc();
166
167 let indexed = entry.remove();
168 pending_rows -= indexed.len();
169 }
170 }
171 }
172 let elapsed = guard.stop_and_record();
175 debug!(
176 pipeline = H::NAME,
177 elapsed_ms = elapsed * 1000.0,
178 rows = batch_rows,
179 pending = pending_rows,
180 "Gathered batch",
181 );
182
183 if batch_checkpoints == 0 {
189 assert_eq!(batch_rows, 0);
190 continue;
191 }
192
193 let Some(watermark) = watermark else {
194 continue;
195 };
196
197 metrics
198 .collector_batch_size
199 .with_label_values(&[H::NAME])
200 .observe(batch_rows as f64);
201
202 metrics
203 .total_committer_batches_attempted
204 .with_label_values(&[H::NAME])
205 .inc();
206
207 metrics
208 .watermark_epoch
209 .with_label_values(&[H::NAME])
210 .set(watermark.epoch_hi_inclusive as i64);
211
212 metrics
213 .watermark_checkpoint
214 .with_label_values(&[H::NAME])
215 .set(watermark.checkpoint_hi_inclusive as i64);
216
217 metrics
218 .watermark_transaction
219 .with_label_values(&[H::NAME])
220 .set(watermark.tx_hi as i64);
221
222 metrics
223 .watermark_timestamp_ms
224 .with_label_values(&[H::NAME])
225 .set(watermark.timestamp_ms_hi_inclusive as i64);
226
227 let guard = metrics
228 .committer_commit_latency
229 .with_label_values(&[H::NAME])
230 .start_timer();
231
232 let affected = store.transaction(|conn| {
233 async {
234 conn.set_committer_watermark(H::NAME, watermark).await?;
235 H::commit(&batch, conn).await
236 }.scope_boxed()
237 }).await;
238
239
240 let elapsed = guard.stop_and_record();
241
242 let affected = match affected {
243 Ok(affected) => affected,
244
245 Err(e) => {
246 warn!(
247 pipeline = H::NAME,
248 elapsed_ms = elapsed * 1000.0,
249 attempt,
250 committed = batch_rows,
251 pending = pending_rows,
252 "Error writing batch: {e}",
253 );
254
255 metrics
256 .total_committer_batches_failed
257 .with_label_values(&[H::NAME])
258 .inc();
259
260 attempt += 1;
261 continue;
262 }
263 };
264
265 debug!(
266 pipeline = H::NAME,
267 attempt,
268 affected,
269 committed = batch_rows,
270 pending = pending_rows,
271 "Wrote batch",
272 );
273
274 logger.log::<H>(&watermark, elapsed);
275
276 checkpoint_lag_reporter.report_lag(
277 watermark.checkpoint_hi_inclusive,
278 watermark.timestamp_ms_hi_inclusive
279 );
280
281 metrics
282 .total_committer_batches_succeeded
283 .with_label_values(&[H::NAME])
284 .inc();
285
286 metrics
287 .total_committer_rows_committed
288 .with_label_values(&[H::NAME])
289 .inc_by(batch_rows as u64);
290
291 metrics
292 .total_committer_rows_affected
293 .with_label_values(&[H::NAME])
294 .inc_by(affected as u64);
295
296 metrics
297 .committer_tx_rows
298 .with_label_values(&[H::NAME])
299 .observe(affected as f64);
300
301 metrics
302 .watermark_epoch_in_db
303 .with_label_values(&[H::NAME])
304 .set(watermark.epoch_hi_inclusive as i64);
305
306 metrics
307 .watermark_checkpoint_in_db
308 .with_label_values(&[H::NAME])
309 .set(watermark.checkpoint_hi_inclusive as i64);
310
311 metrics
312 .watermark_transaction_in_db
313 .with_label_values(&[H::NAME])
314 .set(watermark.tx_hi as i64);
315
316 metrics
317 .watermark_timestamp_in_db_ms
318 .with_label_values(&[H::NAME])
319 .set(watermark.timestamp_ms_hi_inclusive as i64);
320
321 let _ = tx.send((H::NAME, watermark.checkpoint_hi_inclusive + 1));
326 let _ = std::mem::take(&mut batch);
329 pending_rows -= batch_rows;
330 batch_checkpoints = 0;
331 batch_rows = 0;
332 attempt = 0;
333
334 if can_process_pending(next_checkpoint, checkpoint_lag, &pending) {
337 poll.reset_immediately();
338 }
339 }
340
341 Some(indexed) = rx.recv() => {
342 metrics
346 .total_collector_rows_received
347 .with_label_values(&[H::NAME])
348 .inc_by(indexed.len() as u64);
349
350 pending_rows += indexed.len();
351 pending.insert(indexed.checkpoint(), indexed);
352
353 if pending_rows < H::MIN_EAGER_ROWS {
358 continue;
359 }
360
361 if batch_checkpoints > 0
362 || can_process_pending(next_checkpoint, checkpoint_lag, &pending)
363 {
364 poll.reset_immediately();
365 }
366 }
367 }
368 }
369
370 info!(pipeline = H::NAME, "Stopping committer");
371 })
372}
373
374fn can_process_pending<T>(
380 next_checkpoint: u64,
381 checkpoint_lag: u64,
382 pending: &BTreeMap<u64, T>,
383) -> bool {
384 let Some((&first, _)) = pending.first_key_value() else {
385 return false;
386 };
387
388 let Some((&last, _)) = pending.last_key_value() else {
389 return false;
390 };
391
392 first <= next_checkpoint && first + checkpoint_lag <= last
393}
394
395#[cfg(test)]
396mod tests {
397 use crate::{
398 mocks::store::{MockConnection, MockStore},
399 pipeline::{CommitterConfig, Processor},
400 };
401
402 use super::*;
403 use async_trait::async_trait;
404 use prometheus::Registry;
405 use std::{sync::Arc, time::Duration};
406 use sui_types::full_checkpoint_content::CheckpointData;
407 use tokio::sync::mpsc;
408 use tokio_util::sync::CancellationToken;
409
410 #[derive(Default)]
412 struct TestHandler;
413
414 #[async_trait]
415 impl Processor for TestHandler {
416 const NAME: &'static str = "test";
417 type Value = u64;
418
419 async fn process(
420 &self,
421 _checkpoint: &Arc<CheckpointData>,
422 ) -> anyhow::Result<Vec<Self::Value>> {
423 Ok(vec![])
424 }
425 }
426
427 #[async_trait]
428 impl super::Handler for TestHandler {
429 type Store = MockStore;
430 type Batch = Vec<u64>;
431 const MAX_BATCH_CHECKPOINTS: usize = 3; const MIN_EAGER_ROWS: usize = 4; fn batch(batch: &mut Self::Batch, values: Vec<Self::Value>) {
435 batch.extend(values);
436 }
437
438 async fn commit<'a>(
439 batch: &Self::Batch,
440 conn: &mut MockConnection<'a>,
441 ) -> anyhow::Result<usize> {
442 if !batch.is_empty() {
443 let mut sequential_data = conn.0.sequential_checkpoint_data.lock().unwrap();
444 sequential_data.extend(batch.iter().cloned());
445 }
446 Ok(batch.len())
447 }
448 }
449
450 struct TestSetup {
451 store: MockStore,
452 checkpoint_tx: mpsc::Sender<IndexedCheckpoint<TestHandler>>,
453 commit_hi_rx: mpsc::UnboundedReceiver<(&'static str, u64)>,
454 committer_handle: JoinHandle<()>,
455 }
456
457 fn setup_test(next_checkpoint: u64, config: SequentialConfig, store: MockStore) -> TestSetup {
460 let metrics = IndexerMetrics::new(None, &Registry::default());
461 let cancel = CancellationToken::new();
462
463 let (checkpoint_tx, checkpoint_rx) = mpsc::channel(10);
464 #[allow(clippy::disallowed_methods)]
465 let (commit_hi_tx, commit_hi_rx) = mpsc::unbounded_channel();
466
467 let store_clone = store.clone();
468 let committer_handle = committer(
469 config,
470 next_checkpoint,
471 checkpoint_rx,
472 commit_hi_tx,
473 store_clone,
474 metrics,
475 cancel,
476 );
477
478 TestSetup {
479 store,
480 checkpoint_tx,
481 commit_hi_rx,
482 committer_handle,
483 }
484 }
485
486 async fn send_checkpoint(setup: &mut TestSetup, checkpoint: u64) {
487 setup
488 .checkpoint_tx
489 .send(create_checkpoint(checkpoint))
490 .await
491 .unwrap();
492 }
493
494 fn create_checkpoint(checkpoint: u64) -> IndexedCheckpoint<TestHandler> {
495 IndexedCheckpoint::new(
496 checkpoint, checkpoint, checkpoint, checkpoint * 1000, vec![checkpoint], )
502 }
503
504 #[tokio::test]
505 async fn test_committer_processes_sequential_checkpoints() {
506 let config = SequentialConfig {
507 committer: CommitterConfig::default(),
508 checkpoint_lag: 0, };
510 let mut setup = setup_test(0, config, MockStore::default());
511
512 for i in 0..3 {
514 send_checkpoint(&mut setup, i).await;
515 }
516
517 tokio::time::sleep(Duration::from_millis(200)).await;
519
520 assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2]);
522
523 {
525 let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
526 assert_eq!(watermark.checkpoint_hi_inclusive, 2);
527 assert_eq!(watermark.tx_hi, 2);
528 }
529
530 let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
532 assert_eq!(commit_hi.0, "test", "Pipeline name should be 'test'");
533 assert_eq!(commit_hi.1, 3, "commit_hi should be 3 (checkpoint 2 + 1)");
534
535 drop(setup.checkpoint_tx);
537 let _ = setup.committer_handle.await;
538 }
539
540 #[tokio::test]
543 async fn test_committer_processes_sequential_checkpoints_with_initial_watermark() {
544 let config = SequentialConfig::default();
545 let mut setup = setup_test(5, config, MockStore::default());
546
547 let watermark = setup.store.watermark(TestHandler::NAME);
549 assert!(watermark.is_none());
550
551 for i in 0..5 {
553 send_checkpoint(&mut setup, i).await;
554 }
555
556 tokio::time::sleep(Duration::from_millis(1000)).await;
558
559 let watermark = setup.store.watermark(TestHandler::NAME);
561 assert!(watermark.is_none());
562
563 for i in 5..8 {
564 send_checkpoint(&mut setup, i).await;
565 }
566
567 tokio::time::sleep(Duration::from_millis(1000)).await;
569
570 assert_eq!(setup.store.get_sequential_data(), vec![5, 6, 7]);
572
573 {
575 let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
576 assert_eq!(watermark.checkpoint_hi_inclusive, 7);
577 assert_eq!(watermark.tx_hi, 7);
578 }
579
580 drop(setup.checkpoint_tx);
582 let _ = setup.committer_handle.await;
583 }
584
585 #[tokio::test]
586 async fn test_committer_processes_out_of_order_checkpoints() {
587 let config = SequentialConfig {
588 committer: CommitterConfig::default(),
589 checkpoint_lag: 0, };
591 let mut setup = setup_test(0, config, MockStore::default());
592
593 for i in [1, 0, 2] {
595 send_checkpoint(&mut setup, i).await;
596 }
597
598 tokio::time::sleep(Duration::from_millis(200)).await;
600
601 assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2]);
603
604 {
606 let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
607 assert_eq!(watermark.checkpoint_hi_inclusive, 2);
608 assert_eq!(watermark.tx_hi, 2);
609 }
610
611 let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
613 assert_eq!(commit_hi.0, "test", "Pipeline name should be 'test'");
614 assert_eq!(commit_hi.1, 3, "commit_hi should be 3 (checkpoint 2 + 1)");
615
616 drop(setup.checkpoint_tx);
618 let _ = setup.committer_handle.await;
619 }
620
621 #[tokio::test]
622 async fn test_committer_commit_up_to_max_batch_checkpoints() {
623 let config = SequentialConfig {
624 committer: CommitterConfig::default(),
625 checkpoint_lag: 0, };
627 let mut setup = setup_test(0, config, MockStore::default());
628
629 for i in 0..4 {
631 send_checkpoint(&mut setup, i).await;
632 }
633
634 tokio::time::sleep(Duration::from_millis(200)).await;
636
637 let commit_hi1 = setup.commit_hi_rx.recv().await.unwrap();
639 assert_eq!(
640 commit_hi1.1, 3,
641 "First commit_hi should be 3 (checkpoint 2 + 1, highest processed of first batch)"
642 );
643
644 let commit_hi2 = setup.commit_hi_rx.recv().await.unwrap();
645 assert_eq!(
646 commit_hi2.1, 4,
647 "Second commit_hi should be 4 (checkpoint 3 + 1, highest processed of second batch)"
648 );
649
650 assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2, 3]);
652
653 drop(setup.checkpoint_tx);
655 let _ = setup.committer_handle.await;
656 }
657
658 #[tokio::test]
659 async fn test_committer_does_not_commit_until_checkpoint_lag() {
660 let config = SequentialConfig {
661 committer: CommitterConfig::default(),
662 checkpoint_lag: 1, };
664 let mut setup = setup_test(0, config, MockStore::default());
665
666 for i in 0..3 {
668 send_checkpoint(&mut setup, i).await;
669 }
670
671 tokio::time::sleep(Duration::from_millis(200)).await;
673
674 assert_eq!(setup.store.get_sequential_data(), vec![0, 1]);
676 let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
677 assert_eq!(commit_hi.1, 2, "commit_hi should be 2 (checkpoint 1 + 1)");
678
679 send_checkpoint(&mut setup, 3).await;
681
682 tokio::time::sleep(Duration::from_millis(1000)).await;
684
685 assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2]);
687 let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
688 assert_eq!(commit_hi.1, 3, "commit_hi should be 3 (checkpoint 2 + 1)");
689
690 drop(setup.checkpoint_tx);
692 let _ = setup.committer_handle.await;
693 }
694
695 #[tokio::test]
696 async fn test_committer_commits_eagerly() {
697 let config = SequentialConfig {
698 committer: CommitterConfig {
699 collect_interval_ms: 4_000, ..Default::default()
701 },
702 checkpoint_lag: 0, };
704 let mut setup = setup_test(0, config, MockStore::default());
705
706 tokio::time::sleep(Duration::from_millis(200)).await;
708
709 for i in 0..3 {
711 send_checkpoint(&mut setup, i).await;
712 }
713
714 assert_eq!(setup.store.get_sequential_data(), Vec::<u64>::new());
716
717 send_checkpoint(&mut setup, 3).await;
719
720 tokio::time::sleep(Duration::from_millis(200)).await;
722
723 assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2, 3]);
725
726 drop(setup.checkpoint_tx);
728 let _ = setup.committer_handle.await;
729 }
730
731 #[tokio::test]
732 async fn test_committer_cannot_commit_eagerly_due_to_checkpoint_lag() {
733 let config = SequentialConfig {
734 committer: CommitterConfig {
735 collect_interval_ms: 4_000, ..Default::default()
737 },
738 checkpoint_lag: 4, };
740 let mut setup = setup_test(0, config, MockStore::default());
741
742 tokio::time::sleep(Duration::from_millis(200)).await;
744
745 for i in 0..4 {
747 send_checkpoint(&mut setup, i).await;
748 }
749
750 tokio::time::sleep(Duration::from_millis(200)).await;
752
753 assert_eq!(setup.store.get_sequential_data(), Vec::<u64>::new());
755
756 send_checkpoint(&mut setup, 4).await;
758
759 tokio::time::sleep(Duration::from_millis(200)).await;
761
762 assert_eq!(setup.store.get_sequential_data(), vec![0]);
764
765 drop(setup.checkpoint_tx);
767 let _ = setup.committer_handle.await;
768 }
769
770 #[tokio::test]
771 async fn test_committer_retries_on_transaction_failure() {
772 let config = SequentialConfig {
773 committer: CommitterConfig {
774 collect_interval_ms: 1_000, ..Default::default()
776 },
777 checkpoint_lag: 0,
778 };
779
780 let store = MockStore::default().with_transaction_failures(1); let mut setup = setup_test(10, config, store);
784
785 send_checkpoint(&mut setup, 10).await;
787
788 tokio::time::sleep(Duration::from_millis(200)).await;
790
791 assert_eq!(setup.store.get_sequential_data(), Vec::<u64>::new());
793
794 tokio::time::sleep(Duration::from_millis(1_200)).await;
796
797 assert_eq!(setup.store.get_sequential_data(), vec![10]);
799
800 let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
802 assert_eq!(commit_hi.0, "test", "Pipeline name should be 'test'");
803 assert_eq!(
804 commit_hi.1, 11,
805 "commit_hi should be 11 (checkpoint 10 + 1)"
806 );
807
808 drop(setup.checkpoint_tx);
810 let _ = setup.committer_handle.await;
811 }
812}