sui_indexer_alt_framework/pipeline/concurrent/
reader_watermark.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use sui_futures::service::Service;
7use tokio::time::interval;
8use tracing::debug;
9use tracing::info;
10use tracing::warn;
11
12use crate::metrics::IndexerMetrics;
13use crate::pipeline::concurrent::Handler;
14use crate::pipeline::concurrent::PrunerConfig;
15use crate::store::Connection;
16use crate::store::Store;
17
18/// The reader watermark task is responsible for updating the `reader_lo` and `pruner_timestamp`
19/// values for a pipeline's row in the watermark table, based on the pruner configuration, and the
20/// committer's progress.
21///
22/// `reader_lo` is the lowest checkpoint that readers are allowed to read from with a guarantee of
23/// data availability for this pipeline, and `pruner_timestamp` is the timestamp at which this task
24/// last updated that watermark. The timestamp is always fetched from the database (not from the
25/// indexer or the reader), to avoid issues with drift between clocks.
26///
27/// If there is no pruner configuration, this task will immediately exit.
28pub(super) fn reader_watermark<H: Handler + 'static>(
29    config: Option<PrunerConfig>,
30    store: H::Store,
31    metrics: Arc<IndexerMetrics>,
32) -> Service {
33    Service::new().spawn_aborting(async move {
34        let Some(config) = config else {
35            info!(pipeline = H::NAME, "Skipping reader watermark task");
36            return Ok(());
37        };
38
39        let mut poll = interval(config.interval());
40
41        loop {
42            poll.tick().await;
43
44            let Ok(mut conn) = store.connect().await else {
45                warn!(
46                    pipeline = H::NAME,
47                    "Reader watermark task failed to get connection for DB"
48                );
49                continue;
50            };
51
52            let current = match conn.reader_watermark(H::NAME).await {
53                Ok(Some(current)) => current,
54
55                Ok(None) => {
56                    info!(pipeline = H::NAME, "No watermark for pipeline, skipping");
57                    continue;
58                }
59
60                Err(e) => {
61                    warn!(pipeline = H::NAME, "Failed to get current watermark: {e}");
62                    continue;
63                }
64            };
65
66            // Calculate the new reader watermark based on the current high watermark.
67            let new_reader_lo =
68                (current.checkpoint_hi_inclusive + 1).saturating_sub(config.retention);
69
70            if new_reader_lo <= current.reader_lo {
71                debug!(
72                    pipeline = H::NAME,
73                    current = current.reader_lo,
74                    new = new_reader_lo,
75                    "No change to reader watermark",
76                );
77                continue;
78            }
79
80            metrics
81                .watermark_reader_lo
82                .with_label_values(&[H::NAME])
83                .set(new_reader_lo as i64);
84
85            let Ok(updated) = conn.set_reader_watermark(H::NAME, new_reader_lo).await else {
86                warn!(pipeline = H::NAME, "Failed to update reader watermark");
87                continue;
88            };
89
90            if updated {
91                info!(pipeline = H::NAME, new_reader_lo, "Watermark");
92
93                metrics
94                    .watermark_reader_lo_in_db
95                    .with_label_values(&[H::NAME])
96                    .set(new_reader_lo as i64);
97            }
98        }
99    })
100}
101
102#[cfg(test)]
103mod tests {
104    use std::sync::Arc;
105
106    use async_trait::async_trait;
107    use sui_pg_db::FieldCount;
108    use sui_types::full_checkpoint_content::Checkpoint;
109    use tokio::time::Duration;
110
111    use crate::metrics::IndexerMetrics;
112    use crate::mocks::store::*;
113    use crate::pipeline::Processor;
114    use crate::pipeline::concurrent::BatchStatus;
115
116    use super::*;
117
118    // Fixed retention value used across all tests
119    const TEST_RETENTION: u64 = 5;
120    // Default timeout for test operations
121    const TEST_TIMEOUT: Duration = Duration::from_secs(20);
122
123    #[derive(Clone, FieldCount)]
124    pub struct StoredData;
125
126    pub struct DataPipeline;
127
128    #[async_trait]
129    impl Processor for DataPipeline {
130        const NAME: &'static str = "data";
131        type Value = StoredData;
132
133        async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
134            Ok(vec![])
135        }
136    }
137
138    #[async_trait]
139    impl Handler for DataPipeline {
140        type Store = MockStore;
141        type Batch = Vec<Self::Value>;
142
143        fn batch(
144            &self,
145            batch: &mut Self::Batch,
146            values: &mut std::vec::IntoIter<Self::Value>,
147        ) -> BatchStatus {
148            batch.extend(values);
149            BatchStatus::Pending
150        }
151
152        async fn commit<'a>(
153            &self,
154            _batch: &Self::Batch,
155            _conn: &mut MockConnection<'a>,
156        ) -> anyhow::Result<usize> {
157            Ok(0)
158        }
159    }
160
161    struct TestSetup {
162        store: MockStore,
163        #[allow(unused)]
164        handle: Service,
165    }
166
167    async fn setup_test(
168        watermark: MockWatermark,
169        interval_ms: u64,
170        connection_failure_attempts: usize,
171        set_reader_watermark_failure_attempts: usize,
172    ) -> TestSetup {
173        let store = MockStore::new()
174            .with_watermark(DataPipeline::NAME, watermark)
175            .with_reader_watermark_failures(set_reader_watermark_failure_attempts)
176            .with_connection_failures(connection_failure_attempts);
177
178        let config = PrunerConfig {
179            interval_ms,
180            delay_ms: 100,
181            retention: TEST_RETENTION,
182            max_chunk_size: 100,
183            prune_concurrency: 1,
184        };
185
186        let metrics = IndexerMetrics::new(None, &Default::default());
187
188        let store_clone = store.clone();
189        let handle = reader_watermark::<DataPipeline>(Some(config), store_clone, metrics);
190
191        TestSetup { store, handle }
192    }
193
194    #[tokio::test]
195    async fn test_reader_watermark_updates() {
196        let watermark = MockWatermark {
197            epoch_hi_inclusive: 0,
198            checkpoint_hi_inclusive: Some(10), // Current high watermark
199            tx_hi: 100,
200            timestamp_ms_hi_inclusive: 1000,
201            reader_lo: 0, // Initial reader_lo
202            pruner_timestamp: 0,
203            pruner_hi: 0,
204            chain_id: None,
205        };
206        let polling_interval_ms = 100;
207        let connection_failure_attempts = 0;
208        let set_reader_watermark_failure_attempts = 0;
209        let setup = setup_test(
210            watermark,
211            polling_interval_ms,
212            connection_failure_attempts,
213            set_reader_watermark_failure_attempts,
214        )
215        .await;
216
217        // Wait for a few intervals to allow the task to update the watermark
218        tokio::time::sleep(Duration::from_millis(200)).await;
219
220        // new reader_lo = checkpoint_hi_inclusive (10) - retention (5) + 1 = 6
221        {
222            let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
223            assert_eq!(watermarks.reader_lo, 6);
224        }
225    }
226
227    #[tokio::test]
228    async fn test_reader_watermark_does_not_update_smaller_reader_lo() {
229        let watermark = MockWatermark {
230            epoch_hi_inclusive: 0,
231            checkpoint_hi_inclusive: Some(10), // Current high watermark
232            tx_hi: 100,
233            timestamp_ms_hi_inclusive: 1000,
234            reader_lo: 7, // Initial reader_lo
235            pruner_timestamp: 0,
236            pruner_hi: 0,
237            chain_id: None,
238        };
239        let polling_interval_ms = 100;
240        let connection_failure_attempts = 0;
241        let set_reader_watermark_failure_attempts = 0;
242        let setup = setup_test(
243            watermark,
244            polling_interval_ms,
245            connection_failure_attempts,
246            set_reader_watermark_failure_attempts,
247        )
248        .await;
249
250        // Wait for a few intervals to allow the task to update the watermark
251        tokio::time::sleep(Duration::from_millis(200)).await;
252
253        // new reader_lo = checkpoint_hi_inclusive (10) - retention (5) + 1 = 6,
254        // which is smaller than current reader_lo (7). Therefore, the reader_lo was not updated.
255        {
256            let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
257            assert_eq!(
258                watermarks.reader_lo, 7,
259                "Reader watermark should not be updated when new value is smaller"
260            );
261        }
262    }
263
264    #[tokio::test]
265    async fn test_reader_watermark_retry_update_after_connection_failure() {
266        let watermark = MockWatermark {
267            epoch_hi_inclusive: 0,
268            checkpoint_hi_inclusive: Some(10), // Current high watermark
269            tx_hi: 100,
270            timestamp_ms_hi_inclusive: 1000,
271            reader_lo: 0, // Initial reader_lo
272            pruner_timestamp: 0,
273            pruner_hi: 0,
274            chain_id: None,
275        };
276        let polling_interval_ms = 1_000; // Long interval for testing retry
277        let connection_failure_attempts = 1;
278        let set_reader_watermark_failure_attempts = 0;
279        let setup = setup_test(
280            watermark,
281            polling_interval_ms,
282            connection_failure_attempts,
283            set_reader_watermark_failure_attempts,
284        )
285        .await;
286
287        // Wait for first connection attempt (which should fail)
288        setup
289            .store
290            .wait_for_connection_attempts(1, TEST_TIMEOUT)
291            .await;
292
293        // Verify state before retry succeeds
294        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
295        assert_eq!(
296            watermark.reader_lo, 0,
297            "Reader watermark should not be updated due to DB connection failure"
298        );
299
300        // Wait for second connection attempt (which should succeed)
301        setup
302            .store
303            .wait_for_connection_attempts(2, TEST_TIMEOUT)
304            .await;
305
306        // Wait a bit more for the watermark update to complete
307        tokio::time::sleep(Duration::from_millis(100)).await;
308
309        // Verify state after retry succeeds
310        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
311        assert_eq!(
312            watermark.reader_lo, 6,
313            "Reader watermark should be updated after retry succeeds"
314        );
315    }
316
317    #[tokio::test]
318    async fn test_reader_watermark_retry_update_after_set_watermark_failure() {
319        let watermark = MockWatermark {
320            epoch_hi_inclusive: 0,
321            checkpoint_hi_inclusive: Some(10), // Current high watermark
322            tx_hi: 100,
323            timestamp_ms_hi_inclusive: 1000,
324            reader_lo: 0, // Initial reader_lo
325            pruner_timestamp: 0,
326            pruner_hi: 0,
327            chain_id: None,
328        };
329        let polling_interval_ms = 1_000; // Long interval for testing retry
330        let connection_failure_attempts = 0;
331        let set_reader_watermark_failure_attempts = 1;
332        let setup = setup_test(
333            watermark,
334            polling_interval_ms,
335            connection_failure_attempts,
336            set_reader_watermark_failure_attempts,
337        )
338        .await;
339
340        // Wait for first failed attempt
341        tokio::time::sleep(Duration::from_millis(200)).await;
342
343        // Verify state before retry succeeds
344        {
345            let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
346            assert_eq!(
347                watermarks.reader_lo, 0,
348                "Reader watermark should not be updated due to set_reader_watermark failure"
349            );
350        }
351
352        // Wait for next polling for second attempt
353        tokio::time::sleep(Duration::from_millis(1200)).await;
354
355        // Verify state after retry succeeds
356        {
357            let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
358            assert_eq!(watermarks.reader_lo, 6);
359        }
360    }
361}