sui_indexer_alt_framework_store_traits/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::time::Duration;
5
6use async_trait::async_trait;
7use chrono::DateTime;
8use chrono::Utc;
9use scoped_futures::ScopedBoxFuture;
10
11/// Represents a database connection that can be used by the indexer framework to manage watermark
12/// operations, agnostic of the underlying store implementation.
13#[async_trait]
14pub trait Connection: Send {
15    /// Initializes a watermark by either returning the existing watermark or, if the impl supports it, attempting to create it.
16    /// Returns `Ok(Some(_))` if a watermark existed or was created by this impl.
17    /// Returns `Ok(None)` if a watermark does not exist and this impl does not attempt to create one.
18    /// Returns `Err(_)` if the store encountered an error while trying to read or create the watermark.
19    async fn init_watermark(
20        &mut self,
21        pipeline_task: &str,
22        checkpoint_hi_inclusive: Option<u64>,
23    ) -> anyhow::Result<Option<InitWatermark>>;
24
25    /// Checks if the store can accept a `chain_id`.
26    /// Returns `Ok(true)` if the store accepts this `chain_id` thereby allowing processing to continue.
27    /// Returns `Ok(false)` if the store does not accept the `chain_id` thereby halting processing with an error.
28    /// Returns `Err(_)` if the store encountered an error while trying to determine if it could accept
29    /// the `chain_id` which will cause `accepts_chain_id` to be retried.
30    async fn accepts_chain_id(
31        &mut self,
32        pipeline_task: &str,
33        chain_id: [u8; 32],
34    ) -> anyhow::Result<bool>;
35
36    /// Given a `pipeline_task` representing either a pipeline name or a pipeline with an associated
37    /// task (formatted as `{pipeline}{Store::DELIMITER}{task}`), return the committer watermark
38    /// from the `Store`. The indexer fetches this value for each pipeline added to determine which
39    /// checkpoint to resume processing from.
40    async fn committer_watermark(
41        &mut self,
42        pipeline_task: &str,
43    ) -> anyhow::Result<Option<CommitterWatermark>>;
44
45    /// Upsert the high watermark for the `pipeline_task` - representing either a pipeline name or a
46    /// pipeline with an associated task (formatted as `{pipeline}{Store::DELIMITER}{task}`) - as
47    /// long as it raises the watermark stored in the database. Returns a boolean indicating whether
48    /// the watermark was actually updated or not.
49    async fn set_committer_watermark(
50        &mut self,
51        pipeline_task: &str,
52        watermark: CommitterWatermark,
53    ) -> anyhow::Result<bool>;
54}
55
56/// Extension of [`Connection`] for concurrent pipeline watermark operations.
57#[async_trait]
58pub trait ConcurrentConnection: Connection {
59    /// Helper to call in [`Connection::init_watermark`] impl that delegates to
60    /// [`ConcurrentConnection::reader_watermark`] if impl does not attempt to write data.
61    async fn delegate_to_reader_watermark(
62        &mut self,
63        pipeline_task: &str,
64    ) -> anyhow::Result<Option<InitWatermark>> {
65        Ok(self
66            .reader_watermark(pipeline_task)
67            .await?
68            .map(|w| InitWatermark {
69                checkpoint_hi_inclusive: Some(w.checkpoint_hi_inclusive),
70                reader_lo: Some(w.reader_lo),
71            }))
72    }
73
74    /// Given a pipeline, return the reader watermark from the database. This is used by the indexer
75    /// to determine the new `reader_lo` or inclusive lower bound of available data.
76    async fn reader_watermark(&mut self, pipeline: &str)
77    -> anyhow::Result<Option<ReaderWatermark>>;
78
79    /// Get the bounds for the region that the pruner is allowed to prune, and the time in
80    /// milliseconds the pruner must wait before it can begin pruning data for the given `pipeline`.
81    /// The pruner is allowed to prune the region between the returned `pruner_hi` (inclusive) and
82    /// `reader_lo` (exclusive) after waiting until `pruner_timestamp + delay` has passed. This
83    /// minimizes the possibility for the pruner to delete data still expected by inflight read
84    /// requests.
85    async fn pruner_watermark(
86        &mut self,
87        pipeline: &'static str,
88        delay: Duration,
89    ) -> anyhow::Result<Option<PrunerWatermark>>;
90
91    /// Update the `reader_lo` of an existing watermark entry only if it raises `reader_lo`. Readers
92    /// will reference this as the inclusive lower bound of available data for the corresponding
93    /// pipeline.
94    ///
95    /// If an update is to be made, some timestamp (i.e `pruner_timestamp`) should also be set on
96    /// the watermark entry to the current time. Ideally, this would be from the perspective of the
97    /// store. If this is not possible, then it should come from some other common source of time
98    /// between the indexer and its readers. This timestamp is critical to the indexer's operations,
99    /// as it determines when the pruner can safely begin pruning data. When `pruner_watermark` is
100    /// called by the indexer, it will retrieve this timestamp to determine how much longer to wait
101    /// before beginning to prune.
102    ///
103    /// Returns a boolean indicating whether the watermark was actually updated or not.
104    async fn set_reader_watermark(
105        &mut self,
106        pipeline: &'static str,
107        reader_lo: u64,
108    ) -> anyhow::Result<bool>;
109
110    /// Update the pruner watermark, returns true if the watermark was actually updated.
111    async fn set_pruner_watermark(
112        &mut self,
113        pipeline: &'static str,
114        pruner_hi: u64,
115    ) -> anyhow::Result<bool>;
116}
117
118/// Extension of [`Connection`] for sequential pipeline operations.
119#[async_trait]
120pub trait SequentialConnection: Connection {
121    /// Helper to call in [`Connection::init_watermark`] impl that delegates to
122    /// [`Connection::committer_watermark`] if impl does not attempt to write data.
123    async fn delegate_to_committer_watermark(
124        &mut self,
125        pipeline_task: &str,
126        _checkpoint_hi_inclusive: Option<u64>,
127    ) -> anyhow::Result<Option<InitWatermark>> {
128        Ok(self
129            .committer_watermark(pipeline_task)
130            .await?
131            .map(|w| InitWatermark {
132                checkpoint_hi_inclusive: Some(w.checkpoint_hi_inclusive),
133                reader_lo: None,
134            }))
135    }
136}
137
138/// A storage-agnostic interface that provides database connections for both watermark management
139/// and arbitrary writes. The indexer framework accepts this `Store` implementation to manage
140/// watermarks operations through its associated `Connection` type. This store is also passed to the
141/// pipeline handlers to perform arbitrary writes to the store.
142#[async_trait]
143pub trait Store: Send + Sync + 'static + Clone {
144    type Connection<'c>: Connection
145    where
146        Self: 'c;
147
148    async fn connect<'c>(&'c self) -> anyhow::Result<Self::Connection<'c>>;
149}
150
151/// Extension of [`Store`] for stores that support concurrent pipeline operations, including
152/// task-based pipeline watermark tracking.
153#[async_trait]
154pub trait ConcurrentStore: for<'c> Store<Connection<'c> = Self::ConcurrentConnection<'c>> {
155    type ConcurrentConnection<'c>: ConcurrentConnection
156    where
157        Self: 'c;
158
159    /// Delimiter used to separate pipeline names from task identifiers when reading or writing the
160    /// committer watermark.
161    const DELIMITER: &'static str = "@";
162}
163
164/// Extension of [`Store`] for stores that support sequential pipeline operations, including
165/// transactional capabilities used within the framework for atomic or transactional writes.
166#[async_trait]
167pub trait SequentialStore: for<'c> Store<Connection<'c> = Self::SequentialConnection<'c>> {
168    type SequentialConnection<'c>: SequentialConnection
169    where
170        Self: 'c;
171
172    async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
173    where
174        R: Send + 'a,
175        F: Send + 'a,
176        F: for<'r> FnOnce(
177            &'r mut Self::Connection<'_>,
178        ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>;
179}
180
181/// Ingested checkpoint range during pipeline initialization.
182#[derive(Default, Debug, Clone, Copy, PartialEq)]
183pub struct InitWatermark {
184    /// Some -> checkpoint_hi_inclusive can be represented as u64
185    /// None -> checkpoint_hi_inclusive cannot be represented as u64. This occurs during pipeline
186    /// initialization if `reader_lo == 0 && checkpoint_hi_inclusive < reader_lo`
187    pub checkpoint_hi_inclusive: Option<u64>,
188    /// Some -> pipeline has a trailing edge checkpoint (concurrent pipeline)
189    /// None -> pipeline does not have a trailing edge checkpoint (sequential pipeline)
190    pub reader_lo: Option<u64>,
191}
192
193/// Represents the highest checkpoint for some pipeline that has been processed by the indexer
194/// framework. When read from the `Store`, this represents the inclusive upper bound checkpoint of
195/// data that has been written to the Store for a pipeline.
196#[derive(Default, Debug, Clone, Copy, PartialEq)]
197pub struct CommitterWatermark {
198    pub epoch_hi_inclusive: u64,
199    pub checkpoint_hi_inclusive: u64,
200    pub tx_hi: u64,
201    pub timestamp_ms_hi_inclusive: u64,
202}
203
204/// Represents the inclusive lower bound of available data in the Store for some pipeline.
205#[derive(Default, Debug, Clone, Copy)]
206pub struct ReaderWatermark {
207    /// Within the framework, this value is used to determine the new `reader_lo`.
208    pub checkpoint_hi_inclusive: u64,
209    /// Within the framework, this value is used to check whether to actually make an update
210    /// transaction to the database.
211    pub reader_lo: u64,
212}
213
214/// A watermark that represents the bounds for the region that the pruner is allowed to prune, and
215/// the time in milliseconds the pruner must wait before it can begin pruning data.
216#[derive(Default, Debug, Clone, Copy, PartialEq)]
217pub struct PrunerWatermark {
218    /// The remaining time in milliseconds that the pruner must wait before it can begin pruning.
219    ///
220    /// This is calculated by finding the difference between the time when it becomes safe to prune
221    /// and the current time: `(pruner_timestamp + delay) - current_time`.
222    ///
223    /// The pruner will wait for this duration before beginning to delete data if it is positive.
224    /// When this value is zero or negative, it means the waiting period has already passed and
225    /// pruning can begin immediately.
226    pub wait_for_ms: i64,
227
228    /// The pruner can delete up to this checkpoint (exclusive).
229    pub reader_lo: u64,
230
231    /// The pruner has already deleted up to this checkpoint (exclusive), so can continue from this
232    /// point.
233    pub pruner_hi: u64,
234}
235
236impl CommitterWatermark {
237    pub fn timestamp(&self) -> DateTime<Utc> {
238        DateTime::from_timestamp_millis(self.timestamp_ms_hi_inclusive as i64).unwrap_or_default()
239    }
240
241    /// Convenience function for testing, instantiates a CommitterWatermark with the given
242    /// `checkpoint_hi_inclusive` and sets all other values to 0.
243    pub fn new_for_testing(checkpoint_hi_inclusive: u64) -> Self {
244        CommitterWatermark {
245            epoch_hi_inclusive: 0,
246            checkpoint_hi_inclusive,
247            tx_hi: 0,
248            timestamp_ms_hi_inclusive: 0,
249        }
250    }
251}
252
253impl PrunerWatermark {
254    /// Returns the duration that the pruner must wait before it can begin pruning data.
255    pub fn wait_for(&self) -> Option<Duration> {
256        (self.wait_for_ms > 0).then(|| Duration::from_millis(self.wait_for_ms as u64))
257    }
258
259    /// The next chunk of checkpoints that the pruner should work on, to advance the watermark. If
260    /// no more checkpoints to prune, returns `None`. Otherwise, returns a tuple (from,
261    /// to_exclusive) where `from` is inclusive and `to_exclusive` is exclusive. Advance the
262    /// watermark as well.
263    pub fn next_chunk(&mut self, size: u64) -> Option<(u64, u64)> {
264        if self.pruner_hi >= self.reader_lo {
265            return None;
266        }
267
268        let from = self.pruner_hi;
269        let to_exclusive = (from + size).min(self.reader_lo);
270        self.pruner_hi = to_exclusive;
271        Some((from, to_exclusive))
272    }
273}
274
275/// Check that the pipeline name does not contain the store's delimiter, and construct the string
276/// used for tracking a pipeline's watermarks in the store. This is either the pipeline name itself,
277/// or `{pipeline}{ConcurrentStore::DELIMITER}{task}` if a task name is provided.
278pub fn pipeline_task<S: ConcurrentStore>(
279    pipeline_name: &'static str,
280    task_name: Option<&str>,
281) -> anyhow::Result<String> {
282    if pipeline_name.contains(S::DELIMITER) {
283        anyhow::bail!(
284            "Pipeline name '{}' contains invalid delimiter '{}'",
285            pipeline_name,
286            S::DELIMITER
287        );
288    }
289
290    Ok(match task_name {
291        Some(task_name) => format!("{}{}{}", pipeline_name, S::DELIMITER, task_name),
292        None => pipeline_name.to_string(),
293    })
294}
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299    use std::time::Duration;
300
301    #[test]
302    fn test_pruner_watermark_wait_for_positive() {
303        let watermark = PrunerWatermark {
304            wait_for_ms: 5000, // 5 seconds
305            reader_lo: 1000,
306            pruner_hi: 500,
307        };
308
309        assert_eq!(watermark.wait_for(), Some(Duration::from_millis(5000)));
310    }
311
312    #[test]
313    fn test_pruner_watermark_wait_for_zero() {
314        let watermark = PrunerWatermark {
315            wait_for_ms: 0,
316            reader_lo: 1000,
317            pruner_hi: 500,
318        };
319
320        assert_eq!(watermark.wait_for(), None);
321    }
322
323    #[test]
324    fn test_pruner_watermark_wait_for_negative() {
325        let watermark = PrunerWatermark {
326            wait_for_ms: -5000,
327            reader_lo: 1000,
328            pruner_hi: 500,
329        };
330
331        assert_eq!(watermark.wait_for(), None);
332    }
333
334    #[test]
335    fn test_pruner_watermark_no_more_chunks() {
336        let mut watermark = PrunerWatermark {
337            wait_for_ms: 0,
338            reader_lo: 1000,
339            pruner_hi: 1000,
340        };
341
342        assert_eq!(watermark.next_chunk(100), None);
343    }
344
345    #[test]
346    fn test_pruner_watermark_chunk_boundaries() {
347        let mut watermark = PrunerWatermark {
348            wait_for_ms: 0,
349            reader_lo: 1000,
350            pruner_hi: 100,
351        };
352
353        assert_eq!(watermark.next_chunk(100), Some((100, 200)));
354        assert_eq!(watermark.pruner_hi, 200);
355        assert_eq!(watermark.next_chunk(100), Some((200, 300)));
356
357        // Reset and test oversized chunk
358        let mut watermark = PrunerWatermark {
359            wait_for_ms: 0,
360            reader_lo: 1000,
361            pruner_hi: 500,
362        };
363
364        // Chunk larger than remaining range
365        assert_eq!(watermark.next_chunk(2000), Some((500, 1000)));
366        assert_eq!(watermark.pruner_hi, 1000);
367        assert_eq!(watermark.next_chunk(2000), None);
368    }
369}