sui_indexer_alt_framework/pipeline/concurrent/
reader_watermark.rs1use std::sync::Arc;
5
6use sui_futures::service::Service;
7use tokio::time::interval;
8use tracing::debug;
9use tracing::info;
10use tracing::warn;
11
12use crate::metrics::IndexerMetrics;
13use crate::pipeline::concurrent::Handler;
14use crate::pipeline::concurrent::PrunerConfig;
15use crate::store::Connection;
16use crate::store::Store;
17
18pub(super) fn reader_watermark<H: Handler + 'static>(
29 config: Option<PrunerConfig>,
30 store: H::Store,
31 metrics: Arc<IndexerMetrics>,
32) -> Service {
33 Service::new().spawn_aborting(async move {
34 let Some(config) = config else {
35 info!(pipeline = H::NAME, "Skipping reader watermark task");
36 return Ok(());
37 };
38
39 let mut poll = interval(config.interval());
40
41 loop {
42 poll.tick().await;
43
44 let Ok(mut conn) = store.connect().await else {
45 warn!(
46 pipeline = H::NAME,
47 "Reader watermark task failed to get connection for DB"
48 );
49 continue;
50 };
51
52 let current = match conn.reader_watermark(H::NAME).await {
53 Ok(Some(current)) => current,
54
55 Ok(None) => {
56 warn!(pipeline = H::NAME, "No watermark for pipeline, skipping");
57 continue;
58 }
59
60 Err(e) => {
61 warn!(pipeline = H::NAME, "Failed to get current watermark: {e}");
62 continue;
63 }
64 };
65
66 let new_reader_lo =
68 (current.checkpoint_hi_inclusive as u64 + 1).saturating_sub(config.retention);
69
70 if new_reader_lo <= current.reader_lo as u64 {
71 debug!(
72 pipeline = H::NAME,
73 current = current.reader_lo,
74 new = new_reader_lo,
75 "No change to reader watermark",
76 );
77 continue;
78 }
79
80 metrics
81 .watermark_reader_lo
82 .with_label_values(&[H::NAME])
83 .set(new_reader_lo as i64);
84
85 let Ok(updated) = conn.set_reader_watermark(H::NAME, new_reader_lo).await else {
86 warn!(pipeline = H::NAME, "Failed to update reader watermark");
87 continue;
88 };
89
90 if updated {
91 info!(pipeline = H::NAME, new_reader_lo, "Watermark");
92
93 metrics
94 .watermark_reader_lo_in_db
95 .with_label_values(&[H::NAME])
96 .set(new_reader_lo as i64);
97 }
98 }
99 })
100}
101
102#[cfg(test)]
103mod tests {
104 use std::sync::Arc;
105
106 use async_trait::async_trait;
107 use sui_pg_db::FieldCount;
108 use sui_types::full_checkpoint_content::Checkpoint;
109 use tokio::time::Duration;
110
111 use crate::metrics::IndexerMetrics;
112 use crate::mocks::store::*;
113 use crate::pipeline::Processor;
114 use crate::pipeline::concurrent::BatchStatus;
115
116 use super::*;
117
118 const TEST_RETENTION: u64 = 5;
120 const TEST_TIMEOUT: Duration = Duration::from_secs(20);
122
123 #[derive(Clone, FieldCount)]
124 pub struct StoredData;
125
126 pub struct DataPipeline;
127
128 #[async_trait]
129 impl Processor for DataPipeline {
130 const NAME: &'static str = "data";
131 type Value = StoredData;
132
133 async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
134 Ok(vec![])
135 }
136 }
137
138 #[async_trait]
139 impl Handler for DataPipeline {
140 type Store = MockStore;
141 type Batch = Vec<Self::Value>;
142
143 fn batch(
144 &self,
145 batch: &mut Self::Batch,
146 values: &mut std::vec::IntoIter<Self::Value>,
147 ) -> BatchStatus {
148 batch.extend(values);
149 BatchStatus::Pending
150 }
151
152 async fn commit<'a>(
153 &self,
154 _batch: &Self::Batch,
155 _conn: &mut MockConnection<'a>,
156 ) -> anyhow::Result<usize> {
157 Ok(0)
158 }
159 }
160
161 struct TestSetup {
162 store: MockStore,
163 #[allow(unused)]
164 handle: Service,
165 }
166
167 async fn setup_test(
168 watermark: MockWatermark,
169 interval_ms: u64,
170 connection_failure_attempts: usize,
171 set_reader_watermark_failure_attempts: usize,
172 ) -> TestSetup {
173 let store = MockStore::new()
174 .with_watermark(DataPipeline::NAME, watermark)
175 .with_reader_watermark_failures(set_reader_watermark_failure_attempts)
176 .with_connection_failures(connection_failure_attempts);
177
178 let config = PrunerConfig {
179 interval_ms,
180 delay_ms: 100,
181 retention: TEST_RETENTION,
182 max_chunk_size: 100,
183 prune_concurrency: 1,
184 };
185
186 let metrics = IndexerMetrics::new(None, &Default::default());
187
188 let store_clone = store.clone();
189 let handle = reader_watermark::<DataPipeline>(Some(config), store_clone, metrics);
190
191 TestSetup { store, handle }
192 }
193
194 #[tokio::test]
195 async fn test_reader_watermark_updates() {
196 let watermark = MockWatermark {
197 epoch_hi_inclusive: 0,
198 checkpoint_hi_inclusive: 10, tx_hi: 100,
200 timestamp_ms_hi_inclusive: 1000,
201 reader_lo: 0, pruner_timestamp: 0,
203 pruner_hi: 0,
204 };
205 let polling_interval_ms = 100;
206 let connection_failure_attempts = 0;
207 let set_reader_watermark_failure_attempts = 0;
208 let setup = setup_test(
209 watermark,
210 polling_interval_ms,
211 connection_failure_attempts,
212 set_reader_watermark_failure_attempts,
213 )
214 .await;
215
216 tokio::time::sleep(Duration::from_millis(200)).await;
218
219 {
221 let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
222 assert_eq!(watermarks.reader_lo, 6);
223 }
224 }
225
226 #[tokio::test]
227 async fn test_reader_watermark_does_not_update_smaller_reader_lo() {
228 let watermark = MockWatermark {
229 epoch_hi_inclusive: 0,
230 checkpoint_hi_inclusive: 10, tx_hi: 100,
232 timestamp_ms_hi_inclusive: 1000,
233 reader_lo: 7, pruner_timestamp: 0,
235 pruner_hi: 0,
236 };
237 let polling_interval_ms = 100;
238 let connection_failure_attempts = 0;
239 let set_reader_watermark_failure_attempts = 0;
240 let setup = setup_test(
241 watermark,
242 polling_interval_ms,
243 connection_failure_attempts,
244 set_reader_watermark_failure_attempts,
245 )
246 .await;
247
248 tokio::time::sleep(Duration::from_millis(200)).await;
250
251 {
254 let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
255 assert_eq!(
256 watermarks.reader_lo, 7,
257 "Reader watermark should not be updated when new value is smaller"
258 );
259 }
260 }
261
262 #[tokio::test]
263 async fn test_reader_watermark_retry_update_after_connection_failure() {
264 let watermark = MockWatermark {
265 epoch_hi_inclusive: 0,
266 checkpoint_hi_inclusive: 10, tx_hi: 100,
268 timestamp_ms_hi_inclusive: 1000,
269 reader_lo: 0, pruner_timestamp: 0,
271 pruner_hi: 0,
272 };
273 let polling_interval_ms = 1_000; let connection_failure_attempts = 1;
275 let set_reader_watermark_failure_attempts = 0;
276 let setup = setup_test(
277 watermark,
278 polling_interval_ms,
279 connection_failure_attempts,
280 set_reader_watermark_failure_attempts,
281 )
282 .await;
283
284 setup
286 .store
287 .wait_for_connection_attempts(1, TEST_TIMEOUT)
288 .await;
289
290 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
292 assert_eq!(
293 watermark.reader_lo, 0,
294 "Reader watermark should not be updated due to DB connection failure"
295 );
296
297 setup
299 .store
300 .wait_for_connection_attempts(2, TEST_TIMEOUT)
301 .await;
302
303 tokio::time::sleep(Duration::from_millis(100)).await;
305
306 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
308 assert_eq!(
309 watermark.reader_lo, 6,
310 "Reader watermark should be updated after retry succeeds"
311 );
312 }
313
314 #[tokio::test]
315 async fn test_reader_watermark_retry_update_after_set_watermark_failure() {
316 let watermark = MockWatermark {
317 epoch_hi_inclusive: 0,
318 checkpoint_hi_inclusive: 10, tx_hi: 100,
320 timestamp_ms_hi_inclusive: 1000,
321 reader_lo: 0, pruner_timestamp: 0,
323 pruner_hi: 0,
324 };
325 let polling_interval_ms = 1_000; let connection_failure_attempts = 0;
327 let set_reader_watermark_failure_attempts = 1;
328 let setup = setup_test(
329 watermark,
330 polling_interval_ms,
331 connection_failure_attempts,
332 set_reader_watermark_failure_attempts,
333 )
334 .await;
335
336 tokio::time::sleep(Duration::from_millis(200)).await;
338
339 {
341 let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
342 assert_eq!(
343 watermarks.reader_lo, 0,
344 "Reader watermark should not be updated due to set_reader_watermark failure"
345 );
346 }
347
348 tokio::time::sleep(Duration::from_millis(1200)).await;
350
351 {
353 let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
354 assert_eq!(watermarks.reader_lo, 6);
355 }
356 }
357}