sui_indexer_alt_framework/pipeline/concurrent/
reader_watermark.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use sui_futures::service::Service;
7use tokio::time::interval;
8use tracing::debug;
9use tracing::info;
10use tracing::warn;
11
12use crate::metrics::IndexerMetrics;
13use crate::pipeline::concurrent::Handler;
14use crate::pipeline::concurrent::PrunerConfig;
15use crate::store::Connection;
16use crate::store::Store;
17
18/// The reader watermark task is responsible for updating the `reader_lo` and `pruner_timestamp`
19/// values for a pipeline's row in the watermark table, based on the pruner configuration, and the
20/// committer's progress.
21///
22/// `reader_lo` is the lowest checkpoint that readers are allowed to read from with a guarantee of
23/// data availability for this pipeline, and `pruner_timestamp` is the timestamp at which this task
24/// last updated that watermark. The timestamp is always fetched from the database (not from the
25/// indexer or the reader), to avoid issues with drift between clocks.
26///
27/// If there is no pruner configuration, this task will immediately exit.
28pub(super) fn reader_watermark<H: Handler + 'static>(
29    config: Option<PrunerConfig>,
30    store: H::Store,
31    metrics: Arc<IndexerMetrics>,
32) -> Service {
33    Service::new().spawn_aborting(async move {
34        let Some(config) = config else {
35            info!(pipeline = H::NAME, "Skipping reader watermark task");
36            return Ok(());
37        };
38
39        let mut poll = interval(config.interval());
40
41        loop {
42            poll.tick().await;
43
44            let Ok(mut conn) = store.connect().await else {
45                warn!(
46                    pipeline = H::NAME,
47                    "Reader watermark task failed to get connection for DB"
48                );
49                continue;
50            };
51
52            let current = match conn.reader_watermark(H::NAME).await {
53                Ok(Some(current)) => current,
54
55                Ok(None) => {
56                    warn!(pipeline = H::NAME, "No watermark for pipeline, skipping");
57                    continue;
58                }
59
60                Err(e) => {
61                    warn!(pipeline = H::NAME, "Failed to get current watermark: {e}");
62                    continue;
63                }
64            };
65
66            // Calculate the new reader watermark based on the current high watermark.
67            let new_reader_lo =
68                (current.checkpoint_hi_inclusive as u64 + 1).saturating_sub(config.retention);
69
70            if new_reader_lo <= current.reader_lo as u64 {
71                debug!(
72                    pipeline = H::NAME,
73                    current = current.reader_lo,
74                    new = new_reader_lo,
75                    "No change to reader watermark",
76                );
77                continue;
78            }
79
80            metrics
81                .watermark_reader_lo
82                .with_label_values(&[H::NAME])
83                .set(new_reader_lo as i64);
84
85            let Ok(updated) = conn.set_reader_watermark(H::NAME, new_reader_lo).await else {
86                warn!(pipeline = H::NAME, "Failed to update reader watermark");
87                continue;
88            };
89
90            if updated {
91                info!(pipeline = H::NAME, new_reader_lo, "Watermark");
92
93                metrics
94                    .watermark_reader_lo_in_db
95                    .with_label_values(&[H::NAME])
96                    .set(new_reader_lo as i64);
97            }
98        }
99    })
100}
101
102#[cfg(test)]
103mod tests {
104    use std::sync::Arc;
105
106    use async_trait::async_trait;
107    use sui_pg_db::FieldCount;
108    use sui_types::full_checkpoint_content::Checkpoint;
109    use tokio::time::Duration;
110
111    use crate::metrics::IndexerMetrics;
112    use crate::mocks::store::*;
113    use crate::pipeline::Processor;
114    use crate::pipeline::concurrent::BatchStatus;
115
116    use super::*;
117
118    // Fixed retention value used across all tests
119    const TEST_RETENTION: u64 = 5;
120    // Default timeout for test operations
121    const TEST_TIMEOUT: Duration = Duration::from_secs(20);
122
123    #[derive(Clone, FieldCount)]
124    pub struct StoredData;
125
126    pub struct DataPipeline;
127
128    #[async_trait]
129    impl Processor for DataPipeline {
130        const NAME: &'static str = "data";
131        type Value = StoredData;
132
133        async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
134            Ok(vec![])
135        }
136    }
137
138    #[async_trait]
139    impl Handler for DataPipeline {
140        type Store = MockStore;
141        type Batch = Vec<Self::Value>;
142
143        fn batch(
144            &self,
145            batch: &mut Self::Batch,
146            values: &mut std::vec::IntoIter<Self::Value>,
147        ) -> BatchStatus {
148            batch.extend(values);
149            BatchStatus::Pending
150        }
151
152        async fn commit<'a>(
153            &self,
154            _batch: &Self::Batch,
155            _conn: &mut MockConnection<'a>,
156        ) -> anyhow::Result<usize> {
157            Ok(0)
158        }
159    }
160
161    struct TestSetup {
162        store: MockStore,
163        #[allow(unused)]
164        handle: Service,
165    }
166
167    async fn setup_test(
168        watermark: MockWatermark,
169        interval_ms: u64,
170        connection_failure_attempts: usize,
171        set_reader_watermark_failure_attempts: usize,
172    ) -> TestSetup {
173        let store = MockStore::new()
174            .with_watermark(DataPipeline::NAME, watermark)
175            .with_reader_watermark_failures(set_reader_watermark_failure_attempts)
176            .with_connection_failures(connection_failure_attempts);
177
178        let config = PrunerConfig {
179            interval_ms,
180            delay_ms: 100,
181            retention: TEST_RETENTION,
182            max_chunk_size: 100,
183            prune_concurrency: 1,
184        };
185
186        let metrics = IndexerMetrics::new(None, &Default::default());
187
188        let store_clone = store.clone();
189        let handle = reader_watermark::<DataPipeline>(Some(config), store_clone, metrics);
190
191        TestSetup { store, handle }
192    }
193
194    #[tokio::test]
195    async fn test_reader_watermark_updates() {
196        let watermark = MockWatermark {
197            epoch_hi_inclusive: 0,
198            checkpoint_hi_inclusive: 10, // Current high watermark
199            tx_hi: 100,
200            timestamp_ms_hi_inclusive: 1000,
201            reader_lo: 0, // Initial reader_lo
202            pruner_timestamp: 0,
203            pruner_hi: 0,
204        };
205        let polling_interval_ms = 100;
206        let connection_failure_attempts = 0;
207        let set_reader_watermark_failure_attempts = 0;
208        let setup = setup_test(
209            watermark,
210            polling_interval_ms,
211            connection_failure_attempts,
212            set_reader_watermark_failure_attempts,
213        )
214        .await;
215
216        // Wait for a few intervals to allow the task to update the watermark
217        tokio::time::sleep(Duration::from_millis(200)).await;
218
219        // new reader_lo = checkpoint_hi_inclusive (10) - retention (5) + 1 = 6
220        {
221            let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
222            assert_eq!(watermarks.reader_lo, 6);
223        }
224    }
225
226    #[tokio::test]
227    async fn test_reader_watermark_does_not_update_smaller_reader_lo() {
228        let watermark = MockWatermark {
229            epoch_hi_inclusive: 0,
230            checkpoint_hi_inclusive: 10, // Current high watermark
231            tx_hi: 100,
232            timestamp_ms_hi_inclusive: 1000,
233            reader_lo: 7, // Initial reader_lo
234            pruner_timestamp: 0,
235            pruner_hi: 0,
236        };
237        let polling_interval_ms = 100;
238        let connection_failure_attempts = 0;
239        let set_reader_watermark_failure_attempts = 0;
240        let setup = setup_test(
241            watermark,
242            polling_interval_ms,
243            connection_failure_attempts,
244            set_reader_watermark_failure_attempts,
245        )
246        .await;
247
248        // Wait for a few intervals to allow the task to update the watermark
249        tokio::time::sleep(Duration::from_millis(200)).await;
250
251        // new reader_lo = checkpoint_hi_inclusive (10) - retention (5) + 1 = 6,
252        // which is smaller than current reader_lo (7). Therefore, the reader_lo was not updated.
253        {
254            let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
255            assert_eq!(
256                watermarks.reader_lo, 7,
257                "Reader watermark should not be updated when new value is smaller"
258            );
259        }
260    }
261
262    #[tokio::test]
263    async fn test_reader_watermark_retry_update_after_connection_failure() {
264        let watermark = MockWatermark {
265            epoch_hi_inclusive: 0,
266            checkpoint_hi_inclusive: 10, // Current high watermark
267            tx_hi: 100,
268            timestamp_ms_hi_inclusive: 1000,
269            reader_lo: 0, // Initial reader_lo
270            pruner_timestamp: 0,
271            pruner_hi: 0,
272        };
273        let polling_interval_ms = 1_000; // Long interval for testing retry
274        let connection_failure_attempts = 1;
275        let set_reader_watermark_failure_attempts = 0;
276        let setup = setup_test(
277            watermark,
278            polling_interval_ms,
279            connection_failure_attempts,
280            set_reader_watermark_failure_attempts,
281        )
282        .await;
283
284        // Wait for first connection attempt (which should fail)
285        setup
286            .store
287            .wait_for_connection_attempts(1, TEST_TIMEOUT)
288            .await;
289
290        // Verify state before retry succeeds
291        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
292        assert_eq!(
293            watermark.reader_lo, 0,
294            "Reader watermark should not be updated due to DB connection failure"
295        );
296
297        // Wait for second connection attempt (which should succeed)
298        setup
299            .store
300            .wait_for_connection_attempts(2, TEST_TIMEOUT)
301            .await;
302
303        // Wait a bit more for the watermark update to complete
304        tokio::time::sleep(Duration::from_millis(100)).await;
305
306        // Verify state after retry succeeds
307        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
308        assert_eq!(
309            watermark.reader_lo, 6,
310            "Reader watermark should be updated after retry succeeds"
311        );
312    }
313
314    #[tokio::test]
315    async fn test_reader_watermark_retry_update_after_set_watermark_failure() {
316        let watermark = MockWatermark {
317            epoch_hi_inclusive: 0,
318            checkpoint_hi_inclusive: 10, // Current high watermark
319            tx_hi: 100,
320            timestamp_ms_hi_inclusive: 1000,
321            reader_lo: 0, // Initial reader_lo
322            pruner_timestamp: 0,
323            pruner_hi: 0,
324        };
325        let polling_interval_ms = 1_000; // Long interval for testing retry
326        let connection_failure_attempts = 0;
327        let set_reader_watermark_failure_attempts = 1;
328        let setup = setup_test(
329            watermark,
330            polling_interval_ms,
331            connection_failure_attempts,
332            set_reader_watermark_failure_attempts,
333        )
334        .await;
335
336        // Wait for first failed attempt
337        tokio::time::sleep(Duration::from_millis(200)).await;
338
339        // Verify state before retry succeeds
340        {
341            let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
342            assert_eq!(
343                watermarks.reader_lo, 0,
344                "Reader watermark should not be updated due to set_reader_watermark failure"
345            );
346        }
347
348        // Wait for next polling for second attempt
349        tokio::time::sleep(Duration::from_millis(1200)).await;
350
351        // Verify state after retry succeeds
352        {
353            let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
354            assert_eq!(watermarks.reader_lo, 6);
355        }
356    }
357}