sui_indexer_alt_framework/pipeline/concurrent/
commit_watermark.rs1use std::cmp::Ordering;
5use std::collections::BTreeMap;
6use std::collections::btree_map::Entry;
7use std::sync::Arc;
8
9use sui_futures::service::Service;
10use tokio::sync::mpsc;
11use tracing::debug;
12use tracing::error;
13use tracing::info;
14use tracing::warn;
15
16use crate::metrics::CheckpointLagMetricReporter;
17use crate::metrics::IndexerMetrics;
18use crate::pipeline::CommitterConfig;
19use crate::pipeline::WARN_PENDING_WATERMARKS;
20use crate::pipeline::WatermarkPart;
21use crate::pipeline::concurrent::Handler;
22use crate::pipeline::logging::WatermarkLogger;
23use crate::store::CommitterWatermark;
24use crate::store::Connection;
25use crate::store::Store;
26use crate::store::pipeline_task;
27
28pub(super) fn commit_watermark<H: Handler>(
47 mut next_checkpoint: u64,
48 config: CommitterConfig,
49 mut rx: mpsc::Receiver<Vec<WatermarkPart>>,
50 store: H::Store,
51 task: Option<String>,
52 metrics: Arc<IndexerMetrics>,
53) -> Service {
54 let pipeline_task = pipeline_task::<H::Store>(H::NAME, task.as_deref()).unwrap();
56 Service::new().spawn_aborting(async move {
57 let mut precommitted: BTreeMap<u64, WatermarkPart> = BTreeMap::new();
63
64 let mut logger = WatermarkLogger::new("concurrent_committer");
67
68 let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
69 &metrics.watermarked_checkpoint_timestamp_lag,
70 &metrics.latest_watermarked_checkpoint_timestamp_lag_ms,
71 &metrics.watermark_checkpoint_in_db,
72 );
73
74 info!(
75 pipeline = H::NAME,
76 next_checkpoint, "Starting commit watermark task"
77 );
78
79 let mut next_wake = tokio::time::Instant::now();
80 let mut pending_watermark = None;
81
82 loop {
83 let mut should_write_db = false;
84
85 tokio::select! {
86 () = tokio::time::sleep_until(next_wake) => {
87 next_wake = config.watermark_interval_with_jitter();
90 should_write_db = true;
91 }
92 Some(parts) = rx.recv() => {
93 for part in parts {
94 match precommitted.entry(part.checkpoint()) {
95 Entry::Vacant(entry) => {
96 entry.insert(part);
97 }
98
99 Entry::Occupied(mut entry) => {
100 entry.get_mut().add(part);
101 }
102 }
103 }
104 }
105 }
106
107 let guard = metrics
111 .watermark_gather_latency
112 .with_label_values(&[H::NAME])
113 .start_timer();
114
115 while let Some(pending) = precommitted.first_entry() {
116 let part = pending.get();
117
118 if !part.is_complete() {
120 break;
121 }
122
123 match next_checkpoint.cmp(&part.watermark.checkpoint_hi_inclusive) {
124 Ordering::Less => break,
126
127 Ordering::Equal => {
129 pending_watermark = Some(pending.remove().watermark);
130 next_checkpoint += 1;
131 }
132
133 Ordering::Greater => {
138 metrics
140 .total_watermarks_out_of_order
141 .with_label_values(&[H::NAME])
142 .inc();
143
144 pending.remove();
145 }
146 }
147 }
148
149 let elapsed = guard.stop_and_record();
150
151 if let Some(ref watermark) = pending_watermark {
152 metrics
153 .watermark_epoch
154 .with_label_values(&[H::NAME])
155 .set(watermark.epoch_hi_inclusive as i64);
156
157 metrics
158 .watermark_checkpoint
159 .with_label_values(&[H::NAME])
160 .set(watermark.checkpoint_hi_inclusive as i64);
161
162 metrics
163 .watermark_transaction
164 .with_label_values(&[H::NAME])
165 .set(watermark.tx_hi as i64);
166
167 metrics
168 .watermark_timestamp_ms
169 .with_label_values(&[H::NAME])
170 .set(watermark.timestamp_ms_hi_inclusive as i64);
171
172 debug!(
173 pipeline = H::NAME,
174 elapsed_ms = elapsed * 1000.0,
175 watermark = watermark.checkpoint_hi_inclusive,
176 timestamp = %watermark.timestamp(),
177 pending = precommitted.len(),
178 "Gathered watermarks",
179 );
180 }
181
182 if precommitted.len() > WARN_PENDING_WATERMARKS {
183 warn!(
184 pipeline = H::NAME,
185 pending = precommitted.len(),
186 "Pipeline has a large number of pending commit watermarks",
187 );
188 }
189
190 if should_write_db
192 && let Some(watermark) = pending_watermark.take()
193 && write_watermark::<H>(
194 &store,
195 &pipeline_task,
196 &watermark,
197 &mut logger,
198 &checkpoint_lag_reporter,
199 &metrics,
200 )
201 .await
202 .is_err()
203 {
204 pending_watermark = Some(watermark);
205 }
206
207 if rx.is_closed() && rx.is_empty() {
208 info!(pipeline = H::NAME, "Committer closed channel");
209 break;
210 }
211 }
212
213 if let Some(watermark) = pending_watermark
214 && write_watermark::<H>(
215 &store,
216 &pipeline_task,
217 &watermark,
218 &mut logger,
219 &checkpoint_lag_reporter,
220 &metrics,
221 )
222 .await
223 .is_err()
224 {
225 warn!(
226 pipeline = H::NAME,
227 ?watermark,
228 "Failed to write final watermark on shutdown, will not retry",
229 );
230 }
231
232 info!(pipeline = H::NAME, "Stopping committer watermark task");
233 Ok(())
234 })
235}
236
237async fn write_watermark<H: Handler>(
240 store: &H::Store,
241 pipeline_task: &str,
242 watermark: &CommitterWatermark,
243 logger: &mut WatermarkLogger,
244 checkpoint_lag_reporter: &CheckpointLagMetricReporter,
245 metrics: &IndexerMetrics,
246) -> Result<(), ()> {
247 let Ok(mut conn) = store.connect().await else {
248 warn!(
249 pipeline = H::NAME,
250 "Commit watermark task failed to get connection for DB"
251 );
252 return Err(());
253 };
254
255 let guard = metrics
256 .watermark_commit_latency
257 .with_label_values(&[H::NAME])
258 .start_timer();
259
260 match conn
263 .set_committer_watermark(pipeline_task, *watermark)
264 .await
265 {
266 Err(e) => {
267 let elapsed = guard.stop_and_record();
268 error!(
269 pipeline = H::NAME,
270 elapsed_ms = elapsed * 1000.0,
271 ?watermark,
272 "Error updating commit watermark: {e}",
273 );
274 Err(())
275 }
276
277 Ok(true) => {
278 let elapsed = guard.stop_and_record();
279
280 logger.log::<H>(watermark, elapsed);
281
282 checkpoint_lag_reporter.report_lag(
283 watermark.checkpoint_hi_inclusive,
284 watermark.timestamp_ms_hi_inclusive,
285 );
286
287 metrics
288 .watermark_epoch_in_db
289 .with_label_values(&[H::NAME])
290 .set(watermark.epoch_hi_inclusive as i64);
291
292 metrics
293 .watermark_transaction_in_db
294 .with_label_values(&[H::NAME])
295 .set(watermark.tx_hi as i64);
296
297 metrics
298 .watermark_timestamp_in_db_ms
299 .with_label_values(&[H::NAME])
300 .set(watermark.timestamp_ms_hi_inclusive as i64);
301
302 Ok(())
303 }
304 Ok(false) => Ok(()),
305 }
306}
307
308#[cfg(test)]
309mod tests {
310 use std::sync::Arc;
311 use std::time::Duration;
312
313 use async_trait::async_trait;
314 use sui_types::full_checkpoint_content::Checkpoint;
315 use tokio::sync::mpsc;
316
317 use crate::FieldCount;
318 use crate::metrics::IndexerMetrics;
319 use crate::mocks::store::FallibleMockConnection;
320 use crate::mocks::store::FallibleMockStore;
321 use crate::pipeline::CommitterConfig;
322 use crate::pipeline::Processor;
323 use crate::pipeline::WatermarkPart;
324 use crate::pipeline::concurrent::BatchStatus;
325 use crate::store::CommitterWatermark;
326
327 use super::*;
328
329 #[derive(Clone, FieldCount)]
330 pub struct StoredData;
331
332 pub struct DataPipeline;
333
334 #[async_trait]
335 impl Processor for DataPipeline {
336 const NAME: &'static str = "data";
337 type Value = StoredData;
338
339 async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
340 Ok(vec![])
341 }
342 }
343
344 #[async_trait]
345 impl Handler for DataPipeline {
346 type Store = FallibleMockStore;
347 type Batch = Vec<Self::Value>;
348
349 fn batch(
350 &self,
351 batch: &mut Self::Batch,
352 values: &mut std::vec::IntoIter<Self::Value>,
353 ) -> BatchStatus {
354 batch.extend(values);
355 BatchStatus::Pending
356 }
357
358 async fn commit<'a>(
359 &self,
360 _batch: &Self::Batch,
361 _conn: &mut FallibleMockConnection<'a>,
362 ) -> anyhow::Result<usize> {
363 Ok(0)
364 }
365 }
366
367 struct TestSetup {
368 store: FallibleMockStore,
369 watermark_tx: mpsc::Sender<Vec<WatermarkPart>>,
370 #[allow(unused)]
371 commit_watermark: Service,
372 }
373
374 fn setup_test<H: Handler<Store = FallibleMockStore>>(
375 config: CommitterConfig,
376 next_checkpoint: u64,
377 store: FallibleMockStore,
378 ) -> TestSetup {
379 let (watermark_tx, watermark_rx) = mpsc::channel(100);
380 let metrics = IndexerMetrics::new(None, &Default::default());
381
382 let store_clone = store.clone();
383
384 let commit_watermark = commit_watermark::<H>(
385 next_checkpoint,
386 config,
387 watermark_rx,
388 store_clone,
389 None,
390 metrics,
391 );
392
393 TestSetup {
394 store,
395 watermark_tx,
396 commit_watermark,
397 }
398 }
399
400 fn create_watermark_part_for_checkpoint(checkpoint: u64) -> WatermarkPart {
401 WatermarkPart {
402 watermark: CommitterWatermark {
403 checkpoint_hi_inclusive: checkpoint,
404 ..Default::default()
405 },
406 batch_rows: 1,
407 total_rows: 1,
408 }
409 }
410
411 #[tokio::test]
412 async fn test_basic_watermark_progression() {
413 let config = CommitterConfig::default();
414 let setup = setup_test::<DataPipeline>(config, 1, FallibleMockStore::default());
415
416 for cp in 1..4 {
418 let part = create_watermark_part_for_checkpoint(cp);
419 setup.watermark_tx.send(vec![part]).await.unwrap();
420 }
421
422 tokio::time::sleep(Duration::from_millis(100)).await;
424
425 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
427 assert_eq!(watermark.checkpoint_hi_inclusive, Some(3));
428 }
429
430 #[tokio::test]
431 async fn test_out_of_order_watermarks() {
432 let config = CommitterConfig::default();
433 let setup = setup_test::<DataPipeline>(config, 1, FallibleMockStore::default());
434
435 let parts = vec![
437 create_watermark_part_for_checkpoint(4),
438 create_watermark_part_for_checkpoint(2),
439 create_watermark_part_for_checkpoint(1),
440 ];
441 setup.watermark_tx.send(parts).await.unwrap();
442
443 tokio::time::sleep(Duration::from_millis(100)).await;
445
446 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
448 assert_eq!(watermark.checkpoint_hi_inclusive, Some(2));
449
450 setup
452 .watermark_tx
453 .send(vec![create_watermark_part_for_checkpoint(3)])
454 .await
455 .unwrap();
456
457 tokio::time::sleep(Duration::from_secs(1)).await;
459
460 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
462 assert_eq!(watermark.checkpoint_hi_inclusive, Some(4));
463 }
464
465 #[tokio::test]
466 async fn test_watermark_with_connection_failure() {
467 let config = CommitterConfig {
468 watermark_interval_ms: 1_000, ..Default::default()
470 };
471 let store = FallibleMockStore::default().with_connection_failures(1);
472 let setup = setup_test::<DataPipeline>(config, 1, store);
473
474 let part = create_watermark_part_for_checkpoint(1);
476 setup.watermark_tx.send(vec![part]).await.unwrap();
477
478 tokio::time::sleep(Duration::from_millis(200)).await;
480
481 let watermark = setup.store.watermark(DataPipeline::NAME);
483 assert!(watermark.is_none());
484
485 tokio::time::sleep(Duration::from_millis(1_200)).await;
487
488 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
490 assert_eq!(watermark.checkpoint_hi_inclusive, Some(1));
491 }
492
493 #[tokio::test]
494 async fn test_committer_retries_on_commit_watermark_failure() {
495 let config = CommitterConfig {
496 watermark_interval_ms: 1_000, ..Default::default()
498 };
499 let store = FallibleMockStore::default().with_commit_watermark_failures(1); let setup = setup_test::<DataPipeline>(config, 10, store);
502
503 let part = WatermarkPart {
504 watermark: CommitterWatermark {
505 checkpoint_hi_inclusive: 10,
506 ..Default::default()
507 },
508 batch_rows: 1,
509 total_rows: 1,
510 };
511 setup.watermark_tx.send(vec![part]).await.unwrap();
512
513 tokio::time::sleep(Duration::from_millis(200)).await;
515 let watermark = setup.store.watermark(DataPipeline::NAME);
516 assert!(watermark.is_none());
517
518 tokio::time::sleep(Duration::from_millis(1_200)).await;
520
521 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
523 assert_eq!(watermark.checkpoint_hi_inclusive, Some(10));
524 }
525
526 #[tokio::test]
527 async fn test_committer_retries_on_commit_watermark_failure_advances() {
528 let config = CommitterConfig {
529 watermark_interval_ms: 1_000, ..Default::default() };
532 let store = FallibleMockStore::default().with_commit_watermark_failures(1); let setup = setup_test::<DataPipeline>(config, 10, store);
534
535 let part = WatermarkPart {
536 watermark: CommitterWatermark {
537 checkpoint_hi_inclusive: 10,
538 ..Default::default()
539 },
540 batch_rows: 1,
541 total_rows: 1,
542 };
543 setup.watermark_tx.send(vec![part]).await.unwrap();
544
545 tokio::time::sleep(Duration::from_millis(200)).await;
547 let watermark = setup.store.watermark(DataPipeline::NAME);
548 assert!(watermark.is_none());
549
550 let part = WatermarkPart {
551 watermark: CommitterWatermark {
552 checkpoint_hi_inclusive: 11,
553 ..Default::default()
554 },
555 batch_rows: 1,
556 total_rows: 1,
557 };
558 setup.watermark_tx.send(vec![part]).await.unwrap();
559
560 tokio::time::sleep(Duration::from_millis(1_200)).await;
562
563 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
564 assert_eq!(watermark.checkpoint_hi_inclusive, Some(11));
565 }
566
567 #[tokio::test]
568 async fn test_incomplete_watermark() {
569 let config = CommitterConfig {
570 watermark_interval_ms: 1_000, ..Default::default()
572 };
573 let setup = setup_test::<DataPipeline>(config, 1, FallibleMockStore::default());
574
575 let part = WatermarkPart {
577 watermark: CommitterWatermark {
578 checkpoint_hi_inclusive: 1,
579 ..Default::default()
580 },
581 batch_rows: 1,
582 total_rows: 3,
583 };
584 setup.watermark_tx.send(vec![part.clone()]).await.unwrap();
585
586 tokio::time::sleep(Duration::from_millis(200)).await;
588
589 let watermark = setup.store.watermark(DataPipeline::NAME);
591 assert!(watermark.is_none());
592
593 setup
595 .watermark_tx
596 .send(vec![part.clone(), part.clone()])
597 .await
598 .unwrap();
599
600 tokio::time::sleep(Duration::from_millis(1_200)).await;
602
603 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
605 assert_eq!(watermark.checkpoint_hi_inclusive, Some(1));
606 }
607
608 #[tokio::test]
609 async fn test_no_initial_watermark() {
610 let config = CommitterConfig::default();
611 let setup = setup_test::<DataPipeline>(config, 0, FallibleMockStore::default());
612
613 setup
615 .watermark_tx
616 .send(vec![create_watermark_part_for_checkpoint(1)])
617 .await
618 .unwrap();
619
620 tokio::time::sleep(Duration::from_millis(200)).await;
622
623 let watermark = setup.store.watermark(DataPipeline::NAME);
625 assert!(watermark.is_none());
626
627 setup
629 .watermark_tx
630 .send(vec![create_watermark_part_for_checkpoint(0)])
631 .await
632 .unwrap();
633
634 tokio::time::sleep(Duration::from_millis(1200)).await;
636
637 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
639 assert_eq!(watermark.checkpoint_hi_inclusive, Some(1));
640 }
641}