sui_indexer_alt_framework/pipeline/concurrent/
reader_watermark.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::sync::Arc;

use tokio::{task::JoinHandle, time::interval};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};

use crate::{
    metrics::IndexerMetrics,
    store::{Connection, Store},
};

use super::{Handler, PrunerConfig};

/// The reader watermark task is responsible for updating the `reader_lo` and `pruner_timestamp`
/// values for a pipeline's row in the watermark table, based on the pruner configuration, and the
/// committer's progress.
///
/// `reader_lo` is the lowest checkpoint that readers are allowed to read from with a guarantee of
/// data availability for this pipeline, and `pruner_timestamp` is the timestamp at which this task
/// last updated that watermark. The timestamp is always fetched from the database (not from the
/// indexer or the reader), to avoid issues with drift between clocks.
///
/// If there is no pruner configuration, this task will immediately exit. Otherwise, the task exits
/// when the provided cancellation token is triggered.
pub(super) fn reader_watermark<H: Handler + 'static>(
    config: Option<PrunerConfig>,
    store: H::Store,
    metrics: Arc<IndexerMetrics>,
    cancel: CancellationToken,
) -> JoinHandle<()> {
    tokio::spawn(async move {
        let Some(config) = config else {
            info!(pipeline = H::NAME, "Skipping reader watermark task");
            return;
        };

        let mut poll = interval(config.interval());

        loop {
            tokio::select! {
                _ = cancel.cancelled() => {
                    info!(pipeline = H::NAME, "Shutdown received");
                    break;
                }

                _ = poll.tick() => {
                    let Ok(mut conn) = store.connect().await else {
                        warn!(pipeline = H::NAME, "Reader watermark task failed to get connection for DB");
                        continue;
                    };

                    let current = match conn.reader_watermark(H::NAME).await {
                        Ok(Some(current)) => current,

                        Ok(None) => {
                            warn!(pipeline = H::NAME, "No watermark for pipeline, skipping");
                            continue;
                        }

                        Err(e) => {
                            warn!(pipeline = H::NAME, "Failed to get current watermark: {e}");
                            continue;
                        }
                    };

                    // Calculate the new reader watermark based on the current high watermark.
                    let new_reader_lo = (current.checkpoint_hi_inclusive as u64 + 1)
                        .saturating_sub(config.retention);

                    if new_reader_lo <= current.reader_lo as u64 {
                        debug!(
                            pipeline = H::NAME,
                            current = current.reader_lo,
                            new = new_reader_lo,
                            "No change to reader watermark",
                        );
                        continue;
                    }

                    metrics
                        .watermark_reader_lo
                        .with_label_values(&[H::NAME])
                        .set(new_reader_lo as i64);

                    let Ok(updated) = conn.set_reader_watermark(H::NAME, new_reader_lo).await else {
                        warn!(pipeline = H::NAME, "Failed to update reader watermark");
                        continue;
                    };

                    if updated {
                        info!(pipeline = H::NAME, new_reader_lo, "Watermark");

                        metrics
                            .watermark_reader_lo_in_db
                            .with_label_values(&[H::NAME])
                            .set(new_reader_lo as i64);
                    }
                }
            }
        }

        info!(pipeline = H::NAME, "Stopping reader watermark task");
    })
}