sui_indexer_alt_framework_store_traits/lib.rs
1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::time::Duration;
5
6use anyhow::Result;
7use async_trait::async_trait;
8use chrono::DateTime;
9use chrono::Utc;
10use scoped_futures::ScopedBoxFuture;
11
12/// Represents a database connection that can be used by the indexer framework to manage watermark
13/// operations, agnostic of the underlying store implementation.
14#[async_trait]
15pub trait Connection: Send {
16 /// If no existing watermark record exists, initializes it with `default_next_checkpoint`.
17 /// Returns the committer watermark `checkpoint_hi_inclusive`.
18 async fn init_watermark(
19 &mut self,
20 pipeline_task: &str,
21 default_next_checkpoint: u64,
22 ) -> anyhow::Result<Option<u64>>;
23
24 /// Given a `pipeline_task` representing either a pipeline name or a pipeline with an associated
25 /// task (formatted as `{pipeline}{Store::DELIMITER}{task}`), return the committer watermark
26 /// from the `Store`. The indexer fetches this value for each pipeline added to determine which
27 /// checkpoint to resume processing from.
28 async fn committer_watermark(
29 &mut self,
30 pipeline_task: &str,
31 ) -> anyhow::Result<Option<CommitterWatermark>>;
32
33 /// Given a pipeline, return the reader watermark from the database. This is used by the indexer
34 /// to determine the new `reader_lo` or inclusive lower bound of available data.
35 async fn reader_watermark(
36 &mut self,
37 pipeline: &'static str,
38 ) -> anyhow::Result<Option<ReaderWatermark>>;
39
40 /// Get the bounds for the region that the pruner is allowed to prune, and the time in
41 /// milliseconds the pruner must wait before it can begin pruning data for the given `pipeline`.
42 /// The pruner is allowed to prune the region between the returned `pruner_hi` (inclusive) and
43 /// `reader_lo` (exclusive) after waiting until `pruner_timestamp + delay` has passed. This
44 /// minimizes the possibility for the pruner to delete data still expected by inflight read
45 /// requests.
46 async fn pruner_watermark(
47 &mut self,
48 pipeline: &'static str,
49 delay: Duration,
50 ) -> anyhow::Result<Option<PrunerWatermark>>;
51
52 /// Upsert the high watermark for the `pipeline_task` - representing either a pipeline name or a
53 /// pipeline with an associated task (formatted as `{pipeline}{Store::DELIMITER}{task}`) - as
54 /// long as it raises the watermark stored in the database. Returns a boolean indicating whether
55 /// the watermark was actually updated or not.
56 async fn set_committer_watermark(
57 &mut self,
58 pipeline_task: &str,
59 watermark: CommitterWatermark,
60 ) -> anyhow::Result<bool>;
61
62 /// Update the `reader_lo` of an existing watermark entry only if it raises `reader_lo`. Readers
63 /// will reference this as the inclusive lower bound of available data for the corresponding
64 /// pipeline.
65 ///
66 /// If an update is to be made, some timestamp (i.e `pruner_timestamp`) should also be set on
67 /// the watermark entry to the current time. Ideally, this would be from the perspective of the
68 /// store. If this is not possible, then it should come from some other common source of time
69 /// between the indexer and its readers. This timestamp is critical to the indexer's operations,
70 /// as it determines when the pruner can safely begin pruning data. When `pruner_watermark` is
71 /// called by the indexer, it will retrieve this timestamp to determine how much longer to wait
72 /// before beginning to prune.
73 ///
74 /// Returns a boolean indicating whether the watermark was actually updated or not.
75 async fn set_reader_watermark(
76 &mut self,
77 pipeline: &'static str,
78 reader_lo: u64,
79 ) -> anyhow::Result<bool>;
80
81 /// Update the pruner watermark, returns true if the watermark was actually updated.
82 async fn set_pruner_watermark(
83 &mut self,
84 pipeline: &'static str,
85 pruner_hi: u64,
86 ) -> anyhow::Result<bool>;
87}
88
89/// A storage-agnostic interface that provides database connections for both watermark management
90/// and arbitrary writes. The indexer framework accepts this `Store` implementation to manage
91/// watermarks operations through its associated `Connection` type. This store is also passed to the
92/// pipeline handlers to perform arbitrary writes to the store.
93#[async_trait]
94pub trait Store: Send + Sync + 'static + Clone {
95 type Connection<'c>: Connection
96 where
97 Self: 'c;
98
99 /// Delimiter used to separate pipeline names from task identifiers when reading or writing the
100 /// committer watermark.
101 const DELIMITER: &'static str = "@";
102
103 async fn connect<'c>(&'c self) -> Result<Self::Connection<'c>, anyhow::Error>;
104}
105
106/// Extends the Store trait with transactional capabilities, to be used within the framework for
107/// atomic or transactional writes.
108#[async_trait]
109pub trait TransactionalStore: Store {
110 async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
111 where
112 R: Send + 'a,
113 F: Send + 'a,
114 F: for<'r> FnOnce(
115 &'r mut Self::Connection<'_>,
116 ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>;
117}
118
119/// Represents the highest checkpoint for some pipeline that has been processed by the indexer
120/// framework. When read from the `Store`, this represents the inclusive upper bound checkpoint of
121/// data that has been written to the Store for a pipeline.
122#[derive(Default, Debug, Clone, Copy, PartialEq)]
123pub struct CommitterWatermark {
124 pub epoch_hi_inclusive: u64,
125 pub checkpoint_hi_inclusive: u64,
126 pub tx_hi: u64,
127 pub timestamp_ms_hi_inclusive: u64,
128}
129
130/// Represents the inclusive lower bound of available data in the Store for some pipeline.
131#[derive(Default, Debug, Clone, Copy)]
132pub struct ReaderWatermark {
133 /// Within the framework, this value is used to determine the new `reader_lo`.
134 pub checkpoint_hi_inclusive: u64,
135 /// Within the framework, this value is used to check whether to actually make an update
136 /// transaction to the database.
137 pub reader_lo: u64,
138}
139
140/// A watermark that represents the bounds for the region that the pruner is allowed to prune, and
141/// the time in milliseconds the pruner must wait before it can begin pruning data.
142#[derive(Default, Debug, Clone, Copy, PartialEq)]
143pub struct PrunerWatermark {
144 /// The remaining time in milliseconds that the pruner must wait before it can begin pruning.
145 ///
146 /// This is calculated by finding the difference between the time when it becomes safe to prune
147 /// and the current time: `(pruner_timestamp + delay) - current_time`.
148 ///
149 /// The pruner will wait for this duration before beginning to delete data if it is positive.
150 /// When this value is zero or negative, it means the waiting period has already passed and
151 /// pruning can begin immediately.
152 pub wait_for_ms: i64,
153
154 /// The pruner can delete up to this checkpoint (exclusive).
155 pub reader_lo: u64,
156
157 /// The pruner has already deleted up to this checkpoint (exclusive), so can continue from this
158 /// point.
159 pub pruner_hi: u64,
160}
161
162impl CommitterWatermark {
163 pub fn timestamp(&self) -> DateTime<Utc> {
164 DateTime::from_timestamp_millis(self.timestamp_ms_hi_inclusive as i64).unwrap_or_default()
165 }
166
167 /// Convenience function for testing, instantiates a CommitterWatermark with the given
168 /// `checkpoint_hi_inclusive` and sets all other values to 0.
169 pub fn new_for_testing(checkpoint_hi_inclusive: u64) -> Self {
170 CommitterWatermark {
171 epoch_hi_inclusive: 0,
172 checkpoint_hi_inclusive,
173 tx_hi: 0,
174 timestamp_ms_hi_inclusive: 0,
175 }
176 }
177}
178
179impl PrunerWatermark {
180 /// Returns the duration that the pruner must wait before it can begin pruning data.
181 pub fn wait_for(&self) -> Option<Duration> {
182 (self.wait_for_ms > 0).then(|| Duration::from_millis(self.wait_for_ms as u64))
183 }
184
185 /// The next chunk of checkpoints that the pruner should work on, to advance the watermark. If
186 /// no more checkpoints to prune, returns `None`. Otherwise, returns a tuple (from,
187 /// to_exclusive) where `from` is inclusive and `to_exclusive` is exclusive. Advance the
188 /// watermark as well.
189 pub fn next_chunk(&mut self, size: u64) -> Option<(u64, u64)> {
190 if self.pruner_hi >= self.reader_lo {
191 return None;
192 }
193
194 let from = self.pruner_hi;
195 let to_exclusive = (from + size).min(self.reader_lo);
196 self.pruner_hi = to_exclusive;
197 Some((from, to_exclusive))
198 }
199}
200
201/// Check that the pipeline name does not contain the store's delimiter, and construct the string
202/// used for tracking a pipeline's watermarks in the store. This is either the pipeline name itself,
203/// or `{pipeline}{Store::DELIMITER}{task}` if a task name is provided.
204pub fn pipeline_task<S: Store>(
205 pipeline_name: &'static str,
206 task_name: Option<&str>,
207) -> Result<String> {
208 if pipeline_name.contains(S::DELIMITER) {
209 anyhow::bail!(
210 "Pipeline name '{}' contains invalid delimiter '{}'",
211 pipeline_name,
212 S::DELIMITER
213 );
214 }
215
216 Ok(match task_name {
217 Some(task_name) => format!("{}{}{}", pipeline_name, S::DELIMITER, task_name),
218 None => pipeline_name.to_string(),
219 })
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225 use std::time::Duration;
226
227 #[test]
228 fn test_pruner_watermark_wait_for_positive() {
229 let watermark = PrunerWatermark {
230 wait_for_ms: 5000, // 5 seconds
231 reader_lo: 1000,
232 pruner_hi: 500,
233 };
234
235 assert_eq!(watermark.wait_for(), Some(Duration::from_millis(5000)));
236 }
237
238 #[test]
239 fn test_pruner_watermark_wait_for_zero() {
240 let watermark = PrunerWatermark {
241 wait_for_ms: 0,
242 reader_lo: 1000,
243 pruner_hi: 500,
244 };
245
246 assert_eq!(watermark.wait_for(), None);
247 }
248
249 #[test]
250 fn test_pruner_watermark_wait_for_negative() {
251 let watermark = PrunerWatermark {
252 wait_for_ms: -5000,
253 reader_lo: 1000,
254 pruner_hi: 500,
255 };
256
257 assert_eq!(watermark.wait_for(), None);
258 }
259
260 #[test]
261 fn test_pruner_watermark_no_more_chunks() {
262 let mut watermark = PrunerWatermark {
263 wait_for_ms: 0,
264 reader_lo: 1000,
265 pruner_hi: 1000,
266 };
267
268 assert_eq!(watermark.next_chunk(100), None);
269 }
270
271 #[test]
272 fn test_pruner_watermark_chunk_boundaries() {
273 let mut watermark = PrunerWatermark {
274 wait_for_ms: 0,
275 reader_lo: 1000,
276 pruner_hi: 100,
277 };
278
279 assert_eq!(watermark.next_chunk(100), Some((100, 200)));
280 assert_eq!(watermark.pruner_hi, 200);
281 assert_eq!(watermark.next_chunk(100), Some((200, 300)));
282
283 // Reset and test oversized chunk
284 let mut watermark = PrunerWatermark {
285 wait_for_ms: 0,
286 reader_lo: 1000,
287 pruner_hi: 500,
288 };
289
290 // Chunk larger than remaining range
291 assert_eq!(watermark.next_chunk(2000), Some((500, 1000)));
292 assert_eq!(watermark.pruner_hi, 1000);
293 assert_eq!(watermark.next_chunk(2000), None);
294 }
295}