sui_indexer_alt_framework_store_traits/lib.rs
1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use scoped_futures::ScopedBoxFuture;
7use std::time::Duration;
8
9/// Represents a database connection that can be used by the indexer framework to manage watermark
10/// operations, agnostic of the underlying store implementation.
11#[async_trait]
12pub trait Connection: Send {
13 /// Given a pipeline, return the committer watermark from the `Store`. This is used by the
14 /// indexer on startup to determine which checkpoint to resume processing from.
15 async fn committer_watermark(
16 &mut self,
17 pipeline: &'static str,
18 ) -> anyhow::Result<Option<CommitterWatermark>>;
19
20 /// Given a pipeline, return the reader watermark from the database. This is used by the indexer
21 /// to determine the new `reader_lo` or inclusive lower bound of available data.
22 async fn reader_watermark(
23 &mut self,
24 pipeline: &'static str,
25 ) -> anyhow::Result<Option<ReaderWatermark>>;
26
27 /// Get the bounds for the region that the pruner is allowed to prune, and the time in
28 /// milliseconds the pruner must wait before it can begin pruning data for the given `pipeline`.
29 /// The pruner is allowed to prune the region between the returned `pruner_hi` (inclusive) and
30 /// `reader_lo` (exclusive) after waiting until `pruner_timestamp + delay` has passed. This
31 /// minimizes the possibility for the pruner to delete data still expected by inflight read
32 /// requests.
33 async fn pruner_watermark(
34 &mut self,
35 pipeline: &'static str,
36 delay: Duration,
37 ) -> anyhow::Result<Option<PrunerWatermark>>;
38
39 /// Upsert the high watermark as long as it raises the watermark stored in the database. Returns
40 /// a boolean indicating whether the watermark was actually updated or not.
41 async fn set_committer_watermark(
42 &mut self,
43 pipeline: &'static str,
44 watermark: CommitterWatermark,
45 ) -> anyhow::Result<bool>;
46
47 /// Update the `reader_lo` of an existing watermark entry only if it raises `reader_lo`. Readers
48 /// will reference this as the inclusive lower bound of available data for the corresponding
49 /// pipeline.
50 ///
51 /// If an update is to be made, some timestamp (i.e `pruner_timestamp`) should also be set on
52 /// the watermark entry to the current time. Ideally, this would be from the perspective of the
53 /// store. If this is not possible, then it should come from some other common source of time
54 /// between the indexer and its readers. This timestamp is critical to the indexer's operations,
55 /// as it determines when the pruner can safely begin pruning data. When `pruner_watermark` is
56 /// called by the indexer, it will retrieve this timestamp to determine how much longer to wait
57 /// before beginning to prune.
58 ///
59 /// Returns a boolean indicating whether the watermark was actually updated or not.
60 async fn set_reader_watermark(
61 &mut self,
62 pipeline: &'static str,
63 reader_lo: u64,
64 ) -> anyhow::Result<bool>;
65
66 /// Update the pruner watermark, returns true if the watermark was actually updated
67 async fn set_pruner_watermark(
68 &mut self,
69 pipeline: &'static str,
70 pruner_hi: u64,
71 ) -> anyhow::Result<bool>;
72}
73
74/// A storage-agnostic interface that provides database connections for both watermark management
75/// and arbitrary writes. The indexer framework accepts this `Store` implementation to manage
76/// watermarks operations through its associated `Connection` type. This store is also passed to the
77/// pipeline handlers to perform arbitrary writes to the store.
78#[async_trait]
79pub trait Store: Send + Sync + 'static + Clone {
80 type Connection<'c>: Connection
81 where
82 Self: 'c;
83
84 async fn connect<'c>(&'c self) -> Result<Self::Connection<'c>, anyhow::Error>;
85}
86
87/// Extends the Store trait with transactional capabilities, to be used within the framework for
88/// atomic or transactional writes.
89#[async_trait]
90pub trait TransactionalStore: Store {
91 async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
92 where
93 R: Send + 'a,
94 F: Send + 'a,
95 F: for<'r> FnOnce(
96 &'r mut Self::Connection<'_>,
97 ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>;
98}
99
100/// Represents the highest checkpoint for some pipeline that has been processed by the indexer
101/// framework. When read from the `Store`, this represents the inclusive upper bound checkpoint of
102/// data that has been written to the Store for a pipeline.
103#[derive(Default, Debug, Clone, Copy)]
104pub struct CommitterWatermark {
105 pub epoch_hi_inclusive: u64,
106 pub checkpoint_hi_inclusive: u64,
107 pub tx_hi: u64,
108 pub timestamp_ms_hi_inclusive: u64,
109}
110
111/// Represents the inclusive lower bound of available data in the Store for some pipeline.
112#[derive(Default, Debug, Clone, Copy)]
113pub struct ReaderWatermark {
114 /// Within the framework, this value is used to determine the new `reader_lo`.
115 pub checkpoint_hi_inclusive: u64,
116 /// Within the framework, this value is used to check whether to actually make an update
117 /// transaction to the database.
118 pub reader_lo: u64,
119}
120
121/// A watermark that represents the bounds for the region that the pruner is allowed to prune, and
122/// the time in milliseconds the pruner must wait before it can begin pruning data.
123#[derive(Default, Debug, Clone, Copy)]
124pub struct PrunerWatermark {
125 /// The remaining time in milliseconds that the pruner must wait before it can begin pruning.
126 ///
127 /// This is calculated by finding the difference between the time when it becomes safe to prune
128 /// and the current time: `(pruner_timestamp + delay) - current_time`.
129 ///
130 /// The pruner will wait for this duration before beginning to delete data if it is positive.
131 /// When this value is zero or negative, it means the waiting period has already passed and
132 /// pruning can begin immediately.
133 pub wait_for_ms: i64,
134
135 /// The pruner can delete up to this checkpoint (exclusive).
136 pub reader_lo: u64,
137
138 /// The pruner has already deleted up to this checkpoint (exclusive), so can continue from this
139 /// point.
140 pub pruner_hi: u64,
141}
142
143impl CommitterWatermark {
144 pub fn timestamp(&self) -> DateTime<Utc> {
145 DateTime::from_timestamp_millis(self.timestamp_ms_hi_inclusive as i64).unwrap_or_default()
146 }
147
148 /// Convenience function for testing, instantiates a CommitterWatermark with the given
149 /// `checkpoint_hi_inclusive` and sets all other values to 0.
150 pub fn new_for_testing(checkpoint_hi_inclusive: u64) -> Self {
151 CommitterWatermark {
152 epoch_hi_inclusive: 0,
153 checkpoint_hi_inclusive,
154 tx_hi: 0,
155 timestamp_ms_hi_inclusive: 0,
156 }
157 }
158}
159
160impl PrunerWatermark {
161 /// Returns the duration that the pruner must wait before it can begin pruning data.
162 pub fn wait_for(&self) -> Option<Duration> {
163 (self.wait_for_ms > 0).then(|| Duration::from_millis(self.wait_for_ms as u64))
164 }
165
166 /// The next chunk of checkpoints that the pruner should work on, to advance the watermark. If
167 /// no more checkpoints to prune, returns `None`. Otherwise, returns a tuple (from,
168 /// to_exclusive) where `from` is inclusive and `to_exclusive` is exclusive. Advance the
169 /// watermark as well.
170 pub fn next_chunk(&mut self, size: u64) -> Option<(u64, u64)> {
171 if self.pruner_hi >= self.reader_lo {
172 return None;
173 }
174
175 let from = self.pruner_hi;
176 let to_exclusive = (from + size).min(self.reader_lo);
177 self.pruner_hi = to_exclusive;
178 Some((from, to_exclusive))
179 }
180}
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185 use std::time::Duration;
186
187 #[test]
188 fn test_pruner_watermark_wait_for_positive() {
189 let watermark = PrunerWatermark {
190 wait_for_ms: 5000, // 5 seconds
191 reader_lo: 1000,
192 pruner_hi: 500,
193 };
194
195 assert_eq!(watermark.wait_for(), Some(Duration::from_millis(5000)));
196 }
197
198 #[test]
199 fn test_pruner_watermark_wait_for_zero() {
200 let watermark = PrunerWatermark {
201 wait_for_ms: 0,
202 reader_lo: 1000,
203 pruner_hi: 500,
204 };
205
206 assert_eq!(watermark.wait_for(), None);
207 }
208
209 #[test]
210 fn test_pruner_watermark_wait_for_negative() {
211 let watermark = PrunerWatermark {
212 wait_for_ms: -5000,
213 reader_lo: 1000,
214 pruner_hi: 500,
215 };
216
217 assert_eq!(watermark.wait_for(), None);
218 }
219
220 #[test]
221 fn test_pruner_watermark_no_more_chunks() {
222 let mut watermark = PrunerWatermark {
223 wait_for_ms: 0,
224 reader_lo: 1000,
225 pruner_hi: 1000,
226 };
227
228 assert_eq!(watermark.next_chunk(100), None);
229 }
230
231 #[test]
232 fn test_pruner_watermark_chunk_boundaries() {
233 let mut watermark = PrunerWatermark {
234 wait_for_ms: 0,
235 reader_lo: 1000,
236 pruner_hi: 100,
237 };
238
239 assert_eq!(watermark.next_chunk(100), Some((100, 200)));
240 assert_eq!(watermark.pruner_hi, 200);
241 assert_eq!(watermark.next_chunk(100), Some((200, 300)));
242
243 // Reset and test oversized chunk
244 let mut watermark = PrunerWatermark {
245 wait_for_ms: 0,
246 reader_lo: 1000,
247 pruner_hi: 500,
248 };
249
250 // Chunk larger than remaining range
251 assert_eq!(watermark.next_chunk(2000), Some((500, 1000)));
252 assert_eq!(watermark.pruner_hi, 1000);
253 assert_eq!(watermark.next_chunk(2000), None);
254 }
255}