sui_indexer_alt_framework/pipeline/concurrent/
reader_watermark.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use sui_futures::service::Service;
7use tokio::time::interval;
8use tracing::{debug, info, warn};
9
10use crate::{
11    metrics::IndexerMetrics,
12    store::{Connection, Store},
13};
14
15use super::{Handler, PrunerConfig};
16
17/// The reader watermark task is responsible for updating the `reader_lo` and `pruner_timestamp`
18/// values for a pipeline's row in the watermark table, based on the pruner configuration, and the
19/// committer's progress.
20///
21/// `reader_lo` is the lowest checkpoint that readers are allowed to read from with a guarantee of
22/// data availability for this pipeline, and `pruner_timestamp` is the timestamp at which this task
23/// last updated that watermark. The timestamp is always fetched from the database (not from the
24/// indexer or the reader), to avoid issues with drift between clocks.
25///
26/// If there is no pruner configuration, this task will immediately exit.
27pub(super) fn reader_watermark<H: Handler + 'static>(
28    config: Option<PrunerConfig>,
29    store: H::Store,
30    metrics: Arc<IndexerMetrics>,
31) -> Service {
32    Service::new().spawn_aborting(async move {
33        let Some(config) = config else {
34            info!(pipeline = H::NAME, "Skipping reader watermark task");
35            return Ok(());
36        };
37
38        let mut poll = interval(config.interval());
39
40        loop {
41            poll.tick().await;
42
43            let Ok(mut conn) = store.connect().await else {
44                warn!(
45                    pipeline = H::NAME,
46                    "Reader watermark task failed to get connection for DB"
47                );
48                continue;
49            };
50
51            let current = match conn.reader_watermark(H::NAME).await {
52                Ok(Some(current)) => current,
53
54                Ok(None) => {
55                    warn!(pipeline = H::NAME, "No watermark for pipeline, skipping");
56                    continue;
57                }
58
59                Err(e) => {
60                    warn!(pipeline = H::NAME, "Failed to get current watermark: {e}");
61                    continue;
62                }
63            };
64
65            // Calculate the new reader watermark based on the current high watermark.
66            let new_reader_lo =
67                (current.checkpoint_hi_inclusive as u64 + 1).saturating_sub(config.retention);
68
69            if new_reader_lo <= current.reader_lo as u64 {
70                debug!(
71                    pipeline = H::NAME,
72                    current = current.reader_lo,
73                    new = new_reader_lo,
74                    "No change to reader watermark",
75                );
76                continue;
77            }
78
79            metrics
80                .watermark_reader_lo
81                .with_label_values(&[H::NAME])
82                .set(new_reader_lo as i64);
83
84            let Ok(updated) = conn.set_reader_watermark(H::NAME, new_reader_lo).await else {
85                warn!(pipeline = H::NAME, "Failed to update reader watermark");
86                continue;
87            };
88
89            if updated {
90                info!(pipeline = H::NAME, new_reader_lo, "Watermark");
91
92                metrics
93                    .watermark_reader_lo_in_db
94                    .with_label_values(&[H::NAME])
95                    .set(new_reader_lo as i64);
96            }
97        }
98    })
99}
100
101#[cfg(test)]
102mod tests {
103    use async_trait::async_trait;
104    use std::sync::Arc;
105    use sui_pg_db::FieldCount;
106    use sui_types::full_checkpoint_content::Checkpoint;
107    use tokio::time::Duration;
108
109    use crate::{
110        metrics::IndexerMetrics,
111        mocks::store::*,
112        pipeline::{Processor, concurrent::BatchStatus},
113    };
114
115    use super::*;
116
117    // Fixed retention value used across all tests
118    const TEST_RETENTION: u64 = 5;
119    // Default timeout for test operations
120    const TEST_TIMEOUT: Duration = Duration::from_secs(20);
121
122    #[derive(Clone, FieldCount)]
123    pub struct StoredData;
124
125    pub struct DataPipeline;
126
127    #[async_trait]
128    impl Processor for DataPipeline {
129        const NAME: &'static str = "data";
130        type Value = StoredData;
131
132        async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
133            Ok(vec![])
134        }
135    }
136
137    #[async_trait]
138    impl Handler for DataPipeline {
139        type Store = MockStore;
140        type Batch = Vec<Self::Value>;
141
142        fn batch(
143            &self,
144            batch: &mut Self::Batch,
145            values: &mut std::vec::IntoIter<Self::Value>,
146        ) -> BatchStatus {
147            batch.extend(values);
148            BatchStatus::Pending
149        }
150
151        async fn commit<'a>(
152            &self,
153            _batch: &Self::Batch,
154            _conn: &mut MockConnection<'a>,
155        ) -> anyhow::Result<usize> {
156            Ok(0)
157        }
158    }
159
160    struct TestSetup {
161        store: MockStore,
162        #[allow(unused)]
163        handle: Service,
164    }
165
166    async fn setup_test(
167        watermark: MockWatermark,
168        interval_ms: u64,
169        connection_failure_attempts: usize,
170        set_reader_watermark_failure_attempts: usize,
171    ) -> TestSetup {
172        let store = MockStore::new()
173            .with_watermark(DataPipeline::NAME, watermark)
174            .with_reader_watermark_failures(set_reader_watermark_failure_attempts)
175            .with_connection_failures(connection_failure_attempts);
176
177        let config = PrunerConfig {
178            interval_ms,
179            delay_ms: 100,
180            retention: TEST_RETENTION,
181            max_chunk_size: 100,
182            prune_concurrency: 1,
183        };
184
185        let metrics = IndexerMetrics::new(None, &Default::default());
186
187        let store_clone = store.clone();
188        let handle = reader_watermark::<DataPipeline>(Some(config), store_clone, metrics);
189
190        TestSetup { store, handle }
191    }
192
193    #[tokio::test]
194    async fn test_reader_watermark_updates() {
195        let watermark = MockWatermark {
196            epoch_hi_inclusive: 0,
197            checkpoint_hi_inclusive: 10, // Current high watermark
198            tx_hi: 100,
199            timestamp_ms_hi_inclusive: 1000,
200            reader_lo: 0, // Initial reader_lo
201            pruner_timestamp: 0,
202            pruner_hi: 0,
203        };
204        let polling_interval_ms = 100;
205        let connection_failure_attempts = 0;
206        let set_reader_watermark_failure_attempts = 0;
207        let setup = setup_test(
208            watermark,
209            polling_interval_ms,
210            connection_failure_attempts,
211            set_reader_watermark_failure_attempts,
212        )
213        .await;
214
215        // Wait for a few intervals to allow the task to update the watermark
216        tokio::time::sleep(Duration::from_millis(200)).await;
217
218        // new reader_lo = checkpoint_hi_inclusive (10) - retention (5) + 1 = 6
219        {
220            let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
221            assert_eq!(watermarks.reader_lo, 6);
222        }
223    }
224
225    #[tokio::test]
226    async fn test_reader_watermark_does_not_update_smaller_reader_lo() {
227        let watermark = MockWatermark {
228            epoch_hi_inclusive: 0,
229            checkpoint_hi_inclusive: 10, // Current high watermark
230            tx_hi: 100,
231            timestamp_ms_hi_inclusive: 1000,
232            reader_lo: 7, // Initial reader_lo
233            pruner_timestamp: 0,
234            pruner_hi: 0,
235        };
236        let polling_interval_ms = 100;
237        let connection_failure_attempts = 0;
238        let set_reader_watermark_failure_attempts = 0;
239        let setup = setup_test(
240            watermark,
241            polling_interval_ms,
242            connection_failure_attempts,
243            set_reader_watermark_failure_attempts,
244        )
245        .await;
246
247        // Wait for a few intervals to allow the task to update the watermark
248        tokio::time::sleep(Duration::from_millis(200)).await;
249
250        // new reader_lo = checkpoint_hi_inclusive (10) - retention (5) + 1 = 6,
251        // which is smaller than current reader_lo (7). Therefore, the reader_lo was not updated.
252        {
253            let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
254            assert_eq!(
255                watermarks.reader_lo, 7,
256                "Reader watermark should not be updated when new value is smaller"
257            );
258        }
259    }
260
261    #[tokio::test]
262    async fn test_reader_watermark_retry_update_after_connection_failure() {
263        let watermark = MockWatermark {
264            epoch_hi_inclusive: 0,
265            checkpoint_hi_inclusive: 10, // Current high watermark
266            tx_hi: 100,
267            timestamp_ms_hi_inclusive: 1000,
268            reader_lo: 0, // Initial reader_lo
269            pruner_timestamp: 0,
270            pruner_hi: 0,
271        };
272        let polling_interval_ms = 1_000; // Long interval for testing retry
273        let connection_failure_attempts = 1;
274        let set_reader_watermark_failure_attempts = 0;
275        let setup = setup_test(
276            watermark,
277            polling_interval_ms,
278            connection_failure_attempts,
279            set_reader_watermark_failure_attempts,
280        )
281        .await;
282
283        // Wait for first connection attempt (which should fail)
284        setup
285            .store
286            .wait_for_connection_attempts(1, TEST_TIMEOUT)
287            .await;
288
289        // Verify state before retry succeeds
290        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
291        assert_eq!(
292            watermark.reader_lo, 0,
293            "Reader watermark should not be updated due to DB connection failure"
294        );
295
296        // Wait for second connection attempt (which should succeed)
297        setup
298            .store
299            .wait_for_connection_attempts(2, TEST_TIMEOUT)
300            .await;
301
302        // Wait a bit more for the watermark update to complete
303        tokio::time::sleep(Duration::from_millis(100)).await;
304
305        // Verify state after retry succeeds
306        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
307        assert_eq!(
308            watermark.reader_lo, 6,
309            "Reader watermark should be updated after retry succeeds"
310        );
311    }
312
313    #[tokio::test]
314    async fn test_reader_watermark_retry_update_after_set_watermark_failure() {
315        let watermark = MockWatermark {
316            epoch_hi_inclusive: 0,
317            checkpoint_hi_inclusive: 10, // Current high watermark
318            tx_hi: 100,
319            timestamp_ms_hi_inclusive: 1000,
320            reader_lo: 0, // Initial reader_lo
321            pruner_timestamp: 0,
322            pruner_hi: 0,
323        };
324        let polling_interval_ms = 1_000; // Long interval for testing retry
325        let connection_failure_attempts = 0;
326        let set_reader_watermark_failure_attempts = 1;
327        let setup = setup_test(
328            watermark,
329            polling_interval_ms,
330            connection_failure_attempts,
331            set_reader_watermark_failure_attempts,
332        )
333        .await;
334
335        // Wait for first failed attempt
336        tokio::time::sleep(Duration::from_millis(200)).await;
337
338        // Verify state before retry succeeds
339        {
340            let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
341            assert_eq!(
342                watermarks.reader_lo, 0,
343                "Reader watermark should not be updated due to set_reader_watermark failure"
344            );
345        }
346
347        // Wait for next polling for second attempt
348        tokio::time::sleep(Duration::from_millis(1200)).await;
349
350        // Verify state after retry succeeds
351        {
352            let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
353            assert_eq!(watermarks.reader_lo, 6);
354        }
355    }
356}