sui_indexer_alt_framework/pipeline/concurrent/
commit_watermark.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::cmp::Ordering;
5use std::collections::BTreeMap;
6use std::collections::btree_map::Entry;
7use std::sync::Arc;
8
9use sui_futures::service::Service;
10use tokio::sync::mpsc;
11use tracing::debug;
12use tracing::error;
13use tracing::info;
14use tracing::warn;
15
16use crate::metrics::CheckpointLagMetricReporter;
17use crate::metrics::IndexerMetrics;
18use crate::pipeline::CommitterConfig;
19use crate::pipeline::WARN_PENDING_WATERMARKS;
20use crate::pipeline::WatermarkPart;
21use crate::pipeline::concurrent::Handler;
22use crate::pipeline::logging::WatermarkLogger;
23use crate::store::CommitterWatermark;
24use crate::store::Connection;
25use crate::store::Store;
26use crate::store::pipeline_task;
27
28/// The watermark task is responsible for keeping track of a pipeline's out-of-order commits and
29/// updating its row in the `watermarks` table when a continuous run of checkpoints have landed
30/// since the last watermark update.
31///
32/// It receives watermark "parts" that detail the proportion of each checkpoint's data that has been
33/// written out by the committer and periodically (on a configurable interval) checks if the
34/// watermark for the pipeline can be pushed forward. The watermark can be pushed forward if there
35/// is one or more complete (all data for that checkpoint written out) watermarks spanning
36/// contiguously from the current high watermark into the future.
37///
38/// If it detects that more than [WARN_PENDING_WATERMARKS] watermarks have built up, it will issue a
39/// warning, as this could be the indication of a memory leak, and the caller probably intended to
40/// run the indexer with watermarking disabled (e.g. if they are running a backfill).
41///
42/// The task regularly traces its progress, outputting at a higher log level every
43/// [LOUD_WATERMARK_UPDATE_INTERVAL]-many checkpoints.
44///
45/// The task will shutdown if the `rx` channel closes and the watermark cannot be progressed.
46pub(super) fn commit_watermark<H: Handler>(
47    mut next_checkpoint: u64,
48    config: CommitterConfig,
49    mut rx: mpsc::Receiver<Vec<WatermarkPart>>,
50    store: H::Store,
51    task: Option<String>,
52    metrics: Arc<IndexerMetrics>,
53) -> Service {
54    // SAFETY: on indexer instantiation, we've checked that the pipeline name is valid.
55    let pipeline_task = pipeline_task::<H::Store>(H::NAME, task.as_deref()).unwrap();
56    Service::new().spawn_aborting(async move {
57        // To correctly update the watermark, the task tracks the watermark it last tried to write
58        // and the watermark parts for any checkpoints that have been written since then
59        // ("pre-committed"). After each batch is written, the task will try to progress the
60        // watermark as much as possible without going over any holes in the sequence of
61        // checkpoints (entirely missing watermarks, or incomplete watermarks).
62        let mut precommitted: BTreeMap<u64, WatermarkPart> = BTreeMap::new();
63
64        // The watermark task will periodically output a log message at a higher log level to
65        // demonstrate that the pipeline is making progress.
66        let mut logger = WatermarkLogger::new("concurrent_committer");
67
68        let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
69            &metrics.watermarked_checkpoint_timestamp_lag,
70            &metrics.latest_watermarked_checkpoint_timestamp_lag_ms,
71            &metrics.watermark_checkpoint_in_db,
72        );
73
74        info!(
75            pipeline = H::NAME,
76            next_checkpoint, "Starting commit watermark task"
77        );
78
79        let mut next_wake = tokio::time::Instant::now();
80        let mut pending_watermark = None;
81
82        loop {
83            let mut should_write_db = false;
84
85            tokio::select! {
86                () = tokio::time::sleep_until(next_wake) => {
87                    // Schedule next wake immediately, so the timer effectively runs in parallel
88                    // with the commit logic below.
89                    next_wake = config.watermark_interval_with_jitter();
90                    should_write_db = true;
91                }
92                Some(parts) = rx.recv() => {
93                    for part in parts {
94                        match precommitted.entry(part.checkpoint()) {
95                            Entry::Vacant(entry) => {
96                                entry.insert(part);
97                            }
98
99                            Entry::Occupied(mut entry) => {
100                                entry.get_mut().add(part);
101                            }
102                        }
103                    }
104                }
105            }
106
107            // Advance the watermark through contiguous precommitted entries on every
108            // iteration, not just when the DB write timer fires. This ensures commit_hi
109            // feedback reaches the broadcaster immediately.
110            let guard = metrics
111                .watermark_gather_latency
112                .with_label_values(&[H::NAME])
113                .start_timer();
114
115            while let Some(pending) = precommitted.first_entry() {
116                let part = pending.get();
117
118                // Some rows from the next watermark have not landed yet.
119                if !part.is_complete() {
120                    break;
121                }
122
123                match next_checkpoint.cmp(&part.watermark.checkpoint_hi_inclusive) {
124                    // Next pending checkpoint is from the future.
125                    Ordering::Less => break,
126
127                    // This is the next checkpoint -- include it.
128                    Ordering::Equal => {
129                        pending_watermark = Some(pending.remove().watermark);
130                        next_checkpoint += 1;
131                    }
132
133                    // Next pending checkpoint is in the past. Out of order watermarks can
134                    // be encountered when a pipeline is starting up, because ingestion
135                    // must start at the lowest checkpoint across all pipelines, or because
136                    // of a backfill, where the initial checkpoint has been overridden.
137                    Ordering::Greater => {
138                        // Track how many we see to make sure it doesn't grow without bound.
139                        metrics
140                            .total_watermarks_out_of_order
141                            .with_label_values(&[H::NAME])
142                            .inc();
143
144                        pending.remove();
145                    }
146                }
147            }
148
149            let elapsed = guard.stop_and_record();
150
151            if let Some(ref watermark) = pending_watermark {
152                metrics
153                    .watermark_epoch
154                    .with_label_values(&[H::NAME])
155                    .set(watermark.epoch_hi_inclusive as i64);
156
157                metrics
158                    .watermark_checkpoint
159                    .with_label_values(&[H::NAME])
160                    .set(watermark.checkpoint_hi_inclusive as i64);
161
162                metrics
163                    .watermark_transaction
164                    .with_label_values(&[H::NAME])
165                    .set(watermark.tx_hi as i64);
166
167                metrics
168                    .watermark_timestamp_ms
169                    .with_label_values(&[H::NAME])
170                    .set(watermark.timestamp_ms_hi_inclusive as i64);
171
172                debug!(
173                    pipeline = H::NAME,
174                    elapsed_ms = elapsed * 1000.0,
175                    watermark = watermark.checkpoint_hi_inclusive,
176                    timestamp = %watermark.timestamp(),
177                    pending = precommitted.len(),
178                    "Gathered watermarks",
179                );
180            }
181
182            if precommitted.len() > WARN_PENDING_WATERMARKS {
183                warn!(
184                    pipeline = H::NAME,
185                    pending = precommitted.len(),
186                    "Pipeline has a large number of pending commit watermarks",
187                );
188            }
189
190            // DB writes are deferred to the timer interval to avoid excessive DB load.
191            if should_write_db
192                && let Some(watermark) = pending_watermark.take()
193                && write_watermark::<H>(
194                    &store,
195                    &pipeline_task,
196                    &watermark,
197                    &mut logger,
198                    &checkpoint_lag_reporter,
199                    &metrics,
200                )
201                .await
202                .is_err()
203            {
204                pending_watermark = Some(watermark);
205            }
206
207            if rx.is_closed() && rx.is_empty() {
208                info!(pipeline = H::NAME, "Committer closed channel");
209                break;
210            }
211        }
212
213        if let Some(watermark) = pending_watermark
214            && write_watermark::<H>(
215                &store,
216                &pipeline_task,
217                &watermark,
218                &mut logger,
219                &checkpoint_lag_reporter,
220                &metrics,
221            )
222            .await
223            .is_err()
224        {
225            warn!(
226                pipeline = H::NAME,
227                ?watermark,
228                "Failed to write final watermark on shutdown, will not retry",
229            );
230        }
231
232        info!(pipeline = H::NAME, "Stopping committer watermark task");
233        Ok(())
234    })
235}
236
237/// Write the watermark to DB and update metrics. Returns `Err` on failure so the caller can
238/// preserve the watermark for retry on the next tick.
239async fn write_watermark<H: Handler>(
240    store: &H::Store,
241    pipeline_task: &str,
242    watermark: &CommitterWatermark,
243    logger: &mut WatermarkLogger,
244    checkpoint_lag_reporter: &CheckpointLagMetricReporter,
245    metrics: &IndexerMetrics,
246) -> Result<(), ()> {
247    let Ok(mut conn) = store.connect().await else {
248        warn!(
249            pipeline = H::NAME,
250            "Commit watermark task failed to get connection for DB"
251        );
252        return Err(());
253    };
254
255    let guard = metrics
256        .watermark_commit_latency
257        .with_label_values(&[H::NAME])
258        .start_timer();
259
260    // TODO: If initial_watermark is empty, when we update watermark
261    // for the first time, we should also update the low watermark.
262    match conn
263        .set_committer_watermark(pipeline_task, *watermark)
264        .await
265    {
266        Err(e) => {
267            let elapsed = guard.stop_and_record();
268            error!(
269                pipeline = H::NAME,
270                elapsed_ms = elapsed * 1000.0,
271                ?watermark,
272                "Error updating commit watermark: {e}",
273            );
274            Err(())
275        }
276
277        Ok(true) => {
278            let elapsed = guard.stop_and_record();
279
280            logger.log::<H>(watermark, elapsed);
281
282            checkpoint_lag_reporter.report_lag(
283                watermark.checkpoint_hi_inclusive,
284                watermark.timestamp_ms_hi_inclusive,
285            );
286
287            metrics
288                .watermark_epoch_in_db
289                .with_label_values(&[H::NAME])
290                .set(watermark.epoch_hi_inclusive as i64);
291
292            metrics
293                .watermark_transaction_in_db
294                .with_label_values(&[H::NAME])
295                .set(watermark.tx_hi as i64);
296
297            metrics
298                .watermark_timestamp_in_db_ms
299                .with_label_values(&[H::NAME])
300                .set(watermark.timestamp_ms_hi_inclusive as i64);
301
302            Ok(())
303        }
304        Ok(false) => Ok(()),
305    }
306}
307
308#[cfg(test)]
309mod tests {
310    use std::sync::Arc;
311    use std::time::Duration;
312
313    use async_trait::async_trait;
314    use sui_types::full_checkpoint_content::Checkpoint;
315    use tokio::sync::mpsc;
316
317    use crate::FieldCount;
318    use crate::metrics::IndexerMetrics;
319    use crate::mocks::store::FallibleMockConnection;
320    use crate::mocks::store::FallibleMockStore;
321    use crate::pipeline::CommitterConfig;
322    use crate::pipeline::Processor;
323    use crate::pipeline::WatermarkPart;
324    use crate::pipeline::concurrent::BatchStatus;
325    use crate::store::CommitterWatermark;
326
327    use super::*;
328
329    #[derive(Clone, FieldCount)]
330    pub struct StoredData;
331
332    pub struct DataPipeline;
333
334    #[async_trait]
335    impl Processor for DataPipeline {
336        const NAME: &'static str = "data";
337        type Value = StoredData;
338
339        async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
340            Ok(vec![])
341        }
342    }
343
344    #[async_trait]
345    impl Handler for DataPipeline {
346        type Store = FallibleMockStore;
347        type Batch = Vec<Self::Value>;
348
349        fn batch(
350            &self,
351            batch: &mut Self::Batch,
352            values: &mut std::vec::IntoIter<Self::Value>,
353        ) -> BatchStatus {
354            batch.extend(values);
355            BatchStatus::Pending
356        }
357
358        async fn commit<'a>(
359            &self,
360            _batch: &Self::Batch,
361            _conn: &mut FallibleMockConnection<'a>,
362        ) -> anyhow::Result<usize> {
363            Ok(0)
364        }
365    }
366
367    struct TestSetup {
368        store: FallibleMockStore,
369        watermark_tx: mpsc::Sender<Vec<WatermarkPart>>,
370        #[allow(unused)]
371        commit_watermark: Service,
372    }
373
374    fn setup_test<H: Handler<Store = FallibleMockStore>>(
375        config: CommitterConfig,
376        next_checkpoint: u64,
377        store: FallibleMockStore,
378    ) -> TestSetup {
379        let (watermark_tx, watermark_rx) = mpsc::channel(100);
380        let metrics = IndexerMetrics::new(None, &Default::default());
381
382        let store_clone = store.clone();
383
384        let commit_watermark = commit_watermark::<H>(
385            next_checkpoint,
386            config,
387            watermark_rx,
388            store_clone,
389            None,
390            metrics,
391        );
392
393        TestSetup {
394            store,
395            watermark_tx,
396            commit_watermark,
397        }
398    }
399
400    fn create_watermark_part_for_checkpoint(checkpoint: u64) -> WatermarkPart {
401        WatermarkPart {
402            watermark: CommitterWatermark {
403                checkpoint_hi_inclusive: checkpoint,
404                ..Default::default()
405            },
406            batch_rows: 1,
407            total_rows: 1,
408        }
409    }
410
411    #[tokio::test]
412    async fn test_basic_watermark_progression() {
413        let config = CommitterConfig::default();
414        let setup = setup_test::<DataPipeline>(config, 1, FallibleMockStore::default());
415
416        // Send watermark parts in order
417        for cp in 1..4 {
418            let part = create_watermark_part_for_checkpoint(cp);
419            setup.watermark_tx.send(vec![part]).await.unwrap();
420        }
421
422        // Wait for processing
423        tokio::time::sleep(Duration::from_millis(100)).await;
424
425        // Verify watermark progression
426        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
427        assert_eq!(watermark.checkpoint_hi_inclusive, Some(3));
428    }
429
430    #[tokio::test]
431    async fn test_out_of_order_watermarks() {
432        let config = CommitterConfig::default();
433        let setup = setup_test::<DataPipeline>(config, 1, FallibleMockStore::default());
434
435        // Send watermark parts out of order
436        let parts = vec![
437            create_watermark_part_for_checkpoint(4),
438            create_watermark_part_for_checkpoint(2),
439            create_watermark_part_for_checkpoint(1),
440        ];
441        setup.watermark_tx.send(parts).await.unwrap();
442
443        // Wait for processing
444        tokio::time::sleep(Duration::from_millis(100)).await;
445
446        // Verify watermark hasn't progressed past 2
447        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
448        assert_eq!(watermark.checkpoint_hi_inclusive, Some(2));
449
450        // Send checkpoint 3 to fill the gap
451        setup
452            .watermark_tx
453            .send(vec![create_watermark_part_for_checkpoint(3)])
454            .await
455            .unwrap();
456
457        // Wait for the next polling and processing
458        tokio::time::sleep(Duration::from_secs(1)).await;
459
460        // Verify watermark has progressed to 4
461        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
462        assert_eq!(watermark.checkpoint_hi_inclusive, Some(4));
463    }
464
465    #[tokio::test]
466    async fn test_watermark_with_connection_failure() {
467        let config = CommitterConfig {
468            watermark_interval_ms: 1_000, // Long polling interval to test connection retry
469            ..Default::default()
470        };
471        let store = FallibleMockStore::default().with_connection_failures(1);
472        let setup = setup_test::<DataPipeline>(config, 1, store);
473
474        // Send watermark part
475        let part = create_watermark_part_for_checkpoint(1);
476        setup.watermark_tx.send(vec![part]).await.unwrap();
477
478        // Wait for processing
479        tokio::time::sleep(Duration::from_millis(200)).await;
480
481        // Verify watermark hasn't progressed
482        let watermark = setup.store.watermark(DataPipeline::NAME);
483        assert!(watermark.is_none());
484
485        // Wait for next polling and processing
486        tokio::time::sleep(Duration::from_millis(1_200)).await;
487
488        // Verify watermark has progressed
489        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
490        assert_eq!(watermark.checkpoint_hi_inclusive, Some(1));
491    }
492
493    #[tokio::test]
494    async fn test_committer_retries_on_commit_watermark_failure() {
495        let config = CommitterConfig {
496            watermark_interval_ms: 1_000, // Long polling interval to test connection retry
497            ..Default::default()
498        };
499        // Create store with transaction failure configuration
500        let store = FallibleMockStore::default().with_commit_watermark_failures(1); // Will fail once before succeeding
501        let setup = setup_test::<DataPipeline>(config, 10, store);
502
503        let part = WatermarkPart {
504            watermark: CommitterWatermark {
505                checkpoint_hi_inclusive: 10,
506                ..Default::default()
507            },
508            batch_rows: 1,
509            total_rows: 1,
510        };
511        setup.watermark_tx.send(vec![part]).await.unwrap();
512
513        // Wait for initial poll to be over
514        tokio::time::sleep(Duration::from_millis(200)).await;
515        let watermark = setup.store.watermark(DataPipeline::NAME);
516        assert!(watermark.is_none());
517
518        // Wait for retries to complete
519        tokio::time::sleep(Duration::from_millis(1_200)).await;
520
521        // Verify watermark has progressed after retry
522        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
523        assert_eq!(watermark.checkpoint_hi_inclusive, Some(10));
524    }
525
526    #[tokio::test]
527    async fn test_committer_retries_on_commit_watermark_failure_advances() {
528        let config = CommitterConfig {
529            watermark_interval_ms: 1_000, // Long polling interval to test connection retry
530            ..Default::default()          // Create store with transaction failure configuration
531        };
532        let store = FallibleMockStore::default().with_commit_watermark_failures(1); // Will fail once before succeeding
533        let setup = setup_test::<DataPipeline>(config, 10, store);
534
535        let part = WatermarkPart {
536            watermark: CommitterWatermark {
537                checkpoint_hi_inclusive: 10,
538                ..Default::default()
539            },
540            batch_rows: 1,
541            total_rows: 1,
542        };
543        setup.watermark_tx.send(vec![part]).await.unwrap();
544
545        // Wait for initial poll to be over
546        tokio::time::sleep(Duration::from_millis(200)).await;
547        let watermark = setup.store.watermark(DataPipeline::NAME);
548        assert!(watermark.is_none());
549
550        let part = WatermarkPart {
551            watermark: CommitterWatermark {
552                checkpoint_hi_inclusive: 11,
553                ..Default::default()
554            },
555            batch_rows: 1,
556            total_rows: 1,
557        };
558        setup.watermark_tx.send(vec![part]).await.unwrap();
559
560        // Wait for retries to complete
561        tokio::time::sleep(Duration::from_millis(1_200)).await;
562
563        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
564        assert_eq!(watermark.checkpoint_hi_inclusive, Some(11));
565    }
566
567    #[tokio::test]
568    async fn test_incomplete_watermark() {
569        let config = CommitterConfig {
570            watermark_interval_ms: 1_000, // Long polling interval to test adding complete part
571            ..Default::default()
572        };
573        let setup = setup_test::<DataPipeline>(config, 1, FallibleMockStore::default());
574
575        // Send the first incomplete watermark part
576        let part = WatermarkPart {
577            watermark: CommitterWatermark {
578                checkpoint_hi_inclusive: 1,
579                ..Default::default()
580            },
581            batch_rows: 1,
582            total_rows: 3,
583        };
584        setup.watermark_tx.send(vec![part.clone()]).await.unwrap();
585
586        // Wait for processing
587        tokio::time::sleep(Duration::from_millis(200)).await;
588
589        // Verify watermark hasn't progressed
590        let watermark = setup.store.watermark(DataPipeline::NAME);
591        assert!(watermark.is_none());
592
593        // Send the other two parts to complete the watermark
594        setup
595            .watermark_tx
596            .send(vec![part.clone(), part.clone()])
597            .await
598            .unwrap();
599
600        // Wait for next polling and processing
601        tokio::time::sleep(Duration::from_millis(1_200)).await;
602
603        // Verify watermark has progressed
604        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
605        assert_eq!(watermark.checkpoint_hi_inclusive, Some(1));
606    }
607
608    #[tokio::test]
609    async fn test_no_initial_watermark() {
610        let config = CommitterConfig::default();
611        let setup = setup_test::<DataPipeline>(config, 0, FallibleMockStore::default());
612
613        // Send the checkpoint 1 watermark
614        setup
615            .watermark_tx
616            .send(vec![create_watermark_part_for_checkpoint(1)])
617            .await
618            .unwrap();
619
620        // Wait for processing
621        tokio::time::sleep(Duration::from_millis(200)).await;
622
623        // Verify watermark hasn't progressed
624        let watermark = setup.store.watermark(DataPipeline::NAME);
625        assert!(watermark.is_none());
626
627        // Send the checkpoint 0 watermark to fill the gap.
628        setup
629            .watermark_tx
630            .send(vec![create_watermark_part_for_checkpoint(0)])
631            .await
632            .unwrap();
633
634        // Wait for processing
635        tokio::time::sleep(Duration::from_millis(1200)).await;
636
637        // Verify watermark has progressed
638        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
639        assert_eq!(watermark.checkpoint_hi_inclusive, Some(1));
640    }
641}