sui_indexer_alt_framework/pipeline/concurrent/
commit_watermark.rs1use std::cmp::Ordering;
5use std::collections::BTreeMap;
6use std::collections::btree_map::Entry;
7use std::sync::Arc;
8
9use sui_futures::service::Service;
10use tokio::sync::mpsc;
11use tracing::debug;
12use tracing::error;
13use tracing::info;
14use tracing::warn;
15
16use crate::metrics::CheckpointLagMetricReporter;
17use crate::metrics::IndexerMetrics;
18use crate::pipeline::CommitterConfig;
19use crate::pipeline::WARN_PENDING_WATERMARKS;
20use crate::pipeline::WatermarkPart;
21use crate::pipeline::concurrent::Handler;
22use crate::pipeline::logging::WatermarkLogger;
23use crate::store::Connection;
24use crate::store::Store;
25use crate::store::pipeline_task;
26
27pub(super) fn commit_watermark<H: Handler + 'static>(
46 mut next_checkpoint: u64,
47 config: CommitterConfig,
48 mut rx: mpsc::Receiver<Vec<WatermarkPart>>,
49 store: H::Store,
50 task: Option<String>,
51 metrics: Arc<IndexerMetrics>,
52) -> Service {
53 let pipeline_task = pipeline_task::<H::Store>(H::NAME, task.as_deref()).unwrap();
55 Service::new().spawn_aborting(async move {
56 let mut precommitted: BTreeMap<u64, WatermarkPart> = BTreeMap::new();
62
63 let mut logger = WatermarkLogger::new("concurrent_committer");
66
67 let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
68 &metrics.watermarked_checkpoint_timestamp_lag,
69 &metrics.latest_watermarked_checkpoint_timestamp_lag_ms,
70 &metrics.watermark_checkpoint_in_db,
71 );
72
73 info!(
74 pipeline = H::NAME,
75 next_checkpoint, "Starting commit watermark task"
76 );
77
78 let mut next_wake = tokio::time::Instant::now();
79
80 loop {
81 tokio::select! {
82 () = tokio::time::sleep_until(next_wake) => {
83 next_wake = config.watermark_interval_with_jitter();
86 }
87 Some(parts) = rx.recv() => {
88 for part in parts {
89 match precommitted.entry(part.checkpoint()) {
90 Entry::Vacant(entry) => {
91 entry.insert(part);
92 }
93
94 Entry::Occupied(mut entry) => {
95 entry.get_mut().add(part);
96 }
97 }
98 }
99
100 continue;
101 }
102 }
103
104 let mut watermark = None;
107
108 if precommitted.len() > WARN_PENDING_WATERMARKS {
109 warn!(
110 pipeline = H::NAME,
111 pending = precommitted.len(),
112 "Pipeline has a large number of pending commit watermarks",
113 );
114 }
115
116 let Ok(mut conn) = store.connect().await else {
117 warn!(
118 pipeline = H::NAME,
119 "Commit watermark task failed to get connection for DB"
120 );
121 continue;
122 };
123
124 let guard = metrics
126 .watermark_gather_latency
127 .with_label_values(&[H::NAME])
128 .start_timer();
129
130 while let Some(pending) = precommitted.first_entry() {
131 let part = pending.get();
132
133 if !part.is_complete() {
135 break;
136 }
137
138 match next_checkpoint.cmp(&part.watermark.checkpoint_hi_inclusive) {
139 Ordering::Less => break,
141
142 Ordering::Equal => {
144 watermark = Some(pending.remove().watermark);
145 next_checkpoint += 1;
146 }
147
148 Ordering::Greater => {
153 metrics
155 .total_watermarks_out_of_order
156 .with_label_values(&[H::NAME])
157 .inc();
158
159 pending.remove();
160 }
161 }
162 }
163
164 let elapsed = guard.stop_and_record();
165
166 if let Some(watermark) = watermark {
167 metrics
168 .watermark_epoch
169 .with_label_values(&[H::NAME])
170 .set(watermark.epoch_hi_inclusive as i64);
171
172 metrics
173 .watermark_checkpoint
174 .with_label_values(&[H::NAME])
175 .set(watermark.checkpoint_hi_inclusive as i64);
176
177 metrics
178 .watermark_transaction
179 .with_label_values(&[H::NAME])
180 .set(watermark.tx_hi as i64);
181
182 metrics
183 .watermark_timestamp_ms
184 .with_label_values(&[H::NAME])
185 .set(watermark.timestamp_ms_hi_inclusive as i64);
186
187 debug!(
188 pipeline = H::NAME,
189 elapsed_ms = elapsed * 1000.0,
190 watermark = watermark.checkpoint_hi_inclusive,
191 timestamp = %watermark.timestamp(),
192 pending = precommitted.len(),
193 "Gathered watermarks",
194 );
195
196 let guard = metrics
197 .watermark_commit_latency
198 .with_label_values(&[H::NAME])
199 .start_timer();
200
201 match conn
204 .set_committer_watermark(&pipeline_task, watermark)
205 .await
206 {
207 Err(e) => {
210 let elapsed = guard.stop_and_record();
211 error!(
212 pipeline = H::NAME,
213 elapsed_ms = elapsed * 1000.0,
214 ?watermark,
215 "Error updating commit watermark: {e}",
216 );
217 }
218
219 Ok(true) => {
220 let elapsed = guard.stop_and_record();
221
222 logger.log::<H>(&watermark, elapsed);
223
224 checkpoint_lag_reporter.report_lag(
225 watermark.checkpoint_hi_inclusive,
226 watermark.timestamp_ms_hi_inclusive,
227 );
228
229 metrics
230 .watermark_epoch_in_db
231 .with_label_values(&[H::NAME])
232 .set(watermark.epoch_hi_inclusive as i64);
233
234 metrics
235 .watermark_transaction_in_db
236 .with_label_values(&[H::NAME])
237 .set(watermark.tx_hi as i64);
238
239 metrics
240 .watermark_timestamp_in_db_ms
241 .with_label_values(&[H::NAME])
242 .set(watermark.timestamp_ms_hi_inclusive as i64);
243 }
244 Ok(false) => {}
245 }
246 }
247
248 if rx.is_closed() && rx.is_empty() {
249 info!(pipeline = H::NAME, "Committer closed channel");
250 break;
251 }
252 }
253
254 info!(pipeline = H::NAME, "Stopping committer watermark task");
255 Ok(())
256 })
257}
258
259#[cfg(test)]
260mod tests {
261 use std::sync::Arc;
262 use std::time::Duration;
263
264 use async_trait::async_trait;
265 use sui_types::full_checkpoint_content::Checkpoint;
266 use tokio::sync::mpsc;
267
268 use crate::FieldCount;
269 use crate::metrics::IndexerMetrics;
270 use crate::mocks::store::*;
271 use crate::pipeline::CommitterConfig;
272 use crate::pipeline::Processor;
273 use crate::pipeline::WatermarkPart;
274 use crate::pipeline::concurrent::BatchStatus;
275 use crate::store::CommitterWatermark;
276
277 use super::*;
278
279 #[derive(Clone, FieldCount)]
280 pub struct StoredData;
281
282 pub struct DataPipeline;
283
284 #[async_trait]
285 impl Processor for DataPipeline {
286 const NAME: &'static str = "data";
287 type Value = StoredData;
288
289 async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
290 Ok(vec![])
291 }
292 }
293
294 #[async_trait]
295 impl Handler for DataPipeline {
296 type Store = MockStore;
297 type Batch = Vec<Self::Value>;
298
299 fn batch(
300 &self,
301 batch: &mut Self::Batch,
302 values: &mut std::vec::IntoIter<Self::Value>,
303 ) -> BatchStatus {
304 batch.extend(values);
305 BatchStatus::Pending
306 }
307
308 async fn commit<'a>(
309 &self,
310 _batch: &Self::Batch,
311 _conn: &mut MockConnection<'a>,
312 ) -> anyhow::Result<usize> {
313 Ok(0)
314 }
315 }
316
317 struct TestSetup {
318 store: MockStore,
319 watermark_tx: mpsc::Sender<Vec<WatermarkPart>>,
320 #[allow(unused)]
321 commit_watermark: Service,
322 }
323
324 fn setup_test<H: Handler<Store = MockStore> + 'static>(
325 config: CommitterConfig,
326 next_checkpoint: u64,
327 store: MockStore,
328 ) -> TestSetup {
329 let (watermark_tx, watermark_rx) = mpsc::channel(100);
330 let metrics = IndexerMetrics::new(None, &Default::default());
331
332 let store_clone = store.clone();
333
334 let commit_watermark = commit_watermark::<H>(
335 next_checkpoint,
336 config,
337 watermark_rx,
338 store_clone,
339 None,
340 metrics,
341 );
342
343 TestSetup {
344 store,
345 watermark_tx,
346 commit_watermark,
347 }
348 }
349
350 fn create_watermark_part_for_checkpoint(checkpoint: u64) -> WatermarkPart {
351 WatermarkPart {
352 watermark: CommitterWatermark {
353 checkpoint_hi_inclusive: checkpoint,
354 ..Default::default()
355 },
356 batch_rows: 1,
357 total_rows: 1,
358 }
359 }
360
361 #[tokio::test]
362 async fn test_basic_watermark_progression() {
363 let config = CommitterConfig::default();
364 let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
365
366 for cp in 1..4 {
368 let part = create_watermark_part_for_checkpoint(cp);
369 setup.watermark_tx.send(vec![part]).await.unwrap();
370 }
371
372 tokio::time::sleep(Duration::from_millis(100)).await;
374
375 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
377 assert_eq!(watermark.checkpoint_hi_inclusive, 3);
378 }
379
380 #[tokio::test]
381 async fn test_out_of_order_watermarks() {
382 let config = CommitterConfig::default();
383 let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
384
385 let parts = vec![
387 create_watermark_part_for_checkpoint(4),
388 create_watermark_part_for_checkpoint(2),
389 create_watermark_part_for_checkpoint(1),
390 ];
391 setup.watermark_tx.send(parts).await.unwrap();
392
393 tokio::time::sleep(Duration::from_millis(100)).await;
395
396 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
398 assert_eq!(watermark.checkpoint_hi_inclusive, 2);
399
400 setup
402 .watermark_tx
403 .send(vec![create_watermark_part_for_checkpoint(3)])
404 .await
405 .unwrap();
406
407 tokio::time::sleep(Duration::from_secs(1)).await;
409
410 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
412 assert_eq!(watermark.checkpoint_hi_inclusive, 4);
413 }
414
415 #[tokio::test]
416 async fn test_watermark_with_connection_failure() {
417 let config = CommitterConfig {
418 watermark_interval_ms: 1_000, ..Default::default()
420 };
421 let store = MockStore::default().with_connection_failures(1);
422 let setup = setup_test::<DataPipeline>(config, 1, store);
423
424 let part = create_watermark_part_for_checkpoint(1);
426 setup.watermark_tx.send(vec![part]).await.unwrap();
427
428 tokio::time::sleep(Duration::from_millis(200)).await;
430
431 let watermark = setup.store.watermark(DataPipeline::NAME);
433 assert!(watermark.is_none());
434
435 tokio::time::sleep(Duration::from_millis(1_200)).await;
437
438 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
440 assert_eq!(watermark.checkpoint_hi_inclusive, 1);
441 }
442
443 #[tokio::test]
444 async fn test_committer_retries_on_commit_watermark_failure() {
445 let config = CommitterConfig {
446 watermark_interval_ms: 1_000, ..Default::default()
448 };
449 let store = MockStore::default().with_commit_watermark_failures(1); let setup = setup_test::<DataPipeline>(config, 10, store);
452
453 let part = WatermarkPart {
454 watermark: CommitterWatermark {
455 checkpoint_hi_inclusive: 10,
456 ..Default::default()
457 },
458 batch_rows: 1,
459 total_rows: 1,
460 };
461 setup.watermark_tx.send(vec![part]).await.unwrap();
462
463 tokio::time::sleep(Duration::from_millis(200)).await;
465 let watermark = setup.store.watermark(DataPipeline::NAME);
466 assert!(watermark.is_none());
467
468 tokio::time::sleep(Duration::from_millis(1_200)).await;
470
471 let watermark = setup.store.watermark(DataPipeline::NAME);
473 assert!(watermark.is_none());
474 }
475
476 #[tokio::test]
477 async fn test_committer_retries_on_commit_watermark_failure_advances() {
478 let config = CommitterConfig {
479 watermark_interval_ms: 1_000, ..Default::default() };
482 let store = MockStore::default().with_commit_watermark_failures(1); let setup = setup_test::<DataPipeline>(config, 10, store);
484
485 let part = WatermarkPart {
486 watermark: CommitterWatermark {
487 checkpoint_hi_inclusive: 10,
488 ..Default::default()
489 },
490 batch_rows: 1,
491 total_rows: 1,
492 };
493 setup.watermark_tx.send(vec![part]).await.unwrap();
494
495 tokio::time::sleep(Duration::from_millis(200)).await;
497 let watermark = setup.store.watermark(DataPipeline::NAME);
498 assert!(watermark.is_none());
499
500 let part = WatermarkPart {
501 watermark: CommitterWatermark {
502 checkpoint_hi_inclusive: 11,
503 ..Default::default()
504 },
505 batch_rows: 1,
506 total_rows: 1,
507 };
508 setup.watermark_tx.send(vec![part]).await.unwrap();
509
510 tokio::time::sleep(Duration::from_millis(1_200)).await;
512
513 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
514 assert_eq!(watermark.checkpoint_hi_inclusive, 11);
515 }
516
517 #[tokio::test]
518 async fn test_incomplete_watermark() {
519 let config = CommitterConfig {
520 watermark_interval_ms: 1_000, ..Default::default()
522 };
523 let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
524
525 let part = WatermarkPart {
527 watermark: CommitterWatermark {
528 checkpoint_hi_inclusive: 1,
529 ..Default::default()
530 },
531 batch_rows: 1,
532 total_rows: 3,
533 };
534 setup.watermark_tx.send(vec![part.clone()]).await.unwrap();
535
536 tokio::time::sleep(Duration::from_millis(200)).await;
538
539 let watermark = setup.store.watermark(DataPipeline::NAME);
541 assert!(watermark.is_none());
542
543 setup
545 .watermark_tx
546 .send(vec![part.clone(), part.clone()])
547 .await
548 .unwrap();
549
550 tokio::time::sleep(Duration::from_millis(1_200)).await;
552
553 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
555 assert_eq!(watermark.checkpoint_hi_inclusive, 1);
556 }
557
558 #[tokio::test]
559 async fn test_no_initial_watermark() {
560 let config = CommitterConfig::default();
561 let setup = setup_test::<DataPipeline>(config, 0, MockStore::default());
562
563 setup
565 .watermark_tx
566 .send(vec![create_watermark_part_for_checkpoint(1)])
567 .await
568 .unwrap();
569
570 tokio::time::sleep(Duration::from_millis(200)).await;
572
573 let watermark = setup.store.watermark(DataPipeline::NAME);
575 assert!(watermark.is_none());
576
577 setup
579 .watermark_tx
580 .send(vec![create_watermark_part_for_checkpoint(0)])
581 .await
582 .unwrap();
583
584 tokio::time::sleep(Duration::from_millis(1200)).await;
586
587 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
589 assert_eq!(watermark.checkpoint_hi_inclusive, 1);
590 }
591}