sui_indexer_alt_framework_store_traits/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anyhow::Result;
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use scoped_futures::ScopedBoxFuture;
8use std::time::Duration;
9
10/// Represents a database connection that can be used by the indexer framework to manage watermark
11/// operations, agnostic of the underlying store implementation.
12#[async_trait]
13pub trait Connection: Send {
14    /// Given a `pipeline_task` representing either a pipeline name or a pipeline with an associated
15    /// task (formatted as `{pipeline}{Store::DELIMITER}{task}`), return the committer watermark
16    /// from the `Store`. The indexer fetches this value for each pipeline added to determine which
17    /// checkpoint to resume processing from.
18    async fn committer_watermark(
19        &mut self,
20        pipeline_task: &str,
21    ) -> anyhow::Result<Option<CommitterWatermark>>;
22
23    /// Given a pipeline, return the reader watermark from the database. This is used by the indexer
24    /// to determine the new `reader_lo` or inclusive lower bound of available data.
25    async fn reader_watermark(
26        &mut self,
27        pipeline: &'static str,
28    ) -> anyhow::Result<Option<ReaderWatermark>>;
29
30    /// Get the bounds for the region that the pruner is allowed to prune, and the time in
31    /// milliseconds the pruner must wait before it can begin pruning data for the given `pipeline`.
32    /// The pruner is allowed to prune the region between the returned `pruner_hi` (inclusive) and
33    /// `reader_lo` (exclusive) after waiting until `pruner_timestamp + delay` has passed. This
34    /// minimizes the possibility for the pruner to delete data still expected by inflight read
35    /// requests.
36    async fn pruner_watermark(
37        &mut self,
38        pipeline: &'static str,
39        delay: Duration,
40    ) -> anyhow::Result<Option<PrunerWatermark>>;
41
42    /// Upsert the high watermark for the `pipeline_task` - representing either a pipeline name or a
43    /// pipeline with an associated task (formatted as `{pipeline}{Store::DELIMITER}{task}`) - as
44    /// long as it raises the watermark stored in the database. Returns a boolean indicating whether
45    /// the watermark was actually updated or not.
46    async fn set_committer_watermark(
47        &mut self,
48        pipeline_task: &str,
49        watermark: CommitterWatermark,
50    ) -> anyhow::Result<bool>;
51
52    /// Update the `reader_lo` of an existing watermark entry only if it raises `reader_lo`. Readers
53    /// will reference this as the inclusive lower bound of available data for the corresponding
54    /// pipeline.
55    ///
56    /// If an update is to be made, some timestamp (i.e `pruner_timestamp`) should also be set on
57    /// the watermark entry to the current time. Ideally, this would be from the perspective of the
58    /// store. If this is not possible, then it should come from some other common source of time
59    /// between the indexer and its readers. This timestamp is critical to the indexer's operations,
60    /// as it determines when the pruner can safely begin pruning data. When `pruner_watermark` is
61    /// called by the indexer, it will retrieve this timestamp to determine how much longer to wait
62    /// before beginning to prune.
63    ///
64    /// Returns a boolean indicating whether the watermark was actually updated or not.
65    async fn set_reader_watermark(
66        &mut self,
67        pipeline: &'static str,
68        reader_lo: u64,
69    ) -> anyhow::Result<bool>;
70
71    /// Update the pruner watermark, returns true if the watermark was actually updated.
72    async fn set_pruner_watermark(
73        &mut self,
74        pipeline: &'static str,
75        pruner_hi: u64,
76    ) -> anyhow::Result<bool>;
77}
78
79/// A storage-agnostic interface that provides database connections for both watermark management
80/// and arbitrary writes. The indexer framework accepts this `Store` implementation to manage
81/// watermarks operations through its associated `Connection` type. This store is also passed to the
82/// pipeline handlers to perform arbitrary writes to the store.
83#[async_trait]
84pub trait Store: Send + Sync + 'static + Clone {
85    type Connection<'c>: Connection
86    where
87        Self: 'c;
88
89    /// Delimiter used to separate pipeline names from task identifiers when reading or writing the
90    /// committer watermark.
91    const DELIMITER: &'static str = "@";
92
93    async fn connect<'c>(&'c self) -> Result<Self::Connection<'c>, anyhow::Error>;
94}
95
96/// Extends the Store trait with transactional capabilities, to be used within the framework for
97/// atomic or transactional writes.
98#[async_trait]
99pub trait TransactionalStore: Store {
100    async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
101    where
102        R: Send + 'a,
103        F: Send + 'a,
104        F: for<'r> FnOnce(
105            &'r mut Self::Connection<'_>,
106        ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>;
107}
108
109/// Represents the highest checkpoint for some pipeline that has been processed by the indexer
110/// framework. When read from the `Store`, this represents the inclusive upper bound checkpoint of
111/// data that has been written to the Store for a pipeline.
112#[derive(Default, Debug, Clone, Copy)]
113pub struct CommitterWatermark {
114    pub epoch_hi_inclusive: u64,
115    pub checkpoint_hi_inclusive: u64,
116    pub tx_hi: u64,
117    pub timestamp_ms_hi_inclusive: u64,
118}
119
120/// Represents the inclusive lower bound of available data in the Store for some pipeline.
121#[derive(Default, Debug, Clone, Copy)]
122pub struct ReaderWatermark {
123    /// Within the framework, this value is used to determine the new `reader_lo`.
124    pub checkpoint_hi_inclusive: u64,
125    /// Within the framework, this value is used to check whether to actually make an update
126    /// transaction to the database.
127    pub reader_lo: u64,
128}
129
130/// A watermark that represents the bounds for the region that the pruner is allowed to prune, and
131/// the time in milliseconds the pruner must wait before it can begin pruning data.
132#[derive(Default, Debug, Clone, Copy)]
133pub struct PrunerWatermark {
134    /// The remaining time in milliseconds that the pruner must wait before it can begin pruning.
135    ///
136    /// This is calculated by finding the difference between the time when it becomes safe to prune
137    /// and the current time: `(pruner_timestamp + delay) - current_time`.
138    ///
139    /// The pruner will wait for this duration before beginning to delete data if it is positive.
140    /// When this value is zero or negative, it means the waiting period has already passed and
141    /// pruning can begin immediately.
142    pub wait_for_ms: i64,
143
144    /// The pruner can delete up to this checkpoint (exclusive).
145    pub reader_lo: u64,
146
147    /// The pruner has already deleted up to this checkpoint (exclusive), so can continue from this
148    /// point.
149    pub pruner_hi: u64,
150}
151
152impl CommitterWatermark {
153    pub fn timestamp(&self) -> DateTime<Utc> {
154        DateTime::from_timestamp_millis(self.timestamp_ms_hi_inclusive as i64).unwrap_or_default()
155    }
156
157    /// Convenience function for testing, instantiates a CommitterWatermark with the given
158    /// `checkpoint_hi_inclusive` and sets all other values to 0.
159    pub fn new_for_testing(checkpoint_hi_inclusive: u64) -> Self {
160        CommitterWatermark {
161            epoch_hi_inclusive: 0,
162            checkpoint_hi_inclusive,
163            tx_hi: 0,
164            timestamp_ms_hi_inclusive: 0,
165        }
166    }
167}
168
169impl PrunerWatermark {
170    /// Returns the duration that the pruner must wait before it can begin pruning data.
171    pub fn wait_for(&self) -> Option<Duration> {
172        (self.wait_for_ms > 0).then(|| Duration::from_millis(self.wait_for_ms as u64))
173    }
174
175    /// The next chunk of checkpoints that the pruner should work on, to advance the watermark. If
176    /// no more checkpoints to prune, returns `None`. Otherwise, returns a tuple (from,
177    /// to_exclusive) where `from` is inclusive and `to_exclusive` is exclusive. Advance the
178    /// watermark as well.
179    pub fn next_chunk(&mut self, size: u64) -> Option<(u64, u64)> {
180        if self.pruner_hi >= self.reader_lo {
181            return None;
182        }
183
184        let from = self.pruner_hi;
185        let to_exclusive = (from + size).min(self.reader_lo);
186        self.pruner_hi = to_exclusive;
187        Some((from, to_exclusive))
188    }
189}
190
191/// Check that the pipeline name does not contain the store's delimiter, and construct the string
192/// used for tracking a pipeline's watermarks in the store. This is either the pipeline name itself,
193/// or `{pipeline}{Store::DELIMITER}{task}` if a task name is provided.
194pub fn pipeline_task<S: Store>(
195    pipeline_name: &'static str,
196    task_name: Option<&str>,
197) -> Result<String> {
198    if pipeline_name.contains(S::DELIMITER) {
199        anyhow::bail!(
200            "Pipeline name '{}' contains invalid delimiter '{}'",
201            pipeline_name,
202            S::DELIMITER
203        );
204    }
205
206    Ok(match task_name {
207        Some(task_name) => format!("{}{}{}", pipeline_name, S::DELIMITER, task_name),
208        None => pipeline_name.to_string(),
209    })
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215    use std::time::Duration;
216
217    #[test]
218    fn test_pruner_watermark_wait_for_positive() {
219        let watermark = PrunerWatermark {
220            wait_for_ms: 5000, // 5 seconds
221            reader_lo: 1000,
222            pruner_hi: 500,
223        };
224
225        assert_eq!(watermark.wait_for(), Some(Duration::from_millis(5000)));
226    }
227
228    #[test]
229    fn test_pruner_watermark_wait_for_zero() {
230        let watermark = PrunerWatermark {
231            wait_for_ms: 0,
232            reader_lo: 1000,
233            pruner_hi: 500,
234        };
235
236        assert_eq!(watermark.wait_for(), None);
237    }
238
239    #[test]
240    fn test_pruner_watermark_wait_for_negative() {
241        let watermark = PrunerWatermark {
242            wait_for_ms: -5000,
243            reader_lo: 1000,
244            pruner_hi: 500,
245        };
246
247        assert_eq!(watermark.wait_for(), None);
248    }
249
250    #[test]
251    fn test_pruner_watermark_no_more_chunks() {
252        let mut watermark = PrunerWatermark {
253            wait_for_ms: 0,
254            reader_lo: 1000,
255            pruner_hi: 1000,
256        };
257
258        assert_eq!(watermark.next_chunk(100), None);
259    }
260
261    #[test]
262    fn test_pruner_watermark_chunk_boundaries() {
263        let mut watermark = PrunerWatermark {
264            wait_for_ms: 0,
265            reader_lo: 1000,
266            pruner_hi: 100,
267        };
268
269        assert_eq!(watermark.next_chunk(100), Some((100, 200)));
270        assert_eq!(watermark.pruner_hi, 200);
271        assert_eq!(watermark.next_chunk(100), Some((200, 300)));
272
273        // Reset and test oversized chunk
274        let mut watermark = PrunerWatermark {
275            wait_for_ms: 0,
276            reader_lo: 1000,
277            pruner_hi: 500,
278        };
279
280        // Chunk larger than remaining range
281        assert_eq!(watermark.next_chunk(2000), Some((500, 1000)));
282        assert_eq!(watermark.pruner_hi, 1000);
283        assert_eq!(watermark.next_chunk(2000), None);
284    }
285}