sui_indexer_alt_framework_store_traits/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anyhow::Result;
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use scoped_futures::ScopedBoxFuture;
8use std::time::Duration;
9
10/// Represents a database connection that can be used by the indexer framework to manage watermark
11/// operations, agnostic of the underlying store implementation.
12#[async_trait]
13pub trait Connection: Send {
14    /// If no existing watermark record exists, initializes it with `default_next_checkpoint`.
15    /// Returns the committer watermark `checkpoint_hi_inclusive`.
16    async fn init_watermark(
17        &mut self,
18        pipeline_task: &str,
19        default_next_checkpoint: u64,
20    ) -> anyhow::Result<Option<u64>>;
21
22    /// Given a `pipeline_task` representing either a pipeline name or a pipeline with an associated
23    /// task (formatted as `{pipeline}{Store::DELIMITER}{task}`), return the committer watermark
24    /// from the `Store`. The indexer fetches this value for each pipeline added to determine which
25    /// checkpoint to resume processing from.
26    async fn committer_watermark(
27        &mut self,
28        pipeline_task: &str,
29    ) -> anyhow::Result<Option<CommitterWatermark>>;
30
31    /// Given a pipeline, return the reader watermark from the database. This is used by the indexer
32    /// to determine the new `reader_lo` or inclusive lower bound of available data.
33    async fn reader_watermark(
34        &mut self,
35        pipeline: &'static str,
36    ) -> anyhow::Result<Option<ReaderWatermark>>;
37
38    /// Get the bounds for the region that the pruner is allowed to prune, and the time in
39    /// milliseconds the pruner must wait before it can begin pruning data for the given `pipeline`.
40    /// The pruner is allowed to prune the region between the returned `pruner_hi` (inclusive) and
41    /// `reader_lo` (exclusive) after waiting until `pruner_timestamp + delay` has passed. This
42    /// minimizes the possibility for the pruner to delete data still expected by inflight read
43    /// requests.
44    async fn pruner_watermark(
45        &mut self,
46        pipeline: &'static str,
47        delay: Duration,
48    ) -> anyhow::Result<Option<PrunerWatermark>>;
49
50    /// Upsert the high watermark for the `pipeline_task` - representing either a pipeline name or a
51    /// pipeline with an associated task (formatted as `{pipeline}{Store::DELIMITER}{task}`) - as
52    /// long as it raises the watermark stored in the database. Returns a boolean indicating whether
53    /// the watermark was actually updated or not.
54    async fn set_committer_watermark(
55        &mut self,
56        pipeline_task: &str,
57        watermark: CommitterWatermark,
58    ) -> anyhow::Result<bool>;
59
60    /// Update the `reader_lo` of an existing watermark entry only if it raises `reader_lo`. Readers
61    /// will reference this as the inclusive lower bound of available data for the corresponding
62    /// pipeline.
63    ///
64    /// If an update is to be made, some timestamp (i.e `pruner_timestamp`) should also be set on
65    /// the watermark entry to the current time. Ideally, this would be from the perspective of the
66    /// store. If this is not possible, then it should come from some other common source of time
67    /// between the indexer and its readers. This timestamp is critical to the indexer's operations,
68    /// as it determines when the pruner can safely begin pruning data. When `pruner_watermark` is
69    /// called by the indexer, it will retrieve this timestamp to determine how much longer to wait
70    /// before beginning to prune.
71    ///
72    /// Returns a boolean indicating whether the watermark was actually updated or not.
73    async fn set_reader_watermark(
74        &mut self,
75        pipeline: &'static str,
76        reader_lo: u64,
77    ) -> anyhow::Result<bool>;
78
79    /// Update the pruner watermark, returns true if the watermark was actually updated.
80    async fn set_pruner_watermark(
81        &mut self,
82        pipeline: &'static str,
83        pruner_hi: u64,
84    ) -> anyhow::Result<bool>;
85}
86
87/// A storage-agnostic interface that provides database connections for both watermark management
88/// and arbitrary writes. The indexer framework accepts this `Store` implementation to manage
89/// watermarks operations through its associated `Connection` type. This store is also passed to the
90/// pipeline handlers to perform arbitrary writes to the store.
91#[async_trait]
92pub trait Store: Send + Sync + 'static + Clone {
93    type Connection<'c>: Connection
94    where
95        Self: 'c;
96
97    /// Delimiter used to separate pipeline names from task identifiers when reading or writing the
98    /// committer watermark.
99    const DELIMITER: &'static str = "@";
100
101    async fn connect<'c>(&'c self) -> Result<Self::Connection<'c>, anyhow::Error>;
102}
103
104/// Extends the Store trait with transactional capabilities, to be used within the framework for
105/// atomic or transactional writes.
106#[async_trait]
107pub trait TransactionalStore: Store {
108    async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
109    where
110        R: Send + 'a,
111        F: Send + 'a,
112        F: for<'r> FnOnce(
113            &'r mut Self::Connection<'_>,
114        ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>;
115}
116
117/// Represents the highest checkpoint for some pipeline that has been processed by the indexer
118/// framework. When read from the `Store`, this represents the inclusive upper bound checkpoint of
119/// data that has been written to the Store for a pipeline.
120#[derive(Default, Debug, Clone, Copy, PartialEq)]
121pub struct CommitterWatermark {
122    pub epoch_hi_inclusive: u64,
123    pub checkpoint_hi_inclusive: u64,
124    pub tx_hi: u64,
125    pub timestamp_ms_hi_inclusive: u64,
126}
127
128/// Represents the inclusive lower bound of available data in the Store for some pipeline.
129#[derive(Default, Debug, Clone, Copy)]
130pub struct ReaderWatermark {
131    /// Within the framework, this value is used to determine the new `reader_lo`.
132    pub checkpoint_hi_inclusive: u64,
133    /// Within the framework, this value is used to check whether to actually make an update
134    /// transaction to the database.
135    pub reader_lo: u64,
136}
137
138/// A watermark that represents the bounds for the region that the pruner is allowed to prune, and
139/// the time in milliseconds the pruner must wait before it can begin pruning data.
140#[derive(Default, Debug, Clone, Copy, PartialEq)]
141pub struct PrunerWatermark {
142    /// The remaining time in milliseconds that the pruner must wait before it can begin pruning.
143    ///
144    /// This is calculated by finding the difference between the time when it becomes safe to prune
145    /// and the current time: `(pruner_timestamp + delay) - current_time`.
146    ///
147    /// The pruner will wait for this duration before beginning to delete data if it is positive.
148    /// When this value is zero or negative, it means the waiting period has already passed and
149    /// pruning can begin immediately.
150    pub wait_for_ms: i64,
151
152    /// The pruner can delete up to this checkpoint (exclusive).
153    pub reader_lo: u64,
154
155    /// The pruner has already deleted up to this checkpoint (exclusive), so can continue from this
156    /// point.
157    pub pruner_hi: u64,
158}
159
160impl CommitterWatermark {
161    pub fn timestamp(&self) -> DateTime<Utc> {
162        DateTime::from_timestamp_millis(self.timestamp_ms_hi_inclusive as i64).unwrap_or_default()
163    }
164
165    /// Convenience function for testing, instantiates a CommitterWatermark with the given
166    /// `checkpoint_hi_inclusive` and sets all other values to 0.
167    pub fn new_for_testing(checkpoint_hi_inclusive: u64) -> Self {
168        CommitterWatermark {
169            epoch_hi_inclusive: 0,
170            checkpoint_hi_inclusive,
171            tx_hi: 0,
172            timestamp_ms_hi_inclusive: 0,
173        }
174    }
175}
176
177impl PrunerWatermark {
178    /// Returns the duration that the pruner must wait before it can begin pruning data.
179    pub fn wait_for(&self) -> Option<Duration> {
180        (self.wait_for_ms > 0).then(|| Duration::from_millis(self.wait_for_ms as u64))
181    }
182
183    /// The next chunk of checkpoints that the pruner should work on, to advance the watermark. If
184    /// no more checkpoints to prune, returns `None`. Otherwise, returns a tuple (from,
185    /// to_exclusive) where `from` is inclusive and `to_exclusive` is exclusive. Advance the
186    /// watermark as well.
187    pub fn next_chunk(&mut self, size: u64) -> Option<(u64, u64)> {
188        if self.pruner_hi >= self.reader_lo {
189            return None;
190        }
191
192        let from = self.pruner_hi;
193        let to_exclusive = (from + size).min(self.reader_lo);
194        self.pruner_hi = to_exclusive;
195        Some((from, to_exclusive))
196    }
197}
198
199/// Check that the pipeline name does not contain the store's delimiter, and construct the string
200/// used for tracking a pipeline's watermarks in the store. This is either the pipeline name itself,
201/// or `{pipeline}{Store::DELIMITER}{task}` if a task name is provided.
202pub fn pipeline_task<S: Store>(
203    pipeline_name: &'static str,
204    task_name: Option<&str>,
205) -> Result<String> {
206    if pipeline_name.contains(S::DELIMITER) {
207        anyhow::bail!(
208            "Pipeline name '{}' contains invalid delimiter '{}'",
209            pipeline_name,
210            S::DELIMITER
211        );
212    }
213
214    Ok(match task_name {
215        Some(task_name) => format!("{}{}{}", pipeline_name, S::DELIMITER, task_name),
216        None => pipeline_name.to_string(),
217    })
218}
219
220#[cfg(test)]
221mod tests {
222    use super::*;
223    use std::time::Duration;
224
225    #[test]
226    fn test_pruner_watermark_wait_for_positive() {
227        let watermark = PrunerWatermark {
228            wait_for_ms: 5000, // 5 seconds
229            reader_lo: 1000,
230            pruner_hi: 500,
231        };
232
233        assert_eq!(watermark.wait_for(), Some(Duration::from_millis(5000)));
234    }
235
236    #[test]
237    fn test_pruner_watermark_wait_for_zero() {
238        let watermark = PrunerWatermark {
239            wait_for_ms: 0,
240            reader_lo: 1000,
241            pruner_hi: 500,
242        };
243
244        assert_eq!(watermark.wait_for(), None);
245    }
246
247    #[test]
248    fn test_pruner_watermark_wait_for_negative() {
249        let watermark = PrunerWatermark {
250            wait_for_ms: -5000,
251            reader_lo: 1000,
252            pruner_hi: 500,
253        };
254
255        assert_eq!(watermark.wait_for(), None);
256    }
257
258    #[test]
259    fn test_pruner_watermark_no_more_chunks() {
260        let mut watermark = PrunerWatermark {
261            wait_for_ms: 0,
262            reader_lo: 1000,
263            pruner_hi: 1000,
264        };
265
266        assert_eq!(watermark.next_chunk(100), None);
267    }
268
269    #[test]
270    fn test_pruner_watermark_chunk_boundaries() {
271        let mut watermark = PrunerWatermark {
272            wait_for_ms: 0,
273            reader_lo: 1000,
274            pruner_hi: 100,
275        };
276
277        assert_eq!(watermark.next_chunk(100), Some((100, 200)));
278        assert_eq!(watermark.pruner_hi, 200);
279        assert_eq!(watermark.next_chunk(100), Some((200, 300)));
280
281        // Reset and test oversized chunk
282        let mut watermark = PrunerWatermark {
283            wait_for_ms: 0,
284            reader_lo: 1000,
285            pruner_hi: 500,
286        };
287
288        // Chunk larger than remaining range
289        assert_eq!(watermark.next_chunk(2000), Some((500, 1000)));
290        assert_eq!(watermark.pruner_hi, 1000);
291        assert_eq!(watermark.next_chunk(2000), None);
292    }
293}