sui_indexer_alt_framework/pipeline/concurrent/
commit_watermark.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::cmp::Ordering;
5use std::collections::BTreeMap;
6use std::collections::btree_map::Entry;
7use std::sync::Arc;
8
9use sui_futures::service::Service;
10use tokio::sync::mpsc;
11use tracing::debug;
12use tracing::error;
13use tracing::info;
14use tracing::warn;
15
16use crate::metrics::CheckpointLagMetricReporter;
17use crate::metrics::IndexerMetrics;
18use crate::pipeline::CommitterConfig;
19use crate::pipeline::WARN_PENDING_WATERMARKS;
20use crate::pipeline::WatermarkPart;
21use crate::pipeline::concurrent::Handler;
22use crate::pipeline::logging::WatermarkLogger;
23use crate::store::CommitterWatermark;
24use crate::store::Connection;
25use crate::store::Store;
26use crate::store::pipeline_task;
27
28/// The watermark task is responsible for keeping track of a pipeline's out-of-order commits and
29/// updating its row in the `watermarks` table when a continuous run of checkpoints have landed
30/// since the last watermark update.
31///
32/// It receives watermark "parts" that detail the proportion of each checkpoint's data that has been
33/// written out by the committer and periodically (on a configurable interval) checks if the
34/// watermark for the pipeline can be pushed forward. The watermark can be pushed forward if there
35/// is one or more complete (all data for that checkpoint written out) watermarks spanning
36/// contiguously from the current high watermark into the future.
37///
38/// If it detects that more than [WARN_PENDING_WATERMARKS] watermarks have built up, it will issue a
39/// warning, as this could be the indication of a memory leak, and the caller probably intended to
40/// run the indexer with watermarking disabled (e.g. if they are running a backfill).
41///
42/// The task regularly traces its progress, outputting at a higher log level every
43/// [LOUD_WATERMARK_UPDATE_INTERVAL]-many checkpoints.
44///
45/// The task will shutdown if the `rx` channel closes and the watermark cannot be progressed.
46pub(super) fn commit_watermark<H: Handler + 'static>(
47    mut next_checkpoint: u64,
48    config: CommitterConfig,
49    mut rx: mpsc::Receiver<Vec<WatermarkPart>>,
50    store: H::Store,
51    task: Option<String>,
52    metrics: Arc<IndexerMetrics>,
53) -> Service {
54    // SAFETY: on indexer instantiation, we've checked that the pipeline name is valid.
55    let pipeline_task = pipeline_task::<H::Store>(H::NAME, task.as_deref()).unwrap();
56    Service::new().spawn_aborting(async move {
57        // To correctly update the watermark, the task tracks the watermark it last tried to write
58        // and the watermark parts for any checkpoints that have been written since then
59        // ("pre-committed"). After each batch is written, the task will try to progress the
60        // watermark as much as possible without going over any holes in the sequence of
61        // checkpoints (entirely missing watermarks, or incomplete watermarks).
62        let mut precommitted: BTreeMap<u64, WatermarkPart> = BTreeMap::new();
63
64        // The watermark task will periodically output a log message at a higher log level to
65        // demonstrate that the pipeline is making progress.
66        let mut logger = WatermarkLogger::new("concurrent_committer");
67
68        let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
69            &metrics.watermarked_checkpoint_timestamp_lag,
70            &metrics.latest_watermarked_checkpoint_timestamp_lag_ms,
71            &metrics.watermark_checkpoint_in_db,
72        );
73
74        info!(
75            pipeline = H::NAME,
76            next_checkpoint, "Starting commit watermark task"
77        );
78
79        let mut next_wake = tokio::time::Instant::now();
80        let mut pending_watermark = None;
81
82        loop {
83            let mut should_write_db = false;
84
85            tokio::select! {
86                () = tokio::time::sleep_until(next_wake) => {
87                    // Schedule next wake immediately, so the timer effectively runs in parallel
88                    // with the commit logic below.
89                    next_wake = config.watermark_interval_with_jitter();
90                    should_write_db = true;
91                }
92                Some(parts) = rx.recv() => {
93                    for part in parts {
94                        match precommitted.entry(part.checkpoint()) {
95                            Entry::Vacant(entry) => {
96                                entry.insert(part);
97                            }
98
99                            Entry::Occupied(mut entry) => {
100                                entry.get_mut().add(part);
101                            }
102                        }
103                    }
104                }
105            }
106
107            // Advance the watermark through contiguous precommitted entries on every
108            // iteration, not just when the DB write timer fires. This ensures commit_hi
109            // feedback reaches the broadcaster immediately.
110            let guard = metrics
111                .watermark_gather_latency
112                .with_label_values(&[H::NAME])
113                .start_timer();
114
115            while let Some(pending) = precommitted.first_entry() {
116                let part = pending.get();
117
118                // Some rows from the next watermark have not landed yet.
119                if !part.is_complete() {
120                    break;
121                }
122
123                match next_checkpoint.cmp(&part.watermark.checkpoint_hi_inclusive) {
124                    // Next pending checkpoint is from the future.
125                    Ordering::Less => break,
126
127                    // This is the next checkpoint -- include it.
128                    Ordering::Equal => {
129                        pending_watermark = Some(pending.remove().watermark);
130                        next_checkpoint += 1;
131                    }
132
133                    // Next pending checkpoint is in the past. Out of order watermarks can
134                    // be encountered when a pipeline is starting up, because ingestion
135                    // must start at the lowest checkpoint across all pipelines, or because
136                    // of a backfill, where the initial checkpoint has been overridden.
137                    Ordering::Greater => {
138                        // Track how many we see to make sure it doesn't grow without bound.
139                        metrics
140                            .total_watermarks_out_of_order
141                            .with_label_values(&[H::NAME])
142                            .inc();
143
144                        pending.remove();
145                    }
146                }
147            }
148
149            let elapsed = guard.stop_and_record();
150
151            if let Some(ref watermark) = pending_watermark {
152                metrics
153                    .watermark_epoch
154                    .with_label_values(&[H::NAME])
155                    .set(watermark.epoch_hi_inclusive as i64);
156
157                metrics
158                    .watermark_checkpoint
159                    .with_label_values(&[H::NAME])
160                    .set(watermark.checkpoint_hi_inclusive as i64);
161
162                metrics
163                    .watermark_transaction
164                    .with_label_values(&[H::NAME])
165                    .set(watermark.tx_hi as i64);
166
167                metrics
168                    .watermark_timestamp_ms
169                    .with_label_values(&[H::NAME])
170                    .set(watermark.timestamp_ms_hi_inclusive as i64);
171
172                debug!(
173                    pipeline = H::NAME,
174                    elapsed_ms = elapsed * 1000.0,
175                    watermark = watermark.checkpoint_hi_inclusive,
176                    timestamp = %watermark.timestamp(),
177                    pending = precommitted.len(),
178                    "Gathered watermarks",
179                );
180            }
181
182            if precommitted.len() > WARN_PENDING_WATERMARKS {
183                warn!(
184                    pipeline = H::NAME,
185                    pending = precommitted.len(),
186                    "Pipeline has a large number of pending commit watermarks",
187                );
188            }
189
190            // DB writes are deferred to the timer interval to avoid excessive DB load.
191            if should_write_db
192                && let Some(watermark) = pending_watermark.take()
193                && write_watermark::<H>(
194                    &store,
195                    &pipeline_task,
196                    &watermark,
197                    &mut logger,
198                    &checkpoint_lag_reporter,
199                    &metrics,
200                )
201                .await
202                .is_err()
203            {
204                pending_watermark = Some(watermark);
205            }
206
207            if rx.is_closed() && rx.is_empty() {
208                info!(pipeline = H::NAME, "Committer closed channel");
209                break;
210            }
211        }
212
213        if let Some(watermark) = pending_watermark
214            && write_watermark::<H>(
215                &store,
216                &pipeline_task,
217                &watermark,
218                &mut logger,
219                &checkpoint_lag_reporter,
220                &metrics,
221            )
222            .await
223            .is_err()
224        {
225            warn!(
226                pipeline = H::NAME,
227                ?watermark,
228                "Failed to write final watermark on shutdown, will not retry",
229            );
230        }
231
232        info!(pipeline = H::NAME, "Stopping committer watermark task");
233        Ok(())
234    })
235}
236
237/// Write the watermark to DB and update metrics. Returns `Err` on failure so the caller can
238/// preserve the watermark for retry on the next tick.
239async fn write_watermark<H: Handler>(
240    store: &H::Store,
241    pipeline_task: &str,
242    watermark: &CommitterWatermark,
243    logger: &mut WatermarkLogger,
244    checkpoint_lag_reporter: &CheckpointLagMetricReporter,
245    metrics: &IndexerMetrics,
246) -> Result<(), ()> {
247    let Ok(mut conn) = store.connect().await else {
248        warn!(
249            pipeline = H::NAME,
250            "Commit watermark task failed to get connection for DB"
251        );
252        return Err(());
253    };
254
255    let guard = metrics
256        .watermark_commit_latency
257        .with_label_values(&[H::NAME])
258        .start_timer();
259
260    // TODO: If initial_watermark is empty, when we update watermark
261    // for the first time, we should also update the low watermark.
262    match conn
263        .set_committer_watermark(pipeline_task, *watermark)
264        .await
265    {
266        Err(e) => {
267            let elapsed = guard.stop_and_record();
268            error!(
269                pipeline = H::NAME,
270                elapsed_ms = elapsed * 1000.0,
271                ?watermark,
272                "Error updating commit watermark: {e}",
273            );
274            Err(())
275        }
276
277        Ok(true) => {
278            let elapsed = guard.stop_and_record();
279
280            logger.log::<H>(watermark, elapsed);
281
282            checkpoint_lag_reporter.report_lag(
283                watermark.checkpoint_hi_inclusive,
284                watermark.timestamp_ms_hi_inclusive,
285            );
286
287            metrics
288                .watermark_epoch_in_db
289                .with_label_values(&[H::NAME])
290                .set(watermark.epoch_hi_inclusive as i64);
291
292            metrics
293                .watermark_transaction_in_db
294                .with_label_values(&[H::NAME])
295                .set(watermark.tx_hi as i64);
296
297            metrics
298                .watermark_timestamp_in_db_ms
299                .with_label_values(&[H::NAME])
300                .set(watermark.timestamp_ms_hi_inclusive as i64);
301
302            Ok(())
303        }
304        Ok(false) => Ok(()),
305    }
306}
307
308#[cfg(test)]
309mod tests {
310    use std::sync::Arc;
311    use std::time::Duration;
312
313    use async_trait::async_trait;
314    use sui_types::full_checkpoint_content::Checkpoint;
315    use tokio::sync::mpsc;
316
317    use crate::FieldCount;
318    use crate::metrics::IndexerMetrics;
319    use crate::mocks::store::*;
320    use crate::pipeline::CommitterConfig;
321    use crate::pipeline::Processor;
322    use crate::pipeline::WatermarkPart;
323    use crate::pipeline::concurrent::BatchStatus;
324    use crate::store::CommitterWatermark;
325
326    use super::*;
327
328    #[derive(Clone, FieldCount)]
329    pub struct StoredData;
330
331    pub struct DataPipeline;
332
333    #[async_trait]
334    impl Processor for DataPipeline {
335        const NAME: &'static str = "data";
336        type Value = StoredData;
337
338        async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
339            Ok(vec![])
340        }
341    }
342
343    #[async_trait]
344    impl Handler for DataPipeline {
345        type Store = MockStore;
346        type Batch = Vec<Self::Value>;
347
348        fn batch(
349            &self,
350            batch: &mut Self::Batch,
351            values: &mut std::vec::IntoIter<Self::Value>,
352        ) -> BatchStatus {
353            batch.extend(values);
354            BatchStatus::Pending
355        }
356
357        async fn commit<'a>(
358            &self,
359            _batch: &Self::Batch,
360            _conn: &mut MockConnection<'a>,
361        ) -> anyhow::Result<usize> {
362            Ok(0)
363        }
364    }
365
366    struct TestSetup {
367        store: MockStore,
368        watermark_tx: mpsc::Sender<Vec<WatermarkPart>>,
369        #[allow(unused)]
370        commit_watermark: Service,
371    }
372
373    fn setup_test<H: Handler<Store = MockStore> + 'static>(
374        config: CommitterConfig,
375        next_checkpoint: u64,
376        store: MockStore,
377    ) -> TestSetup {
378        let (watermark_tx, watermark_rx) = mpsc::channel(100);
379        let metrics = IndexerMetrics::new(None, &Default::default());
380
381        let store_clone = store.clone();
382
383        let commit_watermark = commit_watermark::<H>(
384            next_checkpoint,
385            config,
386            watermark_rx,
387            store_clone,
388            None,
389            metrics,
390        );
391
392        TestSetup {
393            store,
394            watermark_tx,
395            commit_watermark,
396        }
397    }
398
399    fn create_watermark_part_for_checkpoint(checkpoint: u64) -> WatermarkPart {
400        WatermarkPart {
401            watermark: CommitterWatermark {
402                checkpoint_hi_inclusive: checkpoint,
403                ..Default::default()
404            },
405            batch_rows: 1,
406            total_rows: 1,
407        }
408    }
409
410    #[tokio::test]
411    async fn test_basic_watermark_progression() {
412        let config = CommitterConfig::default();
413        let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
414
415        // Send watermark parts in order
416        for cp in 1..4 {
417            let part = create_watermark_part_for_checkpoint(cp);
418            setup.watermark_tx.send(vec![part]).await.unwrap();
419        }
420
421        // Wait for processing
422        tokio::time::sleep(Duration::from_millis(100)).await;
423
424        // Verify watermark progression
425        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
426        assert_eq!(watermark.checkpoint_hi_inclusive, 3);
427    }
428
429    #[tokio::test]
430    async fn test_out_of_order_watermarks() {
431        let config = CommitterConfig::default();
432        let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
433
434        // Send watermark parts out of order
435        let parts = vec![
436            create_watermark_part_for_checkpoint(4),
437            create_watermark_part_for_checkpoint(2),
438            create_watermark_part_for_checkpoint(1),
439        ];
440        setup.watermark_tx.send(parts).await.unwrap();
441
442        // Wait for processing
443        tokio::time::sleep(Duration::from_millis(100)).await;
444
445        // Verify watermark hasn't progressed past 2
446        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
447        assert_eq!(watermark.checkpoint_hi_inclusive, 2);
448
449        // Send checkpoint 3 to fill the gap
450        setup
451            .watermark_tx
452            .send(vec![create_watermark_part_for_checkpoint(3)])
453            .await
454            .unwrap();
455
456        // Wait for the next polling and processing
457        tokio::time::sleep(Duration::from_secs(1)).await;
458
459        // Verify watermark has progressed to 4
460        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
461        assert_eq!(watermark.checkpoint_hi_inclusive, 4);
462    }
463
464    #[tokio::test]
465    async fn test_watermark_with_connection_failure() {
466        let config = CommitterConfig {
467            watermark_interval_ms: 1_000, // Long polling interval to test connection retry
468            ..Default::default()
469        };
470        let store = MockStore::default().with_connection_failures(1);
471        let setup = setup_test::<DataPipeline>(config, 1, store);
472
473        // Send watermark part
474        let part = create_watermark_part_for_checkpoint(1);
475        setup.watermark_tx.send(vec![part]).await.unwrap();
476
477        // Wait for processing
478        tokio::time::sleep(Duration::from_millis(200)).await;
479
480        // Verify watermark hasn't progressed
481        let watermark = setup.store.watermark(DataPipeline::NAME);
482        assert!(watermark.is_none());
483
484        // Wait for next polling and processing
485        tokio::time::sleep(Duration::from_millis(1_200)).await;
486
487        // Verify watermark has progressed
488        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
489        assert_eq!(watermark.checkpoint_hi_inclusive, 1);
490    }
491
492    #[tokio::test]
493    async fn test_committer_retries_on_commit_watermark_failure() {
494        let config = CommitterConfig {
495            watermark_interval_ms: 1_000, // Long polling interval to test connection retry
496            ..Default::default()
497        };
498        // Create store with transaction failure configuration
499        let store = MockStore::default().with_commit_watermark_failures(1); // Will fail once before succeeding
500        let setup = setup_test::<DataPipeline>(config, 10, store);
501
502        let part = WatermarkPart {
503            watermark: CommitterWatermark {
504                checkpoint_hi_inclusive: 10,
505                ..Default::default()
506            },
507            batch_rows: 1,
508            total_rows: 1,
509        };
510        setup.watermark_tx.send(vec![part]).await.unwrap();
511
512        // Wait for initial poll to be over
513        tokio::time::sleep(Duration::from_millis(200)).await;
514        let watermark = setup.store.watermark(DataPipeline::NAME);
515        assert!(watermark.is_none());
516
517        // Wait for retries to complete
518        tokio::time::sleep(Duration::from_millis(1_200)).await;
519
520        // Verify watermark has progressed after retry
521        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
522        assert_eq!(watermark.checkpoint_hi_inclusive, 10);
523    }
524
525    #[tokio::test]
526    async fn test_committer_retries_on_commit_watermark_failure_advances() {
527        let config = CommitterConfig {
528            watermark_interval_ms: 1_000, // Long polling interval to test connection retry
529            ..Default::default()          // Create store with transaction failure configuration
530        };
531        let store = MockStore::default().with_commit_watermark_failures(1); // Will fail once before succeeding
532        let setup = setup_test::<DataPipeline>(config, 10, store);
533
534        let part = WatermarkPart {
535            watermark: CommitterWatermark {
536                checkpoint_hi_inclusive: 10,
537                ..Default::default()
538            },
539            batch_rows: 1,
540            total_rows: 1,
541        };
542        setup.watermark_tx.send(vec![part]).await.unwrap();
543
544        // Wait for initial poll to be over
545        tokio::time::sleep(Duration::from_millis(200)).await;
546        let watermark = setup.store.watermark(DataPipeline::NAME);
547        assert!(watermark.is_none());
548
549        let part = WatermarkPart {
550            watermark: CommitterWatermark {
551                checkpoint_hi_inclusive: 11,
552                ..Default::default()
553            },
554            batch_rows: 1,
555            total_rows: 1,
556        };
557        setup.watermark_tx.send(vec![part]).await.unwrap();
558
559        // Wait for retries to complete
560        tokio::time::sleep(Duration::from_millis(1_200)).await;
561
562        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
563        assert_eq!(watermark.checkpoint_hi_inclusive, 11);
564    }
565
566    #[tokio::test]
567    async fn test_incomplete_watermark() {
568        let config = CommitterConfig {
569            watermark_interval_ms: 1_000, // Long polling interval to test adding complete part
570            ..Default::default()
571        };
572        let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
573
574        // Send the first incomplete watermark part
575        let part = WatermarkPart {
576            watermark: CommitterWatermark {
577                checkpoint_hi_inclusive: 1,
578                ..Default::default()
579            },
580            batch_rows: 1,
581            total_rows: 3,
582        };
583        setup.watermark_tx.send(vec![part.clone()]).await.unwrap();
584
585        // Wait for processing
586        tokio::time::sleep(Duration::from_millis(200)).await;
587
588        // Verify watermark hasn't progressed
589        let watermark = setup.store.watermark(DataPipeline::NAME);
590        assert!(watermark.is_none());
591
592        // Send the other two parts to complete the watermark
593        setup
594            .watermark_tx
595            .send(vec![part.clone(), part.clone()])
596            .await
597            .unwrap();
598
599        // Wait for next polling and processing
600        tokio::time::sleep(Duration::from_millis(1_200)).await;
601
602        // Verify watermark has progressed
603        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
604        assert_eq!(watermark.checkpoint_hi_inclusive, 1);
605    }
606
607    #[tokio::test]
608    async fn test_no_initial_watermark() {
609        let config = CommitterConfig::default();
610        let setup = setup_test::<DataPipeline>(config, 0, MockStore::default());
611
612        // Send the checkpoint 1 watermark
613        setup
614            .watermark_tx
615            .send(vec![create_watermark_part_for_checkpoint(1)])
616            .await
617            .unwrap();
618
619        // Wait for processing
620        tokio::time::sleep(Duration::from_millis(200)).await;
621
622        // Verify watermark hasn't progressed
623        let watermark = setup.store.watermark(DataPipeline::NAME);
624        assert!(watermark.is_none());
625
626        // Send the checkpoint 0 watermark to fill the gap.
627        setup
628            .watermark_tx
629            .send(vec![create_watermark_part_for_checkpoint(0)])
630            .await
631            .unwrap();
632
633        // Wait for processing
634        tokio::time::sleep(Duration::from_millis(1200)).await;
635
636        // Verify watermark has progressed
637        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
638        assert_eq!(watermark.checkpoint_hi_inclusive, 1);
639    }
640}