sui_indexer_alt_framework/pipeline/concurrent/
reader_watermark.rs1use std::sync::Arc;
5
6use sui_futures::service::Service;
7use tokio::time::interval;
8use tracing::{debug, info, warn};
9
10use crate::{
11 metrics::IndexerMetrics,
12 store::{Connection, Store},
13};
14
15use super::{Handler, PrunerConfig};
16
17pub(super) fn reader_watermark<H: Handler + 'static>(
28 config: Option<PrunerConfig>,
29 store: H::Store,
30 metrics: Arc<IndexerMetrics>,
31) -> Service {
32 Service::new().spawn_aborting(async move {
33 let Some(config) = config else {
34 info!(pipeline = H::NAME, "Skipping reader watermark task");
35 return Ok(());
36 };
37
38 let mut poll = interval(config.interval());
39
40 loop {
41 poll.tick().await;
42
43 let Ok(mut conn) = store.connect().await else {
44 warn!(
45 pipeline = H::NAME,
46 "Reader watermark task failed to get connection for DB"
47 );
48 continue;
49 };
50
51 let current = match conn.reader_watermark(H::NAME).await {
52 Ok(Some(current)) => current,
53
54 Ok(None) => {
55 warn!(pipeline = H::NAME, "No watermark for pipeline, skipping");
56 continue;
57 }
58
59 Err(e) => {
60 warn!(pipeline = H::NAME, "Failed to get current watermark: {e}");
61 continue;
62 }
63 };
64
65 let new_reader_lo =
67 (current.checkpoint_hi_inclusive as u64 + 1).saturating_sub(config.retention);
68
69 if new_reader_lo <= current.reader_lo as u64 {
70 debug!(
71 pipeline = H::NAME,
72 current = current.reader_lo,
73 new = new_reader_lo,
74 "No change to reader watermark",
75 );
76 continue;
77 }
78
79 metrics
80 .watermark_reader_lo
81 .with_label_values(&[H::NAME])
82 .set(new_reader_lo as i64);
83
84 let Ok(updated) = conn.set_reader_watermark(H::NAME, new_reader_lo).await else {
85 warn!(pipeline = H::NAME, "Failed to update reader watermark");
86 continue;
87 };
88
89 if updated {
90 info!(pipeline = H::NAME, new_reader_lo, "Watermark");
91
92 metrics
93 .watermark_reader_lo_in_db
94 .with_label_values(&[H::NAME])
95 .set(new_reader_lo as i64);
96 }
97 }
98 })
99}
100
101#[cfg(test)]
102mod tests {
103 use async_trait::async_trait;
104 use std::sync::Arc;
105 use sui_pg_db::FieldCount;
106 use sui_types::full_checkpoint_content::Checkpoint;
107 use tokio::time::Duration;
108
109 use crate::{
110 metrics::IndexerMetrics,
111 mocks::store::*,
112 pipeline::{Processor, concurrent::BatchStatus},
113 };
114
115 use super::*;
116
117 const TEST_RETENTION: u64 = 5;
119 const TEST_TIMEOUT: Duration = Duration::from_secs(20);
121
122 #[derive(Clone, FieldCount)]
123 pub struct StoredData;
124
125 pub struct DataPipeline;
126
127 #[async_trait]
128 impl Processor for DataPipeline {
129 const NAME: &'static str = "data";
130 type Value = StoredData;
131
132 async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
133 Ok(vec![])
134 }
135 }
136
137 #[async_trait]
138 impl Handler for DataPipeline {
139 type Store = MockStore;
140 type Batch = Vec<Self::Value>;
141
142 fn batch(
143 &self,
144 batch: &mut Self::Batch,
145 values: &mut std::vec::IntoIter<Self::Value>,
146 ) -> BatchStatus {
147 batch.extend(values);
148 BatchStatus::Pending
149 }
150
151 async fn commit<'a>(
152 &self,
153 _batch: &Self::Batch,
154 _conn: &mut MockConnection<'a>,
155 ) -> anyhow::Result<usize> {
156 Ok(0)
157 }
158 }
159
160 struct TestSetup {
161 store: MockStore,
162 #[allow(unused)]
163 handle: Service,
164 }
165
166 async fn setup_test(
167 watermark: MockWatermark,
168 interval_ms: u64,
169 connection_failure_attempts: usize,
170 set_reader_watermark_failure_attempts: usize,
171 ) -> TestSetup {
172 let store = MockStore::new()
173 .with_watermark(DataPipeline::NAME, watermark)
174 .with_reader_watermark_failures(set_reader_watermark_failure_attempts)
175 .with_connection_failures(connection_failure_attempts);
176
177 let config = PrunerConfig {
178 interval_ms,
179 delay_ms: 100,
180 retention: TEST_RETENTION,
181 max_chunk_size: 100,
182 prune_concurrency: 1,
183 };
184
185 let metrics = IndexerMetrics::new(None, &Default::default());
186
187 let store_clone = store.clone();
188 let handle = reader_watermark::<DataPipeline>(Some(config), store_clone, metrics);
189
190 TestSetup { store, handle }
191 }
192
193 #[tokio::test]
194 async fn test_reader_watermark_updates() {
195 let watermark = MockWatermark {
196 epoch_hi_inclusive: 0,
197 checkpoint_hi_inclusive: 10, tx_hi: 100,
199 timestamp_ms_hi_inclusive: 1000,
200 reader_lo: 0, pruner_timestamp: 0,
202 pruner_hi: 0,
203 };
204 let polling_interval_ms = 100;
205 let connection_failure_attempts = 0;
206 let set_reader_watermark_failure_attempts = 0;
207 let setup = setup_test(
208 watermark,
209 polling_interval_ms,
210 connection_failure_attempts,
211 set_reader_watermark_failure_attempts,
212 )
213 .await;
214
215 tokio::time::sleep(Duration::from_millis(200)).await;
217
218 {
220 let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
221 assert_eq!(watermarks.reader_lo, 6);
222 }
223 }
224
225 #[tokio::test]
226 async fn test_reader_watermark_does_not_update_smaller_reader_lo() {
227 let watermark = MockWatermark {
228 epoch_hi_inclusive: 0,
229 checkpoint_hi_inclusive: 10, tx_hi: 100,
231 timestamp_ms_hi_inclusive: 1000,
232 reader_lo: 7, pruner_timestamp: 0,
234 pruner_hi: 0,
235 };
236 let polling_interval_ms = 100;
237 let connection_failure_attempts = 0;
238 let set_reader_watermark_failure_attempts = 0;
239 let setup = setup_test(
240 watermark,
241 polling_interval_ms,
242 connection_failure_attempts,
243 set_reader_watermark_failure_attempts,
244 )
245 .await;
246
247 tokio::time::sleep(Duration::from_millis(200)).await;
249
250 {
253 let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
254 assert_eq!(
255 watermarks.reader_lo, 7,
256 "Reader watermark should not be updated when new value is smaller"
257 );
258 }
259 }
260
261 #[tokio::test]
262 async fn test_reader_watermark_retry_update_after_connection_failure() {
263 let watermark = MockWatermark {
264 epoch_hi_inclusive: 0,
265 checkpoint_hi_inclusive: 10, tx_hi: 100,
267 timestamp_ms_hi_inclusive: 1000,
268 reader_lo: 0, pruner_timestamp: 0,
270 pruner_hi: 0,
271 };
272 let polling_interval_ms = 1_000; let connection_failure_attempts = 1;
274 let set_reader_watermark_failure_attempts = 0;
275 let setup = setup_test(
276 watermark,
277 polling_interval_ms,
278 connection_failure_attempts,
279 set_reader_watermark_failure_attempts,
280 )
281 .await;
282
283 setup
285 .store
286 .wait_for_connection_attempts(1, TEST_TIMEOUT)
287 .await;
288
289 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
291 assert_eq!(
292 watermark.reader_lo, 0,
293 "Reader watermark should not be updated due to DB connection failure"
294 );
295
296 setup
298 .store
299 .wait_for_connection_attempts(2, TEST_TIMEOUT)
300 .await;
301
302 tokio::time::sleep(Duration::from_millis(100)).await;
304
305 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
307 assert_eq!(
308 watermark.reader_lo, 6,
309 "Reader watermark should be updated after retry succeeds"
310 );
311 }
312
313 #[tokio::test]
314 async fn test_reader_watermark_retry_update_after_set_watermark_failure() {
315 let watermark = MockWatermark {
316 epoch_hi_inclusive: 0,
317 checkpoint_hi_inclusive: 10, tx_hi: 100,
319 timestamp_ms_hi_inclusive: 1000,
320 reader_lo: 0, pruner_timestamp: 0,
322 pruner_hi: 0,
323 };
324 let polling_interval_ms = 1_000; let connection_failure_attempts = 0;
326 let set_reader_watermark_failure_attempts = 1;
327 let setup = setup_test(
328 watermark,
329 polling_interval_ms,
330 connection_failure_attempts,
331 set_reader_watermark_failure_attempts,
332 )
333 .await;
334
335 tokio::time::sleep(Duration::from_millis(200)).await;
337
338 {
340 let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
341 assert_eq!(
342 watermarks.reader_lo, 0,
343 "Reader watermark should not be updated due to set_reader_watermark failure"
344 );
345 }
346
347 tokio::time::sleep(Duration::from_millis(1200)).await;
349
350 {
352 let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
353 assert_eq!(watermarks.reader_lo, 6);
354 }
355 }
356}