sui_indexer_alt_framework/pipeline/concurrent/
reader_watermark.rs1use std::sync::Arc;
5
6use tokio::{task::JoinHandle, time::interval};
7use tokio_util::sync::CancellationToken;
8use tracing::{debug, info, warn};
9
10use crate::{
11 metrics::IndexerMetrics,
12 store::{Connection, Store},
13};
14
15use super::{Handler, PrunerConfig};
16
17pub(super) fn reader_watermark<H: Handler + 'static>(
29 config: Option<PrunerConfig>,
30 store: H::Store,
31 metrics: Arc<IndexerMetrics>,
32 cancel: CancellationToken,
33) -> JoinHandle<()> {
34 tokio::spawn(async move {
35 let Some(config) = config else {
36 info!(pipeline = H::NAME, "Skipping reader watermark task");
37 return;
38 };
39
40 let mut poll = interval(config.interval());
41
42 loop {
43 tokio::select! {
44 _ = cancel.cancelled() => {
45 info!(pipeline = H::NAME, "Shutdown received");
46 break;
47 }
48
49 _ = poll.tick() => {
50 let Ok(mut conn) = store.connect().await else {
51 warn!(pipeline = H::NAME, "Reader watermark task failed to get connection for DB");
52 continue;
53 };
54
55 let current = match conn.reader_watermark(H::NAME).await {
56 Ok(Some(current)) => current,
57
58 Ok(None) => {
59 warn!(pipeline = H::NAME, "No watermark for pipeline, skipping");
60 continue;
61 }
62
63 Err(e) => {
64 warn!(pipeline = H::NAME, "Failed to get current watermark: {e}");
65 continue;
66 }
67 };
68
69 let new_reader_lo = (current.checkpoint_hi_inclusive as u64 + 1)
71 .saturating_sub(config.retention);
72
73 if new_reader_lo <= current.reader_lo as u64 {
74 debug!(
75 pipeline = H::NAME,
76 current = current.reader_lo,
77 new = new_reader_lo,
78 "No change to reader watermark",
79 );
80 continue;
81 }
82
83 metrics
84 .watermark_reader_lo
85 .with_label_values(&[H::NAME])
86 .set(new_reader_lo as i64);
87
88 let Ok(updated) = conn.set_reader_watermark(H::NAME, new_reader_lo).await else {
89 warn!(pipeline = H::NAME, "Failed to update reader watermark");
90 continue;
91 };
92
93 if updated {
94 info!(pipeline = H::NAME, new_reader_lo, "Watermark");
95
96 metrics
97 .watermark_reader_lo_in_db
98 .with_label_values(&[H::NAME])
99 .set(new_reader_lo as i64);
100 }
101 }
102 }
103 }
104
105 info!(pipeline = H::NAME, "Stopping reader watermark task");
106 })
107}
108
109#[cfg(test)]
110mod tests {
111 use async_trait::async_trait;
112 use std::sync::Arc;
113 use sui_pg_db::FieldCount;
114 use sui_types::full_checkpoint_content::CheckpointData;
115 use tokio::time::Duration;
116 use tokio_util::sync::CancellationToken;
117
118 use crate::{metrics::IndexerMetrics, mocks::store::*, pipeline::Processor};
119
120 use super::*;
121
122 const TEST_RETENTION: u64 = 5;
124 const TEST_TIMEOUT: Duration = Duration::from_secs(20);
126
127 #[derive(Clone, FieldCount)]
128 pub struct StoredData;
129
130 pub struct DataPipeline;
131
132 #[async_trait]
133 impl Processor for DataPipeline {
134 const NAME: &'static str = "data";
135 type Value = StoredData;
136
137 async fn process(
138 &self,
139 _checkpoint: &Arc<CheckpointData>,
140 ) -> anyhow::Result<Vec<Self::Value>> {
141 Ok(vec![])
142 }
143 }
144
145 #[async_trait]
146 impl Handler for DataPipeline {
147 type Store = MockStore;
148
149 async fn commit<'a>(
150 _values: &[Self::Value],
151 _conn: &mut MockConnection<'a>,
152 ) -> anyhow::Result<usize> {
153 Ok(0)
154 }
155 }
156
157 struct TestSetup {
158 store: MockStore,
159 handle: JoinHandle<()>,
160 cancel: CancellationToken,
161 }
162
163 async fn setup_test(
164 watermark: MockWatermark,
165 interval_ms: u64,
166 connection_failure_attempts: usize,
167 set_reader_watermark_failure_attempts: usize,
168 ) -> TestSetup {
169 let store = MockStore::new()
170 .with_watermark(DataPipeline::NAME, watermark)
171 .with_reader_watermark_failures(set_reader_watermark_failure_attempts)
172 .with_connection_failures(connection_failure_attempts);
173
174 let config = PrunerConfig {
175 interval_ms,
176 delay_ms: 100,
177 retention: TEST_RETENTION,
178 max_chunk_size: 100,
179 prune_concurrency: 1,
180 };
181
182 let metrics = IndexerMetrics::new(None, &Default::default());
183 let cancel = CancellationToken::new();
184
185 let store_clone = store.clone();
186 let cancel_clone = cancel.clone();
187 let handle =
188 reader_watermark::<DataPipeline>(Some(config), store_clone, metrics, cancel_clone);
189
190 TestSetup {
191 store,
192 handle,
193 cancel,
194 }
195 }
196
197 #[tokio::test]
198 async fn test_reader_watermark_updates() {
199 let watermark = MockWatermark {
200 epoch_hi_inclusive: 0,
201 checkpoint_hi_inclusive: 10, tx_hi: 100,
203 timestamp_ms_hi_inclusive: 1000,
204 reader_lo: 0, pruner_timestamp: 0,
206 pruner_hi: 0,
207 };
208 let polling_interval_ms = 100;
209 let connection_failure_attempts = 0;
210 let set_reader_watermark_failure_attempts = 0;
211 let setup = setup_test(
212 watermark,
213 polling_interval_ms,
214 connection_failure_attempts,
215 set_reader_watermark_failure_attempts,
216 )
217 .await;
218
219 tokio::time::sleep(Duration::from_millis(200)).await;
221
222 {
224 let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
225 assert_eq!(watermarks.reader_lo, 6);
226 }
227
228 setup.cancel.cancel();
230 let _ = setup.handle.await;
231 }
232
233 #[tokio::test]
234 async fn test_reader_watermark_does_not_update_smaller_reader_lo() {
235 let watermark = MockWatermark {
236 epoch_hi_inclusive: 0,
237 checkpoint_hi_inclusive: 10, tx_hi: 100,
239 timestamp_ms_hi_inclusive: 1000,
240 reader_lo: 7, pruner_timestamp: 0,
242 pruner_hi: 0,
243 };
244 let polling_interval_ms = 100;
245 let connection_failure_attempts = 0;
246 let set_reader_watermark_failure_attempts = 0;
247 let setup = setup_test(
248 watermark,
249 polling_interval_ms,
250 connection_failure_attempts,
251 set_reader_watermark_failure_attempts,
252 )
253 .await;
254
255 tokio::time::sleep(Duration::from_millis(200)).await;
257
258 {
261 let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
262 assert_eq!(
263 watermarks.reader_lo, 7,
264 "Reader watermark should not be updated when new value is smaller"
265 );
266 }
267
268 setup.cancel.cancel();
270 let _ = setup.handle.await;
271 }
272
273 #[tokio::test]
274 async fn test_reader_watermark_retry_update_after_connection_failure() {
275 let watermark = MockWatermark {
276 epoch_hi_inclusive: 0,
277 checkpoint_hi_inclusive: 10, tx_hi: 100,
279 timestamp_ms_hi_inclusive: 1000,
280 reader_lo: 0, pruner_timestamp: 0,
282 pruner_hi: 0,
283 };
284 let polling_interval_ms = 1_000; let connection_failure_attempts = 1;
286 let set_reader_watermark_failure_attempts = 0;
287 let setup = setup_test(
288 watermark,
289 polling_interval_ms,
290 connection_failure_attempts,
291 set_reader_watermark_failure_attempts,
292 )
293 .await;
294
295 setup
297 .store
298 .wait_for_connection_attempts(1, TEST_TIMEOUT)
299 .await;
300
301 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
303 assert_eq!(
304 watermark.reader_lo, 0,
305 "Reader watermark should not be updated due to DB connection failure"
306 );
307
308 setup
310 .store
311 .wait_for_connection_attempts(2, TEST_TIMEOUT)
312 .await;
313
314 tokio::time::sleep(Duration::from_millis(100)).await;
316
317 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
319 assert_eq!(
320 watermark.reader_lo, 6,
321 "Reader watermark should be updated after retry succeeds"
322 );
323
324 setup.cancel.cancel();
326 let _ = setup.handle.await;
327 }
328
329 #[tokio::test]
330 async fn test_reader_watermark_retry_update_after_set_watermark_failure() {
331 let watermark = MockWatermark {
332 epoch_hi_inclusive: 0,
333 checkpoint_hi_inclusive: 10, tx_hi: 100,
335 timestamp_ms_hi_inclusive: 1000,
336 reader_lo: 0, pruner_timestamp: 0,
338 pruner_hi: 0,
339 };
340 let polling_interval_ms = 1_000; let connection_failure_attempts = 0;
342 let set_reader_watermark_failure_attempts = 1;
343 let setup = setup_test(
344 watermark,
345 polling_interval_ms,
346 connection_failure_attempts,
347 set_reader_watermark_failure_attempts,
348 )
349 .await;
350
351 tokio::time::sleep(Duration::from_millis(200)).await;
353
354 {
356 let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
357 assert_eq!(
358 watermarks.reader_lo, 0,
359 "Reader watermark should not be updated due to set_reader_watermark failure"
360 );
361 }
362
363 tokio::time::sleep(Duration::from_millis(1200)).await;
365
366 {
368 let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
369 assert_eq!(watermarks.reader_lo, 6);
370 }
371
372 setup.cancel.cancel();
374 let _ = setup.handle.await;
375 }
376}