sui_indexer_alt_framework/pipeline/concurrent/
reader_watermark.rs1use std::sync::Arc;
5
6use sui_futures::service::Service;
7use tokio::time::interval;
8use tracing::debug;
9use tracing::info;
10use tracing::warn;
11
12use crate::metrics::IndexerMetrics;
13use crate::pipeline::concurrent::Handler;
14use crate::pipeline::concurrent::PrunerConfig;
15use crate::store::Connection;
16use crate::store::Store;
17
18pub(super) fn reader_watermark<H: Handler + 'static>(
29 config: Option<PrunerConfig>,
30 store: H::Store,
31 metrics: Arc<IndexerMetrics>,
32) -> Service {
33 Service::new().spawn_aborting(async move {
34 let Some(config) = config else {
35 info!(pipeline = H::NAME, "Skipping reader watermark task");
36 return Ok(());
37 };
38
39 let mut poll = interval(config.interval());
40
41 loop {
42 poll.tick().await;
43
44 let Ok(mut conn) = store.connect().await else {
45 warn!(
46 pipeline = H::NAME,
47 "Reader watermark task failed to get connection for DB"
48 );
49 continue;
50 };
51
52 let current = match conn.reader_watermark(H::NAME).await {
53 Ok(Some(current)) => current,
54
55 Ok(None) => {
56 info!(pipeline = H::NAME, "No watermark for pipeline, skipping");
57 continue;
58 }
59
60 Err(e) => {
61 warn!(pipeline = H::NAME, "Failed to get current watermark: {e}");
62 continue;
63 }
64 };
65
66 let new_reader_lo =
68 (current.checkpoint_hi_inclusive + 1).saturating_sub(config.retention);
69
70 if new_reader_lo <= current.reader_lo {
71 debug!(
72 pipeline = H::NAME,
73 current = current.reader_lo,
74 new = new_reader_lo,
75 "No change to reader watermark",
76 );
77 continue;
78 }
79
80 metrics
81 .watermark_reader_lo
82 .with_label_values(&[H::NAME])
83 .set(new_reader_lo as i64);
84
85 let Ok(updated) = conn.set_reader_watermark(H::NAME, new_reader_lo).await else {
86 warn!(pipeline = H::NAME, "Failed to update reader watermark");
87 continue;
88 };
89
90 if updated {
91 info!(pipeline = H::NAME, new_reader_lo, "Watermark");
92
93 metrics
94 .watermark_reader_lo_in_db
95 .with_label_values(&[H::NAME])
96 .set(new_reader_lo as i64);
97 }
98 }
99 })
100}
101
102#[cfg(test)]
103mod tests {
104 use std::sync::Arc;
105
106 use async_trait::async_trait;
107 use sui_pg_db::FieldCount;
108 use sui_types::full_checkpoint_content::Checkpoint;
109 use tokio::time::Duration;
110
111 use crate::metrics::IndexerMetrics;
112 use crate::mocks::store::*;
113 use crate::pipeline::Processor;
114 use crate::pipeline::concurrent::BatchStatus;
115
116 use super::*;
117
118 const TEST_RETENTION: u64 = 5;
120 const TEST_TIMEOUT: Duration = Duration::from_secs(20);
122
123 #[derive(Clone, FieldCount)]
124 pub struct StoredData;
125
126 pub struct DataPipeline;
127
128 #[async_trait]
129 impl Processor for DataPipeline {
130 const NAME: &'static str = "data";
131 type Value = StoredData;
132
133 async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
134 Ok(vec![])
135 }
136 }
137
138 #[async_trait]
139 impl Handler for DataPipeline {
140 type Store = MockStore;
141 type Batch = Vec<Self::Value>;
142
143 fn batch(
144 &self,
145 batch: &mut Self::Batch,
146 values: &mut std::vec::IntoIter<Self::Value>,
147 ) -> BatchStatus {
148 batch.extend(values);
149 BatchStatus::Pending
150 }
151
152 async fn commit<'a>(
153 &self,
154 _batch: &Self::Batch,
155 _conn: &mut MockConnection<'a>,
156 ) -> anyhow::Result<usize> {
157 Ok(0)
158 }
159 }
160
161 struct TestSetup {
162 store: MockStore,
163 #[allow(unused)]
164 handle: Service,
165 }
166
167 async fn setup_test(
168 watermark: MockWatermark,
169 interval_ms: u64,
170 connection_failure_attempts: usize,
171 set_reader_watermark_failure_attempts: usize,
172 ) -> TestSetup {
173 let store = MockStore::new()
174 .with_watermark(DataPipeline::NAME, watermark)
175 .with_reader_watermark_failures(set_reader_watermark_failure_attempts)
176 .with_connection_failures(connection_failure_attempts);
177
178 let config = PrunerConfig {
179 interval_ms,
180 delay_ms: 100,
181 retention: TEST_RETENTION,
182 max_chunk_size: 100,
183 prune_concurrency: 1,
184 };
185
186 let metrics = IndexerMetrics::new(None, &Default::default());
187
188 let store_clone = store.clone();
189 let handle = reader_watermark::<DataPipeline>(Some(config), store_clone, metrics);
190
191 TestSetup { store, handle }
192 }
193
194 #[tokio::test]
195 async fn test_reader_watermark_updates() {
196 let watermark = MockWatermark {
197 epoch_hi_inclusive: 0,
198 checkpoint_hi_inclusive: Some(10), tx_hi: 100,
200 timestamp_ms_hi_inclusive: 1000,
201 reader_lo: 0, pruner_timestamp: 0,
203 pruner_hi: 0,
204 chain_id: None,
205 };
206 let polling_interval_ms = 100;
207 let connection_failure_attempts = 0;
208 let set_reader_watermark_failure_attempts = 0;
209 let setup = setup_test(
210 watermark,
211 polling_interval_ms,
212 connection_failure_attempts,
213 set_reader_watermark_failure_attempts,
214 )
215 .await;
216
217 tokio::time::sleep(Duration::from_millis(200)).await;
219
220 {
222 let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
223 assert_eq!(watermarks.reader_lo, 6);
224 }
225 }
226
227 #[tokio::test]
228 async fn test_reader_watermark_does_not_update_smaller_reader_lo() {
229 let watermark = MockWatermark {
230 epoch_hi_inclusive: 0,
231 checkpoint_hi_inclusive: Some(10), tx_hi: 100,
233 timestamp_ms_hi_inclusive: 1000,
234 reader_lo: 7, pruner_timestamp: 0,
236 pruner_hi: 0,
237 chain_id: None,
238 };
239 let polling_interval_ms = 100;
240 let connection_failure_attempts = 0;
241 let set_reader_watermark_failure_attempts = 0;
242 let setup = setup_test(
243 watermark,
244 polling_interval_ms,
245 connection_failure_attempts,
246 set_reader_watermark_failure_attempts,
247 )
248 .await;
249
250 tokio::time::sleep(Duration::from_millis(200)).await;
252
253 {
256 let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
257 assert_eq!(
258 watermarks.reader_lo, 7,
259 "Reader watermark should not be updated when new value is smaller"
260 );
261 }
262 }
263
264 #[tokio::test]
265 async fn test_reader_watermark_retry_update_after_connection_failure() {
266 let watermark = MockWatermark {
267 epoch_hi_inclusive: 0,
268 checkpoint_hi_inclusive: Some(10), tx_hi: 100,
270 timestamp_ms_hi_inclusive: 1000,
271 reader_lo: 0, pruner_timestamp: 0,
273 pruner_hi: 0,
274 chain_id: None,
275 };
276 let polling_interval_ms = 1_000; let connection_failure_attempts = 1;
278 let set_reader_watermark_failure_attempts = 0;
279 let setup = setup_test(
280 watermark,
281 polling_interval_ms,
282 connection_failure_attempts,
283 set_reader_watermark_failure_attempts,
284 )
285 .await;
286
287 setup
289 .store
290 .wait_for_connection_attempts(1, TEST_TIMEOUT)
291 .await;
292
293 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
295 assert_eq!(
296 watermark.reader_lo, 0,
297 "Reader watermark should not be updated due to DB connection failure"
298 );
299
300 setup
302 .store
303 .wait_for_connection_attempts(2, TEST_TIMEOUT)
304 .await;
305
306 tokio::time::sleep(Duration::from_millis(100)).await;
308
309 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
311 assert_eq!(
312 watermark.reader_lo, 6,
313 "Reader watermark should be updated after retry succeeds"
314 );
315 }
316
317 #[tokio::test]
318 async fn test_reader_watermark_retry_update_after_set_watermark_failure() {
319 let watermark = MockWatermark {
320 epoch_hi_inclusive: 0,
321 checkpoint_hi_inclusive: Some(10), tx_hi: 100,
323 timestamp_ms_hi_inclusive: 1000,
324 reader_lo: 0, pruner_timestamp: 0,
326 pruner_hi: 0,
327 chain_id: None,
328 };
329 let polling_interval_ms = 1_000; let connection_failure_attempts = 0;
331 let set_reader_watermark_failure_attempts = 1;
332 let setup = setup_test(
333 watermark,
334 polling_interval_ms,
335 connection_failure_attempts,
336 set_reader_watermark_failure_attempts,
337 )
338 .await;
339
340 tokio::time::sleep(Duration::from_millis(200)).await;
342
343 {
345 let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
346 assert_eq!(
347 watermarks.reader_lo, 0,
348 "Reader watermark should not be updated due to set_reader_watermark failure"
349 );
350 }
351
352 tokio::time::sleep(Duration::from_millis(1200)).await;
354
355 {
357 let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
358 assert_eq!(watermarks.reader_lo, 6);
359 }
360 }
361}