sui_indexer_alt_framework_store_traits/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use scoped_futures::ScopedBoxFuture;
7use std::time::Duration;
8
9/// Represents a database connection that can be used by the indexer framework to manage watermark
10/// operations, agnostic of the underlying store implementation.
11#[async_trait]
12pub trait Connection: Send {
13    /// Given a pipeline, return the committer watermark from the `Store`. This is used by the
14    /// indexer on startup to determine which checkpoint to resume processing from.
15    async fn committer_watermark(
16        &mut self,
17        pipeline: &'static str,
18    ) -> anyhow::Result<Option<CommitterWatermark>>;
19
20    /// Given a pipeline, return the reader watermark from the database. This is used by the indexer
21    /// to determine the new `reader_lo` or inclusive lower bound of available data.
22    async fn reader_watermark(
23        &mut self,
24        pipeline: &'static str,
25    ) -> anyhow::Result<Option<ReaderWatermark>>;
26
27    /// Get the bounds for the region that the pruner is allowed to prune, and the time in
28    /// milliseconds the pruner must wait before it can begin pruning data for the given `pipeline`.
29    /// The pruner is allowed to prune the region between the returned `pruner_hi` (inclusive) and
30    /// `reader_lo` (exclusive) after waiting until `pruner_timestamp + delay` has passed. This
31    /// minimizes the possibility for the pruner to delete data still expected by inflight read
32    /// requests.
33    async fn pruner_watermark(
34        &mut self,
35        pipeline: &'static str,
36        delay: Duration,
37    ) -> anyhow::Result<Option<PrunerWatermark>>;
38
39    /// Upsert the high watermark as long as it raises the watermark stored in the database. Returns
40    /// a boolean indicating whether the watermark was actually updated or not.
41    async fn set_committer_watermark(
42        &mut self,
43        pipeline: &'static str,
44        watermark: CommitterWatermark,
45    ) -> anyhow::Result<bool>;
46
47    /// Update the `reader_lo` of an existing watermark entry only if it raises `reader_lo`. Readers
48    /// will reference this as the inclusive lower bound of available data for the corresponding
49    /// pipeline.
50    ///
51    /// If an update is to be made, some timestamp (i.e `pruner_timestamp`) should also be set on
52    /// the watermark entry to the current time. Ideally, this would be from the perspective of the
53    /// store. If this is not possible, then it should come from some other common source of time
54    /// between the indexer and its readers. This timestamp is critical to the indexer's operations,
55    /// as it determines when the pruner can safely begin pruning data. When `pruner_watermark` is
56    /// called by the indexer, it will retrieve this timestamp to determine how much longer to wait
57    /// before beginning to prune.
58    ///
59    /// Returns a boolean indicating whether the watermark was actually updated or not.
60    async fn set_reader_watermark(
61        &mut self,
62        pipeline: &'static str,
63        reader_lo: u64,
64    ) -> anyhow::Result<bool>;
65
66    /// Update the pruner watermark, returns true if the watermark was actually updated
67    async fn set_pruner_watermark(
68        &mut self,
69        pipeline: &'static str,
70        pruner_hi: u64,
71    ) -> anyhow::Result<bool>;
72}
73
74/// A storage-agnostic interface that provides database connections for both watermark management
75/// and arbitrary writes. The indexer framework accepts this `Store` implementation to manage
76/// watermarks operations through its associated `Connection` type. This store is also passed to the
77/// pipeline handlers to perform arbitrary writes to the store.
78#[async_trait]
79pub trait Store: Send + Sync + 'static + Clone {
80    type Connection<'c>: Connection
81    where
82        Self: 'c;
83
84    async fn connect<'c>(&'c self) -> Result<Self::Connection<'c>, anyhow::Error>;
85}
86
87/// Extends the Store trait with transactional capabilities, to be used within the framework for
88/// atomic or transactional writes.
89#[async_trait]
90pub trait TransactionalStore: Store {
91    async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
92    where
93        R: Send + 'a,
94        F: Send + 'a,
95        F: for<'r> FnOnce(
96            &'r mut Self::Connection<'_>,
97        ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>;
98}
99
100/// Represents the highest checkpoint for some pipeline that has been processed by the indexer
101/// framework. When read from the `Store`, this represents the inclusive upper bound checkpoint of
102/// data that has been written to the Store for a pipeline.
103#[derive(Default, Debug, Clone, Copy)]
104pub struct CommitterWatermark {
105    pub epoch_hi_inclusive: u64,
106    pub checkpoint_hi_inclusive: u64,
107    pub tx_hi: u64,
108    pub timestamp_ms_hi_inclusive: u64,
109}
110
111/// Represents the inclusive lower bound of available data in the Store for some pipeline.
112#[derive(Default, Debug, Clone, Copy)]
113pub struct ReaderWatermark {
114    /// Within the framework, this value is used to determine the new `reader_lo`.
115    pub checkpoint_hi_inclusive: u64,
116    /// Within the framework, this value is used to check whether to actually make an update
117    /// transaction to the database.
118    pub reader_lo: u64,
119}
120
121/// A watermark that represents the bounds for the region that the pruner is allowed to prune, and
122/// the time in milliseconds the pruner must wait before it can begin pruning data.
123#[derive(Default, Debug, Clone, Copy)]
124pub struct PrunerWatermark {
125    /// The remaining time in milliseconds that the pruner must wait before it can begin pruning.
126    ///
127    /// This is calculated by finding the difference between the time when it becomes safe to prune
128    /// and the current time: `(pruner_timestamp + delay) - current_time`.
129    ///
130    /// The pruner will wait for this duration before beginning to delete data if it is positive.
131    /// When this value is zero or negative, it means the waiting period has already passed and
132    /// pruning can begin immediately.
133    pub wait_for_ms: i64,
134
135    /// The pruner can delete up to this checkpoint (exclusive).
136    pub reader_lo: u64,
137
138    /// The pruner has already deleted up to this checkpoint (exclusive), so can continue from this
139    /// point.
140    pub pruner_hi: u64,
141}
142
143impl CommitterWatermark {
144    pub fn timestamp(&self) -> DateTime<Utc> {
145        DateTime::from_timestamp_millis(self.timestamp_ms_hi_inclusive as i64).unwrap_or_default()
146    }
147
148    /// Convenience function for testing, instantiates a CommitterWatermark with the given
149    /// `checkpoint_hi_inclusive` and sets all other values to 0.
150    pub fn new_for_testing(checkpoint_hi_inclusive: u64) -> Self {
151        CommitterWatermark {
152            epoch_hi_inclusive: 0,
153            checkpoint_hi_inclusive,
154            tx_hi: 0,
155            timestamp_ms_hi_inclusive: 0,
156        }
157    }
158}
159
160impl PrunerWatermark {
161    /// Returns the duration that the pruner must wait before it can begin pruning data.
162    pub fn wait_for(&self) -> Option<Duration> {
163        (self.wait_for_ms > 0).then(|| Duration::from_millis(self.wait_for_ms as u64))
164    }
165
166    /// The next chunk of checkpoints that the pruner should work on, to advance the watermark. If
167    /// no more checkpoints to prune, returns `None`. Otherwise, returns a tuple (from,
168    /// to_exclusive) where `from` is inclusive and `to_exclusive` is exclusive. Advance the
169    /// watermark as well.
170    pub fn next_chunk(&mut self, size: u64) -> Option<(u64, u64)> {
171        if self.pruner_hi >= self.reader_lo {
172            return None;
173        }
174
175        let from = self.pruner_hi;
176        let to_exclusive = (from + size).min(self.reader_lo);
177        self.pruner_hi = to_exclusive;
178        Some((from, to_exclusive))
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185    use std::time::Duration;
186
187    #[test]
188    fn test_pruner_watermark_wait_for_positive() {
189        let watermark = PrunerWatermark {
190            wait_for_ms: 5000, // 5 seconds
191            reader_lo: 1000,
192            pruner_hi: 500,
193        };
194
195        assert_eq!(watermark.wait_for(), Some(Duration::from_millis(5000)));
196    }
197
198    #[test]
199    fn test_pruner_watermark_wait_for_zero() {
200        let watermark = PrunerWatermark {
201            wait_for_ms: 0,
202            reader_lo: 1000,
203            pruner_hi: 500,
204        };
205
206        assert_eq!(watermark.wait_for(), None);
207    }
208
209    #[test]
210    fn test_pruner_watermark_wait_for_negative() {
211        let watermark = PrunerWatermark {
212            wait_for_ms: -5000,
213            reader_lo: 1000,
214            pruner_hi: 500,
215        };
216
217        assert_eq!(watermark.wait_for(), None);
218    }
219
220    #[test]
221    fn test_pruner_watermark_no_more_chunks() {
222        let mut watermark = PrunerWatermark {
223            wait_for_ms: 0,
224            reader_lo: 1000,
225            pruner_hi: 1000,
226        };
227
228        assert_eq!(watermark.next_chunk(100), None);
229    }
230
231    #[test]
232    fn test_pruner_watermark_chunk_boundaries() {
233        let mut watermark = PrunerWatermark {
234            wait_for_ms: 0,
235            reader_lo: 1000,
236            pruner_hi: 100,
237        };
238
239        assert_eq!(watermark.next_chunk(100), Some((100, 200)));
240        assert_eq!(watermark.pruner_hi, 200);
241        assert_eq!(watermark.next_chunk(100), Some((200, 300)));
242
243        // Reset and test oversized chunk
244        let mut watermark = PrunerWatermark {
245            wait_for_ms: 0,
246            reader_lo: 1000,
247            pruner_hi: 500,
248        };
249
250        // Chunk larger than remaining range
251        assert_eq!(watermark.next_chunk(2000), Some((500, 1000)));
252        assert_eq!(watermark.pruner_hi, 1000);
253        assert_eq!(watermark.next_chunk(2000), None);
254    }
255}