sui_indexer_alt_framework/pipeline/concurrent/
commit_watermark.rs1use std::{
5 cmp::Ordering,
6 collections::{BTreeMap, btree_map::Entry},
7 sync::Arc,
8};
9
10use sui_futures::service::Service;
11use tokio::{
12 sync::mpsc,
13 time::{MissedTickBehavior, interval},
14};
15use tracing::{debug, error, info, warn};
16
17use crate::{
18 metrics::{CheckpointLagMetricReporter, IndexerMetrics},
19 pipeline::{CommitterConfig, WARN_PENDING_WATERMARKS, WatermarkPart, logging::WatermarkLogger},
20 store::{Connection, Store, pipeline_task},
21};
22
23use super::Handler;
24
25pub(super) fn commit_watermark<H: Handler + 'static>(
44 mut next_checkpoint: u64,
45 config: CommitterConfig,
46 mut rx: mpsc::Receiver<Vec<WatermarkPart>>,
47 store: H::Store,
48 task: Option<String>,
49 metrics: Arc<IndexerMetrics>,
50) -> Service {
51 let pipeline_task = pipeline_task::<H::Store>(H::NAME, task.as_deref()).unwrap();
53 Service::new().spawn_aborting(async move {
54 let mut poll = interval(config.watermark_interval());
55 poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
56
57 let mut precommitted: BTreeMap<u64, WatermarkPart> = BTreeMap::new();
63
64 let mut logger = WatermarkLogger::new("concurrent_committer");
67
68 let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
69 &metrics.watermarked_checkpoint_timestamp_lag,
70 &metrics.latest_watermarked_checkpoint_timestamp_lag_ms,
71 &metrics.watermark_checkpoint_in_db,
72 );
73
74 info!(
75 pipeline = H::NAME,
76 next_checkpoint, "Starting commit watermark task"
77 );
78
79 loop {
80 tokio::select! {
81 _ = poll.tick() => {}
82 Some(parts) = rx.recv() => {
83 for part in parts {
84 match precommitted.entry(part.checkpoint()) {
85 Entry::Vacant(entry) => {
86 entry.insert(part);
87 }
88
89 Entry::Occupied(mut entry) => {
90 entry.get_mut().add(part);
91 }
92 }
93 }
94
95 continue;
96 }
97 }
98
99 let mut watermark = None;
102
103 if precommitted.len() > WARN_PENDING_WATERMARKS {
104 warn!(
105 pipeline = H::NAME,
106 pending = precommitted.len(),
107 "Pipeline has a large number of pending commit watermarks",
108 );
109 }
110
111 let Ok(mut conn) = store.connect().await else {
112 warn!(
113 pipeline = H::NAME,
114 "Commit watermark task failed to get connection for DB"
115 );
116 continue;
117 };
118
119 let guard = metrics
121 .watermark_gather_latency
122 .with_label_values(&[H::NAME])
123 .start_timer();
124
125 while let Some(pending) = precommitted.first_entry() {
126 let part = pending.get();
127
128 if !part.is_complete() {
130 break;
131 }
132
133 match next_checkpoint.cmp(&part.watermark.checkpoint_hi_inclusive) {
134 Ordering::Less => break,
136
137 Ordering::Equal => {
139 watermark = Some(pending.remove().watermark);
140 next_checkpoint += 1;
141 }
142
143 Ordering::Greater => {
148 metrics
150 .total_watermarks_out_of_order
151 .with_label_values(&[H::NAME])
152 .inc();
153
154 pending.remove();
155 }
156 }
157 }
158
159 let elapsed = guard.stop_and_record();
160
161 if let Some(watermark) = watermark {
162 metrics
163 .watermark_epoch
164 .with_label_values(&[H::NAME])
165 .set(watermark.epoch_hi_inclusive as i64);
166
167 metrics
168 .watermark_checkpoint
169 .with_label_values(&[H::NAME])
170 .set(watermark.checkpoint_hi_inclusive as i64);
171
172 metrics
173 .watermark_transaction
174 .with_label_values(&[H::NAME])
175 .set(watermark.tx_hi as i64);
176
177 metrics
178 .watermark_timestamp_ms
179 .with_label_values(&[H::NAME])
180 .set(watermark.timestamp_ms_hi_inclusive as i64);
181
182 debug!(
183 pipeline = H::NAME,
184 elapsed_ms = elapsed * 1000.0,
185 watermark = watermark.checkpoint_hi_inclusive,
186 timestamp = %watermark.timestamp(),
187 pending = precommitted.len(),
188 "Gathered watermarks",
189 );
190
191 let guard = metrics
192 .watermark_commit_latency
193 .with_label_values(&[H::NAME])
194 .start_timer();
195
196 match conn
199 .set_committer_watermark(&pipeline_task, watermark)
200 .await
201 {
202 Err(e) => {
205 let elapsed = guard.stop_and_record();
206 error!(
207 pipeline = H::NAME,
208 elapsed_ms = elapsed * 1000.0,
209 ?watermark,
210 "Error updating commit watermark: {e}",
211 );
212 }
213
214 Ok(true) => {
215 let elapsed = guard.stop_and_record();
216
217 logger.log::<H>(&watermark, elapsed);
218
219 checkpoint_lag_reporter.report_lag(
220 watermark.checkpoint_hi_inclusive,
221 watermark.timestamp_ms_hi_inclusive,
222 );
223
224 metrics
225 .watermark_epoch_in_db
226 .with_label_values(&[H::NAME])
227 .set(watermark.epoch_hi_inclusive as i64);
228
229 metrics
230 .watermark_transaction_in_db
231 .with_label_values(&[H::NAME])
232 .set(watermark.tx_hi as i64);
233
234 metrics
235 .watermark_timestamp_in_db_ms
236 .with_label_values(&[H::NAME])
237 .set(watermark.timestamp_ms_hi_inclusive as i64);
238 }
239 Ok(false) => {}
240 }
241 }
242
243 if rx.is_closed() && rx.is_empty() {
244 info!(pipeline = H::NAME, "Committer closed channel");
245 break;
246 }
247 }
248
249 info!(pipeline = H::NAME, "Stopping committer watermark task");
250 Ok(())
251 })
252}
253
254#[cfg(test)]
255mod tests {
256 use std::{sync::Arc, time::Duration};
257
258 use async_trait::async_trait;
259 use sui_types::full_checkpoint_content::Checkpoint;
260 use tokio::sync::mpsc;
261
262 use crate::{
263 FieldCount,
264 metrics::IndexerMetrics,
265 mocks::store::*,
266 pipeline::{CommitterConfig, Processor, WatermarkPart, concurrent::BatchStatus},
267 store::CommitterWatermark,
268 };
269
270 use super::*;
271
272 #[derive(Clone, FieldCount)]
273 pub struct StoredData;
274
275 pub struct DataPipeline;
276
277 #[async_trait]
278 impl Processor for DataPipeline {
279 const NAME: &'static str = "data";
280 type Value = StoredData;
281
282 async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
283 Ok(vec![])
284 }
285 }
286
287 #[async_trait]
288 impl Handler for DataPipeline {
289 type Store = MockStore;
290 type Batch = Vec<Self::Value>;
291
292 fn batch(
293 &self,
294 batch: &mut Self::Batch,
295 values: &mut std::vec::IntoIter<Self::Value>,
296 ) -> BatchStatus {
297 batch.extend(values);
298 BatchStatus::Pending
299 }
300
301 async fn commit<'a>(
302 &self,
303 _batch: &Self::Batch,
304 _conn: &mut MockConnection<'a>,
305 ) -> anyhow::Result<usize> {
306 Ok(0)
307 }
308 }
309
310 struct TestSetup {
311 store: MockStore,
312 watermark_tx: mpsc::Sender<Vec<WatermarkPart>>,
313 #[allow(unused)]
314 commit_watermark: Service,
315 }
316
317 fn setup_test<H: Handler<Store = MockStore> + 'static>(
318 config: CommitterConfig,
319 next_checkpoint: u64,
320 store: MockStore,
321 ) -> TestSetup {
322 let (watermark_tx, watermark_rx) = mpsc::channel(100);
323 let metrics = IndexerMetrics::new(None, &Default::default());
324
325 let store_clone = store.clone();
326
327 let commit_watermark = commit_watermark::<H>(
328 next_checkpoint,
329 config,
330 watermark_rx,
331 store_clone,
332 None,
333 metrics,
334 );
335
336 TestSetup {
337 store,
338 watermark_tx,
339 commit_watermark,
340 }
341 }
342
343 fn create_watermark_part_for_checkpoint(checkpoint: u64) -> WatermarkPart {
344 WatermarkPart {
345 watermark: CommitterWatermark {
346 checkpoint_hi_inclusive: checkpoint,
347 ..Default::default()
348 },
349 batch_rows: 1,
350 total_rows: 1,
351 }
352 }
353
354 #[tokio::test]
355 async fn test_basic_watermark_progression() {
356 let config = CommitterConfig::default();
357 let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
358
359 for cp in 1..4 {
361 let part = create_watermark_part_for_checkpoint(cp);
362 setup.watermark_tx.send(vec![part]).await.unwrap();
363 }
364
365 tokio::time::sleep(Duration::from_millis(100)).await;
367
368 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
370 assert_eq!(watermark.checkpoint_hi_inclusive, 3);
371 }
372
373 #[tokio::test]
374 async fn test_out_of_order_watermarks() {
375 let config = CommitterConfig::default();
376 let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
377
378 let parts = vec![
380 create_watermark_part_for_checkpoint(4),
381 create_watermark_part_for_checkpoint(2),
382 create_watermark_part_for_checkpoint(1),
383 ];
384 setup.watermark_tx.send(parts).await.unwrap();
385
386 tokio::time::sleep(Duration::from_millis(100)).await;
388
389 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
391 assert_eq!(watermark.checkpoint_hi_inclusive, 2);
392
393 setup
395 .watermark_tx
396 .send(vec![create_watermark_part_for_checkpoint(3)])
397 .await
398 .unwrap();
399
400 tokio::time::sleep(Duration::from_secs(1)).await;
402
403 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
405 assert_eq!(watermark.checkpoint_hi_inclusive, 4);
406 }
407
408 #[tokio::test]
409 async fn test_watermark_with_connection_failure() {
410 let config = CommitterConfig {
411 watermark_interval_ms: 1_000, ..Default::default()
413 };
414 let store = MockStore::default().with_connection_failures(1);
415 let setup = setup_test::<DataPipeline>(config, 1, store);
416
417 let part = create_watermark_part_for_checkpoint(1);
419 setup.watermark_tx.send(vec![part]).await.unwrap();
420
421 tokio::time::sleep(Duration::from_millis(200)).await;
423
424 let watermark = setup.store.watermark(DataPipeline::NAME);
426 assert!(watermark.is_none());
427
428 tokio::time::sleep(Duration::from_millis(1_200)).await;
430
431 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
433 assert_eq!(watermark.checkpoint_hi_inclusive, 1);
434 }
435
436 #[tokio::test]
437 async fn test_committer_retries_on_commit_watermark_failure() {
438 let config = CommitterConfig {
439 watermark_interval_ms: 1_000, ..Default::default()
441 };
442 let store = MockStore::default().with_commit_watermark_failures(1); let setup = setup_test::<DataPipeline>(config, 10, store);
445
446 let part = WatermarkPart {
447 watermark: CommitterWatermark {
448 checkpoint_hi_inclusive: 10,
449 ..Default::default()
450 },
451 batch_rows: 1,
452 total_rows: 1,
453 };
454 setup.watermark_tx.send(vec![part]).await.unwrap();
455
456 tokio::time::sleep(Duration::from_millis(200)).await;
458 let watermark = setup.store.watermark(DataPipeline::NAME);
459 assert!(watermark.is_none());
460
461 tokio::time::sleep(Duration::from_millis(1_200)).await;
463
464 let watermark = setup.store.watermark(DataPipeline::NAME);
466 assert!(watermark.is_none());
467 }
468
469 #[tokio::test]
470 async fn test_committer_retries_on_commit_watermark_failure_advances() {
471 let config = CommitterConfig {
472 watermark_interval_ms: 1_000, ..Default::default() };
475 let store = MockStore::default().with_commit_watermark_failures(1); let setup = setup_test::<DataPipeline>(config, 10, store);
477
478 let part = WatermarkPart {
479 watermark: CommitterWatermark {
480 checkpoint_hi_inclusive: 10,
481 ..Default::default()
482 },
483 batch_rows: 1,
484 total_rows: 1,
485 };
486 setup.watermark_tx.send(vec![part]).await.unwrap();
487
488 tokio::time::sleep(Duration::from_millis(200)).await;
490 let watermark = setup.store.watermark(DataPipeline::NAME);
491 assert!(watermark.is_none());
492
493 let part = WatermarkPart {
494 watermark: CommitterWatermark {
495 checkpoint_hi_inclusive: 11,
496 ..Default::default()
497 },
498 batch_rows: 1,
499 total_rows: 1,
500 };
501 setup.watermark_tx.send(vec![part]).await.unwrap();
502
503 tokio::time::sleep(Duration::from_millis(1_200)).await;
505
506 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
507 assert_eq!(watermark.checkpoint_hi_inclusive, 11);
508 }
509
510 #[tokio::test]
511 async fn test_incomplete_watermark() {
512 let config = CommitterConfig {
513 watermark_interval_ms: 1_000, ..Default::default()
515 };
516 let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
517
518 let part = WatermarkPart {
520 watermark: CommitterWatermark {
521 checkpoint_hi_inclusive: 1,
522 ..Default::default()
523 },
524 batch_rows: 1,
525 total_rows: 3,
526 };
527 setup.watermark_tx.send(vec![part.clone()]).await.unwrap();
528
529 tokio::time::sleep(Duration::from_millis(200)).await;
531
532 let watermark = setup.store.watermark(DataPipeline::NAME);
534 assert!(watermark.is_none());
535
536 setup
538 .watermark_tx
539 .send(vec![part.clone(), part.clone()])
540 .await
541 .unwrap();
542
543 tokio::time::sleep(Duration::from_millis(1_200)).await;
545
546 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
548 assert_eq!(watermark.checkpoint_hi_inclusive, 1);
549 }
550
551 #[tokio::test]
552 async fn test_no_initial_watermark() {
553 let config = CommitterConfig::default();
554 let setup = setup_test::<DataPipeline>(config, 0, MockStore::default());
555
556 setup
558 .watermark_tx
559 .send(vec![create_watermark_part_for_checkpoint(1)])
560 .await
561 .unwrap();
562
563 tokio::time::sleep(Duration::from_millis(200)).await;
565
566 let watermark = setup.store.watermark(DataPipeline::NAME);
568 assert!(watermark.is_none());
569
570 setup
572 .watermark_tx
573 .send(vec![create_watermark_part_for_checkpoint(0)])
574 .await
575 .unwrap();
576
577 tokio::time::sleep(Duration::from_millis(1200)).await;
579
580 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
582 assert_eq!(watermark.checkpoint_hi_inclusive, 1);
583 }
584}