sui_indexer_alt_framework_store_traits/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::time::Duration;
5
6use anyhow::Result;
7use async_trait::async_trait;
8use chrono::DateTime;
9use chrono::Utc;
10use scoped_futures::ScopedBoxFuture;
11
12/// Represents a database connection that can be used by the indexer framework to manage watermark
13/// operations, agnostic of the underlying store implementation.
14#[async_trait]
15pub trait Connection: Send {
16    /// If no existing watermark record exists, initializes it with `default_next_checkpoint`.
17    /// Returns the committer watermark `checkpoint_hi_inclusive`.
18    async fn init_watermark(
19        &mut self,
20        pipeline_task: &str,
21        default_next_checkpoint: u64,
22    ) -> anyhow::Result<Option<u64>>;
23
24    /// Given a `pipeline_task` representing either a pipeline name or a pipeline with an associated
25    /// task (formatted as `{pipeline}{Store::DELIMITER}{task}`), return the committer watermark
26    /// from the `Store`. The indexer fetches this value for each pipeline added to determine which
27    /// checkpoint to resume processing from.
28    async fn committer_watermark(
29        &mut self,
30        pipeline_task: &str,
31    ) -> anyhow::Result<Option<CommitterWatermark>>;
32
33    /// Given a pipeline, return the reader watermark from the database. This is used by the indexer
34    /// to determine the new `reader_lo` or inclusive lower bound of available data.
35    async fn reader_watermark(
36        &mut self,
37        pipeline: &'static str,
38    ) -> anyhow::Result<Option<ReaderWatermark>>;
39
40    /// Get the bounds for the region that the pruner is allowed to prune, and the time in
41    /// milliseconds the pruner must wait before it can begin pruning data for the given `pipeline`.
42    /// The pruner is allowed to prune the region between the returned `pruner_hi` (inclusive) and
43    /// `reader_lo` (exclusive) after waiting until `pruner_timestamp + delay` has passed. This
44    /// minimizes the possibility for the pruner to delete data still expected by inflight read
45    /// requests.
46    async fn pruner_watermark(
47        &mut self,
48        pipeline: &'static str,
49        delay: Duration,
50    ) -> anyhow::Result<Option<PrunerWatermark>>;
51
52    /// Upsert the high watermark for the `pipeline_task` - representing either a pipeline name or a
53    /// pipeline with an associated task (formatted as `{pipeline}{Store::DELIMITER}{task}`) - as
54    /// long as it raises the watermark stored in the database. Returns a boolean indicating whether
55    /// the watermark was actually updated or not.
56    async fn set_committer_watermark(
57        &mut self,
58        pipeline_task: &str,
59        watermark: CommitterWatermark,
60    ) -> anyhow::Result<bool>;
61
62    /// Update the `reader_lo` of an existing watermark entry only if it raises `reader_lo`. Readers
63    /// will reference this as the inclusive lower bound of available data for the corresponding
64    /// pipeline.
65    ///
66    /// If an update is to be made, some timestamp (i.e `pruner_timestamp`) should also be set on
67    /// the watermark entry to the current time. Ideally, this would be from the perspective of the
68    /// store. If this is not possible, then it should come from some other common source of time
69    /// between the indexer and its readers. This timestamp is critical to the indexer's operations,
70    /// as it determines when the pruner can safely begin pruning data. When `pruner_watermark` is
71    /// called by the indexer, it will retrieve this timestamp to determine how much longer to wait
72    /// before beginning to prune.
73    ///
74    /// Returns a boolean indicating whether the watermark was actually updated or not.
75    async fn set_reader_watermark(
76        &mut self,
77        pipeline: &'static str,
78        reader_lo: u64,
79    ) -> anyhow::Result<bool>;
80
81    /// Update the pruner watermark, returns true if the watermark was actually updated.
82    async fn set_pruner_watermark(
83        &mut self,
84        pipeline: &'static str,
85        pruner_hi: u64,
86    ) -> anyhow::Result<bool>;
87}
88
89/// A storage-agnostic interface that provides database connections for both watermark management
90/// and arbitrary writes. The indexer framework accepts this `Store` implementation to manage
91/// watermarks operations through its associated `Connection` type. This store is also passed to the
92/// pipeline handlers to perform arbitrary writes to the store.
93#[async_trait]
94pub trait Store: Send + Sync + 'static + Clone {
95    type Connection<'c>: Connection
96    where
97        Self: 'c;
98
99    /// Delimiter used to separate pipeline names from task identifiers when reading or writing the
100    /// committer watermark.
101    const DELIMITER: &'static str = "@";
102
103    async fn connect<'c>(&'c self) -> Result<Self::Connection<'c>, anyhow::Error>;
104}
105
106/// Extends the Store trait with transactional capabilities, to be used within the framework for
107/// atomic or transactional writes.
108#[async_trait]
109pub trait TransactionalStore: Store {
110    async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
111    where
112        R: Send + 'a,
113        F: Send + 'a,
114        F: for<'r> FnOnce(
115            &'r mut Self::Connection<'_>,
116        ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>;
117}
118
119/// Represents the highest checkpoint for some pipeline that has been processed by the indexer
120/// framework. When read from the `Store`, this represents the inclusive upper bound checkpoint of
121/// data that has been written to the Store for a pipeline.
122#[derive(Default, Debug, Clone, Copy, PartialEq)]
123pub struct CommitterWatermark {
124    pub epoch_hi_inclusive: u64,
125    pub checkpoint_hi_inclusive: u64,
126    pub tx_hi: u64,
127    pub timestamp_ms_hi_inclusive: u64,
128}
129
130/// Represents the inclusive lower bound of available data in the Store for some pipeline.
131#[derive(Default, Debug, Clone, Copy)]
132pub struct ReaderWatermark {
133    /// Within the framework, this value is used to determine the new `reader_lo`.
134    pub checkpoint_hi_inclusive: u64,
135    /// Within the framework, this value is used to check whether to actually make an update
136    /// transaction to the database.
137    pub reader_lo: u64,
138}
139
140/// A watermark that represents the bounds for the region that the pruner is allowed to prune, and
141/// the time in milliseconds the pruner must wait before it can begin pruning data.
142#[derive(Default, Debug, Clone, Copy, PartialEq)]
143pub struct PrunerWatermark {
144    /// The remaining time in milliseconds that the pruner must wait before it can begin pruning.
145    ///
146    /// This is calculated by finding the difference between the time when it becomes safe to prune
147    /// and the current time: `(pruner_timestamp + delay) - current_time`.
148    ///
149    /// The pruner will wait for this duration before beginning to delete data if it is positive.
150    /// When this value is zero or negative, it means the waiting period has already passed and
151    /// pruning can begin immediately.
152    pub wait_for_ms: i64,
153
154    /// The pruner can delete up to this checkpoint (exclusive).
155    pub reader_lo: u64,
156
157    /// The pruner has already deleted up to this checkpoint (exclusive), so can continue from this
158    /// point.
159    pub pruner_hi: u64,
160}
161
162impl CommitterWatermark {
163    pub fn timestamp(&self) -> DateTime<Utc> {
164        DateTime::from_timestamp_millis(self.timestamp_ms_hi_inclusive as i64).unwrap_or_default()
165    }
166
167    /// Convenience function for testing, instantiates a CommitterWatermark with the given
168    /// `checkpoint_hi_inclusive` and sets all other values to 0.
169    pub fn new_for_testing(checkpoint_hi_inclusive: u64) -> Self {
170        CommitterWatermark {
171            epoch_hi_inclusive: 0,
172            checkpoint_hi_inclusive,
173            tx_hi: 0,
174            timestamp_ms_hi_inclusive: 0,
175        }
176    }
177}
178
179impl PrunerWatermark {
180    /// Returns the duration that the pruner must wait before it can begin pruning data.
181    pub fn wait_for(&self) -> Option<Duration> {
182        (self.wait_for_ms > 0).then(|| Duration::from_millis(self.wait_for_ms as u64))
183    }
184
185    /// The next chunk of checkpoints that the pruner should work on, to advance the watermark. If
186    /// no more checkpoints to prune, returns `None`. Otherwise, returns a tuple (from,
187    /// to_exclusive) where `from` is inclusive and `to_exclusive` is exclusive. Advance the
188    /// watermark as well.
189    pub fn next_chunk(&mut self, size: u64) -> Option<(u64, u64)> {
190        if self.pruner_hi >= self.reader_lo {
191            return None;
192        }
193
194        let from = self.pruner_hi;
195        let to_exclusive = (from + size).min(self.reader_lo);
196        self.pruner_hi = to_exclusive;
197        Some((from, to_exclusive))
198    }
199}
200
201/// Check that the pipeline name does not contain the store's delimiter, and construct the string
202/// used for tracking a pipeline's watermarks in the store. This is either the pipeline name itself,
203/// or `{pipeline}{Store::DELIMITER}{task}` if a task name is provided.
204pub fn pipeline_task<S: Store>(
205    pipeline_name: &'static str,
206    task_name: Option<&str>,
207) -> Result<String> {
208    if pipeline_name.contains(S::DELIMITER) {
209        anyhow::bail!(
210            "Pipeline name '{}' contains invalid delimiter '{}'",
211            pipeline_name,
212            S::DELIMITER
213        );
214    }
215
216    Ok(match task_name {
217        Some(task_name) => format!("{}{}{}", pipeline_name, S::DELIMITER, task_name),
218        None => pipeline_name.to_string(),
219    })
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225    use std::time::Duration;
226
227    #[test]
228    fn test_pruner_watermark_wait_for_positive() {
229        let watermark = PrunerWatermark {
230            wait_for_ms: 5000, // 5 seconds
231            reader_lo: 1000,
232            pruner_hi: 500,
233        };
234
235        assert_eq!(watermark.wait_for(), Some(Duration::from_millis(5000)));
236    }
237
238    #[test]
239    fn test_pruner_watermark_wait_for_zero() {
240        let watermark = PrunerWatermark {
241            wait_for_ms: 0,
242            reader_lo: 1000,
243            pruner_hi: 500,
244        };
245
246        assert_eq!(watermark.wait_for(), None);
247    }
248
249    #[test]
250    fn test_pruner_watermark_wait_for_negative() {
251        let watermark = PrunerWatermark {
252            wait_for_ms: -5000,
253            reader_lo: 1000,
254            pruner_hi: 500,
255        };
256
257        assert_eq!(watermark.wait_for(), None);
258    }
259
260    #[test]
261    fn test_pruner_watermark_no_more_chunks() {
262        let mut watermark = PrunerWatermark {
263            wait_for_ms: 0,
264            reader_lo: 1000,
265            pruner_hi: 1000,
266        };
267
268        assert_eq!(watermark.next_chunk(100), None);
269    }
270
271    #[test]
272    fn test_pruner_watermark_chunk_boundaries() {
273        let mut watermark = PrunerWatermark {
274            wait_for_ms: 0,
275            reader_lo: 1000,
276            pruner_hi: 100,
277        };
278
279        assert_eq!(watermark.next_chunk(100), Some((100, 200)));
280        assert_eq!(watermark.pruner_hi, 200);
281        assert_eq!(watermark.next_chunk(100), Some((200, 300)));
282
283        // Reset and test oversized chunk
284        let mut watermark = PrunerWatermark {
285            wait_for_ms: 0,
286            reader_lo: 1000,
287            pruner_hi: 500,
288        };
289
290        // Chunk larger than remaining range
291        assert_eq!(watermark.next_chunk(2000), Some((500, 1000)));
292        assert_eq!(watermark.pruner_hi, 1000);
293        assert_eq!(watermark.next_chunk(2000), None);
294    }
295}