sui_indexer_alt_framework/pipeline/concurrent/
commit_watermark.rs1use std::{
5 cmp::Ordering,
6 collections::{BTreeMap, btree_map::Entry},
7 sync::Arc,
8};
9
10use tokio::{
11 sync::mpsc,
12 task::JoinHandle,
13 time::{MissedTickBehavior, interval},
14};
15use tokio_util::sync::CancellationToken;
16use tracing::{debug, error, info, warn};
17
18use crate::{
19 metrics::{CheckpointLagMetricReporter, IndexerMetrics},
20 pipeline::{CommitterConfig, WARN_PENDING_WATERMARKS, WatermarkPart, logging::WatermarkLogger},
21 store::{Connection, Store},
22};
23
24use super::Handler;
25
26use crate::store::CommitterWatermark;
27
28fn gather_next_watermark(
32 precommitted: &mut BTreeMap<u64, WatermarkPart>,
33 next_checkpoint: &mut u64,
34 metrics: Option<(&Arc<IndexerMetrics>, &'static str)>,
35) -> Option<CommitterWatermark> {
36 let mut watermark = None;
37
38 while let Some(pending) = precommitted.first_entry() {
39 let part = pending.get();
40
41 if !part.is_complete() {
42 break;
43 }
44
45 match (*next_checkpoint).cmp(&part.watermark.checkpoint_hi_inclusive) {
46 Ordering::Less => break,
48
49 Ordering::Equal => {
51 watermark = Some(pending.remove().watermark);
52 *next_checkpoint += 1;
53 }
54
55 Ordering::Greater => {
60 if let Some((metrics, pipeline_name)) = metrics {
61 metrics
62 .total_watermarks_out_of_order
63 .with_label_values(&[pipeline_name])
64 .inc();
65 }
66 pending.remove();
67 }
68 }
69 }
70
71 watermark
72}
73
74pub(super) fn commit_watermark<H: Handler + 'static>(
95 mut next_checkpoint: u64,
96 config: CommitterConfig,
97 skip_watermark: bool,
98 mut rx: mpsc::Receiver<Vec<WatermarkPart>>,
99 store: H::Store,
100 metrics: Arc<IndexerMetrics>,
101 cancel: CancellationToken,
102) -> JoinHandle<()> {
103 tokio::spawn(async move {
104 if skip_watermark {
105 info!(pipeline = H::NAME, "Skipping commit watermark task");
106 return;
107 }
108
109 let mut poll = interval(config.watermark_interval());
110 poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
111
112 let mut precommitted: BTreeMap<u64, WatermarkPart> = BTreeMap::new();
118
119 let mut logger = WatermarkLogger::new("concurrent_committer");
122
123 let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
124 &metrics.watermarked_checkpoint_timestamp_lag,
125 &metrics.latest_watermarked_checkpoint_timestamp_lag_ms,
126 &metrics.watermark_checkpoint_in_db,
127 );
128
129 info!(
130 pipeline = H::NAME,
131 next_checkpoint, "Starting commit watermark task"
132 );
133
134 loop {
135 tokio::select! {
136 _ = cancel.cancelled() => {
137 info!(pipeline = H::NAME, "Shutdown received");
138 break;
139 }
140
141 _ = poll.tick() => {
142 if precommitted.len() > WARN_PENDING_WATERMARKS {
143 warn!(
144 pipeline = H::NAME,
145 pending = precommitted.len(),
146 "Pipeline has a large number of pending commit watermarks",
147 );
148 }
149
150 let Ok(mut conn) = store.connect().await else {
151 warn!(pipeline = H::NAME, "Commit watermark task failed to get connection for DB");
152 continue;
153 };
154
155 let guard = metrics
157 .watermark_gather_latency
158 .with_label_values(&[H::NAME])
159 .start_timer();
160
161 let watermark = gather_next_watermark(
164 &mut precommitted,
165 &mut next_checkpoint,
166 Some((&metrics, H::NAME)),
167 );
168
169 let elapsed = guard.stop_and_record();
170
171 if let Some(watermark) = watermark {
172 metrics
173 .watermark_epoch
174 .with_label_values(&[H::NAME])
175 .set(watermark.epoch_hi_inclusive as i64);
176
177 metrics
178 .watermark_checkpoint
179 .with_label_values(&[H::NAME])
180 .set(watermark.checkpoint_hi_inclusive as i64);
181
182 metrics
183 .watermark_transaction
184 .with_label_values(&[H::NAME])
185 .set(watermark.tx_hi as i64);
186
187 metrics
188 .watermark_timestamp_ms
189 .with_label_values(&[H::NAME])
190 .set(watermark.timestamp_ms_hi_inclusive as i64);
191
192 debug!(
193 pipeline = H::NAME,
194 elapsed_ms = elapsed * 1000.0,
195 watermark = watermark.checkpoint_hi_inclusive,
196 timestamp = %watermark.timestamp(),
197 pending = precommitted.len(),
198 "Gathered watermarks",
199 );
200
201 let guard = metrics
202 .watermark_commit_latency
203 .with_label_values(&[H::NAME])
204 .start_timer();
205
206 match conn.set_committer_watermark(
209 H::NAME,
210 watermark,
211 ).await {
212 Err(e) => {
215 let elapsed = guard.stop_and_record();
216 error!(
217 pipeline = H::NAME,
218 elapsed_ms = elapsed * 1000.0,
219 ?watermark,
220 "Error updating commit watermark: {e}",
221 );
222 }
223
224 Ok(true) => {
225 let elapsed = guard.stop_and_record();
226
227 logger.log::<H>(&watermark, elapsed);
228
229 checkpoint_lag_reporter.report_lag(
230 watermark.checkpoint_hi_inclusive,
231 watermark.timestamp_ms_hi_inclusive
232 );
233
234 metrics
235 .watermark_epoch_in_db
236 .with_label_values(&[H::NAME])
237 .set(watermark.epoch_hi_inclusive as i64);
238
239 metrics
240 .watermark_transaction_in_db
241 .with_label_values(&[H::NAME])
242 .set(watermark.tx_hi as i64);
243
244 metrics
245 .watermark_timestamp_in_db_ms
246 .with_label_values(&[H::NAME])
247 .set(watermark.timestamp_ms_hi_inclusive as i64);
248 }
249 Ok(false) => {}
250 }
251 }
252
253 if rx.is_closed() && rx.is_empty() {
254 info!(pipeline = H::NAME, "Committer closed channel");
255 break;
256 }
257 }
258
259 Some(parts) = rx.recv() => {
260 for part in parts {
261 match precommitted.entry(part.checkpoint()) {
262 Entry::Vacant(entry) => {
263 entry.insert(part);
264 }
265
266 Entry::Occupied(mut entry) => {
267 entry.get_mut().add(part);
268 }
269 }
270 }
271 }
272 }
273 }
274
275 info!(
276 pipeline = H::NAME,
277 "Attempting final watermark sync before shutdown"
278 );
279
280 match store.connect().await {
281 Ok(mut conn) => {
282 let watermark = gather_next_watermark(
283 &mut precommitted,
284 &mut next_checkpoint,
285 Some((&metrics, H::NAME)),
286 );
287
288 if let Some(watermark) = watermark {
289 match conn.set_committer_watermark(H::NAME, watermark).await {
290 Ok(true) => {
291 info!(
292 pipeline = H::NAME,
293 checkpoint = watermark.checkpoint_hi_inclusive,
294 "Successfully wrote final watermark on shutdown"
295 );
296 }
297 Ok(false) => {
298 info!(
299 pipeline = H::NAME,
300 checkpoint = watermark.checkpoint_hi_inclusive,
301 "Final watermark did not advance on shutdown"
302 );
303 }
304 Err(e) => {
305 warn!(
306 pipeline = H::NAME,
307 ?watermark,
308 "Failed to write final watermark on shutdown: {e}"
309 );
310 }
311 }
312 } else {
313 info!(pipeline = H::NAME, "No watermark to write on shutdown");
314 }
315 }
316 Err(_) => {
317 warn!(
318 pipeline = H::NAME,
319 "Failed to get connection for final watermark sync"
320 );
321 }
322 }
323
324 info!(pipeline = H::NAME, "Stopping committer watermark task");
325 })
326}
327
328#[cfg(test)]
329mod tests {
330 use std::sync::Arc;
331
332 use async_trait::async_trait;
333 use sui_types::full_checkpoint_content::CheckpointData;
334 use tokio::sync::mpsc;
335 use tokio_util::sync::CancellationToken;
336
337 use crate::{
338 FieldCount,
339 metrics::IndexerMetrics,
340 mocks::store::*,
341 pipeline::{CommitterConfig, Processor, WatermarkPart},
342 store::CommitterWatermark,
343 };
344
345 use super::*;
346
347 #[derive(Clone, FieldCount)]
348 pub struct StoredData;
349
350 pub struct DataPipeline;
351
352 #[async_trait]
353 impl Processor for DataPipeline {
354 const NAME: &'static str = "data";
355 type Value = StoredData;
356
357 async fn process(
358 &self,
359 _checkpoint: &Arc<CheckpointData>,
360 ) -> anyhow::Result<Vec<Self::Value>> {
361 Ok(vec![])
362 }
363 }
364
365 #[async_trait]
366 impl Handler for DataPipeline {
367 type Store = MockStore;
368
369 async fn commit<'a>(
370 _values: &[StoredData],
371 _conn: &mut MockConnection<'a>,
372 ) -> anyhow::Result<usize> {
373 Ok(0)
374 }
375 }
376
377 struct TestSetup {
378 store: MockStore,
379 watermark_tx: mpsc::Sender<Vec<WatermarkPart>>,
380 commit_watermark_handle: JoinHandle<()>,
381 cancel: CancellationToken,
382 }
383
384 fn setup_test<H: Handler<Store = MockStore> + 'static>(
385 config: CommitterConfig,
386 next_checkpoint: u64,
387 store: MockStore,
388 ) -> TestSetup {
389 let (watermark_tx, watermark_rx) = mpsc::channel(100);
390 let metrics = IndexerMetrics::new(None, &Default::default());
391 let cancel = CancellationToken::new();
392
393 let store_clone = store.clone();
394 let cancel_clone = cancel.clone();
395
396 let commit_watermark_handle = commit_watermark::<H>(
397 next_checkpoint,
398 config,
399 false,
400 watermark_rx,
401 store_clone,
402 metrics,
403 cancel_clone,
404 );
405
406 TestSetup {
407 store,
408 watermark_tx,
409 commit_watermark_handle,
410 cancel,
411 }
412 }
413
414 fn create_watermark_part_for_checkpoint(checkpoint: u64) -> WatermarkPart {
415 WatermarkPart {
416 watermark: CommitterWatermark {
417 checkpoint_hi_inclusive: checkpoint,
418 ..Default::default()
419 },
420 batch_rows: 1,
421 total_rows: 1,
422 }
423 }
424
425 #[tokio::test]
426 async fn test_basic_watermark_progression() {
427 let config = CommitterConfig::default();
428 let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
429
430 for cp in 1..4 {
432 let part = create_watermark_part_for_checkpoint(cp);
433 setup.watermark_tx.send(vec![part]).await.unwrap();
434 }
435
436 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
438
439 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
441 assert_eq!(watermark.checkpoint_hi_inclusive, 3);
442
443 setup.cancel.cancel();
445 let _ = setup.commit_watermark_handle.await;
446 }
447
448 #[tokio::test]
449 async fn test_out_of_order_watermarks() {
450 let config = CommitterConfig::default();
451 let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
452
453 let parts = vec![
455 create_watermark_part_for_checkpoint(4),
456 create_watermark_part_for_checkpoint(2),
457 create_watermark_part_for_checkpoint(1),
458 ];
459 setup.watermark_tx.send(parts).await.unwrap();
460
461 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
463
464 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
466 assert_eq!(watermark.checkpoint_hi_inclusive, 2);
467
468 setup
470 .watermark_tx
471 .send(vec![create_watermark_part_for_checkpoint(3)])
472 .await
473 .unwrap();
474
475 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
477
478 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
480 assert_eq!(watermark.checkpoint_hi_inclusive, 4);
481
482 setup.cancel.cancel();
484 let _ = setup.commit_watermark_handle.await;
485 }
486
487 #[tokio::test]
488 async fn test_watermark_with_connection_failure() {
489 let config = CommitterConfig {
490 watermark_interval_ms: 1_000, ..Default::default()
492 };
493 let store = MockStore::default().with_connection_failures(1);
494 let setup = setup_test::<DataPipeline>(config, 1, store);
495
496 let part = create_watermark_part_for_checkpoint(1);
498 setup.watermark_tx.send(vec![part]).await.unwrap();
499
500 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
502
503 let watermark = setup.store.watermark(DataPipeline::NAME);
505 assert!(watermark.is_none());
506
507 tokio::time::sleep(tokio::time::Duration::from_millis(1_200)).await;
509
510 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
512 assert_eq!(watermark.checkpoint_hi_inclusive, 1);
513
514 setup.cancel.cancel();
516 let _ = setup.commit_watermark_handle.await;
517 }
518
519 #[tokio::test]
520 async fn test_committer_retries_on_commit_watermark_failure() {
521 let config = CommitterConfig {
522 watermark_interval_ms: 1_000, ..Default::default()
524 };
525 let store = MockStore::default().with_commit_watermark_failures(1); let setup = setup_test::<DataPipeline>(config, 10, store);
528
529 let part = WatermarkPart {
530 watermark: CommitterWatermark {
531 checkpoint_hi_inclusive: 10,
532 ..Default::default()
533 },
534 batch_rows: 1,
535 total_rows: 1,
536 };
537 setup.watermark_tx.send(vec![part]).await.unwrap();
538
539 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
541 let watermark = setup.store.watermark(DataPipeline::NAME);
542 assert!(watermark.is_none());
543
544 tokio::time::sleep(tokio::time::Duration::from_millis(1_200)).await;
546
547 let watermark = setup.store.watermark(DataPipeline::NAME);
549 assert!(watermark.is_none());
550
551 setup.cancel.cancel();
553 let _ = setup.commit_watermark_handle.await;
554 }
555
556 #[tokio::test]
557 async fn test_committer_retries_on_commit_watermark_failure_advances() {
558 let config = CommitterConfig {
559 watermark_interval_ms: 1_000, ..Default::default()
561 };
562 let store = MockStore::default().with_commit_watermark_failures(1); let setup = setup_test::<DataPipeline>(config, 10, store);
565
566 let part = WatermarkPart {
567 watermark: CommitterWatermark {
568 checkpoint_hi_inclusive: 10,
569 ..Default::default()
570 },
571 batch_rows: 1,
572 total_rows: 1,
573 };
574 setup.watermark_tx.send(vec![part]).await.unwrap();
575
576 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
578 let watermark = setup.store.watermark(DataPipeline::NAME);
579 assert!(watermark.is_none());
580
581 let part = WatermarkPart {
582 watermark: CommitterWatermark {
583 checkpoint_hi_inclusive: 11,
584 ..Default::default()
585 },
586 batch_rows: 1,
587 total_rows: 1,
588 };
589 setup.watermark_tx.send(vec![part]).await.unwrap();
590
591 tokio::time::sleep(tokio::time::Duration::from_millis(1_200)).await;
593
594 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
595 assert_eq!(watermark.checkpoint_hi_inclusive, 11);
596
597 setup.cancel.cancel();
599 let _ = setup.commit_watermark_handle.await;
600 }
601
602 #[tokio::test]
603 async fn test_incomplete_watermark() {
604 let config = CommitterConfig {
605 watermark_interval_ms: 1_000, ..Default::default()
607 };
608 let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
609
610 let part = WatermarkPart {
612 watermark: CommitterWatermark {
613 checkpoint_hi_inclusive: 1,
614 ..Default::default()
615 },
616 batch_rows: 1,
617 total_rows: 3,
618 };
619 setup.watermark_tx.send(vec![part.clone()]).await.unwrap();
620
621 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
623
624 let watermark = setup.store.watermark(DataPipeline::NAME);
626 assert!(watermark.is_none());
627
628 setup
630 .watermark_tx
631 .send(vec![part.clone(), part.clone()])
632 .await
633 .unwrap();
634
635 tokio::time::sleep(tokio::time::Duration::from_millis(1_200)).await;
637
638 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
640 assert_eq!(watermark.checkpoint_hi_inclusive, 1);
641
642 setup.cancel.cancel();
644 let _ = setup.commit_watermark_handle.await;
645 }
646
647 #[tokio::test]
648 async fn test_no_initial_watermark() {
649 let config = CommitterConfig::default();
650 let setup = setup_test::<DataPipeline>(config, 0, MockStore::default());
651
652 setup
654 .watermark_tx
655 .send(vec![create_watermark_part_for_checkpoint(1)])
656 .await
657 .unwrap();
658
659 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
661
662 let watermark = setup.store.watermark(DataPipeline::NAME);
664 assert!(watermark.is_none());
665
666 setup
668 .watermark_tx
669 .send(vec![create_watermark_part_for_checkpoint(0)])
670 .await
671 .unwrap();
672
673 tokio::time::sleep(tokio::time::Duration::from_millis(1200)).await;
675
676 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
678 assert_eq!(watermark.checkpoint_hi_inclusive, 1);
679
680 setup.cancel.cancel();
682 let _ = setup.commit_watermark_handle.await;
683 }
684
685 #[tokio::test]
686 async fn test_final_watermark_sync_on_shutdown() {
687 let config = CommitterConfig {
688 watermark_interval_ms: u64::MAX,
690 ..Default::default()
691 };
692 let setup = setup_test::<DataPipeline>(config, 10, MockStore::default());
693
694 setup
695 .watermark_tx
696 .send(vec![create_watermark_part_for_checkpoint(10)])
697 .await
698 .unwrap();
699 setup
700 .watermark_tx
701 .send(vec![create_watermark_part_for_checkpoint(11)])
702 .await
703 .unwrap();
704
705 drop(setup.watermark_tx);
706
707 let _ = setup.commit_watermark_handle.await;
708
709 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
710 assert_eq!(watermark.checkpoint_hi_inclusive, 11);
711 }
712}