sui_indexer_alt_framework/pipeline/concurrent/
commit_watermark.rs1use std::cmp::Ordering;
5use std::collections::BTreeMap;
6use std::collections::btree_map::Entry;
7use std::sync::Arc;
8
9use sui_futures::service::Service;
10use tokio::sync::mpsc;
11use tracing::debug;
12use tracing::error;
13use tracing::info;
14use tracing::warn;
15
16use crate::metrics::CheckpointLagMetricReporter;
17use crate::metrics::IndexerMetrics;
18use crate::pipeline::CommitterConfig;
19use crate::pipeline::WARN_PENDING_WATERMARKS;
20use crate::pipeline::WatermarkPart;
21use crate::pipeline::concurrent::Handler;
22use crate::pipeline::logging::WatermarkLogger;
23use crate::store::CommitterWatermark;
24use crate::store::Connection;
25use crate::store::Store;
26use crate::store::pipeline_task;
27
28pub(super) fn commit_watermark<H: Handler + 'static>(
47 mut next_checkpoint: u64,
48 config: CommitterConfig,
49 mut rx: mpsc::Receiver<Vec<WatermarkPart>>,
50 store: H::Store,
51 task: Option<String>,
52 metrics: Arc<IndexerMetrics>,
53) -> Service {
54 let pipeline_task = pipeline_task::<H::Store>(H::NAME, task.as_deref()).unwrap();
56 Service::new().spawn_aborting(async move {
57 let mut precommitted: BTreeMap<u64, WatermarkPart> = BTreeMap::new();
63
64 let mut logger = WatermarkLogger::new("concurrent_committer");
67
68 let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
69 &metrics.watermarked_checkpoint_timestamp_lag,
70 &metrics.latest_watermarked_checkpoint_timestamp_lag_ms,
71 &metrics.watermark_checkpoint_in_db,
72 );
73
74 info!(
75 pipeline = H::NAME,
76 next_checkpoint, "Starting commit watermark task"
77 );
78
79 let mut next_wake = tokio::time::Instant::now();
80 let mut pending_watermark = None;
81
82 loop {
83 let mut should_write_db = false;
84
85 tokio::select! {
86 () = tokio::time::sleep_until(next_wake) => {
87 next_wake = config.watermark_interval_with_jitter();
90 should_write_db = true;
91 }
92 Some(parts) = rx.recv() => {
93 for part in parts {
94 match precommitted.entry(part.checkpoint()) {
95 Entry::Vacant(entry) => {
96 entry.insert(part);
97 }
98
99 Entry::Occupied(mut entry) => {
100 entry.get_mut().add(part);
101 }
102 }
103 }
104 }
105 }
106
107 let guard = metrics
111 .watermark_gather_latency
112 .with_label_values(&[H::NAME])
113 .start_timer();
114
115 while let Some(pending) = precommitted.first_entry() {
116 let part = pending.get();
117
118 if !part.is_complete() {
120 break;
121 }
122
123 match next_checkpoint.cmp(&part.watermark.checkpoint_hi_inclusive) {
124 Ordering::Less => break,
126
127 Ordering::Equal => {
129 pending_watermark = Some(pending.remove().watermark);
130 next_checkpoint += 1;
131 }
132
133 Ordering::Greater => {
138 metrics
140 .total_watermarks_out_of_order
141 .with_label_values(&[H::NAME])
142 .inc();
143
144 pending.remove();
145 }
146 }
147 }
148
149 let elapsed = guard.stop_and_record();
150
151 if let Some(ref watermark) = pending_watermark {
152 metrics
153 .watermark_epoch
154 .with_label_values(&[H::NAME])
155 .set(watermark.epoch_hi_inclusive as i64);
156
157 metrics
158 .watermark_checkpoint
159 .with_label_values(&[H::NAME])
160 .set(watermark.checkpoint_hi_inclusive as i64);
161
162 metrics
163 .watermark_transaction
164 .with_label_values(&[H::NAME])
165 .set(watermark.tx_hi as i64);
166
167 metrics
168 .watermark_timestamp_ms
169 .with_label_values(&[H::NAME])
170 .set(watermark.timestamp_ms_hi_inclusive as i64);
171
172 debug!(
173 pipeline = H::NAME,
174 elapsed_ms = elapsed * 1000.0,
175 watermark = watermark.checkpoint_hi_inclusive,
176 timestamp = %watermark.timestamp(),
177 pending = precommitted.len(),
178 "Gathered watermarks",
179 );
180 }
181
182 if precommitted.len() > WARN_PENDING_WATERMARKS {
183 warn!(
184 pipeline = H::NAME,
185 pending = precommitted.len(),
186 "Pipeline has a large number of pending commit watermarks",
187 );
188 }
189
190 if should_write_db
192 && let Some(watermark) = pending_watermark.take()
193 && write_watermark::<H>(
194 &store,
195 &pipeline_task,
196 &watermark,
197 &mut logger,
198 &checkpoint_lag_reporter,
199 &metrics,
200 )
201 .await
202 .is_err()
203 {
204 pending_watermark = Some(watermark);
205 }
206
207 if rx.is_closed() && rx.is_empty() {
208 info!(pipeline = H::NAME, "Committer closed channel");
209 break;
210 }
211 }
212
213 if let Some(watermark) = pending_watermark
214 && write_watermark::<H>(
215 &store,
216 &pipeline_task,
217 &watermark,
218 &mut logger,
219 &checkpoint_lag_reporter,
220 &metrics,
221 )
222 .await
223 .is_err()
224 {
225 warn!(
226 pipeline = H::NAME,
227 ?watermark,
228 "Failed to write final watermark on shutdown, will not retry",
229 );
230 }
231
232 info!(pipeline = H::NAME, "Stopping committer watermark task");
233 Ok(())
234 })
235}
236
237async fn write_watermark<H: Handler>(
240 store: &H::Store,
241 pipeline_task: &str,
242 watermark: &CommitterWatermark,
243 logger: &mut WatermarkLogger,
244 checkpoint_lag_reporter: &CheckpointLagMetricReporter,
245 metrics: &IndexerMetrics,
246) -> Result<(), ()> {
247 let Ok(mut conn) = store.connect().await else {
248 warn!(
249 pipeline = H::NAME,
250 "Commit watermark task failed to get connection for DB"
251 );
252 return Err(());
253 };
254
255 let guard = metrics
256 .watermark_commit_latency
257 .with_label_values(&[H::NAME])
258 .start_timer();
259
260 match conn
263 .set_committer_watermark(pipeline_task, *watermark)
264 .await
265 {
266 Err(e) => {
267 let elapsed = guard.stop_and_record();
268 error!(
269 pipeline = H::NAME,
270 elapsed_ms = elapsed * 1000.0,
271 ?watermark,
272 "Error updating commit watermark: {e}",
273 );
274 Err(())
275 }
276
277 Ok(true) => {
278 let elapsed = guard.stop_and_record();
279
280 logger.log::<H>(watermark, elapsed);
281
282 checkpoint_lag_reporter.report_lag(
283 watermark.checkpoint_hi_inclusive,
284 watermark.timestamp_ms_hi_inclusive,
285 );
286
287 metrics
288 .watermark_epoch_in_db
289 .with_label_values(&[H::NAME])
290 .set(watermark.epoch_hi_inclusive as i64);
291
292 metrics
293 .watermark_transaction_in_db
294 .with_label_values(&[H::NAME])
295 .set(watermark.tx_hi as i64);
296
297 metrics
298 .watermark_timestamp_in_db_ms
299 .with_label_values(&[H::NAME])
300 .set(watermark.timestamp_ms_hi_inclusive as i64);
301
302 Ok(())
303 }
304 Ok(false) => Ok(()),
305 }
306}
307
308#[cfg(test)]
309mod tests {
310 use std::sync::Arc;
311 use std::time::Duration;
312
313 use async_trait::async_trait;
314 use sui_types::full_checkpoint_content::Checkpoint;
315 use tokio::sync::mpsc;
316
317 use crate::FieldCount;
318 use crate::metrics::IndexerMetrics;
319 use crate::mocks::store::*;
320 use crate::pipeline::CommitterConfig;
321 use crate::pipeline::Processor;
322 use crate::pipeline::WatermarkPart;
323 use crate::pipeline::concurrent::BatchStatus;
324 use crate::store::CommitterWatermark;
325
326 use super::*;
327
328 #[derive(Clone, FieldCount)]
329 pub struct StoredData;
330
331 pub struct DataPipeline;
332
333 #[async_trait]
334 impl Processor for DataPipeline {
335 const NAME: &'static str = "data";
336 type Value = StoredData;
337
338 async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
339 Ok(vec![])
340 }
341 }
342
343 #[async_trait]
344 impl Handler for DataPipeline {
345 type Store = MockStore;
346 type Batch = Vec<Self::Value>;
347
348 fn batch(
349 &self,
350 batch: &mut Self::Batch,
351 values: &mut std::vec::IntoIter<Self::Value>,
352 ) -> BatchStatus {
353 batch.extend(values);
354 BatchStatus::Pending
355 }
356
357 async fn commit<'a>(
358 &self,
359 _batch: &Self::Batch,
360 _conn: &mut MockConnection<'a>,
361 ) -> anyhow::Result<usize> {
362 Ok(0)
363 }
364 }
365
366 struct TestSetup {
367 store: MockStore,
368 watermark_tx: mpsc::Sender<Vec<WatermarkPart>>,
369 #[allow(unused)]
370 commit_watermark: Service,
371 }
372
373 fn setup_test<H: Handler<Store = MockStore> + 'static>(
374 config: CommitterConfig,
375 next_checkpoint: u64,
376 store: MockStore,
377 ) -> TestSetup {
378 let (watermark_tx, watermark_rx) = mpsc::channel(100);
379 let metrics = IndexerMetrics::new(None, &Default::default());
380
381 let store_clone = store.clone();
382
383 let commit_watermark = commit_watermark::<H>(
384 next_checkpoint,
385 config,
386 watermark_rx,
387 store_clone,
388 None,
389 metrics,
390 );
391
392 TestSetup {
393 store,
394 watermark_tx,
395 commit_watermark,
396 }
397 }
398
399 fn create_watermark_part_for_checkpoint(checkpoint: u64) -> WatermarkPart {
400 WatermarkPart {
401 watermark: CommitterWatermark {
402 checkpoint_hi_inclusive: checkpoint,
403 ..Default::default()
404 },
405 batch_rows: 1,
406 total_rows: 1,
407 }
408 }
409
410 #[tokio::test]
411 async fn test_basic_watermark_progression() {
412 let config = CommitterConfig::default();
413 let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
414
415 for cp in 1..4 {
417 let part = create_watermark_part_for_checkpoint(cp);
418 setup.watermark_tx.send(vec![part]).await.unwrap();
419 }
420
421 tokio::time::sleep(Duration::from_millis(100)).await;
423
424 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
426 assert_eq!(watermark.checkpoint_hi_inclusive, 3);
427 }
428
429 #[tokio::test]
430 async fn test_out_of_order_watermarks() {
431 let config = CommitterConfig::default();
432 let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
433
434 let parts = vec![
436 create_watermark_part_for_checkpoint(4),
437 create_watermark_part_for_checkpoint(2),
438 create_watermark_part_for_checkpoint(1),
439 ];
440 setup.watermark_tx.send(parts).await.unwrap();
441
442 tokio::time::sleep(Duration::from_millis(100)).await;
444
445 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
447 assert_eq!(watermark.checkpoint_hi_inclusive, 2);
448
449 setup
451 .watermark_tx
452 .send(vec![create_watermark_part_for_checkpoint(3)])
453 .await
454 .unwrap();
455
456 tokio::time::sleep(Duration::from_secs(1)).await;
458
459 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
461 assert_eq!(watermark.checkpoint_hi_inclusive, 4);
462 }
463
464 #[tokio::test]
465 async fn test_watermark_with_connection_failure() {
466 let config = CommitterConfig {
467 watermark_interval_ms: 1_000, ..Default::default()
469 };
470 let store = MockStore::default().with_connection_failures(1);
471 let setup = setup_test::<DataPipeline>(config, 1, store);
472
473 let part = create_watermark_part_for_checkpoint(1);
475 setup.watermark_tx.send(vec![part]).await.unwrap();
476
477 tokio::time::sleep(Duration::from_millis(200)).await;
479
480 let watermark = setup.store.watermark(DataPipeline::NAME);
482 assert!(watermark.is_none());
483
484 tokio::time::sleep(Duration::from_millis(1_200)).await;
486
487 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
489 assert_eq!(watermark.checkpoint_hi_inclusive, 1);
490 }
491
492 #[tokio::test]
493 async fn test_committer_retries_on_commit_watermark_failure() {
494 let config = CommitterConfig {
495 watermark_interval_ms: 1_000, ..Default::default()
497 };
498 let store = MockStore::default().with_commit_watermark_failures(1); let setup = setup_test::<DataPipeline>(config, 10, store);
501
502 let part = WatermarkPart {
503 watermark: CommitterWatermark {
504 checkpoint_hi_inclusive: 10,
505 ..Default::default()
506 },
507 batch_rows: 1,
508 total_rows: 1,
509 };
510 setup.watermark_tx.send(vec![part]).await.unwrap();
511
512 tokio::time::sleep(Duration::from_millis(200)).await;
514 let watermark = setup.store.watermark(DataPipeline::NAME);
515 assert!(watermark.is_none());
516
517 tokio::time::sleep(Duration::from_millis(1_200)).await;
519
520 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
522 assert_eq!(watermark.checkpoint_hi_inclusive, 10);
523 }
524
525 #[tokio::test]
526 async fn test_committer_retries_on_commit_watermark_failure_advances() {
527 let config = CommitterConfig {
528 watermark_interval_ms: 1_000, ..Default::default() };
531 let store = MockStore::default().with_commit_watermark_failures(1); let setup = setup_test::<DataPipeline>(config, 10, store);
533
534 let part = WatermarkPart {
535 watermark: CommitterWatermark {
536 checkpoint_hi_inclusive: 10,
537 ..Default::default()
538 },
539 batch_rows: 1,
540 total_rows: 1,
541 };
542 setup.watermark_tx.send(vec![part]).await.unwrap();
543
544 tokio::time::sleep(Duration::from_millis(200)).await;
546 let watermark = setup.store.watermark(DataPipeline::NAME);
547 assert!(watermark.is_none());
548
549 let part = WatermarkPart {
550 watermark: CommitterWatermark {
551 checkpoint_hi_inclusive: 11,
552 ..Default::default()
553 },
554 batch_rows: 1,
555 total_rows: 1,
556 };
557 setup.watermark_tx.send(vec![part]).await.unwrap();
558
559 tokio::time::sleep(Duration::from_millis(1_200)).await;
561
562 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
563 assert_eq!(watermark.checkpoint_hi_inclusive, 11);
564 }
565
566 #[tokio::test]
567 async fn test_incomplete_watermark() {
568 let config = CommitterConfig {
569 watermark_interval_ms: 1_000, ..Default::default()
571 };
572 let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
573
574 let part = WatermarkPart {
576 watermark: CommitterWatermark {
577 checkpoint_hi_inclusive: 1,
578 ..Default::default()
579 },
580 batch_rows: 1,
581 total_rows: 3,
582 };
583 setup.watermark_tx.send(vec![part.clone()]).await.unwrap();
584
585 tokio::time::sleep(Duration::from_millis(200)).await;
587
588 let watermark = setup.store.watermark(DataPipeline::NAME);
590 assert!(watermark.is_none());
591
592 setup
594 .watermark_tx
595 .send(vec![part.clone(), part.clone()])
596 .await
597 .unwrap();
598
599 tokio::time::sleep(Duration::from_millis(1_200)).await;
601
602 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
604 assert_eq!(watermark.checkpoint_hi_inclusive, 1);
605 }
606
607 #[tokio::test]
608 async fn test_no_initial_watermark() {
609 let config = CommitterConfig::default();
610 let setup = setup_test::<DataPipeline>(config, 0, MockStore::default());
611
612 setup
614 .watermark_tx
615 .send(vec![create_watermark_part_for_checkpoint(1)])
616 .await
617 .unwrap();
618
619 tokio::time::sleep(Duration::from_millis(200)).await;
621
622 let watermark = setup.store.watermark(DataPipeline::NAME);
624 assert!(watermark.is_none());
625
626 setup
628 .watermark_tx
629 .send(vec![create_watermark_part_for_checkpoint(0)])
630 .await
631 .unwrap();
632
633 tokio::time::sleep(Duration::from_millis(1200)).await;
635
636 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
638 assert_eq!(watermark.checkpoint_hi_inclusive, 1);
639 }
640}