sui_indexer_alt_framework/pipeline/concurrent/
reader_watermark.rsuse std::sync::Arc;
use tokio::{task::JoinHandle, time::interval};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use crate::{
metrics::IndexerMetrics,
store::{Connection, Store},
};
use super::{Handler, PrunerConfig};
pub(super) fn reader_watermark<H: Handler + 'static>(
config: Option<PrunerConfig>,
store: H::Store,
metrics: Arc<IndexerMetrics>,
cancel: CancellationToken,
) -> JoinHandle<()> {
tokio::spawn(async move {
let Some(config) = config else {
info!(pipeline = H::NAME, "Skipping reader watermark task");
return;
};
let mut poll = interval(config.interval());
loop {
tokio::select! {
_ = cancel.cancelled() => {
info!(pipeline = H::NAME, "Shutdown received");
break;
}
_ = poll.tick() => {
let Ok(mut conn) = store.connect().await else {
warn!(pipeline = H::NAME, "Reader watermark task failed to get connection for DB");
continue;
};
let current = match conn.reader_watermark(H::NAME).await {
Ok(Some(current)) => current,
Ok(None) => {
warn!(pipeline = H::NAME, "No watermark for pipeline, skipping");
continue;
}
Err(e) => {
warn!(pipeline = H::NAME, "Failed to get current watermark: {e}");
continue;
}
};
let new_reader_lo = (current.checkpoint_hi_inclusive as u64 + 1)
.saturating_sub(config.retention);
if new_reader_lo <= current.reader_lo as u64 {
debug!(
pipeline = H::NAME,
current = current.reader_lo,
new = new_reader_lo,
"No change to reader watermark",
);
continue;
}
metrics
.watermark_reader_lo
.with_label_values(&[H::NAME])
.set(new_reader_lo as i64);
let Ok(updated) = conn.set_reader_watermark(H::NAME, new_reader_lo).await else {
warn!(pipeline = H::NAME, "Failed to update reader watermark");
continue;
};
if updated {
info!(pipeline = H::NAME, new_reader_lo, "Watermark");
metrics
.watermark_reader_lo_in_db
.with_label_values(&[H::NAME])
.set(new_reader_lo as i64);
}
}
}
}
info!(pipeline = H::NAME, "Stopping reader watermark task");
})
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use sui_pg_db::FieldCount;
use sui_types::full_checkpoint_content::CheckpointData;
use tokio::time::Duration;
use tokio_util::sync::CancellationToken;
use crate::{metrics::IndexerMetrics, pipeline::Processor, testing::mock_store::*};
use super::*;
const TEST_RETENTION: u64 = 5;
const TEST_TIMEOUT: Duration = Duration::from_secs(20);
#[derive(Clone, FieldCount)]
pub struct StoredData;
pub struct DataPipeline;
impl Processor for DataPipeline {
const NAME: &'static str = "data";
type Value = StoredData;
fn process(&self, _checkpoint: &Arc<CheckpointData>) -> anyhow::Result<Vec<Self::Value>> {
Ok(vec![])
}
}
#[async_trait::async_trait]
impl Handler for DataPipeline {
type Store = MockStore;
async fn commit<'a>(
_values: &[Self::Value],
_conn: &mut MockConnection<'a>,
) -> anyhow::Result<usize> {
Ok(0)
}
}
struct TestSetup {
store: MockStore,
handle: JoinHandle<()>,
cancel: CancellationToken,
}
async fn setup_test(
watermark: MockWatermark,
interval_ms: u64,
connection_failure_attempts: usize,
set_reader_watermark_failure_attempts: usize,
) -> TestSetup {
let store = MockStore {
watermarks: Arc::new(Mutex::new(watermark)),
set_reader_watermark_failure_attempts: Arc::new(Mutex::new(
set_reader_watermark_failure_attempts,
)),
connection_failure: Arc::new(Mutex::new(ConnectionFailure {
connection_failure_attempts,
..Default::default()
})),
..Default::default()
};
let config = PrunerConfig {
interval_ms,
delay_ms: 100,
retention: TEST_RETENTION,
max_chunk_size: 100,
prune_concurrency: 1,
};
let metrics = IndexerMetrics::new(&Default::default());
let cancel = CancellationToken::new();
let store_clone = store.clone();
let cancel_clone = cancel.clone();
let handle =
reader_watermark::<DataPipeline>(Some(config), store_clone, metrics, cancel_clone);
TestSetup {
store,
handle,
cancel,
}
}
#[tokio::test]
async fn test_reader_watermark_updates() {
let watermark = MockWatermark {
epoch_hi_inclusive: 0,
checkpoint_hi_inclusive: 10, tx_hi: 100,
timestamp_ms_hi_inclusive: 1000,
reader_lo: 0, pruner_timestamp: 0,
pruner_hi: 0,
};
let polling_interval_ms = 100;
let connection_failure_attempts = 0;
let set_reader_watermark_failure_attempts = 0;
let setup = setup_test(
watermark,
polling_interval_ms,
connection_failure_attempts,
set_reader_watermark_failure_attempts,
)
.await;
tokio::time::sleep(Duration::from_millis(200)).await;
{
let watermarks = setup.store.watermarks.lock().unwrap();
assert_eq!(watermarks.reader_lo, 6);
}
setup.cancel.cancel();
let _ = setup.handle.await;
}
#[tokio::test]
async fn test_reader_watermark_does_not_update_smaller_reader_lo() {
let watermark = MockWatermark {
epoch_hi_inclusive: 0,
checkpoint_hi_inclusive: 10, tx_hi: 100,
timestamp_ms_hi_inclusive: 1000,
reader_lo: 7, pruner_timestamp: 0,
pruner_hi: 0,
};
let polling_interval_ms = 100;
let connection_failure_attempts = 0;
let set_reader_watermark_failure_attempts = 0;
let setup = setup_test(
watermark,
polling_interval_ms,
connection_failure_attempts,
set_reader_watermark_failure_attempts,
)
.await;
tokio::time::sleep(Duration::from_millis(200)).await;
{
let watermarks = setup.store.watermarks.lock().unwrap();
assert_eq!(
watermarks.reader_lo, 7,
"Reader watermark should not be updated when new value is smaller"
);
}
setup.cancel.cancel();
let _ = setup.handle.await;
}
#[tokio::test]
async fn test_reader_watermark_retry_update_after_connection_failure() {
let watermark = MockWatermark {
epoch_hi_inclusive: 0,
checkpoint_hi_inclusive: 10, tx_hi: 100,
timestamp_ms_hi_inclusive: 1000,
reader_lo: 0, pruner_timestamp: 0,
pruner_hi: 0,
};
let polling_interval_ms = 1_000; let connection_failure_attempts = 1;
let set_reader_watermark_failure_attempts = 0;
let setup = setup_test(
watermark,
polling_interval_ms,
connection_failure_attempts,
set_reader_watermark_failure_attempts,
)
.await;
setup
.store
.wait_for_connection_attempts(1, TEST_TIMEOUT)
.await;
let watermark = setup.store.get_watermark();
assert_eq!(
watermark.reader_lo, 0,
"Reader watermark should not be updated due to DB connection failure"
);
setup
.store
.wait_for_connection_attempts(2, TEST_TIMEOUT)
.await;
tokio::time::sleep(Duration::from_millis(100)).await;
let watermark = setup.store.get_watermark();
assert_eq!(
watermark.reader_lo, 6,
"Reader watermark should be updated after retry succeeds"
);
setup.cancel.cancel();
let _ = setup.handle.await;
}
#[tokio::test]
async fn test_reader_watermark_retry_update_after_set_watermark_failure() {
let watermark = MockWatermark {
epoch_hi_inclusive: 0,
checkpoint_hi_inclusive: 10, tx_hi: 100,
timestamp_ms_hi_inclusive: 1000,
reader_lo: 0, pruner_timestamp: 0,
pruner_hi: 0,
};
let polling_interval_ms = 1_000; let connection_failure_attempts = 0;
let set_reader_watermark_failure_attempts = 1;
let setup = setup_test(
watermark,
polling_interval_ms,
connection_failure_attempts,
set_reader_watermark_failure_attempts,
)
.await;
tokio::time::sleep(Duration::from_millis(200)).await;
{
let watermarks = setup.store.watermarks.lock().unwrap();
assert_eq!(
watermarks.reader_lo, 0,
"Reader watermark should not be updated due to set_reader_watermark failure"
);
}
tokio::time::sleep(Duration::from_millis(1200)).await;
{
let watermarks = setup.store.watermarks.lock().unwrap();
assert_eq!(watermarks.reader_lo, 6);
}
setup.cancel.cancel();
let _ = setup.handle.await;
}
}