sui_indexer_alt_framework_store_traits/lib.rs
1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::time::Duration;
5
6use async_trait::async_trait;
7use chrono::DateTime;
8use chrono::Utc;
9use scoped_futures::ScopedBoxFuture;
10
11/// Represents a database connection that can be used by the indexer framework to manage watermark
12/// operations, agnostic of the underlying store implementation.
13#[async_trait]
14pub trait Connection: Send {
15 /// Returns the `InitWatermark` based on the existing watermark if it exists.
16 /// Otherwise, initializes a new watermark record with `InitWatermark` and returns
17 /// the value passed in.
18 async fn init_watermark(
19 &mut self,
20 pipeline_task: &str,
21 init_watermark: InitWatermark,
22 ) -> anyhow::Result<InitWatermark>;
23
24 /// Given a `pipeline_task` representing either a pipeline name or a pipeline with an associated
25 /// task (formatted as `{pipeline}{Store::DELIMITER}{task}`), return the committer watermark
26 /// from the `Store`. The indexer fetches this value for each pipeline added to determine which
27 /// checkpoint to resume processing from.
28 async fn committer_watermark(
29 &mut self,
30 pipeline_task: &str,
31 ) -> anyhow::Result<Option<CommitterWatermark>>;
32
33 /// Given a pipeline, return the reader watermark from the database. This is used by the indexer
34 /// to determine the new `reader_lo` or inclusive lower bound of available data.
35 async fn reader_watermark(
36 &mut self,
37 pipeline: &'static str,
38 ) -> anyhow::Result<Option<ReaderWatermark>>;
39
40 /// Get the bounds for the region that the pruner is allowed to prune, and the time in
41 /// milliseconds the pruner must wait before it can begin pruning data for the given `pipeline`.
42 /// The pruner is allowed to prune the region between the returned `pruner_hi` (inclusive) and
43 /// `reader_lo` (exclusive) after waiting until `pruner_timestamp + delay` has passed. This
44 /// minimizes the possibility for the pruner to delete data still expected by inflight read
45 /// requests.
46 async fn pruner_watermark(
47 &mut self,
48 pipeline: &'static str,
49 delay: Duration,
50 ) -> anyhow::Result<Option<PrunerWatermark>>;
51
52 /// Upsert the high watermark for the `pipeline_task` - representing either a pipeline name or a
53 /// pipeline with an associated task (formatted as `{pipeline}{Store::DELIMITER}{task}`) - as
54 /// long as it raises the watermark stored in the database. Returns a boolean indicating whether
55 /// the watermark was actually updated or not.
56 async fn set_committer_watermark(
57 &mut self,
58 pipeline_task: &str,
59 watermark: CommitterWatermark,
60 ) -> anyhow::Result<bool>;
61
62 /// Update the `reader_lo` of an existing watermark entry only if it raises `reader_lo`. Readers
63 /// will reference this as the inclusive lower bound of available data for the corresponding
64 /// pipeline.
65 ///
66 /// If an update is to be made, some timestamp (i.e `pruner_timestamp`) should also be set on
67 /// the watermark entry to the current time. Ideally, this would be from the perspective of the
68 /// store. If this is not possible, then it should come from some other common source of time
69 /// between the indexer and its readers. This timestamp is critical to the indexer's operations,
70 /// as it determines when the pruner can safely begin pruning data. When `pruner_watermark` is
71 /// called by the indexer, it will retrieve this timestamp to determine how much longer to wait
72 /// before beginning to prune.
73 ///
74 /// Returns a boolean indicating whether the watermark was actually updated or not.
75 async fn set_reader_watermark(
76 &mut self,
77 pipeline: &'static str,
78 reader_lo: u64,
79 ) -> anyhow::Result<bool>;
80
81 /// Update the pruner watermark, returns true if the watermark was actually updated.
82 async fn set_pruner_watermark(
83 &mut self,
84 pipeline: &'static str,
85 pruner_hi: u64,
86 ) -> anyhow::Result<bool>;
87}
88
89/// A storage-agnostic interface that provides database connections for both watermark management
90/// and arbitrary writes. The indexer framework accepts this `Store` implementation to manage
91/// watermarks operations through its associated `Connection` type. This store is also passed to the
92/// pipeline handlers to perform arbitrary writes to the store.
93#[async_trait]
94pub trait Store: Send + Sync + 'static + Clone {
95 type Connection<'c>: Connection
96 where
97 Self: 'c;
98
99 /// Delimiter used to separate pipeline names from task identifiers when reading or writing the
100 /// committer watermark.
101 const DELIMITER: &'static str = "@";
102
103 async fn connect<'c>(&'c self) -> anyhow::Result<Self::Connection<'c>>;
104}
105
106/// Extends the Store trait with transactional capabilities, to be used within the framework for
107/// atomic or transactional writes.
108#[async_trait]
109pub trait TransactionalStore: Store {
110 async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
111 where
112 R: Send + 'a,
113 F: Send + 'a,
114 F: for<'r> FnOnce(
115 &'r mut Self::Connection<'_>,
116 ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>;
117}
118
119/// Used during watermark initialization to set and return state.
120#[derive(Default, Debug, Clone, Copy, PartialEq)]
121pub struct InitWatermark {
122 /// Calculated by the framework as `default_next_checkpoint.checked_sub(1)`.
123 pub checkpoint_hi_inclusive: Option<u64>,
124 /// Calculated by the framework as `default_next_checkpoint`.
125 pub reader_lo: u64,
126}
127
128/// Represents the highest checkpoint for some pipeline that has been processed by the indexer
129/// framework. When read from the `Store`, this represents the inclusive upper bound checkpoint of
130/// data that has been written to the Store for a pipeline.
131#[derive(Default, Debug, Clone, Copy, PartialEq)]
132pub struct CommitterWatermark {
133 pub epoch_hi_inclusive: u64,
134 pub checkpoint_hi_inclusive: u64,
135 pub tx_hi: u64,
136 pub timestamp_ms_hi_inclusive: u64,
137}
138
139/// Represents the inclusive lower bound of available data in the Store for some pipeline.
140#[derive(Default, Debug, Clone, Copy)]
141pub struct ReaderWatermark {
142 /// Within the framework, this value is used to determine the new `reader_lo`.
143 pub checkpoint_hi_inclusive: u64,
144 /// Within the framework, this value is used to check whether to actually make an update
145 /// transaction to the database.
146 pub reader_lo: u64,
147}
148
149/// A watermark that represents the bounds for the region that the pruner is allowed to prune, and
150/// the time in milliseconds the pruner must wait before it can begin pruning data.
151#[derive(Default, Debug, Clone, Copy, PartialEq)]
152pub struct PrunerWatermark {
153 /// The remaining time in milliseconds that the pruner must wait before it can begin pruning.
154 ///
155 /// This is calculated by finding the difference between the time when it becomes safe to prune
156 /// and the current time: `(pruner_timestamp + delay) - current_time`.
157 ///
158 /// The pruner will wait for this duration before beginning to delete data if it is positive.
159 /// When this value is zero or negative, it means the waiting period has already passed and
160 /// pruning can begin immediately.
161 pub wait_for_ms: i64,
162
163 /// The pruner can delete up to this checkpoint (exclusive).
164 pub reader_lo: u64,
165
166 /// The pruner has already deleted up to this checkpoint (exclusive), so can continue from this
167 /// point.
168 pub pruner_hi: u64,
169}
170
171impl CommitterWatermark {
172 pub fn timestamp(&self) -> DateTime<Utc> {
173 DateTime::from_timestamp_millis(self.timestamp_ms_hi_inclusive as i64).unwrap_or_default()
174 }
175
176 /// Convenience function for testing, instantiates a CommitterWatermark with the given
177 /// `checkpoint_hi_inclusive` and sets all other values to 0.
178 pub fn new_for_testing(checkpoint_hi_inclusive: u64) -> Self {
179 CommitterWatermark {
180 epoch_hi_inclusive: 0,
181 checkpoint_hi_inclusive,
182 tx_hi: 0,
183 timestamp_ms_hi_inclusive: 0,
184 }
185 }
186}
187
188impl PrunerWatermark {
189 /// Returns the duration that the pruner must wait before it can begin pruning data.
190 pub fn wait_for(&self) -> Option<Duration> {
191 (self.wait_for_ms > 0).then(|| Duration::from_millis(self.wait_for_ms as u64))
192 }
193
194 /// The next chunk of checkpoints that the pruner should work on, to advance the watermark. If
195 /// no more checkpoints to prune, returns `None`. Otherwise, returns a tuple (from,
196 /// to_exclusive) where `from` is inclusive and `to_exclusive` is exclusive. Advance the
197 /// watermark as well.
198 pub fn next_chunk(&mut self, size: u64) -> Option<(u64, u64)> {
199 if self.pruner_hi >= self.reader_lo {
200 return None;
201 }
202
203 let from = self.pruner_hi;
204 let to_exclusive = (from + size).min(self.reader_lo);
205 self.pruner_hi = to_exclusive;
206 Some((from, to_exclusive))
207 }
208}
209
210/// A utility function for connections that do not have special initialization logic. These
211/// connections delegate initialization to `Connection::committer_watermark`.
212pub async fn init_with_committer_watermark(
213 connection: &mut impl Connection,
214 pipeline_task: &str,
215 init_watermark: InitWatermark,
216) -> anyhow::Result<InitWatermark> {
217 let checkpoint_hi_inclusive = connection
218 .committer_watermark(pipeline_task)
219 .await?
220 .map(|w| w.checkpoint_hi_inclusive);
221 Ok(InitWatermark {
222 checkpoint_hi_inclusive,
223 ..init_watermark
224 })
225}
226
227/// Check that the pipeline name does not contain the store's delimiter, and construct the string
228/// used for tracking a pipeline's watermarks in the store. This is either the pipeline name itself,
229/// or `{pipeline}{Store::DELIMITER}{task}` if a task name is provided.
230pub fn pipeline_task<S: Store>(
231 pipeline_name: &'static str,
232 task_name: Option<&str>,
233) -> anyhow::Result<String> {
234 if pipeline_name.contains(S::DELIMITER) {
235 anyhow::bail!(
236 "Pipeline name '{}' contains invalid delimiter '{}'",
237 pipeline_name,
238 S::DELIMITER
239 );
240 }
241
242 Ok(match task_name {
243 Some(task_name) => format!("{}{}{}", pipeline_name, S::DELIMITER, task_name),
244 None => pipeline_name.to_string(),
245 })
246}
247
248#[cfg(test)]
249mod tests {
250 use super::*;
251 use std::time::Duration;
252
253 #[test]
254 fn test_pruner_watermark_wait_for_positive() {
255 let watermark = PrunerWatermark {
256 wait_for_ms: 5000, // 5 seconds
257 reader_lo: 1000,
258 pruner_hi: 500,
259 };
260
261 assert_eq!(watermark.wait_for(), Some(Duration::from_millis(5000)));
262 }
263
264 #[test]
265 fn test_pruner_watermark_wait_for_zero() {
266 let watermark = PrunerWatermark {
267 wait_for_ms: 0,
268 reader_lo: 1000,
269 pruner_hi: 500,
270 };
271
272 assert_eq!(watermark.wait_for(), None);
273 }
274
275 #[test]
276 fn test_pruner_watermark_wait_for_negative() {
277 let watermark = PrunerWatermark {
278 wait_for_ms: -5000,
279 reader_lo: 1000,
280 pruner_hi: 500,
281 };
282
283 assert_eq!(watermark.wait_for(), None);
284 }
285
286 #[test]
287 fn test_pruner_watermark_no_more_chunks() {
288 let mut watermark = PrunerWatermark {
289 wait_for_ms: 0,
290 reader_lo: 1000,
291 pruner_hi: 1000,
292 };
293
294 assert_eq!(watermark.next_chunk(100), None);
295 }
296
297 #[test]
298 fn test_pruner_watermark_chunk_boundaries() {
299 let mut watermark = PrunerWatermark {
300 wait_for_ms: 0,
301 reader_lo: 1000,
302 pruner_hi: 100,
303 };
304
305 assert_eq!(watermark.next_chunk(100), Some((100, 200)));
306 assert_eq!(watermark.pruner_hi, 200);
307 assert_eq!(watermark.next_chunk(100), Some((200, 300)));
308
309 // Reset and test oversized chunk
310 let mut watermark = PrunerWatermark {
311 wait_for_ms: 0,
312 reader_lo: 1000,
313 pruner_hi: 500,
314 };
315
316 // Chunk larger than remaining range
317 assert_eq!(watermark.next_chunk(2000), Some((500, 1000)));
318 assert_eq!(watermark.pruner_hi, 1000);
319 assert_eq!(watermark.next_chunk(2000), None);
320 }
321}