sui_indexer_alt_framework/pipeline/concurrent/
reader_watermark.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use sui_futures::service::Service;
7use tokio::time::interval;
8use tracing::debug;
9use tracing::info;
10use tracing::warn;
11
12use crate::metrics::IndexerMetrics;
13use crate::pipeline::concurrent::Handler;
14use crate::pipeline::concurrent::PrunerConfig;
15use crate::store::ConcurrentConnection;
16use crate::store::Store;
17
18/// The reader watermark task is responsible for updating the `reader_lo` and `pruner_timestamp`
19/// values for a pipeline's row in the watermark table, based on the pruner configuration, and the
20/// committer's progress.
21///
22/// `reader_lo` is the lowest checkpoint that readers are allowed to read from with a guarantee of
23/// data availability for this pipeline, and `pruner_timestamp` is the timestamp at which this task
24/// last updated that watermark. The timestamp is always fetched from the database (not from the
25/// indexer or the reader), to avoid issues with drift between clocks.
26///
27/// If there is no pruner configuration, this task will immediately exit.
28pub(super) fn reader_watermark<H: Handler>(
29    config: Option<PrunerConfig>,
30    store: H::Store,
31    metrics: Arc<IndexerMetrics>,
32) -> Service {
33    Service::new().spawn_aborting(async move {
34        let Some(config) = config else {
35            info!(pipeline = H::NAME, "Skipping reader watermark task");
36            return Ok(());
37        };
38
39        let mut poll = interval(config.interval());
40
41        loop {
42            poll.tick().await;
43
44            let Ok(mut conn) = store.connect().await else {
45                warn!(
46                    pipeline = H::NAME,
47                    "Reader watermark task failed to get connection for DB"
48                );
49                continue;
50            };
51
52            let current = match conn.reader_watermark(H::NAME).await {
53                Ok(Some(current)) => current,
54
55                Ok(None) => {
56                    info!(pipeline = H::NAME, "No watermark for pipeline, skipping");
57                    continue;
58                }
59
60                Err(e) => {
61                    warn!(pipeline = H::NAME, "Failed to get current watermark: {e}");
62                    continue;
63                }
64            };
65
66            // Calculate the new reader watermark based on the current high watermark.
67            let new_reader_lo =
68                (current.checkpoint_hi_inclusive + 1).saturating_sub(config.retention);
69
70            if new_reader_lo <= current.reader_lo {
71                debug!(
72                    pipeline = H::NAME,
73                    current = current.reader_lo,
74                    new = new_reader_lo,
75                    "No change to reader watermark",
76                );
77                continue;
78            }
79
80            metrics
81                .watermark_reader_lo
82                .with_label_values(&[H::NAME])
83                .set(new_reader_lo as i64);
84
85            let Ok(updated) = conn.set_reader_watermark(H::NAME, new_reader_lo).await else {
86                warn!(pipeline = H::NAME, "Failed to update reader watermark");
87                continue;
88            };
89
90            if updated {
91                info!(pipeline = H::NAME, new_reader_lo, "Watermark");
92
93                metrics
94                    .watermark_reader_lo_in_db
95                    .with_label_values(&[H::NAME])
96                    .set(new_reader_lo as i64);
97            }
98        }
99    })
100}
101
102#[cfg(test)]
103mod tests {
104    use std::sync::Arc;
105
106    use async_trait::async_trait;
107    use sui_field_count::FieldCount;
108    use sui_indexer_alt_framework_store_traits::testing::mock_store::MockWatermark;
109    use sui_types::full_checkpoint_content::Checkpoint;
110    use tokio::time::Duration;
111
112    use crate::metrics::IndexerMetrics;
113    use crate::mocks::store::FallibleMockConnection;
114    use crate::mocks::store::FallibleMockStore;
115    use crate::pipeline::Processor;
116    use crate::pipeline::concurrent::BatchStatus;
117
118    use super::*;
119
120    // Fixed retention value used across all tests
121    const TEST_RETENTION: u64 = 5;
122    // Default timeout for test operations
123    const TEST_TIMEOUT: Duration = Duration::from_secs(20);
124
125    #[derive(Clone, FieldCount)]
126    pub struct StoredData;
127
128    pub struct DataPipeline;
129
130    #[async_trait]
131    impl Processor for DataPipeline {
132        const NAME: &'static str = "data";
133        type Value = StoredData;
134
135        async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
136            Ok(vec![])
137        }
138    }
139
140    #[async_trait]
141    impl Handler for DataPipeline {
142        type Store = FallibleMockStore;
143        type Batch = Vec<Self::Value>;
144
145        fn batch(
146            &self,
147            batch: &mut Self::Batch,
148            values: &mut std::vec::IntoIter<Self::Value>,
149        ) -> BatchStatus {
150            batch.extend(values);
151            BatchStatus::Pending
152        }
153
154        async fn commit<'a>(
155            &self,
156            _batch: &Self::Batch,
157            _conn: &mut FallibleMockConnection<'a>,
158        ) -> anyhow::Result<usize> {
159            Ok(0)
160        }
161    }
162
163    struct TestSetup {
164        store: FallibleMockStore,
165        #[allow(unused)]
166        handle: Service,
167    }
168
169    async fn setup_test(
170        watermark: MockWatermark,
171        interval_ms: u64,
172        connection_failure_attempts: usize,
173        set_reader_watermark_failure_attempts: usize,
174    ) -> TestSetup {
175        let store = FallibleMockStore::new()
176            .with_watermark(DataPipeline::NAME, watermark)
177            .with_reader_watermark_failures(set_reader_watermark_failure_attempts)
178            .with_connection_failures(connection_failure_attempts);
179
180        let config = PrunerConfig {
181            interval_ms,
182            delay_ms: 100,
183            retention: TEST_RETENTION,
184            max_chunk_size: 100,
185            prune_concurrency: 1,
186        };
187
188        let metrics = IndexerMetrics::new(None, &Default::default());
189
190        let store_clone = store.clone();
191        let handle = reader_watermark::<DataPipeline>(Some(config), store_clone, metrics);
192
193        TestSetup { store, handle }
194    }
195
196    #[tokio::test]
197    async fn test_reader_watermark_updates() {
198        let watermark = MockWatermark {
199            epoch_hi_inclusive: 0,
200            checkpoint_hi_inclusive: Some(10), // Current high watermark
201            tx_hi: 100,
202            timestamp_ms_hi_inclusive: 1000,
203            reader_lo: 0, // Initial reader_lo
204            pruner_timestamp: 0,
205            pruner_hi: 0,
206            chain_id: None,
207        };
208        let polling_interval_ms = 100;
209        let connection_failure_attempts = 0;
210        let set_reader_watermark_failure_attempts = 0;
211        let setup = setup_test(
212            watermark,
213            polling_interval_ms,
214            connection_failure_attempts,
215            set_reader_watermark_failure_attempts,
216        )
217        .await;
218
219        // Wait for a few intervals to allow the task to update the watermark
220        tokio::time::sleep(Duration::from_millis(200)).await;
221
222        // new reader_lo = checkpoint_hi_inclusive (10) - retention (5) + 1 = 6
223        {
224            let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
225            assert_eq!(watermarks.reader_lo, 6);
226        }
227    }
228
229    #[tokio::test]
230    async fn test_reader_watermark_does_not_update_smaller_reader_lo() {
231        let watermark = MockWatermark {
232            epoch_hi_inclusive: 0,
233            checkpoint_hi_inclusive: Some(10), // Current high watermark
234            tx_hi: 100,
235            timestamp_ms_hi_inclusive: 1000,
236            reader_lo: 7, // Initial reader_lo
237            pruner_timestamp: 0,
238            pruner_hi: 0,
239            chain_id: None,
240        };
241        let polling_interval_ms = 100;
242        let connection_failure_attempts = 0;
243        let set_reader_watermark_failure_attempts = 0;
244        let setup = setup_test(
245            watermark,
246            polling_interval_ms,
247            connection_failure_attempts,
248            set_reader_watermark_failure_attempts,
249        )
250        .await;
251
252        // Wait for a few intervals to allow the task to update the watermark
253        tokio::time::sleep(Duration::from_millis(200)).await;
254
255        // new reader_lo = checkpoint_hi_inclusive (10) - retention (5) + 1 = 6,
256        // which is smaller than current reader_lo (7). Therefore, the reader_lo was not updated.
257        {
258            let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
259            assert_eq!(
260                watermarks.reader_lo, 7,
261                "Reader watermark should not be updated when new value is smaller"
262            );
263        }
264    }
265
266    #[tokio::test]
267    async fn test_reader_watermark_retry_update_after_connection_failure() {
268        let watermark = MockWatermark {
269            epoch_hi_inclusive: 0,
270            checkpoint_hi_inclusive: Some(10), // Current high watermark
271            tx_hi: 100,
272            timestamp_ms_hi_inclusive: 1000,
273            reader_lo: 0, // Initial reader_lo
274            pruner_timestamp: 0,
275            pruner_hi: 0,
276            chain_id: None,
277        };
278        let polling_interval_ms = 1_000; // Long interval for testing retry
279        let connection_failure_attempts = 1;
280        let set_reader_watermark_failure_attempts = 0;
281        let setup = setup_test(
282            watermark,
283            polling_interval_ms,
284            connection_failure_attempts,
285            set_reader_watermark_failure_attempts,
286        )
287        .await;
288
289        // Wait for first connection attempt (which should fail)
290        setup
291            .store
292            .wait_for_connection_attempts(1, TEST_TIMEOUT)
293            .await;
294
295        // Verify state before retry succeeds
296        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
297        assert_eq!(
298            watermark.reader_lo, 0,
299            "Reader watermark should not be updated due to DB connection failure"
300        );
301
302        // Wait for second connection attempt (which should succeed)
303        setup
304            .store
305            .wait_for_connection_attempts(2, TEST_TIMEOUT)
306            .await;
307
308        // Wait a bit more for the watermark update to complete
309        tokio::time::sleep(Duration::from_millis(100)).await;
310
311        // Verify state after retry succeeds
312        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
313        assert_eq!(
314            watermark.reader_lo, 6,
315            "Reader watermark should be updated after retry succeeds"
316        );
317    }
318
319    #[tokio::test]
320    async fn test_reader_watermark_retry_update_after_set_watermark_failure() {
321        let watermark = MockWatermark {
322            epoch_hi_inclusive: 0,
323            checkpoint_hi_inclusive: Some(10), // Current high watermark
324            tx_hi: 100,
325            timestamp_ms_hi_inclusive: 1000,
326            reader_lo: 0, // Initial reader_lo
327            pruner_timestamp: 0,
328            pruner_hi: 0,
329            chain_id: None,
330        };
331        let polling_interval_ms = 1_000; // Long interval for testing retry
332        let connection_failure_attempts = 0;
333        let set_reader_watermark_failure_attempts = 1;
334        let setup = setup_test(
335            watermark,
336            polling_interval_ms,
337            connection_failure_attempts,
338            set_reader_watermark_failure_attempts,
339        )
340        .await;
341
342        // Wait for first failed attempt
343        tokio::time::sleep(Duration::from_millis(200)).await;
344
345        // Verify state before retry succeeds
346        {
347            let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
348            assert_eq!(
349                watermarks.reader_lo, 0,
350                "Reader watermark should not be updated due to set_reader_watermark failure"
351            );
352        }
353
354        // Wait for next polling for second attempt
355        tokio::time::sleep(Duration::from_millis(1200)).await;
356
357        // Verify state after retry succeeds
358        {
359            let watermarks = setup.store.watermark(DataPipeline::NAME).unwrap();
360            assert_eq!(watermarks.reader_lo, 6);
361        }
362    }
363}