sui_indexer_alt_framework/pipeline/concurrent/
commit_watermark.rs1use std::cmp::Ordering;
5use std::collections::BTreeMap;
6use std::collections::btree_map::Entry;
7use std::sync::Arc;
8
9use sui_futures::service::Service;
10use tokio::sync::mpsc;
11use tracing::debug;
12use tracing::error;
13use tracing::info;
14use tracing::warn;
15
16use crate::metrics::CheckpointLagMetricReporter;
17use crate::metrics::IndexerMetrics;
18use crate::pipeline::CommitterConfig;
19use crate::pipeline::WARN_PENDING_WATERMARKS;
20use crate::pipeline::WatermarkPart;
21use crate::pipeline::concurrent::Handler;
22use crate::pipeline::logging::WatermarkLogger;
23use crate::store::CommitterWatermark;
24use crate::store::Connection;
25use crate::store::Store;
26use crate::store::pipeline_task;
27
28pub(super) fn commit_watermark<H: Handler + 'static>(
47 mut next_checkpoint: u64,
48 config: CommitterConfig,
49 mut rx: mpsc::Receiver<Vec<WatermarkPart>>,
50 commit_hi_tx: mpsc::UnboundedSender<(&'static str, u64)>,
51 store: H::Store,
52 task: Option<String>,
53 metrics: Arc<IndexerMetrics>,
54) -> Service {
55 let pipeline_task = pipeline_task::<H::Store>(H::NAME, task.as_deref()).unwrap();
57 Service::new().spawn_aborting(async move {
58 let mut precommitted: BTreeMap<u64, WatermarkPart> = BTreeMap::new();
64
65 let mut logger = WatermarkLogger::new("concurrent_committer");
68
69 let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
70 &metrics.watermarked_checkpoint_timestamp_lag,
71 &metrics.latest_watermarked_checkpoint_timestamp_lag_ms,
72 &metrics.watermark_checkpoint_in_db,
73 );
74
75 info!(
76 pipeline = H::NAME,
77 next_checkpoint, "Starting commit watermark task"
78 );
79
80 let mut next_wake = tokio::time::Instant::now();
81 let mut pending_watermark = None;
82
83 loop {
84 let mut should_write_db = false;
85
86 tokio::select! {
87 () = tokio::time::sleep_until(next_wake) => {
88 next_wake = config.watermark_interval_with_jitter();
91 should_write_db = true;
92 }
93 Some(parts) = rx.recv() => {
94 for part in parts {
95 match precommitted.entry(part.checkpoint()) {
96 Entry::Vacant(entry) => {
97 entry.insert(part);
98 }
99
100 Entry::Occupied(mut entry) => {
101 entry.get_mut().add(part);
102 }
103 }
104 }
105 }
106 }
107
108 let guard = metrics
112 .watermark_gather_latency
113 .with_label_values(&[H::NAME])
114 .start_timer();
115
116 while let Some(pending) = precommitted.first_entry() {
117 let part = pending.get();
118
119 if !part.is_complete() {
121 break;
122 }
123
124 match next_checkpoint.cmp(&part.watermark.checkpoint_hi_inclusive) {
125 Ordering::Less => break,
127
128 Ordering::Equal => {
130 pending_watermark = Some(pending.remove().watermark);
131 next_checkpoint += 1;
132 }
133
134 Ordering::Greater => {
139 metrics
141 .total_watermarks_out_of_order
142 .with_label_values(&[H::NAME])
143 .inc();
144
145 pending.remove();
146 }
147 }
148 }
149
150 let elapsed = guard.stop_and_record();
151
152 if let Some(ref watermark) = pending_watermark {
153 let _ = commit_hi_tx.send((H::NAME, next_checkpoint));
154
155 metrics
156 .watermark_epoch
157 .with_label_values(&[H::NAME])
158 .set(watermark.epoch_hi_inclusive as i64);
159
160 metrics
161 .watermark_checkpoint
162 .with_label_values(&[H::NAME])
163 .set(watermark.checkpoint_hi_inclusive as i64);
164
165 metrics
166 .watermark_transaction
167 .with_label_values(&[H::NAME])
168 .set(watermark.tx_hi as i64);
169
170 metrics
171 .watermark_timestamp_ms
172 .with_label_values(&[H::NAME])
173 .set(watermark.timestamp_ms_hi_inclusive as i64);
174
175 debug!(
176 pipeline = H::NAME,
177 elapsed_ms = elapsed * 1000.0,
178 watermark = watermark.checkpoint_hi_inclusive,
179 timestamp = %watermark.timestamp(),
180 pending = precommitted.len(),
181 "Gathered watermarks",
182 );
183 }
184
185 if precommitted.len() > WARN_PENDING_WATERMARKS {
186 warn!(
187 pipeline = H::NAME,
188 pending = precommitted.len(),
189 "Pipeline has a large number of pending commit watermarks",
190 );
191 }
192
193 if should_write_db
195 && let Some(watermark) = pending_watermark.take()
196 && write_watermark::<H>(
197 &store,
198 &pipeline_task,
199 &watermark,
200 &mut logger,
201 &checkpoint_lag_reporter,
202 &metrics,
203 )
204 .await
205 .is_err()
206 {
207 pending_watermark = Some(watermark);
208 }
209
210 if rx.is_closed() && rx.is_empty() {
211 info!(pipeline = H::NAME, "Committer closed channel");
212 break;
213 }
214 }
215
216 if let Some(watermark) = pending_watermark
217 && write_watermark::<H>(
218 &store,
219 &pipeline_task,
220 &watermark,
221 &mut logger,
222 &checkpoint_lag_reporter,
223 &metrics,
224 )
225 .await
226 .is_err()
227 {
228 warn!(
229 pipeline = H::NAME,
230 ?watermark,
231 "Failed to write final watermark on shutdown, will not retry",
232 );
233 }
234
235 info!(pipeline = H::NAME, "Stopping committer watermark task");
236 Ok(())
237 })
238}
239
240async fn write_watermark<H: Handler>(
243 store: &H::Store,
244 pipeline_task: &str,
245 watermark: &CommitterWatermark,
246 logger: &mut WatermarkLogger,
247 checkpoint_lag_reporter: &CheckpointLagMetricReporter,
248 metrics: &IndexerMetrics,
249) -> Result<(), ()> {
250 let Ok(mut conn) = store.connect().await else {
251 warn!(
252 pipeline = H::NAME,
253 "Commit watermark task failed to get connection for DB"
254 );
255 return Err(());
256 };
257
258 let guard = metrics
259 .watermark_commit_latency
260 .with_label_values(&[H::NAME])
261 .start_timer();
262
263 match conn
266 .set_committer_watermark(pipeline_task, *watermark)
267 .await
268 {
269 Err(e) => {
270 let elapsed = guard.stop_and_record();
271 error!(
272 pipeline = H::NAME,
273 elapsed_ms = elapsed * 1000.0,
274 ?watermark,
275 "Error updating commit watermark: {e}",
276 );
277 Err(())
278 }
279
280 Ok(true) => {
281 let elapsed = guard.stop_and_record();
282
283 logger.log::<H>(watermark, elapsed);
284
285 checkpoint_lag_reporter.report_lag(
286 watermark.checkpoint_hi_inclusive,
287 watermark.timestamp_ms_hi_inclusive,
288 );
289
290 metrics
291 .watermark_epoch_in_db
292 .with_label_values(&[H::NAME])
293 .set(watermark.epoch_hi_inclusive as i64);
294
295 metrics
296 .watermark_transaction_in_db
297 .with_label_values(&[H::NAME])
298 .set(watermark.tx_hi as i64);
299
300 metrics
301 .watermark_timestamp_in_db_ms
302 .with_label_values(&[H::NAME])
303 .set(watermark.timestamp_ms_hi_inclusive as i64);
304
305 Ok(())
306 }
307 Ok(false) => Ok(()),
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use std::sync::Arc;
314 use std::time::Duration;
315
316 use async_trait::async_trait;
317 use sui_types::full_checkpoint_content::Checkpoint;
318 use tokio::sync::mpsc;
319
320 use crate::FieldCount;
321 use crate::metrics::IndexerMetrics;
322 use crate::mocks::store::*;
323 use crate::pipeline::CommitterConfig;
324 use crate::pipeline::Processor;
325 use crate::pipeline::WatermarkPart;
326 use crate::pipeline::concurrent::BatchStatus;
327 use crate::store::CommitterWatermark;
328
329 use super::*;
330
331 #[derive(Clone, FieldCount)]
332 pub struct StoredData;
333
334 pub struct DataPipeline;
335
336 #[async_trait]
337 impl Processor for DataPipeline {
338 const NAME: &'static str = "data";
339 type Value = StoredData;
340
341 async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
342 Ok(vec![])
343 }
344 }
345
346 #[async_trait]
347 impl Handler for DataPipeline {
348 type Store = MockStore;
349 type Batch = Vec<Self::Value>;
350
351 fn batch(
352 &self,
353 batch: &mut Self::Batch,
354 values: &mut std::vec::IntoIter<Self::Value>,
355 ) -> BatchStatus {
356 batch.extend(values);
357 BatchStatus::Pending
358 }
359
360 async fn commit<'a>(
361 &self,
362 _batch: &Self::Batch,
363 _conn: &mut MockConnection<'a>,
364 ) -> anyhow::Result<usize> {
365 Ok(0)
366 }
367 }
368
369 struct TestSetup {
370 store: MockStore,
371 watermark_tx: mpsc::Sender<Vec<WatermarkPart>>,
372 #[allow(unused)]
373 commit_watermark: Service,
374 }
375
376 fn setup_test<H: Handler<Store = MockStore> + 'static>(
377 config: CommitterConfig,
378 next_checkpoint: u64,
379 store: MockStore,
380 ) -> TestSetup {
381 let (watermark_tx, watermark_rx) = mpsc::channel(100);
382 #[allow(clippy::disallowed_methods)]
383 let (commit_hi_tx, _commit_hi_rx) = mpsc::unbounded_channel();
384 let metrics = IndexerMetrics::new(None, &Default::default());
385
386 let store_clone = store.clone();
387
388 let commit_watermark = commit_watermark::<H>(
389 next_checkpoint,
390 config,
391 watermark_rx,
392 commit_hi_tx,
393 store_clone,
394 None,
395 metrics,
396 );
397
398 TestSetup {
399 store,
400 watermark_tx,
401 commit_watermark,
402 }
403 }
404
405 fn create_watermark_part_for_checkpoint(checkpoint: u64) -> WatermarkPart {
406 WatermarkPart {
407 watermark: CommitterWatermark {
408 checkpoint_hi_inclusive: checkpoint,
409 ..Default::default()
410 },
411 batch_rows: 1,
412 total_rows: 1,
413 }
414 }
415
416 #[tokio::test]
417 async fn test_basic_watermark_progression() {
418 let config = CommitterConfig::default();
419 let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
420
421 for cp in 1..4 {
423 let part = create_watermark_part_for_checkpoint(cp);
424 setup.watermark_tx.send(vec![part]).await.unwrap();
425 }
426
427 tokio::time::sleep(Duration::from_millis(100)).await;
429
430 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
432 assert_eq!(watermark.checkpoint_hi_inclusive, 3);
433 }
434
435 #[tokio::test]
436 async fn test_out_of_order_watermarks() {
437 let config = CommitterConfig::default();
438 let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
439
440 let parts = vec![
442 create_watermark_part_for_checkpoint(4),
443 create_watermark_part_for_checkpoint(2),
444 create_watermark_part_for_checkpoint(1),
445 ];
446 setup.watermark_tx.send(parts).await.unwrap();
447
448 tokio::time::sleep(Duration::from_millis(100)).await;
450
451 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
453 assert_eq!(watermark.checkpoint_hi_inclusive, 2);
454
455 setup
457 .watermark_tx
458 .send(vec![create_watermark_part_for_checkpoint(3)])
459 .await
460 .unwrap();
461
462 tokio::time::sleep(Duration::from_secs(1)).await;
464
465 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
467 assert_eq!(watermark.checkpoint_hi_inclusive, 4);
468 }
469
470 #[tokio::test]
471 async fn test_watermark_with_connection_failure() {
472 let config = CommitterConfig {
473 watermark_interval_ms: 1_000, ..Default::default()
475 };
476 let store = MockStore::default().with_connection_failures(1);
477 let setup = setup_test::<DataPipeline>(config, 1, store);
478
479 let part = create_watermark_part_for_checkpoint(1);
481 setup.watermark_tx.send(vec![part]).await.unwrap();
482
483 tokio::time::sleep(Duration::from_millis(200)).await;
485
486 let watermark = setup.store.watermark(DataPipeline::NAME);
488 assert!(watermark.is_none());
489
490 tokio::time::sleep(Duration::from_millis(1_200)).await;
492
493 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
495 assert_eq!(watermark.checkpoint_hi_inclusive, 1);
496 }
497
498 #[tokio::test]
499 async fn test_committer_retries_on_commit_watermark_failure() {
500 let config = CommitterConfig {
501 watermark_interval_ms: 1_000, ..Default::default()
503 };
504 let store = MockStore::default().with_commit_watermark_failures(1); let setup = setup_test::<DataPipeline>(config, 10, store);
507
508 let part = WatermarkPart {
509 watermark: CommitterWatermark {
510 checkpoint_hi_inclusive: 10,
511 ..Default::default()
512 },
513 batch_rows: 1,
514 total_rows: 1,
515 };
516 setup.watermark_tx.send(vec![part]).await.unwrap();
517
518 tokio::time::sleep(Duration::from_millis(200)).await;
520 let watermark = setup.store.watermark(DataPipeline::NAME);
521 assert!(watermark.is_none());
522
523 tokio::time::sleep(Duration::from_millis(1_200)).await;
525
526 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
528 assert_eq!(watermark.checkpoint_hi_inclusive, 10);
529 }
530
531 #[tokio::test]
532 async fn test_committer_retries_on_commit_watermark_failure_advances() {
533 let config = CommitterConfig {
534 watermark_interval_ms: 1_000, ..Default::default() };
537 let store = MockStore::default().with_commit_watermark_failures(1); let setup = setup_test::<DataPipeline>(config, 10, store);
539
540 let part = WatermarkPart {
541 watermark: CommitterWatermark {
542 checkpoint_hi_inclusive: 10,
543 ..Default::default()
544 },
545 batch_rows: 1,
546 total_rows: 1,
547 };
548 setup.watermark_tx.send(vec![part]).await.unwrap();
549
550 tokio::time::sleep(Duration::from_millis(200)).await;
552 let watermark = setup.store.watermark(DataPipeline::NAME);
553 assert!(watermark.is_none());
554
555 let part = WatermarkPart {
556 watermark: CommitterWatermark {
557 checkpoint_hi_inclusive: 11,
558 ..Default::default()
559 },
560 batch_rows: 1,
561 total_rows: 1,
562 };
563 setup.watermark_tx.send(vec![part]).await.unwrap();
564
565 tokio::time::sleep(Duration::from_millis(1_200)).await;
567
568 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
569 assert_eq!(watermark.checkpoint_hi_inclusive, 11);
570 }
571
572 #[tokio::test]
573 async fn test_incomplete_watermark() {
574 let config = CommitterConfig {
575 watermark_interval_ms: 1_000, ..Default::default()
577 };
578 let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
579
580 let part = WatermarkPart {
582 watermark: CommitterWatermark {
583 checkpoint_hi_inclusive: 1,
584 ..Default::default()
585 },
586 batch_rows: 1,
587 total_rows: 3,
588 };
589 setup.watermark_tx.send(vec![part.clone()]).await.unwrap();
590
591 tokio::time::sleep(Duration::from_millis(200)).await;
593
594 let watermark = setup.store.watermark(DataPipeline::NAME);
596 assert!(watermark.is_none());
597
598 setup
600 .watermark_tx
601 .send(vec![part.clone(), part.clone()])
602 .await
603 .unwrap();
604
605 tokio::time::sleep(Duration::from_millis(1_200)).await;
607
608 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
610 assert_eq!(watermark.checkpoint_hi_inclusive, 1);
611 }
612
613 #[tokio::test]
614 async fn test_no_initial_watermark() {
615 let config = CommitterConfig::default();
616 let setup = setup_test::<DataPipeline>(config, 0, MockStore::default());
617
618 setup
620 .watermark_tx
621 .send(vec![create_watermark_part_for_checkpoint(1)])
622 .await
623 .unwrap();
624
625 tokio::time::sleep(Duration::from_millis(200)).await;
627
628 let watermark = setup.store.watermark(DataPipeline::NAME);
630 assert!(watermark.is_none());
631
632 setup
634 .watermark_tx
635 .send(vec![create_watermark_part_for_checkpoint(0)])
636 .await
637 .unwrap();
638
639 tokio::time::sleep(Duration::from_millis(1200)).await;
641
642 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
644 assert_eq!(watermark.checkpoint_hi_inclusive, 1);
645 }
646}