sui_indexer_alt_framework/pipeline/concurrent/
commit_watermark.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::cmp::Ordering;
5use std::collections::BTreeMap;
6use std::collections::btree_map::Entry;
7use std::sync::Arc;
8
9use sui_futures::service::Service;
10use tokio::sync::mpsc;
11use tracing::debug;
12use tracing::error;
13use tracing::info;
14use tracing::warn;
15
16use crate::metrics::CheckpointLagMetricReporter;
17use crate::metrics::IndexerMetrics;
18use crate::pipeline::CommitterConfig;
19use crate::pipeline::WARN_PENDING_WATERMARKS;
20use crate::pipeline::WatermarkPart;
21use crate::pipeline::concurrent::Handler;
22use crate::pipeline::logging::WatermarkLogger;
23use crate::store::CommitterWatermark;
24use crate::store::Connection;
25use crate::store::Store;
26use crate::store::pipeline_task;
27
28/// The watermark task is responsible for keeping track of a pipeline's out-of-order commits and
29/// updating its row in the `watermarks` table when a continuous run of checkpoints have landed
30/// since the last watermark update.
31///
32/// It receives watermark "parts" that detail the proportion of each checkpoint's data that has been
33/// written out by the committer and periodically (on a configurable interval) checks if the
34/// watermark for the pipeline can be pushed forward. The watermark can be pushed forward if there
35/// is one or more complete (all data for that checkpoint written out) watermarks spanning
36/// contiguously from the current high watermark into the future.
37///
38/// If it detects that more than [WARN_PENDING_WATERMARKS] watermarks have built up, it will issue a
39/// warning, as this could be the indication of a memory leak, and the caller probably intended to
40/// run the indexer with watermarking disabled (e.g. if they are running a backfill).
41///
42/// The task regularly traces its progress, outputting at a higher log level every
43/// [LOUD_WATERMARK_UPDATE_INTERVAL]-many checkpoints.
44///
45/// The task will shutdown if the `rx` channel closes and the watermark cannot be progressed.
46pub(super) fn commit_watermark<H: Handler + 'static>(
47    mut next_checkpoint: u64,
48    config: CommitterConfig,
49    mut rx: mpsc::Receiver<Vec<WatermarkPart>>,
50    commit_hi_tx: mpsc::UnboundedSender<(&'static str, u64)>,
51    store: H::Store,
52    task: Option<String>,
53    metrics: Arc<IndexerMetrics>,
54) -> Service {
55    // SAFETY: on indexer instantiation, we've checked that the pipeline name is valid.
56    let pipeline_task = pipeline_task::<H::Store>(H::NAME, task.as_deref()).unwrap();
57    Service::new().spawn_aborting(async move {
58        // To correctly update the watermark, the task tracks the watermark it last tried to write
59        // and the watermark parts for any checkpoints that have been written since then
60        // ("pre-committed"). After each batch is written, the task will try to progress the
61        // watermark as much as possible without going over any holes in the sequence of
62        // checkpoints (entirely missing watermarks, or incomplete watermarks).
63        let mut precommitted: BTreeMap<u64, WatermarkPart> = BTreeMap::new();
64
65        // The watermark task will periodically output a log message at a higher log level to
66        // demonstrate that the pipeline is making progress.
67        let mut logger = WatermarkLogger::new("concurrent_committer");
68
69        let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
70            &metrics.watermarked_checkpoint_timestamp_lag,
71            &metrics.latest_watermarked_checkpoint_timestamp_lag_ms,
72            &metrics.watermark_checkpoint_in_db,
73        );
74
75        info!(
76            pipeline = H::NAME,
77            next_checkpoint, "Starting commit watermark task"
78        );
79
80        let mut next_wake = tokio::time::Instant::now();
81        let mut pending_watermark = None;
82
83        loop {
84            let mut should_write_db = false;
85
86            tokio::select! {
87                () = tokio::time::sleep_until(next_wake) => {
88                    // Schedule next wake immediately, so the timer effectively runs in parallel
89                    // with the commit logic below.
90                    next_wake = config.watermark_interval_with_jitter();
91                    should_write_db = true;
92                }
93                Some(parts) = rx.recv() => {
94                    for part in parts {
95                        match precommitted.entry(part.checkpoint()) {
96                            Entry::Vacant(entry) => {
97                                entry.insert(part);
98                            }
99
100                            Entry::Occupied(mut entry) => {
101                                entry.get_mut().add(part);
102                            }
103                        }
104                    }
105                }
106            }
107
108            // Advance the watermark through contiguous precommitted entries on every
109            // iteration, not just when the DB write timer fires. This ensures commit_hi
110            // feedback reaches the broadcaster immediately.
111            let guard = metrics
112                .watermark_gather_latency
113                .with_label_values(&[H::NAME])
114                .start_timer();
115
116            while let Some(pending) = precommitted.first_entry() {
117                let part = pending.get();
118
119                // Some rows from the next watermark have not landed yet.
120                if !part.is_complete() {
121                    break;
122                }
123
124                match next_checkpoint.cmp(&part.watermark.checkpoint_hi_inclusive) {
125                    // Next pending checkpoint is from the future.
126                    Ordering::Less => break,
127
128                    // This is the next checkpoint -- include it.
129                    Ordering::Equal => {
130                        pending_watermark = Some(pending.remove().watermark);
131                        next_checkpoint += 1;
132                    }
133
134                    // Next pending checkpoint is in the past. Out of order watermarks can
135                    // be encountered when a pipeline is starting up, because ingestion
136                    // must start at the lowest checkpoint across all pipelines, or because
137                    // of a backfill, where the initial checkpoint has been overridden.
138                    Ordering::Greater => {
139                        // Track how many we see to make sure it doesn't grow without bound.
140                        metrics
141                            .total_watermarks_out_of_order
142                            .with_label_values(&[H::NAME])
143                            .inc();
144
145                        pending.remove();
146                    }
147                }
148            }
149
150            let elapsed = guard.stop_and_record();
151
152            if let Some(ref watermark) = pending_watermark {
153                let _ = commit_hi_tx.send((H::NAME, next_checkpoint));
154
155                metrics
156                    .watermark_epoch
157                    .with_label_values(&[H::NAME])
158                    .set(watermark.epoch_hi_inclusive as i64);
159
160                metrics
161                    .watermark_checkpoint
162                    .with_label_values(&[H::NAME])
163                    .set(watermark.checkpoint_hi_inclusive as i64);
164
165                metrics
166                    .watermark_transaction
167                    .with_label_values(&[H::NAME])
168                    .set(watermark.tx_hi as i64);
169
170                metrics
171                    .watermark_timestamp_ms
172                    .with_label_values(&[H::NAME])
173                    .set(watermark.timestamp_ms_hi_inclusive as i64);
174
175                debug!(
176                    pipeline = H::NAME,
177                    elapsed_ms = elapsed * 1000.0,
178                    watermark = watermark.checkpoint_hi_inclusive,
179                    timestamp = %watermark.timestamp(),
180                    pending = precommitted.len(),
181                    "Gathered watermarks",
182                );
183            }
184
185            if precommitted.len() > WARN_PENDING_WATERMARKS {
186                warn!(
187                    pipeline = H::NAME,
188                    pending = precommitted.len(),
189                    "Pipeline has a large number of pending commit watermarks",
190                );
191            }
192
193            // DB writes are deferred to the timer interval to avoid excessive DB load.
194            if should_write_db
195                && let Some(watermark) = pending_watermark.take()
196                && write_watermark::<H>(
197                    &store,
198                    &pipeline_task,
199                    &watermark,
200                    &mut logger,
201                    &checkpoint_lag_reporter,
202                    &metrics,
203                )
204                .await
205                .is_err()
206            {
207                pending_watermark = Some(watermark);
208            }
209
210            if rx.is_closed() && rx.is_empty() {
211                info!(pipeline = H::NAME, "Committer closed channel");
212                break;
213            }
214        }
215
216        if let Some(watermark) = pending_watermark
217            && write_watermark::<H>(
218                &store,
219                &pipeline_task,
220                &watermark,
221                &mut logger,
222                &checkpoint_lag_reporter,
223                &metrics,
224            )
225            .await
226            .is_err()
227        {
228            warn!(
229                pipeline = H::NAME,
230                ?watermark,
231                "Failed to write final watermark on shutdown, will not retry",
232            );
233        }
234
235        info!(pipeline = H::NAME, "Stopping committer watermark task");
236        Ok(())
237    })
238}
239
240/// Write the watermark to DB and update metrics. Returns `Err` on failure so the caller can
241/// preserve the watermark for retry on the next tick.
242async fn write_watermark<H: Handler>(
243    store: &H::Store,
244    pipeline_task: &str,
245    watermark: &CommitterWatermark,
246    logger: &mut WatermarkLogger,
247    checkpoint_lag_reporter: &CheckpointLagMetricReporter,
248    metrics: &IndexerMetrics,
249) -> Result<(), ()> {
250    let Ok(mut conn) = store.connect().await else {
251        warn!(
252            pipeline = H::NAME,
253            "Commit watermark task failed to get connection for DB"
254        );
255        return Err(());
256    };
257
258    let guard = metrics
259        .watermark_commit_latency
260        .with_label_values(&[H::NAME])
261        .start_timer();
262
263    // TODO: If initial_watermark is empty, when we update watermark
264    // for the first time, we should also update the low watermark.
265    match conn
266        .set_committer_watermark(pipeline_task, *watermark)
267        .await
268    {
269        Err(e) => {
270            let elapsed = guard.stop_and_record();
271            error!(
272                pipeline = H::NAME,
273                elapsed_ms = elapsed * 1000.0,
274                ?watermark,
275                "Error updating commit watermark: {e}",
276            );
277            Err(())
278        }
279
280        Ok(true) => {
281            let elapsed = guard.stop_and_record();
282
283            logger.log::<H>(watermark, elapsed);
284
285            checkpoint_lag_reporter.report_lag(
286                watermark.checkpoint_hi_inclusive,
287                watermark.timestamp_ms_hi_inclusive,
288            );
289
290            metrics
291                .watermark_epoch_in_db
292                .with_label_values(&[H::NAME])
293                .set(watermark.epoch_hi_inclusive as i64);
294
295            metrics
296                .watermark_transaction_in_db
297                .with_label_values(&[H::NAME])
298                .set(watermark.tx_hi as i64);
299
300            metrics
301                .watermark_timestamp_in_db_ms
302                .with_label_values(&[H::NAME])
303                .set(watermark.timestamp_ms_hi_inclusive as i64);
304
305            Ok(())
306        }
307        Ok(false) => Ok(()),
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use std::sync::Arc;
314    use std::time::Duration;
315
316    use async_trait::async_trait;
317    use sui_types::full_checkpoint_content::Checkpoint;
318    use tokio::sync::mpsc;
319
320    use crate::FieldCount;
321    use crate::metrics::IndexerMetrics;
322    use crate::mocks::store::*;
323    use crate::pipeline::CommitterConfig;
324    use crate::pipeline::Processor;
325    use crate::pipeline::WatermarkPart;
326    use crate::pipeline::concurrent::BatchStatus;
327    use crate::store::CommitterWatermark;
328
329    use super::*;
330
331    #[derive(Clone, FieldCount)]
332    pub struct StoredData;
333
334    pub struct DataPipeline;
335
336    #[async_trait]
337    impl Processor for DataPipeline {
338        const NAME: &'static str = "data";
339        type Value = StoredData;
340
341        async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
342            Ok(vec![])
343        }
344    }
345
346    #[async_trait]
347    impl Handler for DataPipeline {
348        type Store = MockStore;
349        type Batch = Vec<Self::Value>;
350
351        fn batch(
352            &self,
353            batch: &mut Self::Batch,
354            values: &mut std::vec::IntoIter<Self::Value>,
355        ) -> BatchStatus {
356            batch.extend(values);
357            BatchStatus::Pending
358        }
359
360        async fn commit<'a>(
361            &self,
362            _batch: &Self::Batch,
363            _conn: &mut MockConnection<'a>,
364        ) -> anyhow::Result<usize> {
365            Ok(0)
366        }
367    }
368
369    struct TestSetup {
370        store: MockStore,
371        watermark_tx: mpsc::Sender<Vec<WatermarkPart>>,
372        #[allow(unused)]
373        commit_watermark: Service,
374    }
375
376    fn setup_test<H: Handler<Store = MockStore> + 'static>(
377        config: CommitterConfig,
378        next_checkpoint: u64,
379        store: MockStore,
380    ) -> TestSetup {
381        let (watermark_tx, watermark_rx) = mpsc::channel(100);
382        #[allow(clippy::disallowed_methods)]
383        let (commit_hi_tx, _commit_hi_rx) = mpsc::unbounded_channel();
384        let metrics = IndexerMetrics::new(None, &Default::default());
385
386        let store_clone = store.clone();
387
388        let commit_watermark = commit_watermark::<H>(
389            next_checkpoint,
390            config,
391            watermark_rx,
392            commit_hi_tx,
393            store_clone,
394            None,
395            metrics,
396        );
397
398        TestSetup {
399            store,
400            watermark_tx,
401            commit_watermark,
402        }
403    }
404
405    fn create_watermark_part_for_checkpoint(checkpoint: u64) -> WatermarkPart {
406        WatermarkPart {
407            watermark: CommitterWatermark {
408                checkpoint_hi_inclusive: checkpoint,
409                ..Default::default()
410            },
411            batch_rows: 1,
412            total_rows: 1,
413        }
414    }
415
416    #[tokio::test]
417    async fn test_basic_watermark_progression() {
418        let config = CommitterConfig::default();
419        let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
420
421        // Send watermark parts in order
422        for cp in 1..4 {
423            let part = create_watermark_part_for_checkpoint(cp);
424            setup.watermark_tx.send(vec![part]).await.unwrap();
425        }
426
427        // Wait for processing
428        tokio::time::sleep(Duration::from_millis(100)).await;
429
430        // Verify watermark progression
431        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
432        assert_eq!(watermark.checkpoint_hi_inclusive, 3);
433    }
434
435    #[tokio::test]
436    async fn test_out_of_order_watermarks() {
437        let config = CommitterConfig::default();
438        let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
439
440        // Send watermark parts out of order
441        let parts = vec![
442            create_watermark_part_for_checkpoint(4),
443            create_watermark_part_for_checkpoint(2),
444            create_watermark_part_for_checkpoint(1),
445        ];
446        setup.watermark_tx.send(parts).await.unwrap();
447
448        // Wait for processing
449        tokio::time::sleep(Duration::from_millis(100)).await;
450
451        // Verify watermark hasn't progressed past 2
452        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
453        assert_eq!(watermark.checkpoint_hi_inclusive, 2);
454
455        // Send checkpoint 3 to fill the gap
456        setup
457            .watermark_tx
458            .send(vec![create_watermark_part_for_checkpoint(3)])
459            .await
460            .unwrap();
461
462        // Wait for the next polling and processing
463        tokio::time::sleep(Duration::from_secs(1)).await;
464
465        // Verify watermark has progressed to 4
466        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
467        assert_eq!(watermark.checkpoint_hi_inclusive, 4);
468    }
469
470    #[tokio::test]
471    async fn test_watermark_with_connection_failure() {
472        let config = CommitterConfig {
473            watermark_interval_ms: 1_000, // Long polling interval to test connection retry
474            ..Default::default()
475        };
476        let store = MockStore::default().with_connection_failures(1);
477        let setup = setup_test::<DataPipeline>(config, 1, store);
478
479        // Send watermark part
480        let part = create_watermark_part_for_checkpoint(1);
481        setup.watermark_tx.send(vec![part]).await.unwrap();
482
483        // Wait for processing
484        tokio::time::sleep(Duration::from_millis(200)).await;
485
486        // Verify watermark hasn't progressed
487        let watermark = setup.store.watermark(DataPipeline::NAME);
488        assert!(watermark.is_none());
489
490        // Wait for next polling and processing
491        tokio::time::sleep(Duration::from_millis(1_200)).await;
492
493        // Verify watermark has progressed
494        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
495        assert_eq!(watermark.checkpoint_hi_inclusive, 1);
496    }
497
498    #[tokio::test]
499    async fn test_committer_retries_on_commit_watermark_failure() {
500        let config = CommitterConfig {
501            watermark_interval_ms: 1_000, // Long polling interval to test connection retry
502            ..Default::default()
503        };
504        // Create store with transaction failure configuration
505        let store = MockStore::default().with_commit_watermark_failures(1); // Will fail once before succeeding
506        let setup = setup_test::<DataPipeline>(config, 10, store);
507
508        let part = WatermarkPart {
509            watermark: CommitterWatermark {
510                checkpoint_hi_inclusive: 10,
511                ..Default::default()
512            },
513            batch_rows: 1,
514            total_rows: 1,
515        };
516        setup.watermark_tx.send(vec![part]).await.unwrap();
517
518        // Wait for initial poll to be over
519        tokio::time::sleep(Duration::from_millis(200)).await;
520        let watermark = setup.store.watermark(DataPipeline::NAME);
521        assert!(watermark.is_none());
522
523        // Wait for retries to complete
524        tokio::time::sleep(Duration::from_millis(1_200)).await;
525
526        // Verify watermark has progressed after retry
527        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
528        assert_eq!(watermark.checkpoint_hi_inclusive, 10);
529    }
530
531    #[tokio::test]
532    async fn test_committer_retries_on_commit_watermark_failure_advances() {
533        let config = CommitterConfig {
534            watermark_interval_ms: 1_000, // Long polling interval to test connection retry
535            ..Default::default()          // Create store with transaction failure configuration
536        };
537        let store = MockStore::default().with_commit_watermark_failures(1); // Will fail once before succeeding
538        let setup = setup_test::<DataPipeline>(config, 10, store);
539
540        let part = WatermarkPart {
541            watermark: CommitterWatermark {
542                checkpoint_hi_inclusive: 10,
543                ..Default::default()
544            },
545            batch_rows: 1,
546            total_rows: 1,
547        };
548        setup.watermark_tx.send(vec![part]).await.unwrap();
549
550        // Wait for initial poll to be over
551        tokio::time::sleep(Duration::from_millis(200)).await;
552        let watermark = setup.store.watermark(DataPipeline::NAME);
553        assert!(watermark.is_none());
554
555        let part = WatermarkPart {
556            watermark: CommitterWatermark {
557                checkpoint_hi_inclusive: 11,
558                ..Default::default()
559            },
560            batch_rows: 1,
561            total_rows: 1,
562        };
563        setup.watermark_tx.send(vec![part]).await.unwrap();
564
565        // Wait for retries to complete
566        tokio::time::sleep(Duration::from_millis(1_200)).await;
567
568        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
569        assert_eq!(watermark.checkpoint_hi_inclusive, 11);
570    }
571
572    #[tokio::test]
573    async fn test_incomplete_watermark() {
574        let config = CommitterConfig {
575            watermark_interval_ms: 1_000, // Long polling interval to test adding complete part
576            ..Default::default()
577        };
578        let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
579
580        // Send the first incomplete watermark part
581        let part = WatermarkPart {
582            watermark: CommitterWatermark {
583                checkpoint_hi_inclusive: 1,
584                ..Default::default()
585            },
586            batch_rows: 1,
587            total_rows: 3,
588        };
589        setup.watermark_tx.send(vec![part.clone()]).await.unwrap();
590
591        // Wait for processing
592        tokio::time::sleep(Duration::from_millis(200)).await;
593
594        // Verify watermark hasn't progressed
595        let watermark = setup.store.watermark(DataPipeline::NAME);
596        assert!(watermark.is_none());
597
598        // Send the other two parts to complete the watermark
599        setup
600            .watermark_tx
601            .send(vec![part.clone(), part.clone()])
602            .await
603            .unwrap();
604
605        // Wait for next polling and processing
606        tokio::time::sleep(Duration::from_millis(1_200)).await;
607
608        // Verify watermark has progressed
609        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
610        assert_eq!(watermark.checkpoint_hi_inclusive, 1);
611    }
612
613    #[tokio::test]
614    async fn test_no_initial_watermark() {
615        let config = CommitterConfig::default();
616        let setup = setup_test::<DataPipeline>(config, 0, MockStore::default());
617
618        // Send the checkpoint 1 watermark
619        setup
620            .watermark_tx
621            .send(vec![create_watermark_part_for_checkpoint(1)])
622            .await
623            .unwrap();
624
625        // Wait for processing
626        tokio::time::sleep(Duration::from_millis(200)).await;
627
628        // Verify watermark hasn't progressed
629        let watermark = setup.store.watermark(DataPipeline::NAME);
630        assert!(watermark.is_none());
631
632        // Send the checkpoint 0 watermark to fill the gap.
633        setup
634            .watermark_tx
635            .send(vec![create_watermark_part_for_checkpoint(0)])
636            .await
637            .unwrap();
638
639        // Wait for processing
640        tokio::time::sleep(Duration::from_millis(1200)).await;
641
642        // Verify watermark has progressed
643        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
644        assert_eq!(watermark.checkpoint_hi_inclusive, 1);
645    }
646}