1use std::cmp::Ordering;
5use std::collections::BTreeMap;
6use std::sync::Arc;
7
8use scoped_futures::ScopedFutureExt;
9use sui_futures::service::Service;
10use tokio::sync::mpsc;
11use tokio::time::MissedTickBehavior;
12use tokio::time::interval;
13use tracing::debug;
14use tracing::info;
15use tracing::warn;
16
17use crate::metrics::CheckpointLagMetricReporter;
18use crate::metrics::IndexerMetrics;
19use crate::pipeline::IndexedCheckpoint;
20use crate::pipeline::WARN_PENDING_WATERMARKS;
21use crate::pipeline::logging::WatermarkLogger;
22use crate::pipeline::sequential::Handler;
23use crate::pipeline::sequential::SequentialConfig;
24use crate::store::Connection;
25use crate::store::TransactionalStore;
26
27pub(super) fn committer<H>(
44 handler: Arc<H>,
45 config: SequentialConfig,
46 mut next_checkpoint: u64,
47 mut rx: mpsc::Receiver<IndexedCheckpoint<H>>,
48 tx: mpsc::UnboundedSender<(&'static str, u64)>,
49 store: H::Store,
50 metrics: Arc<IndexerMetrics>,
51 min_eager_rows: usize,
52 max_batch_checkpoints: usize,
53) -> Service
54where
55 H: Handler + Send + Sync + 'static,
56 H::Store: TransactionalStore + 'static,
57{
58 Service::new().spawn_aborting(async move {
59 let mut poll = interval(config.committer.collect_interval());
62 poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
63
64 let checkpoint_lag = config.checkpoint_lag;
65
66 let mut attempt = 0;
74 let mut batch = H::Batch::default();
75 let mut batch_rows = 0;
76 let mut batch_checkpoints = 0;
77
78 let mut watermark = None;
84
85 let mut logger = WatermarkLogger::new("sequential_committer");
88
89 let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
90 &metrics.watermarked_checkpoint_timestamp_lag,
91 &metrics.latest_watermarked_checkpoint_timestamp_lag_ms,
92 &metrics.watermark_checkpoint_in_db,
93 );
94
95 let mut pending: BTreeMap<u64, IndexedCheckpoint<H>> = BTreeMap::new();
98 let mut pending_rows = 0;
99
100 info!(pipeline = H::NAME, "Starting committer");
101
102 loop {
103 tokio::select! {
104 _ = poll.tick() => {
105 if batch_checkpoints == 0
106 && rx.is_closed()
107 && rx.is_empty()
108 && !can_process_pending(next_checkpoint, checkpoint_lag, &pending)
109 {
110 info!(pipeline = H::NAME, "Process closed channel and no more data to commit");
111 break;
112 }
113
114 if pending.len() > WARN_PENDING_WATERMARKS {
115 warn!(
116 pipeline = H::NAME,
117 pending = pending.len(),
118 "Pipeline has a large number of pending watermarks",
119 );
120 }
121
122 let guard = metrics
123 .collector_gather_latency
124 .with_label_values(&[H::NAME])
125 .start_timer();
126
127 while batch_checkpoints < max_batch_checkpoints {
137 if !can_process_pending(next_checkpoint, checkpoint_lag, &pending) {
138 break;
139 }
140
141 let Some(entry) = pending.first_entry() else {
142 break;
143 };
144
145 match next_checkpoint.cmp(entry.key()) {
146 Ordering::Less => break,
148
149 Ordering::Equal => {
151 let indexed = entry.remove();
152 batch_rows += indexed.len();
153 batch_checkpoints += 1;
154 handler.batch(&mut batch, indexed.values.into_iter());
155 watermark = Some(indexed.watermark);
156 next_checkpoint += 1;
157 }
158
159 Ordering::Greater => {
162 metrics
163 .total_watermarks_out_of_order
164 .with_label_values(&[H::NAME])
165 .inc();
166
167 let indexed = entry.remove();
168 pending_rows -= indexed.len();
169 }
170 }
171 }
172 let elapsed = guard.stop_and_record();
175 debug!(
176 pipeline = H::NAME,
177 elapsed_ms = elapsed * 1000.0,
178 rows = batch_rows,
179 pending = pending_rows,
180 "Gathered batch",
181 );
182
183 if batch_checkpoints == 0 {
189 assert_eq!(batch_rows, 0);
190 continue;
191 }
192
193 let Some(watermark) = watermark else {
194 continue;
195 };
196
197 metrics
198 .collector_batch_size
199 .with_label_values(&[H::NAME])
200 .observe(batch_rows as f64);
201
202 metrics
203 .total_committer_batches_attempted
204 .with_label_values(&[H::NAME])
205 .inc();
206
207 metrics
208 .watermark_epoch
209 .with_label_values(&[H::NAME])
210 .set(watermark.epoch_hi_inclusive as i64);
211
212 metrics
213 .watermark_checkpoint
214 .with_label_values(&[H::NAME])
215 .set(watermark.checkpoint_hi_inclusive as i64);
216
217 metrics
218 .watermark_transaction
219 .with_label_values(&[H::NAME])
220 .set(watermark.tx_hi as i64);
221
222 metrics
223 .watermark_timestamp_ms
224 .with_label_values(&[H::NAME])
225 .set(watermark.timestamp_ms_hi_inclusive as i64);
226
227 let guard = metrics
228 .committer_commit_latency
229 .with_label_values(&[H::NAME])
230 .start_timer();
231
232 let affected = store.transaction(|conn| {
233 async {
234 conn.set_committer_watermark(H::NAME, watermark).await?;
235 handler.commit(&batch, conn).await
236 }.scope_boxed()
237 }).await;
238
239
240 let elapsed = guard.stop_and_record();
241
242 let affected = match affected {
243 Ok(affected) => affected,
244
245 Err(e) => {
246 warn!(
247 pipeline = H::NAME,
248 elapsed_ms = elapsed * 1000.0,
249 attempt,
250 committed = batch_rows,
251 pending = pending_rows,
252 "Error writing batch: {e}",
253 );
254
255 metrics
256 .total_committer_batches_failed
257 .with_label_values(&[H::NAME])
258 .inc();
259
260 attempt += 1;
261 continue;
262 }
263 };
264
265 debug!(
266 pipeline = H::NAME,
267 attempt,
268 affected,
269 committed = batch_rows,
270 pending = pending_rows,
271 "Wrote batch",
272 );
273
274 logger.log::<H>(&watermark, elapsed);
275
276 checkpoint_lag_reporter.report_lag(
277 watermark.checkpoint_hi_inclusive,
278 watermark.timestamp_ms_hi_inclusive
279 );
280
281 metrics
282 .total_committer_batches_succeeded
283 .with_label_values(&[H::NAME])
284 .inc();
285
286 metrics
287 .total_committer_rows_committed
288 .with_label_values(&[H::NAME])
289 .inc_by(batch_rows as u64);
290
291 metrics
292 .total_committer_rows_affected
293 .with_label_values(&[H::NAME])
294 .inc_by(affected as u64);
295
296 metrics
297 .committer_tx_rows
298 .with_label_values(&[H::NAME])
299 .observe(affected as f64);
300
301 metrics
302 .watermark_epoch_in_db
303 .with_label_values(&[H::NAME])
304 .set(watermark.epoch_hi_inclusive as i64);
305
306 metrics
307 .watermark_checkpoint_in_db
308 .with_label_values(&[H::NAME])
309 .set(watermark.checkpoint_hi_inclusive as i64);
310
311 metrics
312 .watermark_transaction_in_db
313 .with_label_values(&[H::NAME])
314 .set(watermark.tx_hi as i64);
315
316 metrics
317 .watermark_timestamp_in_db_ms
318 .with_label_values(&[H::NAME])
319 .set(watermark.timestamp_ms_hi_inclusive as i64);
320
321 let _ = tx.send((H::NAME, watermark.checkpoint_hi_inclusive + 1));
326 let _ = std::mem::take(&mut batch);
329 pending_rows -= batch_rows;
330 batch_checkpoints = 0;
331 batch_rows = 0;
332 attempt = 0;
333
334 if can_process_pending(next_checkpoint, checkpoint_lag, &pending) {
337 poll.reset_immediately();
338 }
339 }
340
341 Some(indexed) = rx.recv() => {
342 metrics
346 .total_collector_rows_received
347 .with_label_values(&[H::NAME])
348 .inc_by(indexed.len() as u64);
349
350 pending_rows += indexed.len();
351 pending.insert(indexed.checkpoint(), indexed);
352
353 if pending_rows < min_eager_rows {
358 continue;
359 }
360
361 if batch_checkpoints > 0
362 || can_process_pending(next_checkpoint, checkpoint_lag, &pending)
363 {
364 poll.reset_immediately();
365 }
366 }
367 }
368 }
369
370 info!(pipeline = H::NAME, "Stopping committer");
371 Ok(())
372 })
373}
374
375fn can_process_pending<T>(
381 next_checkpoint: u64,
382 checkpoint_lag: u64,
383 pending: &BTreeMap<u64, T>,
384) -> bool {
385 let Some((&first, _)) = pending.first_key_value() else {
386 return false;
387 };
388
389 let Some((&last, _)) = pending.last_key_value() else {
390 return false;
391 };
392
393 first <= next_checkpoint && first + checkpoint_lag <= last
394}
395
396#[cfg(test)]
397mod tests {
398 use std::sync::Arc;
399 use std::time::Duration;
400
401 use async_trait::async_trait;
402 use prometheus::Registry;
403 use sui_types::full_checkpoint_content::Checkpoint;
404 use tokio::sync::mpsc;
405
406 use crate::mocks::store::MockConnection;
407 use crate::mocks::store::MockStore;
408 use crate::pipeline::CommitterConfig;
409 use crate::pipeline::Processor;
410
411 use super::*;
412
413 #[derive(Default)]
415 struct TestHandler;
416
417 #[async_trait]
418 impl Processor for TestHandler {
419 const NAME: &'static str = "test";
420 type Value = u64;
421
422 async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
423 Ok(vec![])
424 }
425 }
426
427 #[async_trait]
428 impl super::Handler for TestHandler {
429 type Store = MockStore;
430 type Batch = Vec<u64>;
431 const MAX_BATCH_CHECKPOINTS: usize = 3; const MIN_EAGER_ROWS: usize = 4; fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
435 batch.extend(values);
436 }
437
438 async fn commit<'a>(
439 &self,
440 batch: &Self::Batch,
441 conn: &mut MockConnection<'a>,
442 ) -> anyhow::Result<usize> {
443 if !batch.is_empty() {
444 let mut sequential_data = conn.0.sequential_checkpoint_data.lock().unwrap();
445 sequential_data.extend(batch.iter().cloned());
446 }
447 Ok(batch.len())
448 }
449 }
450
451 struct TestSetup {
452 store: MockStore,
453 checkpoint_tx: mpsc::Sender<IndexedCheckpoint<TestHandler>>,
454 commit_hi_rx: mpsc::UnboundedReceiver<(&'static str, u64)>,
455 #[allow(unused)]
456 committer: Service,
457 }
458
459 fn setup_test(next_checkpoint: u64, config: SequentialConfig, store: MockStore) -> TestSetup {
462 let metrics = IndexerMetrics::new(None, &Registry::default());
463
464 let min_eager_rows = config
465 .min_eager_rows
466 .unwrap_or(<TestHandler as super::Handler>::MIN_EAGER_ROWS);
467 let max_batch_checkpoints = config
468 .max_batch_checkpoints
469 .unwrap_or(<TestHandler as super::Handler>::MAX_BATCH_CHECKPOINTS);
470
471 let (checkpoint_tx, checkpoint_rx) = mpsc::channel(10);
472 #[allow(clippy::disallowed_methods)]
473 let (commit_hi_tx, commit_hi_rx) = mpsc::unbounded_channel();
474
475 let store_clone = store.clone();
476 let handler = Arc::new(TestHandler);
477 let committer = committer(
478 handler,
479 config,
480 next_checkpoint,
481 checkpoint_rx,
482 commit_hi_tx,
483 store_clone,
484 metrics,
485 min_eager_rows,
486 max_batch_checkpoints,
487 );
488
489 TestSetup {
490 store,
491 checkpoint_tx,
492 commit_hi_rx,
493 committer,
494 }
495 }
496
497 async fn send_checkpoint(setup: &mut TestSetup, checkpoint: u64) {
498 setup
499 .checkpoint_tx
500 .send(create_checkpoint(checkpoint))
501 .await
502 .unwrap();
503 }
504
505 fn create_checkpoint(checkpoint: u64) -> IndexedCheckpoint<TestHandler> {
506 IndexedCheckpoint::new(
507 checkpoint, checkpoint, checkpoint, checkpoint * 1000, vec![checkpoint], )
513 }
514
515 #[tokio::test]
516 async fn test_committer_processes_sequential_checkpoints() {
517 let config = SequentialConfig {
518 committer: CommitterConfig::default(),
519 checkpoint_lag: 0, ..Default::default()
521 };
522 let mut setup = setup_test(0, config, MockStore::default());
523
524 for i in 0..3 {
526 send_checkpoint(&mut setup, i).await;
527 }
528
529 tokio::time::sleep(Duration::from_millis(200)).await;
531
532 assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2]);
534
535 {
537 let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
538 assert_eq!(watermark.checkpoint_hi_inclusive, 2);
539 assert_eq!(watermark.tx_hi, 2);
540 }
541
542 let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
544 assert_eq!(commit_hi.0, "test", "Pipeline name should be 'test'");
545 assert_eq!(commit_hi.1, 3, "commit_hi should be 3 (checkpoint 2 + 1)");
546 }
547
548 #[tokio::test]
551 async fn test_committer_processes_sequential_checkpoints_with_initial_watermark() {
552 let config = SequentialConfig::default();
553 let mut setup = setup_test(5, config, MockStore::default());
554
555 let watermark = setup.store.watermark(TestHandler::NAME);
557 assert!(watermark.is_none());
558
559 for i in 0..5 {
561 send_checkpoint(&mut setup, i).await;
562 }
563
564 tokio::time::sleep(Duration::from_millis(1000)).await;
566
567 let watermark = setup.store.watermark(TestHandler::NAME);
569 assert!(watermark.is_none());
570
571 for i in 5..8 {
572 send_checkpoint(&mut setup, i).await;
573 }
574
575 tokio::time::sleep(Duration::from_millis(1000)).await;
577
578 assert_eq!(setup.store.get_sequential_data(), vec![5, 6, 7]);
580
581 {
583 let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
584 assert_eq!(watermark.checkpoint_hi_inclusive, 7);
585 assert_eq!(watermark.tx_hi, 7);
586 }
587 }
588
589 #[tokio::test]
590 async fn test_committer_processes_out_of_order_checkpoints() {
591 let config = SequentialConfig {
592 committer: CommitterConfig::default(),
593 checkpoint_lag: 0, ..Default::default()
595 };
596 let mut setup = setup_test(0, config, MockStore::default());
597
598 for i in [1, 0, 2] {
600 send_checkpoint(&mut setup, i).await;
601 }
602
603 tokio::time::sleep(Duration::from_millis(200)).await;
605
606 assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2]);
608
609 {
611 let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
612 assert_eq!(watermark.checkpoint_hi_inclusive, 2);
613 assert_eq!(watermark.tx_hi, 2);
614 }
615
616 let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
618 assert_eq!(commit_hi.0, "test", "Pipeline name should be 'test'");
619 assert_eq!(commit_hi.1, 3, "commit_hi should be 3 (checkpoint 2 + 1)");
620 }
621
622 #[tokio::test]
623 async fn test_committer_commit_up_to_max_batch_checkpoints() {
624 let config = SequentialConfig {
625 committer: CommitterConfig::default(),
626 checkpoint_lag: 0, ..Default::default()
628 };
629 let mut setup = setup_test(0, config, MockStore::default());
630
631 for i in 0..4 {
633 send_checkpoint(&mut setup, i).await;
634 }
635
636 tokio::time::sleep(Duration::from_millis(200)).await;
638
639 let commit_hi1 = setup.commit_hi_rx.recv().await.unwrap();
641 assert_eq!(
642 commit_hi1.1, 3,
643 "First commit_hi should be 3 (checkpoint 2 + 1, highest processed of first batch)"
644 );
645
646 let commit_hi2 = setup.commit_hi_rx.recv().await.unwrap();
647 assert_eq!(
648 commit_hi2.1, 4,
649 "Second commit_hi should be 4 (checkpoint 3 + 1, highest processed of second batch)"
650 );
651
652 assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2, 3]);
654 }
655
656 #[tokio::test]
657 async fn test_committer_does_not_commit_until_checkpoint_lag() {
658 let config = SequentialConfig {
659 committer: CommitterConfig::default(),
660 checkpoint_lag: 1, ..Default::default()
662 };
663 let mut setup = setup_test(0, config, MockStore::default());
664
665 for i in 0..3 {
667 send_checkpoint(&mut setup, i).await;
668 }
669
670 tokio::time::sleep(Duration::from_millis(200)).await;
672
673 assert_eq!(setup.store.get_sequential_data(), vec![0, 1]);
675 let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
676 assert_eq!(commit_hi.1, 2, "commit_hi should be 2 (checkpoint 1 + 1)");
677
678 send_checkpoint(&mut setup, 3).await;
680
681 tokio::time::sleep(Duration::from_millis(1000)).await;
683
684 assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2]);
686 let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
687 assert_eq!(commit_hi.1, 3, "commit_hi should be 3 (checkpoint 2 + 1)");
688 }
689
690 #[tokio::test]
691 async fn test_committer_commits_eagerly() {
692 let config = SequentialConfig {
693 committer: CommitterConfig {
694 collect_interval_ms: 4_000, ..Default::default()
696 },
697 checkpoint_lag: 0, ..Default::default()
699 };
700 let mut setup = setup_test(0, config, MockStore::default());
701
702 tokio::time::sleep(Duration::from_millis(200)).await;
704
705 for i in 0..3 {
707 send_checkpoint(&mut setup, i).await;
708 }
709
710 assert_eq!(setup.store.get_sequential_data(), Vec::<u64>::new());
712
713 send_checkpoint(&mut setup, 3).await;
715
716 tokio::time::sleep(Duration::from_millis(200)).await;
718
719 assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2, 3]);
721 }
722
723 #[tokio::test]
724 async fn test_committer_cannot_commit_eagerly_due_to_checkpoint_lag() {
725 let config = SequentialConfig {
726 committer: CommitterConfig {
727 collect_interval_ms: 4_000, ..Default::default()
729 },
730 checkpoint_lag: 4, ..Default::default()
732 };
733 let mut setup = setup_test(0, config, MockStore::default());
734
735 tokio::time::sleep(Duration::from_millis(200)).await;
737
738 for i in 0..4 {
740 send_checkpoint(&mut setup, i).await;
741 }
742
743 tokio::time::sleep(Duration::from_millis(200)).await;
745
746 assert_eq!(setup.store.get_sequential_data(), Vec::<u64>::new());
748
749 send_checkpoint(&mut setup, 4).await;
751
752 tokio::time::sleep(Duration::from_millis(200)).await;
754
755 assert_eq!(setup.store.get_sequential_data(), vec![0]);
757 }
758
759 #[tokio::test]
760 async fn test_committer_retries_on_transaction_failure() {
761 let config = SequentialConfig {
762 committer: CommitterConfig {
763 collect_interval_ms: 1_000, ..Default::default()
765 },
766 checkpoint_lag: 0,
767 ..Default::default()
768 };
769
770 let store = MockStore::default().with_transaction_failures(1); let mut setup = setup_test(10, config, store);
774
775 send_checkpoint(&mut setup, 10).await;
777
778 tokio::time::sleep(Duration::from_millis(200)).await;
780
781 assert_eq!(setup.store.get_sequential_data(), Vec::<u64>::new());
783
784 tokio::time::sleep(Duration::from_millis(1_200)).await;
786
787 assert_eq!(setup.store.get_sequential_data(), vec![10]);
789
790 let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
792 assert_eq!(commit_hi.0, "test", "Pipeline name should be 'test'");
793 assert_eq!(
794 commit_hi.1, 11,
795 "commit_hi should be 11 (checkpoint 10 + 1)"
796 );
797 }
798}