sui_indexer_alt_framework/pipeline/concurrent/reader_watermark.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use std::sync::Arc;
use tokio::{task::JoinHandle, time::interval};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use crate::{
metrics::IndexerMetrics,
store::{Connection, Store},
};
use super::{Handler, PrunerConfig};
/// The reader watermark task is responsible for updating the `reader_lo` and `pruner_timestamp`
/// values for a pipeline's row in the watermark table, based on the pruner configuration, and the
/// committer's progress.
///
/// `reader_lo` is the lowest checkpoint that readers are allowed to read from with a guarantee of
/// data availability for this pipeline, and `pruner_timestamp` is the timestamp at which this task
/// last updated that watermark. The timestamp is always fetched from the database (not from the
/// indexer or the reader), to avoid issues with drift between clocks.
///
/// If there is no pruner configuration, this task will immediately exit. Otherwise, the task exits
/// when the provided cancellation token is triggered.
pub(super) fn reader_watermark<H: Handler + 'static>(
config: Option<PrunerConfig>,
store: H::Store,
metrics: Arc<IndexerMetrics>,
cancel: CancellationToken,
) -> JoinHandle<()> {
tokio::spawn(async move {
let Some(config) = config else {
info!(pipeline = H::NAME, "Skipping reader watermark task");
return;
};
let mut poll = interval(config.interval());
loop {
tokio::select! {
_ = cancel.cancelled() => {
info!(pipeline = H::NAME, "Shutdown received");
break;
}
_ = poll.tick() => {
let Ok(mut conn) = store.connect().await else {
warn!(pipeline = H::NAME, "Reader watermark task failed to get connection for DB");
continue;
};
let current = match conn.reader_watermark(H::NAME).await {
Ok(Some(current)) => current,
Ok(None) => {
warn!(pipeline = H::NAME, "No watermark for pipeline, skipping");
continue;
}
Err(e) => {
warn!(pipeline = H::NAME, "Failed to get current watermark: {e}");
continue;
}
};
// Calculate the new reader watermark based on the current high watermark.
let new_reader_lo = (current.checkpoint_hi_inclusive as u64 + 1)
.saturating_sub(config.retention);
if new_reader_lo <= current.reader_lo as u64 {
debug!(
pipeline = H::NAME,
current = current.reader_lo,
new = new_reader_lo,
"No change to reader watermark",
);
continue;
}
metrics
.watermark_reader_lo
.with_label_values(&[H::NAME])
.set(new_reader_lo as i64);
let Ok(updated) = conn.set_reader_watermark(H::NAME, new_reader_lo).await else {
warn!(pipeline = H::NAME, "Failed to update reader watermark");
continue;
};
if updated {
info!(pipeline = H::NAME, new_reader_lo, "Watermark");
metrics
.watermark_reader_lo_in_db
.with_label_values(&[H::NAME])
.set(new_reader_lo as i64);
}
}
}
}
info!(pipeline = H::NAME, "Stopping reader watermark task");
})
}