sui_indexer_alt_framework_store_traits/lib.rs
1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anyhow::Result;
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use scoped_futures::ScopedBoxFuture;
8use std::time::Duration;
9
10/// Represents a database connection that can be used by the indexer framework to manage watermark
11/// operations, agnostic of the underlying store implementation.
12#[async_trait]
13pub trait Connection: Send {
14 /// If no existing watermark record exists, initializes it with `default_next_checkpoint`.
15 /// Returns the committer watermark `checkpoint_hi_inclusive`.
16 async fn init_watermark(
17 &mut self,
18 pipeline_task: &str,
19 default_next_checkpoint: u64,
20 ) -> anyhow::Result<Option<u64>>;
21
22 /// Given a `pipeline_task` representing either a pipeline name or a pipeline with an associated
23 /// task (formatted as `{pipeline}{Store::DELIMITER}{task}`), return the committer watermark
24 /// from the `Store`. The indexer fetches this value for each pipeline added to determine which
25 /// checkpoint to resume processing from.
26 async fn committer_watermark(
27 &mut self,
28 pipeline_task: &str,
29 ) -> anyhow::Result<Option<CommitterWatermark>>;
30
31 /// Given a pipeline, return the reader watermark from the database. This is used by the indexer
32 /// to determine the new `reader_lo` or inclusive lower bound of available data.
33 async fn reader_watermark(
34 &mut self,
35 pipeline: &'static str,
36 ) -> anyhow::Result<Option<ReaderWatermark>>;
37
38 /// Get the bounds for the region that the pruner is allowed to prune, and the time in
39 /// milliseconds the pruner must wait before it can begin pruning data for the given `pipeline`.
40 /// The pruner is allowed to prune the region between the returned `pruner_hi` (inclusive) and
41 /// `reader_lo` (exclusive) after waiting until `pruner_timestamp + delay` has passed. This
42 /// minimizes the possibility for the pruner to delete data still expected by inflight read
43 /// requests.
44 async fn pruner_watermark(
45 &mut self,
46 pipeline: &'static str,
47 delay: Duration,
48 ) -> anyhow::Result<Option<PrunerWatermark>>;
49
50 /// Upsert the high watermark for the `pipeline_task` - representing either a pipeline name or a
51 /// pipeline with an associated task (formatted as `{pipeline}{Store::DELIMITER}{task}`) - as
52 /// long as it raises the watermark stored in the database. Returns a boolean indicating whether
53 /// the watermark was actually updated or not.
54 async fn set_committer_watermark(
55 &mut self,
56 pipeline_task: &str,
57 watermark: CommitterWatermark,
58 ) -> anyhow::Result<bool>;
59
60 /// Update the `reader_lo` of an existing watermark entry only if it raises `reader_lo`. Readers
61 /// will reference this as the inclusive lower bound of available data for the corresponding
62 /// pipeline.
63 ///
64 /// If an update is to be made, some timestamp (i.e `pruner_timestamp`) should also be set on
65 /// the watermark entry to the current time. Ideally, this would be from the perspective of the
66 /// store. If this is not possible, then it should come from some other common source of time
67 /// between the indexer and its readers. This timestamp is critical to the indexer's operations,
68 /// as it determines when the pruner can safely begin pruning data. When `pruner_watermark` is
69 /// called by the indexer, it will retrieve this timestamp to determine how much longer to wait
70 /// before beginning to prune.
71 ///
72 /// Returns a boolean indicating whether the watermark was actually updated or not.
73 async fn set_reader_watermark(
74 &mut self,
75 pipeline: &'static str,
76 reader_lo: u64,
77 ) -> anyhow::Result<bool>;
78
79 /// Update the pruner watermark, returns true if the watermark was actually updated.
80 async fn set_pruner_watermark(
81 &mut self,
82 pipeline: &'static str,
83 pruner_hi: u64,
84 ) -> anyhow::Result<bool>;
85}
86
87/// A storage-agnostic interface that provides database connections for both watermark management
88/// and arbitrary writes. The indexer framework accepts this `Store` implementation to manage
89/// watermarks operations through its associated `Connection` type. This store is also passed to the
90/// pipeline handlers to perform arbitrary writes to the store.
91#[async_trait]
92pub trait Store: Send + Sync + 'static + Clone {
93 type Connection<'c>: Connection
94 where
95 Self: 'c;
96
97 /// Delimiter used to separate pipeline names from task identifiers when reading or writing the
98 /// committer watermark.
99 const DELIMITER: &'static str = "@";
100
101 async fn connect<'c>(&'c self) -> Result<Self::Connection<'c>, anyhow::Error>;
102}
103
104/// Extends the Store trait with transactional capabilities, to be used within the framework for
105/// atomic or transactional writes.
106#[async_trait]
107pub trait TransactionalStore: Store {
108 async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
109 where
110 R: Send + 'a,
111 F: Send + 'a,
112 F: for<'r> FnOnce(
113 &'r mut Self::Connection<'_>,
114 ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>;
115}
116
117/// Represents the highest checkpoint for some pipeline that has been processed by the indexer
118/// framework. When read from the `Store`, this represents the inclusive upper bound checkpoint of
119/// data that has been written to the Store for a pipeline.
120#[derive(Default, Debug, Clone, Copy, PartialEq)]
121pub struct CommitterWatermark {
122 pub epoch_hi_inclusive: u64,
123 pub checkpoint_hi_inclusive: u64,
124 pub tx_hi: u64,
125 pub timestamp_ms_hi_inclusive: u64,
126}
127
128/// Represents the inclusive lower bound of available data in the Store for some pipeline.
129#[derive(Default, Debug, Clone, Copy)]
130pub struct ReaderWatermark {
131 /// Within the framework, this value is used to determine the new `reader_lo`.
132 pub checkpoint_hi_inclusive: u64,
133 /// Within the framework, this value is used to check whether to actually make an update
134 /// transaction to the database.
135 pub reader_lo: u64,
136}
137
138/// A watermark that represents the bounds for the region that the pruner is allowed to prune, and
139/// the time in milliseconds the pruner must wait before it can begin pruning data.
140#[derive(Default, Debug, Clone, Copy, PartialEq)]
141pub struct PrunerWatermark {
142 /// The remaining time in milliseconds that the pruner must wait before it can begin pruning.
143 ///
144 /// This is calculated by finding the difference between the time when it becomes safe to prune
145 /// and the current time: `(pruner_timestamp + delay) - current_time`.
146 ///
147 /// The pruner will wait for this duration before beginning to delete data if it is positive.
148 /// When this value is zero or negative, it means the waiting period has already passed and
149 /// pruning can begin immediately.
150 pub wait_for_ms: i64,
151
152 /// The pruner can delete up to this checkpoint (exclusive).
153 pub reader_lo: u64,
154
155 /// The pruner has already deleted up to this checkpoint (exclusive), so can continue from this
156 /// point.
157 pub pruner_hi: u64,
158}
159
160impl CommitterWatermark {
161 pub fn timestamp(&self) -> DateTime<Utc> {
162 DateTime::from_timestamp_millis(self.timestamp_ms_hi_inclusive as i64).unwrap_or_default()
163 }
164
165 /// Convenience function for testing, instantiates a CommitterWatermark with the given
166 /// `checkpoint_hi_inclusive` and sets all other values to 0.
167 pub fn new_for_testing(checkpoint_hi_inclusive: u64) -> Self {
168 CommitterWatermark {
169 epoch_hi_inclusive: 0,
170 checkpoint_hi_inclusive,
171 tx_hi: 0,
172 timestamp_ms_hi_inclusive: 0,
173 }
174 }
175}
176
177impl PrunerWatermark {
178 /// Returns the duration that the pruner must wait before it can begin pruning data.
179 pub fn wait_for(&self) -> Option<Duration> {
180 (self.wait_for_ms > 0).then(|| Duration::from_millis(self.wait_for_ms as u64))
181 }
182
183 /// The next chunk of checkpoints that the pruner should work on, to advance the watermark. If
184 /// no more checkpoints to prune, returns `None`. Otherwise, returns a tuple (from,
185 /// to_exclusive) where `from` is inclusive and `to_exclusive` is exclusive. Advance the
186 /// watermark as well.
187 pub fn next_chunk(&mut self, size: u64) -> Option<(u64, u64)> {
188 if self.pruner_hi >= self.reader_lo {
189 return None;
190 }
191
192 let from = self.pruner_hi;
193 let to_exclusive = (from + size).min(self.reader_lo);
194 self.pruner_hi = to_exclusive;
195 Some((from, to_exclusive))
196 }
197}
198
199/// Check that the pipeline name does not contain the store's delimiter, and construct the string
200/// used for tracking a pipeline's watermarks in the store. This is either the pipeline name itself,
201/// or `{pipeline}{Store::DELIMITER}{task}` if a task name is provided.
202pub fn pipeline_task<S: Store>(
203 pipeline_name: &'static str,
204 task_name: Option<&str>,
205) -> Result<String> {
206 if pipeline_name.contains(S::DELIMITER) {
207 anyhow::bail!(
208 "Pipeline name '{}' contains invalid delimiter '{}'",
209 pipeline_name,
210 S::DELIMITER
211 );
212 }
213
214 Ok(match task_name {
215 Some(task_name) => format!("{}{}{}", pipeline_name, S::DELIMITER, task_name),
216 None => pipeline_name.to_string(),
217 })
218}
219
220#[cfg(test)]
221mod tests {
222 use super::*;
223 use std::time::Duration;
224
225 #[test]
226 fn test_pruner_watermark_wait_for_positive() {
227 let watermark = PrunerWatermark {
228 wait_for_ms: 5000, // 5 seconds
229 reader_lo: 1000,
230 pruner_hi: 500,
231 };
232
233 assert_eq!(watermark.wait_for(), Some(Duration::from_millis(5000)));
234 }
235
236 #[test]
237 fn test_pruner_watermark_wait_for_zero() {
238 let watermark = PrunerWatermark {
239 wait_for_ms: 0,
240 reader_lo: 1000,
241 pruner_hi: 500,
242 };
243
244 assert_eq!(watermark.wait_for(), None);
245 }
246
247 #[test]
248 fn test_pruner_watermark_wait_for_negative() {
249 let watermark = PrunerWatermark {
250 wait_for_ms: -5000,
251 reader_lo: 1000,
252 pruner_hi: 500,
253 };
254
255 assert_eq!(watermark.wait_for(), None);
256 }
257
258 #[test]
259 fn test_pruner_watermark_no_more_chunks() {
260 let mut watermark = PrunerWatermark {
261 wait_for_ms: 0,
262 reader_lo: 1000,
263 pruner_hi: 1000,
264 };
265
266 assert_eq!(watermark.next_chunk(100), None);
267 }
268
269 #[test]
270 fn test_pruner_watermark_chunk_boundaries() {
271 let mut watermark = PrunerWatermark {
272 wait_for_ms: 0,
273 reader_lo: 1000,
274 pruner_hi: 100,
275 };
276
277 assert_eq!(watermark.next_chunk(100), Some((100, 200)));
278 assert_eq!(watermark.pruner_hi, 200);
279 assert_eq!(watermark.next_chunk(100), Some((200, 300)));
280
281 // Reset and test oversized chunk
282 let mut watermark = PrunerWatermark {
283 wait_for_ms: 0,
284 reader_lo: 1000,
285 pruner_hi: 500,
286 };
287
288 // Chunk larger than remaining range
289 assert_eq!(watermark.next_chunk(2000), Some((500, 1000)));
290 assert_eq!(watermark.pruner_hi, 1000);
291 assert_eq!(watermark.next_chunk(2000), None);
292 }
293}