sui_indexer_alt_framework_store_traits/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::time::Duration;
5
6use async_trait::async_trait;
7use chrono::DateTime;
8use chrono::Utc;
9use scoped_futures::ScopedBoxFuture;
10
11/// Represents a database connection that can be used by the indexer framework to manage watermark
12/// operations, agnostic of the underlying store implementation.
13#[async_trait]
14pub trait Connection: Send {
15    /// Returns the `InitWatermark` based on the existing watermark if it exists.
16    /// Otherwise, initializes a new watermark record with `InitWatermark` and returns
17    /// the value passed in.
18    async fn init_watermark(
19        &mut self,
20        pipeline_task: &str,
21        init_watermark: InitWatermark,
22    ) -> anyhow::Result<InitWatermark>;
23
24    /// Given a `pipeline_task` representing either a pipeline name or a pipeline with an associated
25    /// task (formatted as `{pipeline}{Store::DELIMITER}{task}`), return the committer watermark
26    /// from the `Store`. The indexer fetches this value for each pipeline added to determine which
27    /// checkpoint to resume processing from.
28    async fn committer_watermark(
29        &mut self,
30        pipeline_task: &str,
31    ) -> anyhow::Result<Option<CommitterWatermark>>;
32
33    /// Given a pipeline, return the reader watermark from the database. This is used by the indexer
34    /// to determine the new `reader_lo` or inclusive lower bound of available data.
35    async fn reader_watermark(
36        &mut self,
37        pipeline: &'static str,
38    ) -> anyhow::Result<Option<ReaderWatermark>>;
39
40    /// Get the bounds for the region that the pruner is allowed to prune, and the time in
41    /// milliseconds the pruner must wait before it can begin pruning data for the given `pipeline`.
42    /// The pruner is allowed to prune the region between the returned `pruner_hi` (inclusive) and
43    /// `reader_lo` (exclusive) after waiting until `pruner_timestamp + delay` has passed. This
44    /// minimizes the possibility for the pruner to delete data still expected by inflight read
45    /// requests.
46    async fn pruner_watermark(
47        &mut self,
48        pipeline: &'static str,
49        delay: Duration,
50    ) -> anyhow::Result<Option<PrunerWatermark>>;
51
52    /// Upsert the high watermark for the `pipeline_task` - representing either a pipeline name or a
53    /// pipeline with an associated task (formatted as `{pipeline}{Store::DELIMITER}{task}`) - as
54    /// long as it raises the watermark stored in the database. Returns a boolean indicating whether
55    /// the watermark was actually updated or not.
56    async fn set_committer_watermark(
57        &mut self,
58        pipeline_task: &str,
59        watermark: CommitterWatermark,
60    ) -> anyhow::Result<bool>;
61
62    /// Update the `reader_lo` of an existing watermark entry only if it raises `reader_lo`. Readers
63    /// will reference this as the inclusive lower bound of available data for the corresponding
64    /// pipeline.
65    ///
66    /// If an update is to be made, some timestamp (i.e `pruner_timestamp`) should also be set on
67    /// the watermark entry to the current time. Ideally, this would be from the perspective of the
68    /// store. If this is not possible, then it should come from some other common source of time
69    /// between the indexer and its readers. This timestamp is critical to the indexer's operations,
70    /// as it determines when the pruner can safely begin pruning data. When `pruner_watermark` is
71    /// called by the indexer, it will retrieve this timestamp to determine how much longer to wait
72    /// before beginning to prune.
73    ///
74    /// Returns a boolean indicating whether the watermark was actually updated or not.
75    async fn set_reader_watermark(
76        &mut self,
77        pipeline: &'static str,
78        reader_lo: u64,
79    ) -> anyhow::Result<bool>;
80
81    /// Update the pruner watermark, returns true if the watermark was actually updated.
82    async fn set_pruner_watermark(
83        &mut self,
84        pipeline: &'static str,
85        pruner_hi: u64,
86    ) -> anyhow::Result<bool>;
87}
88
89/// A storage-agnostic interface that provides database connections for both watermark management
90/// and arbitrary writes. The indexer framework accepts this `Store` implementation to manage
91/// watermarks operations through its associated `Connection` type. This store is also passed to the
92/// pipeline handlers to perform arbitrary writes to the store.
93#[async_trait]
94pub trait Store: Send + Sync + 'static + Clone {
95    type Connection<'c>: Connection
96    where
97        Self: 'c;
98
99    /// Delimiter used to separate pipeline names from task identifiers when reading or writing the
100    /// committer watermark.
101    const DELIMITER: &'static str = "@";
102
103    async fn connect<'c>(&'c self) -> anyhow::Result<Self::Connection<'c>>;
104}
105
106/// Extends the Store trait with transactional capabilities, to be used within the framework for
107/// atomic or transactional writes.
108#[async_trait]
109pub trait TransactionalStore: Store {
110    async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
111    where
112        R: Send + 'a,
113        F: Send + 'a,
114        F: for<'r> FnOnce(
115            &'r mut Self::Connection<'_>,
116        ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>;
117}
118
119/// Used during watermark initialization to set and return state.
120#[derive(Default, Debug, Clone, Copy, PartialEq)]
121pub struct InitWatermark {
122    /// Calculated by the framework as `default_next_checkpoint.checked_sub(1)`.
123    pub checkpoint_hi_inclusive: Option<u64>,
124    /// Calculated by the framework as `default_next_checkpoint`.
125    pub reader_lo: u64,
126}
127
128/// Represents the highest checkpoint for some pipeline that has been processed by the indexer
129/// framework. When read from the `Store`, this represents the inclusive upper bound checkpoint of
130/// data that has been written to the Store for a pipeline.
131#[derive(Default, Debug, Clone, Copy, PartialEq)]
132pub struct CommitterWatermark {
133    pub epoch_hi_inclusive: u64,
134    pub checkpoint_hi_inclusive: u64,
135    pub tx_hi: u64,
136    pub timestamp_ms_hi_inclusive: u64,
137}
138
139/// Represents the inclusive lower bound of available data in the Store for some pipeline.
140#[derive(Default, Debug, Clone, Copy)]
141pub struct ReaderWatermark {
142    /// Within the framework, this value is used to determine the new `reader_lo`.
143    pub checkpoint_hi_inclusive: u64,
144    /// Within the framework, this value is used to check whether to actually make an update
145    /// transaction to the database.
146    pub reader_lo: u64,
147}
148
149/// A watermark that represents the bounds for the region that the pruner is allowed to prune, and
150/// the time in milliseconds the pruner must wait before it can begin pruning data.
151#[derive(Default, Debug, Clone, Copy, PartialEq)]
152pub struct PrunerWatermark {
153    /// The remaining time in milliseconds that the pruner must wait before it can begin pruning.
154    ///
155    /// This is calculated by finding the difference between the time when it becomes safe to prune
156    /// and the current time: `(pruner_timestamp + delay) - current_time`.
157    ///
158    /// The pruner will wait for this duration before beginning to delete data if it is positive.
159    /// When this value is zero or negative, it means the waiting period has already passed and
160    /// pruning can begin immediately.
161    pub wait_for_ms: i64,
162
163    /// The pruner can delete up to this checkpoint (exclusive).
164    pub reader_lo: u64,
165
166    /// The pruner has already deleted up to this checkpoint (exclusive), so can continue from this
167    /// point.
168    pub pruner_hi: u64,
169}
170
171impl CommitterWatermark {
172    pub fn timestamp(&self) -> DateTime<Utc> {
173        DateTime::from_timestamp_millis(self.timestamp_ms_hi_inclusive as i64).unwrap_or_default()
174    }
175
176    /// Convenience function for testing, instantiates a CommitterWatermark with the given
177    /// `checkpoint_hi_inclusive` and sets all other values to 0.
178    pub fn new_for_testing(checkpoint_hi_inclusive: u64) -> Self {
179        CommitterWatermark {
180            epoch_hi_inclusive: 0,
181            checkpoint_hi_inclusive,
182            tx_hi: 0,
183            timestamp_ms_hi_inclusive: 0,
184        }
185    }
186}
187
188impl PrunerWatermark {
189    /// Returns the duration that the pruner must wait before it can begin pruning data.
190    pub fn wait_for(&self) -> Option<Duration> {
191        (self.wait_for_ms > 0).then(|| Duration::from_millis(self.wait_for_ms as u64))
192    }
193
194    /// The next chunk of checkpoints that the pruner should work on, to advance the watermark. If
195    /// no more checkpoints to prune, returns `None`. Otherwise, returns a tuple (from,
196    /// to_exclusive) where `from` is inclusive and `to_exclusive` is exclusive. Advance the
197    /// watermark as well.
198    pub fn next_chunk(&mut self, size: u64) -> Option<(u64, u64)> {
199        if self.pruner_hi >= self.reader_lo {
200            return None;
201        }
202
203        let from = self.pruner_hi;
204        let to_exclusive = (from + size).min(self.reader_lo);
205        self.pruner_hi = to_exclusive;
206        Some((from, to_exclusive))
207    }
208}
209
210/// A utility function for connections that do not have special initialization logic. These
211/// connections delegate initialization to `Connection::committer_watermark`.
212pub async fn init_with_committer_watermark(
213    connection: &mut impl Connection,
214    pipeline_task: &str,
215    init_watermark: InitWatermark,
216) -> anyhow::Result<InitWatermark> {
217    let checkpoint_hi_inclusive = connection
218        .committer_watermark(pipeline_task)
219        .await?
220        .map(|w| w.checkpoint_hi_inclusive);
221    Ok(InitWatermark {
222        checkpoint_hi_inclusive,
223        ..init_watermark
224    })
225}
226
227/// Check that the pipeline name does not contain the store's delimiter, and construct the string
228/// used for tracking a pipeline's watermarks in the store. This is either the pipeline name itself,
229/// or `{pipeline}{Store::DELIMITER}{task}` if a task name is provided.
230pub fn pipeline_task<S: Store>(
231    pipeline_name: &'static str,
232    task_name: Option<&str>,
233) -> anyhow::Result<String> {
234    if pipeline_name.contains(S::DELIMITER) {
235        anyhow::bail!(
236            "Pipeline name '{}' contains invalid delimiter '{}'",
237            pipeline_name,
238            S::DELIMITER
239        );
240    }
241
242    Ok(match task_name {
243        Some(task_name) => format!("{}{}{}", pipeline_name, S::DELIMITER, task_name),
244        None => pipeline_name.to_string(),
245    })
246}
247
248#[cfg(test)]
249mod tests {
250    use super::*;
251    use std::time::Duration;
252
253    #[test]
254    fn test_pruner_watermark_wait_for_positive() {
255        let watermark = PrunerWatermark {
256            wait_for_ms: 5000, // 5 seconds
257            reader_lo: 1000,
258            pruner_hi: 500,
259        };
260
261        assert_eq!(watermark.wait_for(), Some(Duration::from_millis(5000)));
262    }
263
264    #[test]
265    fn test_pruner_watermark_wait_for_zero() {
266        let watermark = PrunerWatermark {
267            wait_for_ms: 0,
268            reader_lo: 1000,
269            pruner_hi: 500,
270        };
271
272        assert_eq!(watermark.wait_for(), None);
273    }
274
275    #[test]
276    fn test_pruner_watermark_wait_for_negative() {
277        let watermark = PrunerWatermark {
278            wait_for_ms: -5000,
279            reader_lo: 1000,
280            pruner_hi: 500,
281        };
282
283        assert_eq!(watermark.wait_for(), None);
284    }
285
286    #[test]
287    fn test_pruner_watermark_no_more_chunks() {
288        let mut watermark = PrunerWatermark {
289            wait_for_ms: 0,
290            reader_lo: 1000,
291            pruner_hi: 1000,
292        };
293
294        assert_eq!(watermark.next_chunk(100), None);
295    }
296
297    #[test]
298    fn test_pruner_watermark_chunk_boundaries() {
299        let mut watermark = PrunerWatermark {
300            wait_for_ms: 0,
301            reader_lo: 1000,
302            pruner_hi: 100,
303        };
304
305        assert_eq!(watermark.next_chunk(100), Some((100, 200)));
306        assert_eq!(watermark.pruner_hi, 200);
307        assert_eq!(watermark.next_chunk(100), Some((200, 300)));
308
309        // Reset and test oversized chunk
310        let mut watermark = PrunerWatermark {
311            wait_for_ms: 0,
312            reader_lo: 1000,
313            pruner_hi: 500,
314        };
315
316        // Chunk larger than remaining range
317        assert_eq!(watermark.next_chunk(2000), Some((500, 1000)));
318        assert_eq!(watermark.pruner_hi, 1000);
319        assert_eq!(watermark.next_chunk(2000), None);
320    }
321}