sui_indexer_alt_framework/pipeline/concurrent/
reader_watermark.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use tokio::{task::JoinHandle, time::interval};
7use tokio_util::sync::CancellationToken;
8use tracing::{debug, info, warn};
9
10use crate::{
11    metrics::IndexerMetrics,
12    store::{Connection, Store},
13};
14
15use super::{Handler, PrunerConfig};
16
17/// The reader watermark task is responsible for updating the `reader_lo` and `pruner_timestamp`
18/// values for a pipeline's row in the watermark table, based on the pruner configuration, and the
19/// committer's progress.
20///
21/// `reader_lo` is the lowest checkpoint that readers are allowed to read from with a guarantee of
22/// data availability for this pipeline, and `pruner_timestamp` is the timestamp at which this task
23/// last updated that watermark. The timestamp is always fetched from the database (not from the
24/// indexer or the reader), to avoid issues with drift between clocks.
25///
26/// If there is no pruner configuration, this task will immediately exit. Otherwise, the task exits
27/// when the provided cancellation token is triggered.
28pub(super) fn reader_watermark<H: Handler + 'static>(
29    config: Option<PrunerConfig>,
30    store: H::Store,
31    metrics: Arc<IndexerMetrics>,
32    cancel: CancellationToken,
33) -> JoinHandle<()> {
34    tokio::spawn(async move {
35        let Some(config) = config else {
36            info!(pipeline = H::NAME, "Skipping reader watermark task");
37            return;
38        };
39
40        let mut poll = interval(config.interval());
41
42        loop {
43            tokio::select! {
44                _ = cancel.cancelled() => {
45                    info!(pipeline = H::NAME, "Shutdown received");
46                    break;
47                }
48
49                _ = poll.tick() => {
50                    let Ok(mut conn) = store.connect().await else {
51                        warn!(pipeline = H::NAME, "Reader watermark task failed to get connection for DB");
52                        continue;
53                    };
54
55                    let current = match conn.reader_watermark(H::NAME).await {
56                        Ok(Some(current)) => current,
57
58                        Ok(None) => {
59                            warn!(pipeline = H::NAME, "No watermark for pipeline, skipping");
60                            continue;
61                        }
62
63                        Err(e) => {
64                            warn!(pipeline = H::NAME, "Failed to get current watermark: {e}");
65                            continue;
66                        }
67                    };
68
69                    // Calculate the new reader watermark based on the current high watermark.
70                    let new_reader_lo = (current.checkpoint_hi_inclusive as u64 + 1)
71                        .saturating_sub(config.retention);
72
73                    if new_reader_lo <= current.reader_lo as u64 {
74                        debug!(
75                            pipeline = H::NAME,
76                            current = current.reader_lo,
77                            new = new_reader_lo,
78                            "No change to reader watermark",
79                        );
80                        continue;
81                    }
82
83                    metrics
84                        .watermark_reader_lo
85                        .with_label_values(&[H::NAME])
86                        .set(new_reader_lo as i64);
87
88                    let Ok(updated) = conn.set_reader_watermark(H::NAME, new_reader_lo).await else {
89                        warn!(pipeline = H::NAME, "Failed to update reader watermark");
90                        continue;
91                    };
92
93                    if updated {
94                        info!(pipeline = H::NAME, new_reader_lo, "Watermark");
95
96                        metrics
97                            .watermark_reader_lo_in_db
98                            .with_label_values(&[H::NAME])
99                            .set(new_reader_lo as i64);
100                    }
101                }
102            }
103        }
104
105        info!(pipeline = H::NAME, "Stopping reader watermark task");
106    })
107}
108
109#[cfg(test)]
110mod tests {
111    use async_trait::async_trait;
112    use std::sync::Arc;
113    use sui_pg_db::FieldCount;
114    use sui_types::full_checkpoint_content::CheckpointData;
115    use tokio::time::Duration;
116    use tokio_util::sync::CancellationToken;
117
118    use crate::{metrics::IndexerMetrics, mocks::store::*, pipeline::Processor};
119
120    use super::*;
121
122    // Fixed retention value used across all tests
123    const TEST_RETENTION: u64 = 5;
124    // Default timeout for test operations
125    const TEST_TIMEOUT: Duration = Duration::from_secs(20);
126
127    #[derive(Clone, FieldCount)]
128    pub struct StoredData;
129
130    pub struct DataPipeline;
131
132    #[async_trait]
133    impl Processor for DataPipeline {
134        const NAME: &'static str = "data";
135        type Value = StoredData;
136
137        async fn process(
138            &self,
139            _checkpoint: &Arc<CheckpointData>,
140        ) -> anyhow::Result<Vec<Self::Value>> {
141            Ok(vec![])
142        }
143    }
144
145    #[async_trait]
146    impl Handler for DataPipeline {
147        type Store = MockStore;
148
149        async fn commit<'a>(
150            _values: &[Self::Value],
151            _conn: &mut MockConnection<'a>,
152        ) -> anyhow::Result<usize> {
153            Ok(0)
154        }
155    }
156
157    struct TestSetup {
158        store: MockStore,
159        handle: JoinHandle<()>,
160        cancel: CancellationToken,
161    }
162
163    async fn setup_test(
164        watermark: MockWatermark,
165        interval_ms: u64,
166        connection_failure_attempts: usize,
167        set_reader_watermark_failure_attempts: usize,
168    ) -> TestSetup {
169        let store = MockStore::new()
170            .with_watermark(DataPipeline::NAME, watermark)
171            .with_reader_watermark_failures(set_reader_watermark_failure_attempts)
172            .with_connection_failures(connection_failure_attempts);
173
174        let config = PrunerConfig {
175            interval_ms,
176            delay_ms: 100,
177            retention: TEST_RETENTION,
178            max_chunk_size: 100,
179            prune_concurrency: 1,
180        };
181
182        let metrics = IndexerMetrics::new(None, &Default::default());
183        let cancel = CancellationToken::new();
184
185        let store_clone = store.clone();
186        let cancel_clone = cancel.clone();
187        let handle =
188            reader_watermark::<DataPipeline>(Some(config), store_clone, metrics, cancel_clone);
189
190        TestSetup {
191            store,
192            handle,
193            cancel,
194        }
195    }
196
197    #[tokio::test]
198    async fn test_reader_watermark_updates() {
199        let watermark = MockWatermark {
200            epoch_hi_inclusive: 0,
201            checkpoint_hi_inclusive: 10, // Current high watermark
202            tx_hi: 100,
203            timestamp_ms_hi_inclusive: 1000,
204            reader_lo: 0, // Initial reader_lo
205            pruner_timestamp: 0,
206            pruner_hi: 0,
207        };
208        let polling_interval_ms = 100;
209        let connection_failure_attempts = 0;
210        let set_reader_watermark_failure_attempts = 0;
211        let setup = setup_test(
212            watermark,
213            polling_interval_ms,
214            connection_failure_attempts,
215            set_reader_watermark_failure_attempts,
216        )
217        .await;
218
219        // Wait for a few intervals to allow the task to update the watermark
220        tokio::time::sleep(Duration::from_millis(200)).await;
221
222        // new reader_lo = checkpoint_hi_inclusive (10) - retention (5) + 1 = 6
223        {
224            let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
225            assert_eq!(watermarks.reader_lo, 6);
226        }
227
228        // Clean up
229        setup.cancel.cancel();
230        let _ = setup.handle.await;
231    }
232
233    #[tokio::test]
234    async fn test_reader_watermark_does_not_update_smaller_reader_lo() {
235        let watermark = MockWatermark {
236            epoch_hi_inclusive: 0,
237            checkpoint_hi_inclusive: 10, // Current high watermark
238            tx_hi: 100,
239            timestamp_ms_hi_inclusive: 1000,
240            reader_lo: 7, // Initial reader_lo
241            pruner_timestamp: 0,
242            pruner_hi: 0,
243        };
244        let polling_interval_ms = 100;
245        let connection_failure_attempts = 0;
246        let set_reader_watermark_failure_attempts = 0;
247        let setup = setup_test(
248            watermark,
249            polling_interval_ms,
250            connection_failure_attempts,
251            set_reader_watermark_failure_attempts,
252        )
253        .await;
254
255        // Wait for a few intervals to allow the task to update the watermark
256        tokio::time::sleep(Duration::from_millis(200)).await;
257
258        // new reader_lo = checkpoint_hi_inclusive (10) - retention (5) + 1 = 6,
259        // which is smaller than current reader_lo (7). Therefore, the reader_lo was not updated.
260        {
261            let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
262            assert_eq!(
263                watermarks.reader_lo, 7,
264                "Reader watermark should not be updated when new value is smaller"
265            );
266        }
267
268        // Clean up
269        setup.cancel.cancel();
270        let _ = setup.handle.await;
271    }
272
273    #[tokio::test]
274    async fn test_reader_watermark_retry_update_after_connection_failure() {
275        let watermark = MockWatermark {
276            epoch_hi_inclusive: 0,
277            checkpoint_hi_inclusive: 10, // Current high watermark
278            tx_hi: 100,
279            timestamp_ms_hi_inclusive: 1000,
280            reader_lo: 0, // Initial reader_lo
281            pruner_timestamp: 0,
282            pruner_hi: 0,
283        };
284        let polling_interval_ms = 1_000; // Long interval for testing retry
285        let connection_failure_attempts = 1;
286        let set_reader_watermark_failure_attempts = 0;
287        let setup = setup_test(
288            watermark,
289            polling_interval_ms,
290            connection_failure_attempts,
291            set_reader_watermark_failure_attempts,
292        )
293        .await;
294
295        // Wait for first connection attempt (which should fail)
296        setup
297            .store
298            .wait_for_connection_attempts(1, TEST_TIMEOUT)
299            .await;
300
301        // Verify state before retry succeeds
302        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
303        assert_eq!(
304            watermark.reader_lo, 0,
305            "Reader watermark should not be updated due to DB connection failure"
306        );
307
308        // Wait for second connection attempt (which should succeed)
309        setup
310            .store
311            .wait_for_connection_attempts(2, TEST_TIMEOUT)
312            .await;
313
314        // Wait a bit more for the watermark update to complete
315        tokio::time::sleep(Duration::from_millis(100)).await;
316
317        // Verify state after retry succeeds
318        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
319        assert_eq!(
320            watermark.reader_lo, 6,
321            "Reader watermark should be updated after retry succeeds"
322        );
323
324        // Clean up
325        setup.cancel.cancel();
326        let _ = setup.handle.await;
327    }
328
329    #[tokio::test]
330    async fn test_reader_watermark_retry_update_after_set_watermark_failure() {
331        let watermark = MockWatermark {
332            epoch_hi_inclusive: 0,
333            checkpoint_hi_inclusive: 10, // Current high watermark
334            tx_hi: 100,
335            timestamp_ms_hi_inclusive: 1000,
336            reader_lo: 0, // Initial reader_lo
337            pruner_timestamp: 0,
338            pruner_hi: 0,
339        };
340        let polling_interval_ms = 1_000; // Long interval for testing retry
341        let connection_failure_attempts = 0;
342        let set_reader_watermark_failure_attempts = 1;
343        let setup = setup_test(
344            watermark,
345            polling_interval_ms,
346            connection_failure_attempts,
347            set_reader_watermark_failure_attempts,
348        )
349        .await;
350
351        // Wait for first failed attempt
352        tokio::time::sleep(Duration::from_millis(200)).await;
353
354        // Verify state before retry succeeds
355        {
356            let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
357            assert_eq!(
358                watermarks.reader_lo, 0,
359                "Reader watermark should not be updated due to set_reader_watermark failure"
360            );
361        }
362
363        // Wait for next polling for second attempt
364        tokio::time::sleep(Duration::from_millis(1200)).await;
365
366        // Verify state after retry succeeds
367        {
368            let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
369            assert_eq!(watermarks.reader_lo, 6);
370        }
371
372        // Clean up
373        setup.cancel.cancel();
374        let _ = setup.handle.await;
375    }
376}