sui_indexer_alt_framework_store_traits/lib.rs
1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::time::Duration;
5
6use async_trait::async_trait;
7use chrono::DateTime;
8use chrono::Utc;
9use scoped_futures::ScopedBoxFuture;
10
11/// Represents a database connection that can be used by the indexer framework to manage watermark
12/// operations, agnostic of the underlying store implementation.
13#[async_trait]
14pub trait Connection: Send {
15 /// Initializes a watermark by either returning the existing watermark or, if the impl supports it, attempting to create it.
16 /// Returns `Ok(Some(_))` if a watermark existed or was created by this impl.
17 /// Returns `Ok(None)` if a watermark does not exist and this impl does not attempt to create one.
18 /// Returns `Err(_)` if the store encountered an error while trying to read or create the watermark.
19 async fn init_watermark(
20 &mut self,
21 pipeline_task: &str,
22 checkpoint_hi_inclusive: Option<u64>,
23 ) -> anyhow::Result<Option<InitWatermark>>;
24
25 /// Checks if the store can accept a `chain_id`.
26 /// Returns `Ok(true)` if the store accepts this `chain_id` thereby allowing processing to continue.
27 /// Returns `Ok(false)` if the store does not accept the `chain_id` thereby halting processing with an error.
28 /// Returns `Err(_)` if the store encountered an error while trying to determine if it could accept
29 /// the `chain_id` which will cause `accepts_chain_id` to be retried.
30 async fn accepts_chain_id(
31 &mut self,
32 pipeline_task: &str,
33 chain_id: [u8; 32],
34 ) -> anyhow::Result<bool>;
35
36 /// Given a `pipeline_task` representing either a pipeline name or a pipeline with an associated
37 /// task (formatted as `{pipeline}{Store::DELIMITER}{task}`), return the committer watermark
38 /// from the `Store`. The indexer fetches this value for each pipeline added to determine which
39 /// checkpoint to resume processing from.
40 async fn committer_watermark(
41 &mut self,
42 pipeline_task: &str,
43 ) -> anyhow::Result<Option<CommitterWatermark>>;
44
45 /// Upsert the high watermark for the `pipeline_task` - representing either a pipeline name or a
46 /// pipeline with an associated task (formatted as `{pipeline}{Store::DELIMITER}{task}`) - as
47 /// long as it raises the watermark stored in the database. Returns a boolean indicating whether
48 /// the watermark was actually updated or not.
49 async fn set_committer_watermark(
50 &mut self,
51 pipeline_task: &str,
52 watermark: CommitterWatermark,
53 ) -> anyhow::Result<bool>;
54}
55
56/// Extension of [`Connection`] for concurrent pipeline watermark operations.
57#[async_trait]
58pub trait ConcurrentConnection: Connection {
59 /// Helper to call in [`Connection::init_watermark`] impl that delegates to
60 /// [`ConcurrentConnection::reader_watermark`] if impl does not attempt to write data.
61 async fn delegate_to_reader_watermark(
62 &mut self,
63 pipeline_task: &str,
64 ) -> anyhow::Result<Option<InitWatermark>> {
65 Ok(self
66 .reader_watermark(pipeline_task)
67 .await?
68 .map(|w| InitWatermark {
69 checkpoint_hi_inclusive: Some(w.checkpoint_hi_inclusive),
70 reader_lo: Some(w.reader_lo),
71 }))
72 }
73
74 /// Given a pipeline, return the reader watermark from the database. This is used by the indexer
75 /// to determine the new `reader_lo` or inclusive lower bound of available data.
76 async fn reader_watermark(&mut self, pipeline: &str)
77 -> anyhow::Result<Option<ReaderWatermark>>;
78
79 /// Get the bounds for the region that the pruner is allowed to prune, and the time in
80 /// milliseconds the pruner must wait before it can begin pruning data for the given `pipeline`.
81 /// The pruner is allowed to prune the region between the returned `pruner_hi` (inclusive) and
82 /// `reader_lo` (exclusive) after waiting until `pruner_timestamp + delay` has passed. This
83 /// minimizes the possibility for the pruner to delete data still expected by inflight read
84 /// requests.
85 async fn pruner_watermark(
86 &mut self,
87 pipeline: &'static str,
88 delay: Duration,
89 ) -> anyhow::Result<Option<PrunerWatermark>>;
90
91 /// Update the `reader_lo` of an existing watermark entry only if it raises `reader_lo`. Readers
92 /// will reference this as the inclusive lower bound of available data for the corresponding
93 /// pipeline.
94 ///
95 /// If an update is to be made, some timestamp (i.e `pruner_timestamp`) should also be set on
96 /// the watermark entry to the current time. Ideally, this would be from the perspective of the
97 /// store. If this is not possible, then it should come from some other common source of time
98 /// between the indexer and its readers. This timestamp is critical to the indexer's operations,
99 /// as it determines when the pruner can safely begin pruning data. When `pruner_watermark` is
100 /// called by the indexer, it will retrieve this timestamp to determine how much longer to wait
101 /// before beginning to prune.
102 ///
103 /// Returns a boolean indicating whether the watermark was actually updated or not.
104 async fn set_reader_watermark(
105 &mut self,
106 pipeline: &'static str,
107 reader_lo: u64,
108 ) -> anyhow::Result<bool>;
109
110 /// Update the pruner watermark, returns true if the watermark was actually updated.
111 async fn set_pruner_watermark(
112 &mut self,
113 pipeline: &'static str,
114 pruner_hi: u64,
115 ) -> anyhow::Result<bool>;
116}
117
118/// Extension of [`Connection`] for sequential pipeline operations.
119#[async_trait]
120pub trait SequentialConnection: Connection {
121 /// Helper to call in [`Connection::init_watermark`] impl that delegates to
122 /// [`Connection::committer_watermark`] if impl does not attempt to write data.
123 async fn delegate_to_committer_watermark(
124 &mut self,
125 pipeline_task: &str,
126 _checkpoint_hi_inclusive: Option<u64>,
127 ) -> anyhow::Result<Option<InitWatermark>> {
128 Ok(self
129 .committer_watermark(pipeline_task)
130 .await?
131 .map(|w| InitWatermark {
132 checkpoint_hi_inclusive: Some(w.checkpoint_hi_inclusive),
133 reader_lo: None,
134 }))
135 }
136}
137
138/// A storage-agnostic interface that provides database connections for both watermark management
139/// and arbitrary writes. The indexer framework accepts this `Store` implementation to manage
140/// watermarks operations through its associated `Connection` type. This store is also passed to the
141/// pipeline handlers to perform arbitrary writes to the store.
142#[async_trait]
143pub trait Store: Send + Sync + 'static + Clone {
144 type Connection<'c>: Connection
145 where
146 Self: 'c;
147
148 async fn connect<'c>(&'c self) -> anyhow::Result<Self::Connection<'c>>;
149}
150
151/// Extension of [`Store`] for stores that support concurrent pipeline operations, including
152/// task-based pipeline watermark tracking.
153#[async_trait]
154pub trait ConcurrentStore: for<'c> Store<Connection<'c> = Self::ConcurrentConnection<'c>> {
155 type ConcurrentConnection<'c>: ConcurrentConnection
156 where
157 Self: 'c;
158
159 /// Delimiter used to separate pipeline names from task identifiers when reading or writing the
160 /// committer watermark.
161 const DELIMITER: &'static str = "@";
162}
163
164/// Extension of [`Store`] for stores that support sequential pipeline operations, including
165/// transactional capabilities used within the framework for atomic or transactional writes.
166#[async_trait]
167pub trait SequentialStore: for<'c> Store<Connection<'c> = Self::SequentialConnection<'c>> {
168 type SequentialConnection<'c>: SequentialConnection
169 where
170 Self: 'c;
171
172 async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
173 where
174 R: Send + 'a,
175 F: Send + 'a,
176 F: for<'r> FnOnce(
177 &'r mut Self::Connection<'_>,
178 ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>;
179}
180
181/// Ingested checkpoint range during pipeline initialization.
182#[derive(Default, Debug, Clone, Copy, PartialEq)]
183pub struct InitWatermark {
184 /// Some -> checkpoint_hi_inclusive can be represented as u64
185 /// None -> checkpoint_hi_inclusive cannot be represented as u64. This occurs during pipeline
186 /// initialization if `reader_lo == 0 && checkpoint_hi_inclusive < reader_lo`
187 pub checkpoint_hi_inclusive: Option<u64>,
188 /// Some -> pipeline has a trailing edge checkpoint (concurrent pipeline)
189 /// None -> pipeline does not have a trailing edge checkpoint (sequential pipeline)
190 pub reader_lo: Option<u64>,
191}
192
193/// Represents the highest checkpoint for some pipeline that has been processed by the indexer
194/// framework. When read from the `Store`, this represents the inclusive upper bound checkpoint of
195/// data that has been written to the Store for a pipeline.
196#[derive(Default, Debug, Clone, Copy, PartialEq)]
197pub struct CommitterWatermark {
198 pub epoch_hi_inclusive: u64,
199 pub checkpoint_hi_inclusive: u64,
200 pub tx_hi: u64,
201 pub timestamp_ms_hi_inclusive: u64,
202}
203
204/// Represents the inclusive lower bound of available data in the Store for some pipeline.
205#[derive(Default, Debug, Clone, Copy)]
206pub struct ReaderWatermark {
207 /// Within the framework, this value is used to determine the new `reader_lo`.
208 pub checkpoint_hi_inclusive: u64,
209 /// Within the framework, this value is used to check whether to actually make an update
210 /// transaction to the database.
211 pub reader_lo: u64,
212}
213
214/// A watermark that represents the bounds for the region that the pruner is allowed to prune, and
215/// the time in milliseconds the pruner must wait before it can begin pruning data.
216#[derive(Default, Debug, Clone, Copy, PartialEq)]
217pub struct PrunerWatermark {
218 /// The remaining time in milliseconds that the pruner must wait before it can begin pruning.
219 ///
220 /// This is calculated by finding the difference between the time when it becomes safe to prune
221 /// and the current time: `(pruner_timestamp + delay) - current_time`.
222 ///
223 /// The pruner will wait for this duration before beginning to delete data if it is positive.
224 /// When this value is zero or negative, it means the waiting period has already passed and
225 /// pruning can begin immediately.
226 pub wait_for_ms: i64,
227
228 /// The pruner can delete up to this checkpoint (exclusive).
229 pub reader_lo: u64,
230
231 /// The pruner has already deleted up to this checkpoint (exclusive), so can continue from this
232 /// point.
233 pub pruner_hi: u64,
234}
235
236impl CommitterWatermark {
237 pub fn timestamp(&self) -> DateTime<Utc> {
238 DateTime::from_timestamp_millis(self.timestamp_ms_hi_inclusive as i64).unwrap_or_default()
239 }
240
241 /// Convenience function for testing, instantiates a CommitterWatermark with the given
242 /// `checkpoint_hi_inclusive` and sets all other values to 0.
243 pub fn new_for_testing(checkpoint_hi_inclusive: u64) -> Self {
244 CommitterWatermark {
245 epoch_hi_inclusive: 0,
246 checkpoint_hi_inclusive,
247 tx_hi: 0,
248 timestamp_ms_hi_inclusive: 0,
249 }
250 }
251}
252
253impl PrunerWatermark {
254 /// Returns the duration that the pruner must wait before it can begin pruning data.
255 pub fn wait_for(&self) -> Option<Duration> {
256 (self.wait_for_ms > 0).then(|| Duration::from_millis(self.wait_for_ms as u64))
257 }
258
259 /// The next chunk of checkpoints that the pruner should work on, to advance the watermark. If
260 /// no more checkpoints to prune, returns `None`. Otherwise, returns a tuple (from,
261 /// to_exclusive) where `from` is inclusive and `to_exclusive` is exclusive. Advance the
262 /// watermark as well.
263 pub fn next_chunk(&mut self, size: u64) -> Option<(u64, u64)> {
264 if self.pruner_hi >= self.reader_lo {
265 return None;
266 }
267
268 let from = self.pruner_hi;
269 let to_exclusive = (from + size).min(self.reader_lo);
270 self.pruner_hi = to_exclusive;
271 Some((from, to_exclusive))
272 }
273}
274
275/// Check that the pipeline name does not contain the store's delimiter, and construct the string
276/// used for tracking a pipeline's watermarks in the store. This is either the pipeline name itself,
277/// or `{pipeline}{ConcurrentStore::DELIMITER}{task}` if a task name is provided.
278pub fn pipeline_task<S: ConcurrentStore>(
279 pipeline_name: &'static str,
280 task_name: Option<&str>,
281) -> anyhow::Result<String> {
282 if pipeline_name.contains(S::DELIMITER) {
283 anyhow::bail!(
284 "Pipeline name '{}' contains invalid delimiter '{}'",
285 pipeline_name,
286 S::DELIMITER
287 );
288 }
289
290 Ok(match task_name {
291 Some(task_name) => format!("{}{}{}", pipeline_name, S::DELIMITER, task_name),
292 None => pipeline_name.to_string(),
293 })
294}
295
296#[cfg(test)]
297mod tests {
298 use super::*;
299 use std::time::Duration;
300
301 #[test]
302 fn test_pruner_watermark_wait_for_positive() {
303 let watermark = PrunerWatermark {
304 wait_for_ms: 5000, // 5 seconds
305 reader_lo: 1000,
306 pruner_hi: 500,
307 };
308
309 assert_eq!(watermark.wait_for(), Some(Duration::from_millis(5000)));
310 }
311
312 #[test]
313 fn test_pruner_watermark_wait_for_zero() {
314 let watermark = PrunerWatermark {
315 wait_for_ms: 0,
316 reader_lo: 1000,
317 pruner_hi: 500,
318 };
319
320 assert_eq!(watermark.wait_for(), None);
321 }
322
323 #[test]
324 fn test_pruner_watermark_wait_for_negative() {
325 let watermark = PrunerWatermark {
326 wait_for_ms: -5000,
327 reader_lo: 1000,
328 pruner_hi: 500,
329 };
330
331 assert_eq!(watermark.wait_for(), None);
332 }
333
334 #[test]
335 fn test_pruner_watermark_no_more_chunks() {
336 let mut watermark = PrunerWatermark {
337 wait_for_ms: 0,
338 reader_lo: 1000,
339 pruner_hi: 1000,
340 };
341
342 assert_eq!(watermark.next_chunk(100), None);
343 }
344
345 #[test]
346 fn test_pruner_watermark_chunk_boundaries() {
347 let mut watermark = PrunerWatermark {
348 wait_for_ms: 0,
349 reader_lo: 1000,
350 pruner_hi: 100,
351 };
352
353 assert_eq!(watermark.next_chunk(100), Some((100, 200)));
354 assert_eq!(watermark.pruner_hi, 200);
355 assert_eq!(watermark.next_chunk(100), Some((200, 300)));
356
357 // Reset and test oversized chunk
358 let mut watermark = PrunerWatermark {
359 wait_for_ms: 0,
360 reader_lo: 1000,
361 pruner_hi: 500,
362 };
363
364 // Chunk larger than remaining range
365 assert_eq!(watermark.next_chunk(2000), Some((500, 1000)));
366 assert_eq!(watermark.pruner_hi, 1000);
367 assert_eq!(watermark.next_chunk(2000), None);
368 }
369}