sui_indexer_alt_framework/pipeline/concurrent/
commit_watermark.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    cmp::Ordering,
6    collections::{BTreeMap, btree_map::Entry},
7    sync::Arc,
8};
9
10use sui_futures::service::Service;
11use tokio::{
12    sync::mpsc,
13    time::{MissedTickBehavior, interval},
14};
15use tracing::{debug, error, info, warn};
16
17use crate::{
18    metrics::{CheckpointLagMetricReporter, IndexerMetrics},
19    pipeline::{CommitterConfig, WARN_PENDING_WATERMARKS, WatermarkPart, logging::WatermarkLogger},
20    store::{Connection, Store, pipeline_task},
21};
22
23use super::Handler;
24
25/// The watermark task is responsible for keeping track of a pipeline's out-of-order commits and
26/// updating its row in the `watermarks` table when a continuous run of checkpoints have landed
27/// since the last watermark update.
28///
29/// It receives watermark "parts" that detail the proportion of each checkpoint's data that has been
30/// written out by the committer and periodically (on a configurable interval) checks if the
31/// watermark for the pipeline can be pushed forward. The watermark can be pushed forward if there
32/// is one or more complete (all data for that checkpoint written out) watermarks spanning
33/// contiguously from the current high watermark into the future.
34///
35/// If it detects that more than [WARN_PENDING_WATERMARKS] watermarks have built up, it will issue a
36/// warning, as this could be the indication of a memory leak, and the caller probably intended to
37/// run the indexer with watermarking disabled (e.g. if they are running a backfill).
38///
39/// The task regularly traces its progress, outputting at a higher log level every
40/// [LOUD_WATERMARK_UPDATE_INTERVAL]-many checkpoints.
41///
42/// The task will shutdown if the `rx` channel closes and the watermark cannot be progressed.
43pub(super) fn commit_watermark<H: Handler + 'static>(
44    mut next_checkpoint: u64,
45    config: CommitterConfig,
46    mut rx: mpsc::Receiver<Vec<WatermarkPart>>,
47    store: H::Store,
48    task: Option<String>,
49    metrics: Arc<IndexerMetrics>,
50) -> Service {
51    // SAFETY: on indexer instantiation, we've checked that the pipeline name is valid.
52    let pipeline_task = pipeline_task::<H::Store>(H::NAME, task.as_deref()).unwrap();
53    Service::new().spawn_aborting(async move {
54        let mut poll = interval(config.watermark_interval());
55        poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
56
57        // To correctly update the watermark, the task tracks the watermark it last tried to write
58        // and the watermark parts for any checkpoints that have been written since then
59        // ("pre-committed"). After each batch is written, the task will try to progress the
60        // watermark as much as possible without going over any holes in the sequence of
61        // checkpoints (entirely missing watermarks, or incomplete watermarks).
62        let mut precommitted: BTreeMap<u64, WatermarkPart> = BTreeMap::new();
63
64        // The watermark task will periodically output a log message at a higher log level to
65        // demonstrate that the pipeline is making progress.
66        let mut logger = WatermarkLogger::new("concurrent_committer");
67
68        let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
69            &metrics.watermarked_checkpoint_timestamp_lag,
70            &metrics.latest_watermarked_checkpoint_timestamp_lag_ms,
71            &metrics.watermark_checkpoint_in_db,
72        );
73
74        info!(
75            pipeline = H::NAME,
76            next_checkpoint, "Starting commit watermark task"
77        );
78
79        loop {
80            tokio::select! {
81                _ = poll.tick() => {}
82                Some(parts) = rx.recv() => {
83                    for part in parts {
84                        match precommitted.entry(part.checkpoint()) {
85                            Entry::Vacant(entry) => {
86                                entry.insert(part);
87                            }
88
89                            Entry::Occupied(mut entry) => {
90                                entry.get_mut().add(part);
91                            }
92                        }
93                    }
94
95                    continue;
96                }
97            }
98
99            // The presence of a watermark means that we can update the watermark in db.
100            // However, concurrent pipelines do not need every watermark update to succeed.
101            let mut watermark = None;
102
103            if precommitted.len() > WARN_PENDING_WATERMARKS {
104                warn!(
105                    pipeline = H::NAME,
106                    pending = precommitted.len(),
107                    "Pipeline has a large number of pending commit watermarks",
108                );
109            }
110
111            let Ok(mut conn) = store.connect().await else {
112                warn!(
113                    pipeline = H::NAME,
114                    "Commit watermark task failed to get connection for DB"
115                );
116                continue;
117            };
118
119            // Check if the pipeline's watermark needs to be updated
120            let guard = metrics
121                .watermark_gather_latency
122                .with_label_values(&[H::NAME])
123                .start_timer();
124
125            while let Some(pending) = precommitted.first_entry() {
126                let part = pending.get();
127
128                // Some rows from the next watermark have not landed yet.
129                if !part.is_complete() {
130                    break;
131                }
132
133                match next_checkpoint.cmp(&part.watermark.checkpoint_hi_inclusive) {
134                    // Next pending checkpoint is from the future.
135                    Ordering::Less => break,
136
137                    // This is the next checkpoint -- include it.
138                    Ordering::Equal => {
139                        watermark = Some(pending.remove().watermark);
140                        next_checkpoint += 1;
141                    }
142
143                    // Next pending checkpoint is in the past. Out of order watermarks can
144                    // be encountered when a pipeline is starting up, because ingestion
145                    // must start at the lowest checkpoint across all pipelines, or because
146                    // of a backfill, where the initial checkpoint has been overridden.
147                    Ordering::Greater => {
148                        // Track how many we see to make sure it doesn't grow without bound.
149                        metrics
150                            .total_watermarks_out_of_order
151                            .with_label_values(&[H::NAME])
152                            .inc();
153
154                        pending.remove();
155                    }
156                }
157            }
158
159            let elapsed = guard.stop_and_record();
160
161            if let Some(watermark) = watermark {
162                metrics
163                    .watermark_epoch
164                    .with_label_values(&[H::NAME])
165                    .set(watermark.epoch_hi_inclusive as i64);
166
167                metrics
168                    .watermark_checkpoint
169                    .with_label_values(&[H::NAME])
170                    .set(watermark.checkpoint_hi_inclusive as i64);
171
172                metrics
173                    .watermark_transaction
174                    .with_label_values(&[H::NAME])
175                    .set(watermark.tx_hi as i64);
176
177                metrics
178                    .watermark_timestamp_ms
179                    .with_label_values(&[H::NAME])
180                    .set(watermark.timestamp_ms_hi_inclusive as i64);
181
182                debug!(
183                    pipeline = H::NAME,
184                    elapsed_ms = elapsed * 1000.0,
185                    watermark = watermark.checkpoint_hi_inclusive,
186                    timestamp = %watermark.timestamp(),
187                    pending = precommitted.len(),
188                    "Gathered watermarks",
189                );
190
191                let guard = metrics
192                    .watermark_commit_latency
193                    .with_label_values(&[H::NAME])
194                    .start_timer();
195
196                // TODO: If initial_watermark is empty, when we update watermark
197                // for the first time, we should also update the low watermark.
198                match conn
199                    .set_committer_watermark(&pipeline_task, watermark)
200                    .await
201                {
202                    // If there's an issue updating the watermark, log it but keep going,
203                    // it's OK for the watermark to lag from a correctness perspective.
204                    Err(e) => {
205                        let elapsed = guard.stop_and_record();
206                        error!(
207                            pipeline = H::NAME,
208                            elapsed_ms = elapsed * 1000.0,
209                            ?watermark,
210                            "Error updating commit watermark: {e}",
211                        );
212                    }
213
214                    Ok(true) => {
215                        let elapsed = guard.stop_and_record();
216
217                        logger.log::<H>(&watermark, elapsed);
218
219                        checkpoint_lag_reporter.report_lag(
220                            watermark.checkpoint_hi_inclusive,
221                            watermark.timestamp_ms_hi_inclusive,
222                        );
223
224                        metrics
225                            .watermark_epoch_in_db
226                            .with_label_values(&[H::NAME])
227                            .set(watermark.epoch_hi_inclusive as i64);
228
229                        metrics
230                            .watermark_transaction_in_db
231                            .with_label_values(&[H::NAME])
232                            .set(watermark.tx_hi as i64);
233
234                        metrics
235                            .watermark_timestamp_in_db_ms
236                            .with_label_values(&[H::NAME])
237                            .set(watermark.timestamp_ms_hi_inclusive as i64);
238                    }
239                    Ok(false) => {}
240                }
241            }
242
243            if rx.is_closed() && rx.is_empty() {
244                info!(pipeline = H::NAME, "Committer closed channel");
245                break;
246            }
247        }
248
249        info!(pipeline = H::NAME, "Stopping committer watermark task");
250        Ok(())
251    })
252}
253
254#[cfg(test)]
255mod tests {
256    use std::{sync::Arc, time::Duration};
257
258    use async_trait::async_trait;
259    use sui_types::full_checkpoint_content::Checkpoint;
260    use tokio::sync::mpsc;
261
262    use crate::{
263        FieldCount,
264        metrics::IndexerMetrics,
265        mocks::store::*,
266        pipeline::{CommitterConfig, Processor, WatermarkPart, concurrent::BatchStatus},
267        store::CommitterWatermark,
268    };
269
270    use super::*;
271
272    #[derive(Clone, FieldCount)]
273    pub struct StoredData;
274
275    pub struct DataPipeline;
276
277    #[async_trait]
278    impl Processor for DataPipeline {
279        const NAME: &'static str = "data";
280        type Value = StoredData;
281
282        async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
283            Ok(vec![])
284        }
285    }
286
287    #[async_trait]
288    impl Handler for DataPipeline {
289        type Store = MockStore;
290        type Batch = Vec<Self::Value>;
291
292        fn batch(
293            &self,
294            batch: &mut Self::Batch,
295            values: &mut std::vec::IntoIter<Self::Value>,
296        ) -> BatchStatus {
297            batch.extend(values);
298            BatchStatus::Pending
299        }
300
301        async fn commit<'a>(
302            &self,
303            _batch: &Self::Batch,
304            _conn: &mut MockConnection<'a>,
305        ) -> anyhow::Result<usize> {
306            Ok(0)
307        }
308    }
309
310    struct TestSetup {
311        store: MockStore,
312        watermark_tx: mpsc::Sender<Vec<WatermarkPart>>,
313        #[allow(unused)]
314        commit_watermark: Service,
315    }
316
317    fn setup_test<H: Handler<Store = MockStore> + 'static>(
318        config: CommitterConfig,
319        next_checkpoint: u64,
320        store: MockStore,
321    ) -> TestSetup {
322        let (watermark_tx, watermark_rx) = mpsc::channel(100);
323        let metrics = IndexerMetrics::new(None, &Default::default());
324
325        let store_clone = store.clone();
326
327        let commit_watermark = commit_watermark::<H>(
328            next_checkpoint,
329            config,
330            watermark_rx,
331            store_clone,
332            None,
333            metrics,
334        );
335
336        TestSetup {
337            store,
338            watermark_tx,
339            commit_watermark,
340        }
341    }
342
343    fn create_watermark_part_for_checkpoint(checkpoint: u64) -> WatermarkPart {
344        WatermarkPart {
345            watermark: CommitterWatermark {
346                checkpoint_hi_inclusive: checkpoint,
347                ..Default::default()
348            },
349            batch_rows: 1,
350            total_rows: 1,
351        }
352    }
353
354    #[tokio::test]
355    async fn test_basic_watermark_progression() {
356        let config = CommitterConfig::default();
357        let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
358
359        // Send watermark parts in order
360        for cp in 1..4 {
361            let part = create_watermark_part_for_checkpoint(cp);
362            setup.watermark_tx.send(vec![part]).await.unwrap();
363        }
364
365        // Wait for processing
366        tokio::time::sleep(Duration::from_millis(100)).await;
367
368        // Verify watermark progression
369        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
370        assert_eq!(watermark.checkpoint_hi_inclusive, 3);
371    }
372
373    #[tokio::test]
374    async fn test_out_of_order_watermarks() {
375        let config = CommitterConfig::default();
376        let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
377
378        // Send watermark parts out of order
379        let parts = vec![
380            create_watermark_part_for_checkpoint(4),
381            create_watermark_part_for_checkpoint(2),
382            create_watermark_part_for_checkpoint(1),
383        ];
384        setup.watermark_tx.send(parts).await.unwrap();
385
386        // Wait for processing
387        tokio::time::sleep(Duration::from_millis(100)).await;
388
389        // Verify watermark hasn't progressed past 2
390        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
391        assert_eq!(watermark.checkpoint_hi_inclusive, 2);
392
393        // Send checkpoint 3 to fill the gap
394        setup
395            .watermark_tx
396            .send(vec![create_watermark_part_for_checkpoint(3)])
397            .await
398            .unwrap();
399
400        // Wait for the next polling and processing
401        tokio::time::sleep(Duration::from_secs(1)).await;
402
403        // Verify watermark has progressed to 4
404        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
405        assert_eq!(watermark.checkpoint_hi_inclusive, 4);
406    }
407
408    #[tokio::test]
409    async fn test_watermark_with_connection_failure() {
410        let config = CommitterConfig {
411            watermark_interval_ms: 1_000, // Long polling interval to test connection retry
412            ..Default::default()
413        };
414        let store = MockStore::default().with_connection_failures(1);
415        let setup = setup_test::<DataPipeline>(config, 1, store);
416
417        // Send watermark part
418        let part = create_watermark_part_for_checkpoint(1);
419        setup.watermark_tx.send(vec![part]).await.unwrap();
420
421        // Wait for processing
422        tokio::time::sleep(Duration::from_millis(200)).await;
423
424        // Verify watermark hasn't progressed
425        let watermark = setup.store.watermark(DataPipeline::NAME);
426        assert!(watermark.is_none());
427
428        // Wait for next polling and processing
429        tokio::time::sleep(Duration::from_millis(1_200)).await;
430
431        // Verify watermark has progressed
432        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
433        assert_eq!(watermark.checkpoint_hi_inclusive, 1);
434    }
435
436    #[tokio::test]
437    async fn test_committer_retries_on_commit_watermark_failure() {
438        let config = CommitterConfig {
439            watermark_interval_ms: 1_000, // Long polling interval to test connection retry
440            ..Default::default()
441        };
442        // Create store with transaction failure configuration
443        let store = MockStore::default().with_commit_watermark_failures(1); // Will fail once before succeeding
444        let setup = setup_test::<DataPipeline>(config, 10, store);
445
446        let part = WatermarkPart {
447            watermark: CommitterWatermark {
448                checkpoint_hi_inclusive: 10,
449                ..Default::default()
450            },
451            batch_rows: 1,
452            total_rows: 1,
453        };
454        setup.watermark_tx.send(vec![part]).await.unwrap();
455
456        // Wait for initial poll to be over
457        tokio::time::sleep(Duration::from_millis(200)).await;
458        let watermark = setup.store.watermark(DataPipeline::NAME);
459        assert!(watermark.is_none());
460
461        // Wait for retries to complete
462        tokio::time::sleep(Duration::from_millis(1_200)).await;
463
464        // Verify watermark is still none
465        let watermark = setup.store.watermark(DataPipeline::NAME);
466        assert!(watermark.is_none());
467    }
468
469    #[tokio::test]
470    async fn test_committer_retries_on_commit_watermark_failure_advances() {
471        let config = CommitterConfig {
472            watermark_interval_ms: 1_000, // Long polling interval to test connection retry
473            ..Default::default()          // Create store with transaction failure configuration
474        };
475        let store = MockStore::default().with_commit_watermark_failures(1); // Will fail once before succeeding
476        let setup = setup_test::<DataPipeline>(config, 10, store);
477
478        let part = WatermarkPart {
479            watermark: CommitterWatermark {
480                checkpoint_hi_inclusive: 10,
481                ..Default::default()
482            },
483            batch_rows: 1,
484            total_rows: 1,
485        };
486        setup.watermark_tx.send(vec![part]).await.unwrap();
487
488        // Wait for initial poll to be over
489        tokio::time::sleep(Duration::from_millis(200)).await;
490        let watermark = setup.store.watermark(DataPipeline::NAME);
491        assert!(watermark.is_none());
492
493        let part = WatermarkPart {
494            watermark: CommitterWatermark {
495                checkpoint_hi_inclusive: 11,
496                ..Default::default()
497            },
498            batch_rows: 1,
499            total_rows: 1,
500        };
501        setup.watermark_tx.send(vec![part]).await.unwrap();
502
503        // Wait for retries to complete
504        tokio::time::sleep(Duration::from_millis(1_200)).await;
505
506        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
507        assert_eq!(watermark.checkpoint_hi_inclusive, 11);
508    }
509
510    #[tokio::test]
511    async fn test_incomplete_watermark() {
512        let config = CommitterConfig {
513            watermark_interval_ms: 1_000, // Long polling interval to test adding complete part
514            ..Default::default()
515        };
516        let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
517
518        // Send the first incomplete watermark part
519        let part = WatermarkPart {
520            watermark: CommitterWatermark {
521                checkpoint_hi_inclusive: 1,
522                ..Default::default()
523            },
524            batch_rows: 1,
525            total_rows: 3,
526        };
527        setup.watermark_tx.send(vec![part.clone()]).await.unwrap();
528
529        // Wait for processing
530        tokio::time::sleep(Duration::from_millis(200)).await;
531
532        // Verify watermark hasn't progressed
533        let watermark = setup.store.watermark(DataPipeline::NAME);
534        assert!(watermark.is_none());
535
536        // Send the other two parts to complete the watermark
537        setup
538            .watermark_tx
539            .send(vec![part.clone(), part.clone()])
540            .await
541            .unwrap();
542
543        // Wait for next polling and processing
544        tokio::time::sleep(Duration::from_millis(1_200)).await;
545
546        // Verify watermark has progressed
547        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
548        assert_eq!(watermark.checkpoint_hi_inclusive, 1);
549    }
550
551    #[tokio::test]
552    async fn test_no_initial_watermark() {
553        let config = CommitterConfig::default();
554        let setup = setup_test::<DataPipeline>(config, 0, MockStore::default());
555
556        // Send the checkpoint 1 watermark
557        setup
558            .watermark_tx
559            .send(vec![create_watermark_part_for_checkpoint(1)])
560            .await
561            .unwrap();
562
563        // Wait for processing
564        tokio::time::sleep(Duration::from_millis(200)).await;
565
566        // Verify watermark hasn't progressed
567        let watermark = setup.store.watermark(DataPipeline::NAME);
568        assert!(watermark.is_none());
569
570        // Send the checkpoint 0 watermark to fill the gap.
571        setup
572            .watermark_tx
573            .send(vec![create_watermark_part_for_checkpoint(0)])
574            .await
575            .unwrap();
576
577        // Wait for processing
578        tokio::time::sleep(Duration::from_millis(1200)).await;
579
580        // Verify watermark has progressed
581        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
582        assert_eq!(watermark.checkpoint_hi_inclusive, 1);
583    }
584}