sui_indexer_alt_framework/pipeline/concurrent/
reader_watermark.rs1use std::sync::Arc;
5
6use sui_futures::service::Service;
7use tokio::time::interval;
8use tracing::debug;
9use tracing::info;
10use tracing::warn;
11
12use crate::metrics::IndexerMetrics;
13use crate::pipeline::concurrent::Handler;
14use crate::pipeline::concurrent::PrunerConfig;
15use crate::store::ConcurrentConnection;
16use crate::store::Store;
17
18pub(super) fn reader_watermark<H: Handler>(
29 config: Option<PrunerConfig>,
30 store: H::Store,
31 metrics: Arc<IndexerMetrics>,
32) -> Service {
33 Service::new().spawn_aborting(async move {
34 let Some(config) = config else {
35 info!(pipeline = H::NAME, "Skipping reader watermark task");
36 return Ok(());
37 };
38
39 let mut poll = interval(config.interval());
40
41 loop {
42 poll.tick().await;
43
44 let Ok(mut conn) = store.connect().await else {
45 warn!(
46 pipeline = H::NAME,
47 "Reader watermark task failed to get connection for DB"
48 );
49 continue;
50 };
51
52 let current = match conn.reader_watermark(H::NAME).await {
53 Ok(Some(current)) => current,
54
55 Ok(None) => {
56 info!(pipeline = H::NAME, "No watermark for pipeline, skipping");
57 continue;
58 }
59
60 Err(e) => {
61 warn!(pipeline = H::NAME, "Failed to get current watermark: {e}");
62 continue;
63 }
64 };
65
66 let new_reader_lo =
68 (current.checkpoint_hi_inclusive + 1).saturating_sub(config.retention);
69
70 if new_reader_lo <= current.reader_lo {
71 debug!(
72 pipeline = H::NAME,
73 current = current.reader_lo,
74 new = new_reader_lo,
75 "No change to reader watermark",
76 );
77 continue;
78 }
79
80 metrics
81 .watermark_reader_lo
82 .with_label_values(&[H::NAME])
83 .set(new_reader_lo as i64);
84
85 let Ok(updated) = conn.set_reader_watermark(H::NAME, new_reader_lo).await else {
86 warn!(pipeline = H::NAME, "Failed to update reader watermark");
87 continue;
88 };
89
90 if updated {
91 info!(pipeline = H::NAME, new_reader_lo, "Watermark");
92
93 metrics
94 .watermark_reader_lo_in_db
95 .with_label_values(&[H::NAME])
96 .set(new_reader_lo as i64);
97 }
98 }
99 })
100}
101
102#[cfg(test)]
103mod tests {
104 use std::sync::Arc;
105
106 use async_trait::async_trait;
107 use sui_field_count::FieldCount;
108 use sui_indexer_alt_framework_store_traits::testing::mock_store::MockWatermark;
109 use sui_types::full_checkpoint_content::Checkpoint;
110 use tokio::time::Duration;
111
112 use crate::metrics::IndexerMetrics;
113 use crate::mocks::store::FallibleMockConnection;
114 use crate::mocks::store::FallibleMockStore;
115 use crate::pipeline::Processor;
116 use crate::pipeline::concurrent::BatchStatus;
117
118 use super::*;
119
120 const TEST_RETENTION: u64 = 5;
122 const TEST_TIMEOUT: Duration = Duration::from_secs(20);
124
125 #[derive(Clone, FieldCount)]
126 pub struct StoredData;
127
128 pub struct DataPipeline;
129
130 #[async_trait]
131 impl Processor for DataPipeline {
132 const NAME: &'static str = "data";
133 type Value = StoredData;
134
135 async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
136 Ok(vec![])
137 }
138 }
139
140 #[async_trait]
141 impl Handler for DataPipeline {
142 type Store = FallibleMockStore;
143 type Batch = Vec<Self::Value>;
144
145 fn batch(
146 &self,
147 batch: &mut Self::Batch,
148 values: &mut std::vec::IntoIter<Self::Value>,
149 ) -> BatchStatus {
150 batch.extend(values);
151 BatchStatus::Pending
152 }
153
154 async fn commit<'a>(
155 &self,
156 _batch: &Self::Batch,
157 _conn: &mut FallibleMockConnection<'a>,
158 ) -> anyhow::Result<usize> {
159 Ok(0)
160 }
161 }
162
163 struct TestSetup {
164 store: FallibleMockStore,
165 #[allow(unused)]
166 handle: Service,
167 }
168
169 async fn setup_test(
170 watermark: MockWatermark,
171 interval_ms: u64,
172 connection_failure_attempts: usize,
173 set_reader_watermark_failure_attempts: usize,
174 ) -> TestSetup {
175 let store = FallibleMockStore::new()
176 .with_watermark(DataPipeline::NAME, watermark)
177 .with_reader_watermark_failures(set_reader_watermark_failure_attempts)
178 .with_connection_failures(connection_failure_attempts);
179
180 let config = PrunerConfig {
181 interval_ms,
182 delay_ms: 100,
183 retention: TEST_RETENTION,
184 max_chunk_size: 100,
185 prune_concurrency: 1,
186 };
187
188 let metrics = IndexerMetrics::new(None, &Default::default());
189
190 let store_clone = store.clone();
191 let handle = reader_watermark::<DataPipeline>(Some(config), store_clone, metrics);
192
193 TestSetup { store, handle }
194 }
195
196 #[tokio::test]
197 async fn test_reader_watermark_updates() {
198 let watermark = MockWatermark {
199 epoch_hi_inclusive: 0,
200 checkpoint_hi_inclusive: Some(10), tx_hi: 100,
202 timestamp_ms_hi_inclusive: 1000,
203 reader_lo: 0, pruner_timestamp: 0,
205 pruner_hi: 0,
206 chain_id: None,
207 };
208 let polling_interval_ms = 100;
209 let connection_failure_attempts = 0;
210 let set_reader_watermark_failure_attempts = 0;
211 let setup = setup_test(
212 watermark,
213 polling_interval_ms,
214 connection_failure_attempts,
215 set_reader_watermark_failure_attempts,
216 )
217 .await;
218
219 tokio::time::sleep(Duration::from_millis(200)).await;
221
222 {
224 let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
225 assert_eq!(watermarks.reader_lo, 6);
226 }
227 }
228
229 #[tokio::test]
230 async fn test_reader_watermark_does_not_update_smaller_reader_lo() {
231 let watermark = MockWatermark {
232 epoch_hi_inclusive: 0,
233 checkpoint_hi_inclusive: Some(10), tx_hi: 100,
235 timestamp_ms_hi_inclusive: 1000,
236 reader_lo: 7, pruner_timestamp: 0,
238 pruner_hi: 0,
239 chain_id: None,
240 };
241 let polling_interval_ms = 100;
242 let connection_failure_attempts = 0;
243 let set_reader_watermark_failure_attempts = 0;
244 let setup = setup_test(
245 watermark,
246 polling_interval_ms,
247 connection_failure_attempts,
248 set_reader_watermark_failure_attempts,
249 )
250 .await;
251
252 tokio::time::sleep(Duration::from_millis(200)).await;
254
255 {
258 let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
259 assert_eq!(
260 watermarks.reader_lo, 7,
261 "Reader watermark should not be updated when new value is smaller"
262 );
263 }
264 }
265
266 #[tokio::test]
267 async fn test_reader_watermark_retry_update_after_connection_failure() {
268 let watermark = MockWatermark {
269 epoch_hi_inclusive: 0,
270 checkpoint_hi_inclusive: Some(10), tx_hi: 100,
272 timestamp_ms_hi_inclusive: 1000,
273 reader_lo: 0, pruner_timestamp: 0,
275 pruner_hi: 0,
276 chain_id: None,
277 };
278 let polling_interval_ms = 1_000; let connection_failure_attempts = 1;
280 let set_reader_watermark_failure_attempts = 0;
281 let setup = setup_test(
282 watermark,
283 polling_interval_ms,
284 connection_failure_attempts,
285 set_reader_watermark_failure_attempts,
286 )
287 .await;
288
289 setup
291 .store
292 .wait_for_connection_attempts(1, TEST_TIMEOUT)
293 .await;
294
295 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
297 assert_eq!(
298 watermark.reader_lo, 0,
299 "Reader watermark should not be updated due to DB connection failure"
300 );
301
302 setup
304 .store
305 .wait_for_connection_attempts(2, TEST_TIMEOUT)
306 .await;
307
308 tokio::time::sleep(Duration::from_millis(100)).await;
310
311 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
313 assert_eq!(
314 watermark.reader_lo, 6,
315 "Reader watermark should be updated after retry succeeds"
316 );
317 }
318
319 #[tokio::test]
320 async fn test_reader_watermark_retry_update_after_set_watermark_failure() {
321 let watermark = MockWatermark {
322 epoch_hi_inclusive: 0,
323 checkpoint_hi_inclusive: Some(10), tx_hi: 100,
325 timestamp_ms_hi_inclusive: 1000,
326 reader_lo: 0, pruner_timestamp: 0,
328 pruner_hi: 0,
329 chain_id: None,
330 };
331 let polling_interval_ms = 1_000; let connection_failure_attempts = 0;
333 let set_reader_watermark_failure_attempts = 1;
334 let setup = setup_test(
335 watermark,
336 polling_interval_ms,
337 connection_failure_attempts,
338 set_reader_watermark_failure_attempts,
339 )
340 .await;
341
342 tokio::time::sleep(Duration::from_millis(200)).await;
344
345 {
347 let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
348 assert_eq!(
349 watermarks.reader_lo, 0,
350 "Reader watermark should not be updated due to set_reader_watermark failure"
351 );
352 }
353
354 tokio::time::sleep(Duration::from_millis(1200)).await;
356
357 {
359 let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
360 assert_eq!(watermarks.reader_lo, 6);
361 }
362 }
363}