sui_indexer_alt_framework_store_traits/lib.rs
1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anyhow::Result;
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use scoped_futures::ScopedBoxFuture;
8use std::time::Duration;
9
10/// Represents a database connection that can be used by the indexer framework to manage watermark
11/// operations, agnostic of the underlying store implementation.
12#[async_trait]
13pub trait Connection: Send {
14 /// Given a `pipeline_task` representing either a pipeline name or a pipeline with an associated
15 /// task (formatted as `{pipeline}{Store::DELIMITER}{task}`), return the committer watermark
16 /// from the `Store`. The indexer fetches this value for each pipeline added to determine which
17 /// checkpoint to resume processing from.
18 async fn committer_watermark(
19 &mut self,
20 pipeline_task: &str,
21 ) -> anyhow::Result<Option<CommitterWatermark>>;
22
23 /// Given a pipeline, return the reader watermark from the database. This is used by the indexer
24 /// to determine the new `reader_lo` or inclusive lower bound of available data.
25 async fn reader_watermark(
26 &mut self,
27 pipeline: &'static str,
28 ) -> anyhow::Result<Option<ReaderWatermark>>;
29
30 /// Get the bounds for the region that the pruner is allowed to prune, and the time in
31 /// milliseconds the pruner must wait before it can begin pruning data for the given `pipeline`.
32 /// The pruner is allowed to prune the region between the returned `pruner_hi` (inclusive) and
33 /// `reader_lo` (exclusive) after waiting until `pruner_timestamp + delay` has passed. This
34 /// minimizes the possibility for the pruner to delete data still expected by inflight read
35 /// requests.
36 async fn pruner_watermark(
37 &mut self,
38 pipeline: &'static str,
39 delay: Duration,
40 ) -> anyhow::Result<Option<PrunerWatermark>>;
41
42 /// Upsert the high watermark for the `pipeline_task` - representing either a pipeline name or a
43 /// pipeline with an associated task (formatted as `{pipeline}{Store::DELIMITER}{task}`) - as
44 /// long as it raises the watermark stored in the database. Returns a boolean indicating whether
45 /// the watermark was actually updated or not.
46 async fn set_committer_watermark(
47 &mut self,
48 pipeline_task: &str,
49 watermark: CommitterWatermark,
50 ) -> anyhow::Result<bool>;
51
52 /// Update the `reader_lo` of an existing watermark entry only if it raises `reader_lo`. Readers
53 /// will reference this as the inclusive lower bound of available data for the corresponding
54 /// pipeline.
55 ///
56 /// If an update is to be made, some timestamp (i.e `pruner_timestamp`) should also be set on
57 /// the watermark entry to the current time. Ideally, this would be from the perspective of the
58 /// store. If this is not possible, then it should come from some other common source of time
59 /// between the indexer and its readers. This timestamp is critical to the indexer's operations,
60 /// as it determines when the pruner can safely begin pruning data. When `pruner_watermark` is
61 /// called by the indexer, it will retrieve this timestamp to determine how much longer to wait
62 /// before beginning to prune.
63 ///
64 /// Returns a boolean indicating whether the watermark was actually updated or not.
65 async fn set_reader_watermark(
66 &mut self,
67 pipeline: &'static str,
68 reader_lo: u64,
69 ) -> anyhow::Result<bool>;
70
71 /// Update the pruner watermark, returns true if the watermark was actually updated.
72 async fn set_pruner_watermark(
73 &mut self,
74 pipeline: &'static str,
75 pruner_hi: u64,
76 ) -> anyhow::Result<bool>;
77}
78
79/// A storage-agnostic interface that provides database connections for both watermark management
80/// and arbitrary writes. The indexer framework accepts this `Store` implementation to manage
81/// watermarks operations through its associated `Connection` type. This store is also passed to the
82/// pipeline handlers to perform arbitrary writes to the store.
83#[async_trait]
84pub trait Store: Send + Sync + 'static + Clone {
85 type Connection<'c>: Connection
86 where
87 Self: 'c;
88
89 /// Delimiter used to separate pipeline names from task identifiers when reading or writing the
90 /// committer watermark.
91 const DELIMITER: &'static str = "@";
92
93 async fn connect<'c>(&'c self) -> Result<Self::Connection<'c>, anyhow::Error>;
94}
95
96/// Extends the Store trait with transactional capabilities, to be used within the framework for
97/// atomic or transactional writes.
98#[async_trait]
99pub trait TransactionalStore: Store {
100 async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
101 where
102 R: Send + 'a,
103 F: Send + 'a,
104 F: for<'r> FnOnce(
105 &'r mut Self::Connection<'_>,
106 ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>;
107}
108
109/// Represents the highest checkpoint for some pipeline that has been processed by the indexer
110/// framework. When read from the `Store`, this represents the inclusive upper bound checkpoint of
111/// data that has been written to the Store for a pipeline.
112#[derive(Default, Debug, Clone, Copy)]
113pub struct CommitterWatermark {
114 pub epoch_hi_inclusive: u64,
115 pub checkpoint_hi_inclusive: u64,
116 pub tx_hi: u64,
117 pub timestamp_ms_hi_inclusive: u64,
118}
119
120/// Represents the inclusive lower bound of available data in the Store for some pipeline.
121#[derive(Default, Debug, Clone, Copy)]
122pub struct ReaderWatermark {
123 /// Within the framework, this value is used to determine the new `reader_lo`.
124 pub checkpoint_hi_inclusive: u64,
125 /// Within the framework, this value is used to check whether to actually make an update
126 /// transaction to the database.
127 pub reader_lo: u64,
128}
129
130/// A watermark that represents the bounds for the region that the pruner is allowed to prune, and
131/// the time in milliseconds the pruner must wait before it can begin pruning data.
132#[derive(Default, Debug, Clone, Copy)]
133pub struct PrunerWatermark {
134 /// The remaining time in milliseconds that the pruner must wait before it can begin pruning.
135 ///
136 /// This is calculated by finding the difference between the time when it becomes safe to prune
137 /// and the current time: `(pruner_timestamp + delay) - current_time`.
138 ///
139 /// The pruner will wait for this duration before beginning to delete data if it is positive.
140 /// When this value is zero or negative, it means the waiting period has already passed and
141 /// pruning can begin immediately.
142 pub wait_for_ms: i64,
143
144 /// The pruner can delete up to this checkpoint (exclusive).
145 pub reader_lo: u64,
146
147 /// The pruner has already deleted up to this checkpoint (exclusive), so can continue from this
148 /// point.
149 pub pruner_hi: u64,
150}
151
152impl CommitterWatermark {
153 pub fn timestamp(&self) -> DateTime<Utc> {
154 DateTime::from_timestamp_millis(self.timestamp_ms_hi_inclusive as i64).unwrap_or_default()
155 }
156
157 /// Convenience function for testing, instantiates a CommitterWatermark with the given
158 /// `checkpoint_hi_inclusive` and sets all other values to 0.
159 pub fn new_for_testing(checkpoint_hi_inclusive: u64) -> Self {
160 CommitterWatermark {
161 epoch_hi_inclusive: 0,
162 checkpoint_hi_inclusive,
163 tx_hi: 0,
164 timestamp_ms_hi_inclusive: 0,
165 }
166 }
167}
168
169impl PrunerWatermark {
170 /// Returns the duration that the pruner must wait before it can begin pruning data.
171 pub fn wait_for(&self) -> Option<Duration> {
172 (self.wait_for_ms > 0).then(|| Duration::from_millis(self.wait_for_ms as u64))
173 }
174
175 /// The next chunk of checkpoints that the pruner should work on, to advance the watermark. If
176 /// no more checkpoints to prune, returns `None`. Otherwise, returns a tuple (from,
177 /// to_exclusive) where `from` is inclusive and `to_exclusive` is exclusive. Advance the
178 /// watermark as well.
179 pub fn next_chunk(&mut self, size: u64) -> Option<(u64, u64)> {
180 if self.pruner_hi >= self.reader_lo {
181 return None;
182 }
183
184 let from = self.pruner_hi;
185 let to_exclusive = (from + size).min(self.reader_lo);
186 self.pruner_hi = to_exclusive;
187 Some((from, to_exclusive))
188 }
189}
190
191/// Check that the pipeline name does not contain the store's delimiter, and construct the string
192/// used for tracking a pipeline's watermarks in the store. This is either the pipeline name itself,
193/// or `{pipeline}{Store::DELIMITER}{task}` if a task name is provided.
194pub fn pipeline_task<S: Store>(
195 pipeline_name: &'static str,
196 task_name: Option<&str>,
197) -> Result<String> {
198 if pipeline_name.contains(S::DELIMITER) {
199 anyhow::bail!(
200 "Pipeline name '{}' contains invalid delimiter '{}'",
201 pipeline_name,
202 S::DELIMITER
203 );
204 }
205
206 Ok(match task_name {
207 Some(task_name) => format!("{}{}{}", pipeline_name, S::DELIMITER, task_name),
208 None => pipeline_name.to_string(),
209 })
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215 use std::time::Duration;
216
217 #[test]
218 fn test_pruner_watermark_wait_for_positive() {
219 let watermark = PrunerWatermark {
220 wait_for_ms: 5000, // 5 seconds
221 reader_lo: 1000,
222 pruner_hi: 500,
223 };
224
225 assert_eq!(watermark.wait_for(), Some(Duration::from_millis(5000)));
226 }
227
228 #[test]
229 fn test_pruner_watermark_wait_for_zero() {
230 let watermark = PrunerWatermark {
231 wait_for_ms: 0,
232 reader_lo: 1000,
233 pruner_hi: 500,
234 };
235
236 assert_eq!(watermark.wait_for(), None);
237 }
238
239 #[test]
240 fn test_pruner_watermark_wait_for_negative() {
241 let watermark = PrunerWatermark {
242 wait_for_ms: -5000,
243 reader_lo: 1000,
244 pruner_hi: 500,
245 };
246
247 assert_eq!(watermark.wait_for(), None);
248 }
249
250 #[test]
251 fn test_pruner_watermark_no_more_chunks() {
252 let mut watermark = PrunerWatermark {
253 wait_for_ms: 0,
254 reader_lo: 1000,
255 pruner_hi: 1000,
256 };
257
258 assert_eq!(watermark.next_chunk(100), None);
259 }
260
261 #[test]
262 fn test_pruner_watermark_chunk_boundaries() {
263 let mut watermark = PrunerWatermark {
264 wait_for_ms: 0,
265 reader_lo: 1000,
266 pruner_hi: 100,
267 };
268
269 assert_eq!(watermark.next_chunk(100), Some((100, 200)));
270 assert_eq!(watermark.pruner_hi, 200);
271 assert_eq!(watermark.next_chunk(100), Some((200, 300)));
272
273 // Reset and test oversized chunk
274 let mut watermark = PrunerWatermark {
275 wait_for_ms: 0,
276 reader_lo: 1000,
277 pruner_hi: 500,
278 };
279
280 // Chunk larger than remaining range
281 assert_eq!(watermark.next_chunk(2000), Some((500, 1000)));
282 assert_eq!(watermark.pruner_hi, 1000);
283 assert_eq!(watermark.next_chunk(2000), None);
284 }
285}