sui_indexer_alt_framework/pipeline/concurrent/
commit_watermark.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::cmp::Ordering;
5use std::collections::BTreeMap;
6use std::collections::btree_map::Entry;
7use std::sync::Arc;
8
9use sui_futures::service::Service;
10use tokio::sync::mpsc;
11use tracing::debug;
12use tracing::error;
13use tracing::info;
14use tracing::warn;
15
16use crate::metrics::CheckpointLagMetricReporter;
17use crate::metrics::IndexerMetrics;
18use crate::pipeline::CommitterConfig;
19use crate::pipeline::WARN_PENDING_WATERMARKS;
20use crate::pipeline::WatermarkPart;
21use crate::pipeline::concurrent::Handler;
22use crate::pipeline::logging::WatermarkLogger;
23use crate::store::Connection;
24use crate::store::Store;
25use crate::store::pipeline_task;
26
27/// The watermark task is responsible for keeping track of a pipeline's out-of-order commits and
28/// updating its row in the `watermarks` table when a continuous run of checkpoints have landed
29/// since the last watermark update.
30///
31/// It receives watermark "parts" that detail the proportion of each checkpoint's data that has been
32/// written out by the committer and periodically (on a configurable interval) checks if the
33/// watermark for the pipeline can be pushed forward. The watermark can be pushed forward if there
34/// is one or more complete (all data for that checkpoint written out) watermarks spanning
35/// contiguously from the current high watermark into the future.
36///
37/// If it detects that more than [WARN_PENDING_WATERMARKS] watermarks have built up, it will issue a
38/// warning, as this could be the indication of a memory leak, and the caller probably intended to
39/// run the indexer with watermarking disabled (e.g. if they are running a backfill).
40///
41/// The task regularly traces its progress, outputting at a higher log level every
42/// [LOUD_WATERMARK_UPDATE_INTERVAL]-many checkpoints.
43///
44/// The task will shutdown if the `rx` channel closes and the watermark cannot be progressed.
45pub(super) fn commit_watermark<H: Handler + 'static>(
46    mut next_checkpoint: u64,
47    config: CommitterConfig,
48    mut rx: mpsc::Receiver<Vec<WatermarkPart>>,
49    store: H::Store,
50    task: Option<String>,
51    metrics: Arc<IndexerMetrics>,
52) -> Service {
53    // SAFETY: on indexer instantiation, we've checked that the pipeline name is valid.
54    let pipeline_task = pipeline_task::<H::Store>(H::NAME, task.as_deref()).unwrap();
55    Service::new().spawn_aborting(async move {
56        // To correctly update the watermark, the task tracks the watermark it last tried to write
57        // and the watermark parts for any checkpoints that have been written since then
58        // ("pre-committed"). After each batch is written, the task will try to progress the
59        // watermark as much as possible without going over any holes in the sequence of
60        // checkpoints (entirely missing watermarks, or incomplete watermarks).
61        let mut precommitted: BTreeMap<u64, WatermarkPart> = BTreeMap::new();
62
63        // The watermark task will periodically output a log message at a higher log level to
64        // demonstrate that the pipeline is making progress.
65        let mut logger = WatermarkLogger::new("concurrent_committer");
66
67        let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
68            &metrics.watermarked_checkpoint_timestamp_lag,
69            &metrics.latest_watermarked_checkpoint_timestamp_lag_ms,
70            &metrics.watermark_checkpoint_in_db,
71        );
72
73        info!(
74            pipeline = H::NAME,
75            next_checkpoint, "Starting commit watermark task"
76        );
77
78        let mut next_wake = tokio::time::Instant::now();
79
80        loop {
81            tokio::select! {
82                () = tokio::time::sleep_until(next_wake) => {
83                    // Schedule next wake immediately, so the timer effectively runs in parallel
84                    // with the commit logic below.
85                    next_wake = config.watermark_interval_with_jitter();
86                }
87                Some(parts) = rx.recv() => {
88                    for part in parts {
89                        match precommitted.entry(part.checkpoint()) {
90                            Entry::Vacant(entry) => {
91                                entry.insert(part);
92                            }
93
94                            Entry::Occupied(mut entry) => {
95                                entry.get_mut().add(part);
96                            }
97                        }
98                    }
99
100                    continue;
101                }
102            }
103
104            // The presence of a watermark means that we can update the watermark in db.
105            // However, concurrent pipelines do not need every watermark update to succeed.
106            let mut watermark = None;
107
108            if precommitted.len() > WARN_PENDING_WATERMARKS {
109                warn!(
110                    pipeline = H::NAME,
111                    pending = precommitted.len(),
112                    "Pipeline has a large number of pending commit watermarks",
113                );
114            }
115
116            let Ok(mut conn) = store.connect().await else {
117                warn!(
118                    pipeline = H::NAME,
119                    "Commit watermark task failed to get connection for DB"
120                );
121                continue;
122            };
123
124            // Check if the pipeline's watermark needs to be updated
125            let guard = metrics
126                .watermark_gather_latency
127                .with_label_values(&[H::NAME])
128                .start_timer();
129
130            while let Some(pending) = precommitted.first_entry() {
131                let part = pending.get();
132
133                // Some rows from the next watermark have not landed yet.
134                if !part.is_complete() {
135                    break;
136                }
137
138                match next_checkpoint.cmp(&part.watermark.checkpoint_hi_inclusive) {
139                    // Next pending checkpoint is from the future.
140                    Ordering::Less => break,
141
142                    // This is the next checkpoint -- include it.
143                    Ordering::Equal => {
144                        watermark = Some(pending.remove().watermark);
145                        next_checkpoint += 1;
146                    }
147
148                    // Next pending checkpoint is in the past. Out of order watermarks can
149                    // be encountered when a pipeline is starting up, because ingestion
150                    // must start at the lowest checkpoint across all pipelines, or because
151                    // of a backfill, where the initial checkpoint has been overridden.
152                    Ordering::Greater => {
153                        // Track how many we see to make sure it doesn't grow without bound.
154                        metrics
155                            .total_watermarks_out_of_order
156                            .with_label_values(&[H::NAME])
157                            .inc();
158
159                        pending.remove();
160                    }
161                }
162            }
163
164            let elapsed = guard.stop_and_record();
165
166            if let Some(watermark) = watermark {
167                metrics
168                    .watermark_epoch
169                    .with_label_values(&[H::NAME])
170                    .set(watermark.epoch_hi_inclusive as i64);
171
172                metrics
173                    .watermark_checkpoint
174                    .with_label_values(&[H::NAME])
175                    .set(watermark.checkpoint_hi_inclusive as i64);
176
177                metrics
178                    .watermark_transaction
179                    .with_label_values(&[H::NAME])
180                    .set(watermark.tx_hi as i64);
181
182                metrics
183                    .watermark_timestamp_ms
184                    .with_label_values(&[H::NAME])
185                    .set(watermark.timestamp_ms_hi_inclusive as i64);
186
187                debug!(
188                    pipeline = H::NAME,
189                    elapsed_ms = elapsed * 1000.0,
190                    watermark = watermark.checkpoint_hi_inclusive,
191                    timestamp = %watermark.timestamp(),
192                    pending = precommitted.len(),
193                    "Gathered watermarks",
194                );
195
196                let guard = metrics
197                    .watermark_commit_latency
198                    .with_label_values(&[H::NAME])
199                    .start_timer();
200
201                // TODO: If initial_watermark is empty, when we update watermark
202                // for the first time, we should also update the low watermark.
203                match conn
204                    .set_committer_watermark(&pipeline_task, watermark)
205                    .await
206                {
207                    // If there's an issue updating the watermark, log it but keep going,
208                    // it's OK for the watermark to lag from a correctness perspective.
209                    Err(e) => {
210                        let elapsed = guard.stop_and_record();
211                        error!(
212                            pipeline = H::NAME,
213                            elapsed_ms = elapsed * 1000.0,
214                            ?watermark,
215                            "Error updating commit watermark: {e}",
216                        );
217                    }
218
219                    Ok(true) => {
220                        let elapsed = guard.stop_and_record();
221
222                        logger.log::<H>(&watermark, elapsed);
223
224                        checkpoint_lag_reporter.report_lag(
225                            watermark.checkpoint_hi_inclusive,
226                            watermark.timestamp_ms_hi_inclusive,
227                        );
228
229                        metrics
230                            .watermark_epoch_in_db
231                            .with_label_values(&[H::NAME])
232                            .set(watermark.epoch_hi_inclusive as i64);
233
234                        metrics
235                            .watermark_transaction_in_db
236                            .with_label_values(&[H::NAME])
237                            .set(watermark.tx_hi as i64);
238
239                        metrics
240                            .watermark_timestamp_in_db_ms
241                            .with_label_values(&[H::NAME])
242                            .set(watermark.timestamp_ms_hi_inclusive as i64);
243                    }
244                    Ok(false) => {}
245                }
246            }
247
248            if rx.is_closed() && rx.is_empty() {
249                info!(pipeline = H::NAME, "Committer closed channel");
250                break;
251            }
252        }
253
254        info!(pipeline = H::NAME, "Stopping committer watermark task");
255        Ok(())
256    })
257}
258
259#[cfg(test)]
260mod tests {
261    use std::sync::Arc;
262    use std::time::Duration;
263
264    use async_trait::async_trait;
265    use sui_types::full_checkpoint_content::Checkpoint;
266    use tokio::sync::mpsc;
267
268    use crate::FieldCount;
269    use crate::metrics::IndexerMetrics;
270    use crate::mocks::store::*;
271    use crate::pipeline::CommitterConfig;
272    use crate::pipeline::Processor;
273    use crate::pipeline::WatermarkPart;
274    use crate::pipeline::concurrent::BatchStatus;
275    use crate::store::CommitterWatermark;
276
277    use super::*;
278
279    #[derive(Clone, FieldCount)]
280    pub struct StoredData;
281
282    pub struct DataPipeline;
283
284    #[async_trait]
285    impl Processor for DataPipeline {
286        const NAME: &'static str = "data";
287        type Value = StoredData;
288
289        async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
290            Ok(vec![])
291        }
292    }
293
294    #[async_trait]
295    impl Handler for DataPipeline {
296        type Store = MockStore;
297        type Batch = Vec<Self::Value>;
298
299        fn batch(
300            &self,
301            batch: &mut Self::Batch,
302            values: &mut std::vec::IntoIter<Self::Value>,
303        ) -> BatchStatus {
304            batch.extend(values);
305            BatchStatus::Pending
306        }
307
308        async fn commit<'a>(
309            &self,
310            _batch: &Self::Batch,
311            _conn: &mut MockConnection<'a>,
312        ) -> anyhow::Result<usize> {
313            Ok(0)
314        }
315    }
316
317    struct TestSetup {
318        store: MockStore,
319        watermark_tx: mpsc::Sender<Vec<WatermarkPart>>,
320        #[allow(unused)]
321        commit_watermark: Service,
322    }
323
324    fn setup_test<H: Handler<Store = MockStore> + 'static>(
325        config: CommitterConfig,
326        next_checkpoint: u64,
327        store: MockStore,
328    ) -> TestSetup {
329        let (watermark_tx, watermark_rx) = mpsc::channel(100);
330        let metrics = IndexerMetrics::new(None, &Default::default());
331
332        let store_clone = store.clone();
333
334        let commit_watermark = commit_watermark::<H>(
335            next_checkpoint,
336            config,
337            watermark_rx,
338            store_clone,
339            None,
340            metrics,
341        );
342
343        TestSetup {
344            store,
345            watermark_tx,
346            commit_watermark,
347        }
348    }
349
350    fn create_watermark_part_for_checkpoint(checkpoint: u64) -> WatermarkPart {
351        WatermarkPart {
352            watermark: CommitterWatermark {
353                checkpoint_hi_inclusive: checkpoint,
354                ..Default::default()
355            },
356            batch_rows: 1,
357            total_rows: 1,
358        }
359    }
360
361    #[tokio::test]
362    async fn test_basic_watermark_progression() {
363        let config = CommitterConfig::default();
364        let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
365
366        // Send watermark parts in order
367        for cp in 1..4 {
368            let part = create_watermark_part_for_checkpoint(cp);
369            setup.watermark_tx.send(vec![part]).await.unwrap();
370        }
371
372        // Wait for processing
373        tokio::time::sleep(Duration::from_millis(100)).await;
374
375        // Verify watermark progression
376        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
377        assert_eq!(watermark.checkpoint_hi_inclusive, 3);
378    }
379
380    #[tokio::test]
381    async fn test_out_of_order_watermarks() {
382        let config = CommitterConfig::default();
383        let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
384
385        // Send watermark parts out of order
386        let parts = vec![
387            create_watermark_part_for_checkpoint(4),
388            create_watermark_part_for_checkpoint(2),
389            create_watermark_part_for_checkpoint(1),
390        ];
391        setup.watermark_tx.send(parts).await.unwrap();
392
393        // Wait for processing
394        tokio::time::sleep(Duration::from_millis(100)).await;
395
396        // Verify watermark hasn't progressed past 2
397        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
398        assert_eq!(watermark.checkpoint_hi_inclusive, 2);
399
400        // Send checkpoint 3 to fill the gap
401        setup
402            .watermark_tx
403            .send(vec![create_watermark_part_for_checkpoint(3)])
404            .await
405            .unwrap();
406
407        // Wait for the next polling and processing
408        tokio::time::sleep(Duration::from_secs(1)).await;
409
410        // Verify watermark has progressed to 4
411        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
412        assert_eq!(watermark.checkpoint_hi_inclusive, 4);
413    }
414
415    #[tokio::test]
416    async fn test_watermark_with_connection_failure() {
417        let config = CommitterConfig {
418            watermark_interval_ms: 1_000, // Long polling interval to test connection retry
419            ..Default::default()
420        };
421        let store = MockStore::default().with_connection_failures(1);
422        let setup = setup_test::<DataPipeline>(config, 1, store);
423
424        // Send watermark part
425        let part = create_watermark_part_for_checkpoint(1);
426        setup.watermark_tx.send(vec![part]).await.unwrap();
427
428        // Wait for processing
429        tokio::time::sleep(Duration::from_millis(200)).await;
430
431        // Verify watermark hasn't progressed
432        let watermark = setup.store.watermark(DataPipeline::NAME);
433        assert!(watermark.is_none());
434
435        // Wait for next polling and processing
436        tokio::time::sleep(Duration::from_millis(1_200)).await;
437
438        // Verify watermark has progressed
439        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
440        assert_eq!(watermark.checkpoint_hi_inclusive, 1);
441    }
442
443    #[tokio::test]
444    async fn test_committer_retries_on_commit_watermark_failure() {
445        let config = CommitterConfig {
446            watermark_interval_ms: 1_000, // Long polling interval to test connection retry
447            ..Default::default()
448        };
449        // Create store with transaction failure configuration
450        let store = MockStore::default().with_commit_watermark_failures(1); // Will fail once before succeeding
451        let setup = setup_test::<DataPipeline>(config, 10, store);
452
453        let part = WatermarkPart {
454            watermark: CommitterWatermark {
455                checkpoint_hi_inclusive: 10,
456                ..Default::default()
457            },
458            batch_rows: 1,
459            total_rows: 1,
460        };
461        setup.watermark_tx.send(vec![part]).await.unwrap();
462
463        // Wait for initial poll to be over
464        tokio::time::sleep(Duration::from_millis(200)).await;
465        let watermark = setup.store.watermark(DataPipeline::NAME);
466        assert!(watermark.is_none());
467
468        // Wait for retries to complete
469        tokio::time::sleep(Duration::from_millis(1_200)).await;
470
471        // Verify watermark is still none
472        let watermark = setup.store.watermark(DataPipeline::NAME);
473        assert!(watermark.is_none());
474    }
475
476    #[tokio::test]
477    async fn test_committer_retries_on_commit_watermark_failure_advances() {
478        let config = CommitterConfig {
479            watermark_interval_ms: 1_000, // Long polling interval to test connection retry
480            ..Default::default()          // Create store with transaction failure configuration
481        };
482        let store = MockStore::default().with_commit_watermark_failures(1); // Will fail once before succeeding
483        let setup = setup_test::<DataPipeline>(config, 10, store);
484
485        let part = WatermarkPart {
486            watermark: CommitterWatermark {
487                checkpoint_hi_inclusive: 10,
488                ..Default::default()
489            },
490            batch_rows: 1,
491            total_rows: 1,
492        };
493        setup.watermark_tx.send(vec![part]).await.unwrap();
494
495        // Wait for initial poll to be over
496        tokio::time::sleep(Duration::from_millis(200)).await;
497        let watermark = setup.store.watermark(DataPipeline::NAME);
498        assert!(watermark.is_none());
499
500        let part = WatermarkPart {
501            watermark: CommitterWatermark {
502                checkpoint_hi_inclusive: 11,
503                ..Default::default()
504            },
505            batch_rows: 1,
506            total_rows: 1,
507        };
508        setup.watermark_tx.send(vec![part]).await.unwrap();
509
510        // Wait for retries to complete
511        tokio::time::sleep(Duration::from_millis(1_200)).await;
512
513        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
514        assert_eq!(watermark.checkpoint_hi_inclusive, 11);
515    }
516
517    #[tokio::test]
518    async fn test_incomplete_watermark() {
519        let config = CommitterConfig {
520            watermark_interval_ms: 1_000, // Long polling interval to test adding complete part
521            ..Default::default()
522        };
523        let setup = setup_test::<DataPipeline>(config, 1, MockStore::default());
524
525        // Send the first incomplete watermark part
526        let part = WatermarkPart {
527            watermark: CommitterWatermark {
528                checkpoint_hi_inclusive: 1,
529                ..Default::default()
530            },
531            batch_rows: 1,
532            total_rows: 3,
533        };
534        setup.watermark_tx.send(vec![part.clone()]).await.unwrap();
535
536        // Wait for processing
537        tokio::time::sleep(Duration::from_millis(200)).await;
538
539        // Verify watermark hasn't progressed
540        let watermark = setup.store.watermark(DataPipeline::NAME);
541        assert!(watermark.is_none());
542
543        // Send the other two parts to complete the watermark
544        setup
545            .watermark_tx
546            .send(vec![part.clone(), part.clone()])
547            .await
548            .unwrap();
549
550        // Wait for next polling and processing
551        tokio::time::sleep(Duration::from_millis(1_200)).await;
552
553        // Verify watermark has progressed
554        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
555        assert_eq!(watermark.checkpoint_hi_inclusive, 1);
556    }
557
558    #[tokio::test]
559    async fn test_no_initial_watermark() {
560        let config = CommitterConfig::default();
561        let setup = setup_test::<DataPipeline>(config, 0, MockStore::default());
562
563        // Send the checkpoint 1 watermark
564        setup
565            .watermark_tx
566            .send(vec![create_watermark_part_for_checkpoint(1)])
567            .await
568            .unwrap();
569
570        // Wait for processing
571        tokio::time::sleep(Duration::from_millis(200)).await;
572
573        // Verify watermark hasn't progressed
574        let watermark = setup.store.watermark(DataPipeline::NAME);
575        assert!(watermark.is_none());
576
577        // Send the checkpoint 0 watermark to fill the gap.
578        setup
579            .watermark_tx
580            .send(vec![create_watermark_part_for_checkpoint(0)])
581            .await
582            .unwrap();
583
584        // Wait for processing
585        tokio::time::sleep(Duration::from_millis(1200)).await;
586
587        // Verify watermark has progressed
588        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
589        assert_eq!(watermark.checkpoint_hi_inclusive, 1);
590    }
591}