sui_indexer_alt_framework/pipeline/concurrent/
commit_watermark.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    cmp::Ordering,
6    collections::{BTreeMap, btree_map::Entry},
7    sync::Arc,
8};
9
10use tokio::{
11    sync::mpsc,
12    task::JoinHandle,
13    time::{MissedTickBehavior, interval},
14};
15use tokio_util::sync::CancellationToken;
16use tracing::{debug, error, info, warn};
17
18use crate::{
19    metrics::{CheckpointLagMetricReporter, IndexerMetrics},
20    pipeline::{CommitterConfig, WARN_PENDING_WATERMARKS, WatermarkPart, logging::WatermarkLogger},
21    store::{Connection, Store},
22};
23
24use super::Handler;
25
26use crate::store::CommitterWatermark;
27
28/// Gathers the next contiguous watermark from pending checkpoints.
29/// Returns the highest contiguous watermark that can be committed, or None if no complete
30/// contiguous watermarks are available from `next_checkpoint` forward.
31fn gather_next_watermark(
32    precommitted: &mut BTreeMap<u64, WatermarkPart>,
33    next_checkpoint: &mut u64,
34    metrics: Option<(&Arc<IndexerMetrics>, &'static str)>,
35) -> Option<CommitterWatermark> {
36    let mut watermark = None;
37
38    while let Some(pending) = precommitted.first_entry() {
39        let part = pending.get();
40
41        if !part.is_complete() {
42            break;
43        }
44
45        match (*next_checkpoint).cmp(&part.watermark.checkpoint_hi_inclusive) {
46            // Next pending checkpoint is from the future.
47            Ordering::Less => break,
48
49            // This is the next checkpoint -- include it.
50            Ordering::Equal => {
51                watermark = Some(pending.remove().watermark);
52                *next_checkpoint += 1;
53            }
54
55            // Next pending checkpoint is in the past. Out of order watermarks can
56            // be encountered when a pipeline is starting up, because ingestion
57            // must start at the lowest checkpoint across all pipelines, or because
58            // of a backfill, where the initial checkpoint has been overridden.
59            Ordering::Greater => {
60                if let Some((metrics, pipeline_name)) = metrics {
61                    metrics
62                        .total_watermarks_out_of_order
63                        .with_label_values(&[pipeline_name])
64                        .inc();
65                }
66                pending.remove();
67            }
68        }
69    }
70
71    watermark
72}
73
74/// The watermark task is responsible for keeping track of a pipeline's out-of-order commits and
75/// updating its row in the `watermarks` table when a continuous run of checkpoints have landed
76/// since the last watermark update.
77///
78/// It receives watermark "parts" that detail the proportion of each checkpoint's data that has
79/// been written out by the committer and periodically (on a configurable interval) checks if the
80/// watermark for the pipeline can be pushed forward. The watermark can be pushed forward if there
81/// is one or more complete (all data for that checkpoint written out) watermarks spanning
82/// contiguously from the current high watermark into the future.
83///
84/// If it detects that more than [WARN_PENDING_WATERMARKS] watermarks have built up, it will issue
85/// a warning, as this could be the indication of a memory leak, and the caller probably intended
86/// to run the indexer with watermarking disabled (e.g. if they are running a backfill).
87///
88/// The task regularly traces its progress, outputting at a higher log level every
89/// [LOUD_WATERMARK_UPDATE_INTERVAL]-many checkpoints.
90///
91/// The task will shutdown if the `cancel` token is signalled, or if the `rx` channel closes and
92/// the watermark cannot be progressed. If `skip_watermark` is set, the task will shutdown
93/// immediately.
94pub(super) fn commit_watermark<H: Handler + 'static>(
95    mut next_checkpoint: u64,
96    config: CommitterConfig,
97    skip_watermark: bool,
98    mut rx: mpsc::Receiver<Vec<WatermarkPart>>,
99    store: H::Store,
100    metrics: Arc<IndexerMetrics>,
101    cancel: CancellationToken,
102) -> JoinHandle<()> {
103    tokio::spawn(async move {
104        if skip_watermark {
105            info!(pipeline = H::NAME, "Skipping commit watermark task");
106            return;
107        }
108
109        let mut poll = interval(config.watermark_interval());
110        poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
111
112        // To correctly update the watermark, the task tracks the watermark it last tried to write
113        // and the watermark parts for any checkpoints that have been written since then
114        // ("pre-committed"). After each batch is written, the task will try to progress the
115        // watermark as much as possible without going over any holes in the sequence of
116        // checkpoints (entirely missing watermarks, or incomplete watermarks).
117        let mut precommitted: BTreeMap<u64, WatermarkPart> = BTreeMap::new();
118
119        // The watermark task will periodically output a log message at a higher log level to
120        // demonstrate that the pipeline is making progress.
121        let mut logger = WatermarkLogger::new("concurrent_committer");
122
123        let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
124            &metrics.watermarked_checkpoint_timestamp_lag,
125            &metrics.latest_watermarked_checkpoint_timestamp_lag_ms,
126            &metrics.watermark_checkpoint_in_db,
127        );
128
129        info!(
130            pipeline = H::NAME,
131            next_checkpoint, "Starting commit watermark task"
132        );
133
134        loop {
135            tokio::select! {
136                _ = cancel.cancelled() => {
137                    info!(pipeline = H::NAME, "Shutdown received");
138                    break;
139                }
140
141                _ = poll.tick() => {
142                    if precommitted.len() > WARN_PENDING_WATERMARKS {
143                        warn!(
144                            pipeline = H::NAME,
145                            pending = precommitted.len(),
146                            "Pipeline has a large number of pending commit watermarks",
147                        );
148                    }
149
150                    let Ok(mut conn) = store.connect().await else {
151                        warn!(pipeline = H::NAME, "Commit watermark task failed to get connection for DB");
152                        continue;
153                    };
154
155                    // Check if the pipeline's watermark needs to be updated
156                    let guard = metrics
157                        .watermark_gather_latency
158                        .with_label_values(&[H::NAME])
159                        .start_timer();
160
161                    // The presence of a watermark means that we can update the watermark in db.
162                    // However, concurrent pipelines do not need every watermark update to succeed.
163                    let watermark = gather_next_watermark(
164                        &mut precommitted,
165                        &mut next_checkpoint,
166                        Some((&metrics, H::NAME)),
167                    );
168
169                    let elapsed = guard.stop_and_record();
170
171                    if let Some(watermark) = watermark {
172                        metrics
173                            .watermark_epoch
174                            .with_label_values(&[H::NAME])
175                            .set(watermark.epoch_hi_inclusive as i64);
176
177                        metrics
178                            .watermark_checkpoint
179                            .with_label_values(&[H::NAME])
180                            .set(watermark.checkpoint_hi_inclusive as i64);
181
182                        metrics
183                            .watermark_transaction
184                            .with_label_values(&[H::NAME])
185                            .set(watermark.tx_hi as i64);
186
187                        metrics
188                            .watermark_timestamp_ms
189                            .with_label_values(&[H::NAME])
190                            .set(watermark.timestamp_ms_hi_inclusive as i64);
191
192                        debug!(
193                            pipeline = H::NAME,
194                            elapsed_ms = elapsed * 1000.0,
195                            watermark = watermark.checkpoint_hi_inclusive,
196                            timestamp = %watermark.timestamp(),
197                            pending = precommitted.len(),
198                            "Gathered watermarks",
199                        );
200
201                        let guard = metrics
202                            .watermark_commit_latency
203                            .with_label_values(&[H::NAME])
204                            .start_timer();
205
206                        // TODO: If initial_watermark is empty, when we update watermark
207                        // for the first time, we should also update the low watermark.
208                        match conn.set_committer_watermark(
209                            H::NAME,
210                            watermark,
211                        ).await {
212                            // If there's an issue updating the watermark, log it but keep going,
213                            // it's OK for the watermark to lag from a correctness perspective.
214                            Err(e) => {
215                                let elapsed = guard.stop_and_record();
216                                error!(
217                                    pipeline = H::NAME,
218                                    elapsed_ms = elapsed * 1000.0,
219                                    ?watermark,
220                                    "Error updating commit watermark: {e}",
221                                );
222                            }
223
224                            Ok(true) => {
225                                let elapsed = guard.stop_and_record();
226
227                                logger.log::<H>(&watermark, elapsed);
228
229                                checkpoint_lag_reporter.report_lag(
230                                    watermark.checkpoint_hi_inclusive,
231                                    watermark.timestamp_ms_hi_inclusive
232                                );
233
234                                metrics
235                                    .watermark_epoch_in_db
236                                    .with_label_values(&[H::NAME])
237                                    .set(watermark.epoch_hi_inclusive as i64);
238
239                                metrics
240                                    .watermark_transaction_in_db
241                                    .with_label_values(&[H::NAME])
242                                    .set(watermark.tx_hi as i64);
243
244                                metrics
245                                    .watermark_timestamp_in_db_ms
246                                    .with_label_values(&[H::NAME])
247                                    .set(watermark.timestamp_ms_hi_inclusive as i64);
248                            }
249                            Ok(false) => {}
250                        }
251                    }
252
253                    if rx.is_closed() && rx.is_empty() {
254                        info!(pipeline = H::NAME, "Committer closed channel");
255                        break;
256                    }
257                }
258
259                Some(parts) = rx.recv() => {
260                    for part in parts {
261                        match precommitted.entry(part.checkpoint()) {
262                            Entry::Vacant(entry) => {
263                                entry.insert(part);
264                            }
265
266                            Entry::Occupied(mut entry) => {
267                                entry.get_mut().add(part);
268                            }
269                        }
270                    }
271                }
272            }
273        }
274
275        info!(
276            pipeline = H::NAME,
277            "Attempting final watermark sync before shutdown"
278        );
279
280        match store.connect().await {
281            Ok(mut conn) => {
282                let watermark = gather_next_watermark(
283                    &mut precommitted,
284                    &mut next_checkpoint,
285                    Some((&metrics, H::NAME)),
286                );
287
288                if let Some(watermark) = watermark {
289                    match conn.set_committer_watermark(H::NAME, watermark).await {
290                        Ok(true) => {
291                            info!(
292                                pipeline = H::NAME,
293                                checkpoint = watermark.checkpoint_hi_inclusive,
294                                "Successfully wrote final watermark on shutdown"
295                            );
296                        }
297                        Ok(false) => {
298                            info!(
299                                pipeline = H::NAME,
300                                checkpoint = watermark.checkpoint_hi_inclusive,
301                                "Final watermark did not advance on shutdown"
302                            );
303                        }
304                        Err(e) => {
305                            warn!(
306                                pipeline = H::NAME,
307                                ?watermark,
308                                "Failed to write final watermark on shutdown: {e}"
309                            );
310                        }
311                    }
312                } else {
313                    info!(pipeline = H::NAME, "No watermark to write on shutdown");
314                }
315            }
316            Err(_) => {
317                warn!(
318                    pipeline = H::NAME,
319                    "Failed to get connection for final watermark sync"
320                );
321            }
322        }
323
324        info!(pipeline = H::NAME, "Stopping committer watermark task");
325    })
326}
327
328#[cfg(test)]
329mod tests {
330    use std::sync::Arc;
331
332    use async_trait::async_trait;
333    use sui_types::full_checkpoint_content::CheckpointData;
334    use tokio::sync::mpsc;
335    use tokio_util::sync::CancellationToken;
336
337    use crate::{
338        FieldCount,
339        metrics::IndexerMetrics,
340        mocks::store::*,
341        pipeline::{CommitterConfig, Processor, WatermarkPart},
342        store::CommitterWatermark,
343    };
344
345    use super::*;
346
347    #[derive(Clone, FieldCount)]
348    pub struct StoredData;
349
350    pub struct DataPipeline;
351
352    #[async_trait]
353    impl Processor for DataPipeline {
354        const NAME: &'static str = "data";
355        type Value = StoredData;
356
357        async fn process(
358            &self,
359            _checkpoint: &Arc<CheckpointData>,
360        ) -> anyhow::Result<Vec<Self::Value>> {
361            Ok(vec![])
362        }
363    }
364
365    #[async_trait]
366    impl Handler for DataPipeline {
367        type Store = MockStore;
368
369        async fn commit<'a>(
370            _values: &[StoredData],
371            _conn: &mut MockConnection<'a>,
372        ) -> anyhow::Result<usize> {
373            Ok(0)
374        }
375    }
376
377    struct TestSetup {
378        store: MockStore,
379        watermark_tx: mpsc::Sender<Vec<WatermarkPart>>,
380        commit_watermark_handle: JoinHandle<()>,
381        cancel: CancellationToken,
382    }
383
384    fn setup_test<H: Handler<Store = MockStore> + 'static>(
385        config: CommitterConfig,
386        next_checkpoint: u64,
387        store: MockStore,
388    ) -> TestSetup {
389        let (watermark_tx, watermark_rx) = mpsc::channel(100);
390        let metrics = IndexerMetrics::new(None, &Default::default());
391        let cancel = CancellationToken::new();
392
393        let store_clone = store.clone();
394        let cancel_clone = cancel.clone();
395
396        let commit_watermark_handle = commit_watermark::<H>(
397            next_checkpoint,
398            config,
399            false,
400            watermark_rx,
401            store_clone,
402            metrics,
403            cancel_clone,
404        );
405
406        TestSetup {
407            store,
408            watermark_tx,
409            commit_watermark_handle,
410            cancel,
411        }
412    }
413
414    fn create_watermark_part_for_checkpoint(checkpoint: u64) -> WatermarkPart {
415        WatermarkPart {
416            watermark: CommitterWatermark {
417                checkpoint_hi_inclusive: checkpoint,
418                ..Default::default()
419            },
420            batch_rows: 1,
421            total_rows: 1,
422        }
423    }
424
425    #[tokio::test]
426    async fn test_basic_watermark_progression() {
427        let config = CommitterConfig::default();
428        let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
429
430        // Send watermark parts in order
431        for cp in 1..4 {
432            let part = create_watermark_part_for_checkpoint(cp);
433            setup.watermark_tx.send(vec![part]).await.unwrap();
434        }
435
436        // Wait for processing
437        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
438
439        // Verify watermark progression
440        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
441        assert_eq!(watermark.checkpoint_hi_inclusive, 3);
442
443        // Clean up
444        setup.cancel.cancel();
445        let _ = setup.commit_watermark_handle.await;
446    }
447
448    #[tokio::test]
449    async fn test_out_of_order_watermarks() {
450        let config = CommitterConfig::default();
451        let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
452
453        // Send watermark parts out of order
454        let parts = vec![
455            create_watermark_part_for_checkpoint(4),
456            create_watermark_part_for_checkpoint(2),
457            create_watermark_part_for_checkpoint(1),
458        ];
459        setup.watermark_tx.send(parts).await.unwrap();
460
461        // Wait for processing
462        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
463
464        // Verify watermark hasn't progressed past 2
465        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
466        assert_eq!(watermark.checkpoint_hi_inclusive, 2);
467
468        // Send checkpoint 3 to fill the gap
469        setup
470            .watermark_tx
471            .send(vec![create_watermark_part_for_checkpoint(3)])
472            .await
473            .unwrap();
474
475        // Wait for the next polling and processing
476        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
477
478        // Verify watermark has progressed to 4
479        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
480        assert_eq!(watermark.checkpoint_hi_inclusive, 4);
481
482        // Clean up
483        setup.cancel.cancel();
484        let _ = setup.commit_watermark_handle.await;
485    }
486
487    #[tokio::test]
488    async fn test_watermark_with_connection_failure() {
489        let config = CommitterConfig {
490            watermark_interval_ms: 1_000, // Long polling interval to test connection retry
491            ..Default::default()
492        };
493        let store = MockStore::default().with_connection_failures(1);
494        let setup = setup_test::<DataPipeline>(config, 1, store);
495
496        // Send watermark part
497        let part = create_watermark_part_for_checkpoint(1);
498        setup.watermark_tx.send(vec![part]).await.unwrap();
499
500        // Wait for processing
501        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
502
503        // Verify watermark hasn't progressed
504        let watermark = setup.store.watermark(DataPipeline::NAME);
505        assert!(watermark.is_none());
506
507        // Wait for next polling and processing
508        tokio::time::sleep(tokio::time::Duration::from_millis(1_200)).await;
509
510        // Verify watermark has progressed
511        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
512        assert_eq!(watermark.checkpoint_hi_inclusive, 1);
513
514        // Clean up
515        setup.cancel.cancel();
516        let _ = setup.commit_watermark_handle.await;
517    }
518
519    #[tokio::test]
520    async fn test_committer_retries_on_commit_watermark_failure() {
521        let config = CommitterConfig {
522            watermark_interval_ms: 1_000, // Long polling interval to test connection retry
523            ..Default::default()
524        };
525        // Create store with transaction failure configuration
526        let store = MockStore::default().with_commit_watermark_failures(1); // Will fail once before succeeding
527        let setup = setup_test::<DataPipeline>(config, 10, store);
528
529        let part = WatermarkPart {
530            watermark: CommitterWatermark {
531                checkpoint_hi_inclusive: 10,
532                ..Default::default()
533            },
534            batch_rows: 1,
535            total_rows: 1,
536        };
537        setup.watermark_tx.send(vec![part]).await.unwrap();
538
539        // Wait for initial poll to be over
540        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
541        let watermark = setup.store.watermark(DataPipeline::NAME);
542        assert!(watermark.is_none());
543
544        // Wait for retries to complete
545        tokio::time::sleep(tokio::time::Duration::from_millis(1_200)).await;
546
547        // Verify watermark is still none
548        let watermark = setup.store.watermark(DataPipeline::NAME);
549        assert!(watermark.is_none());
550
551        // Clean up
552        setup.cancel.cancel();
553        let _ = setup.commit_watermark_handle.await;
554    }
555
556    #[tokio::test]
557    async fn test_committer_retries_on_commit_watermark_failure_advances() {
558        let config = CommitterConfig {
559            watermark_interval_ms: 1_000, // Long polling interval to test connection retry
560            ..Default::default()
561        };
562        // Create store with transaction failure configuration
563        let store = MockStore::default().with_commit_watermark_failures(1); // Will fail once before succeeding
564        let setup = setup_test::<DataPipeline>(config, 10, store);
565
566        let part = WatermarkPart {
567            watermark: CommitterWatermark {
568                checkpoint_hi_inclusive: 10,
569                ..Default::default()
570            },
571            batch_rows: 1,
572            total_rows: 1,
573        };
574        setup.watermark_tx.send(vec![part]).await.unwrap();
575
576        // Wait for initial poll to be over
577        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
578        let watermark = setup.store.watermark(DataPipeline::NAME);
579        assert!(watermark.is_none());
580
581        let part = WatermarkPart {
582            watermark: CommitterWatermark {
583                checkpoint_hi_inclusive: 11,
584                ..Default::default()
585            },
586            batch_rows: 1,
587            total_rows: 1,
588        };
589        setup.watermark_tx.send(vec![part]).await.unwrap();
590
591        // Wait for retries to complete
592        tokio::time::sleep(tokio::time::Duration::from_millis(1_200)).await;
593
594        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
595        assert_eq!(watermark.checkpoint_hi_inclusive, 11);
596
597        // Clean up
598        setup.cancel.cancel();
599        let _ = setup.commit_watermark_handle.await;
600    }
601
602    #[tokio::test]
603    async fn test_incomplete_watermark() {
604        let config = CommitterConfig {
605            watermark_interval_ms: 1_000, // Long polling interval to test adding complete part
606            ..Default::default()
607        };
608        let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
609
610        // Send the first incomplete watermark part
611        let part = WatermarkPart {
612            watermark: CommitterWatermark {
613                checkpoint_hi_inclusive: 1,
614                ..Default::default()
615            },
616            batch_rows: 1,
617            total_rows: 3,
618        };
619        setup.watermark_tx.send(vec![part.clone()]).await.unwrap();
620
621        // Wait for processing
622        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
623
624        // Verify watermark hasn't progressed
625        let watermark = setup.store.watermark(DataPipeline::NAME);
626        assert!(watermark.is_none());
627
628        // Send the other two parts to complete the watermark
629        setup
630            .watermark_tx
631            .send(vec![part.clone(), part.clone()])
632            .await
633            .unwrap();
634
635        // Wait for next polling and processing
636        tokio::time::sleep(tokio::time::Duration::from_millis(1_200)).await;
637
638        // Verify watermark has progressed
639        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
640        assert_eq!(watermark.checkpoint_hi_inclusive, 1);
641
642        // Clean up
643        setup.cancel.cancel();
644        let _ = setup.commit_watermark_handle.await;
645    }
646
647    #[tokio::test]
648    async fn test_no_initial_watermark() {
649        let config = CommitterConfig::default();
650        let setup = setup_test::<DataPipeline>(config, 0, MockStore::default());
651
652        // Send the checkpoint 1 watermark
653        setup
654            .watermark_tx
655            .send(vec![create_watermark_part_for_checkpoint(1)])
656            .await
657            .unwrap();
658
659        // Wait for processing
660        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
661
662        // Verify watermark hasn't progressed
663        let watermark = setup.store.watermark(DataPipeline::NAME);
664        assert!(watermark.is_none());
665
666        // Send the checkpoint 0 watermark to fill the gap.
667        setup
668            .watermark_tx
669            .send(vec![create_watermark_part_for_checkpoint(0)])
670            .await
671            .unwrap();
672
673        // Wait for processing
674        tokio::time::sleep(tokio::time::Duration::from_millis(1200)).await;
675
676        // Verify watermark has progressed
677        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
678        assert_eq!(watermark.checkpoint_hi_inclusive, 1);
679
680        // Clean up
681        setup.cancel.cancel();
682        let _ = setup.commit_watermark_handle.await;
683    }
684
685    #[tokio::test]
686    async fn test_final_watermark_sync_on_shutdown() {
687        let config = CommitterConfig {
688            // Set to u64::MAX to ensure watermark isn't updated until shutdown.
689            watermark_interval_ms: u64::MAX,
690            ..Default::default()
691        };
692        let setup = setup_test::<DataPipeline>(config, 10, MockStore::default());
693
694        setup
695            .watermark_tx
696            .send(vec![create_watermark_part_for_checkpoint(10)])
697            .await
698            .unwrap();
699        setup
700            .watermark_tx
701            .send(vec![create_watermark_part_for_checkpoint(11)])
702            .await
703            .unwrap();
704
705        drop(setup.watermark_tx);
706
707        let _ = setup.commit_watermark_handle.await;
708
709        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
710        assert_eq!(watermark.checkpoint_hi_inclusive, 11);
711    }
712}