sui_indexer_alt_framework/
store.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

pub use crate::pipeline::sequential::Handler as SequentialHandler;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use scoped_futures::ScopedBoxFuture;
use std::time::Duration;

pub use scoped_futures;

/// Represents a database connection that can be used by the indexer framework to manage watermark
/// operations, agnostic of the underlying store implementation.
#[async_trait]
pub trait Connection: Send + Sync {
    /// Given a pipeline, return the committer watermark from the `Store`. This is used by the
    /// indexer on startup to determine which checkpoint to resume processing from.
    async fn committer_watermark(
        &mut self,
        pipeline: &'static str,
    ) -> anyhow::Result<Option<CommitterWatermark>>;

    /// Given a pipeline, return the reader watermark from the database. This is used by the indexer
    /// to determine the new `reader_lo` or inclusive lower bound of available data.
    async fn reader_watermark(
        &mut self,
        pipeline: &'static str,
    ) -> anyhow::Result<Option<ReaderWatermark>>;

    /// Upsert the high watermark as long as it raises the watermark stored in the database. Returns
    /// a boolean indicating whether the watermark was actually updated or not.
    async fn set_committer_watermark(
        &mut self,
        pipeline: &'static str,
        watermark: CommitterWatermark,
    ) -> anyhow::Result<bool>;

    /// Update the `reader_lo` of an existing watermark entry only if it raises `reader_lo`. Readers
    /// will reference this as the inclusive lower bound of available data for the corresponding
    /// pipeline.
    ///
    /// If an update is to be made, some timestamp (i.e `pruner_timestamp`) should also be set on
    /// the watermark entry to the current time. Ideally, this would be from the perspective of the
    /// store. If this is not possible, then it should come from some other common source of time
    /// between the indexer and its readers. This timestamp is critical to the indexer's operations,
    /// as it determines when the pruner can safely begin pruning data. When `pruner_watermark` is
    /// called by the indexer, it will retrieve this timestamp to determine how much longer to wait
    /// before beginning to prune.
    ///
    /// Returns a boolean indicating whether the watermark was actually updated or not.
    async fn set_reader_watermark(
        &mut self,
        pipeline: &'static str,
        reader_lo: u64,
    ) -> anyhow::Result<bool>;

    /// Get the bounds for the region that the pruner is allowed to prune, and the time in
    /// milliseconds the pruner must wait before it can begin pruning data for the given `pipeline`.
    /// The pruner is allowed to prune the region between the returned `pruner_hi` (inclusive) and
    /// `reader_lo` (exclusive) after waiting until `pruner_timestamp + delay` has passed. This
    /// minimizes the possibility for the pruner to delete data still expected by inflight read
    /// requests.
    async fn pruner_watermark(
        &mut self,
        pipeline: &'static str,
        delay: Duration,
    ) -> anyhow::Result<Option<PrunerWatermark>>;

    /// Update the pruner watermark, returns true if the watermark was actually updated
    async fn set_pruner_watermark(
        &mut self,
        pipeline: &'static str,
        pruner_hi: u64,
    ) -> anyhow::Result<bool>;
}

/// A storage-agnostic interface that provides database connections for both watermark management
/// and arbitrary writes. The indexer framework accepts this `Store` implementation to manage
/// watermarks operations through its associated `Connection` type. This store is also passed to the
/// pipeline handlers to perform arbitrary writes to the store.
#[async_trait]
pub trait Store: Send + Sync + 'static + Clone {
    type Connection<'c>: Connection
    where
        Self: 'c;

    async fn connect<'c>(&'c self) -> Result<Self::Connection<'c>, anyhow::Error>;
}

pub type HandlerBatch<H> = <H as SequentialHandler>::Batch;

/// Extends the Store trait with transactional capabilities, to be used within the framework for
/// atomic or transactional writes.
#[async_trait]
pub trait TransactionalStore: Store {
    async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
    where
        R: Send + 'a,
        F: Send + 'a,
        F: for<'r> FnOnce(
            &'r mut Self::Connection<'_>,
        ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>;
}

/// Represents the highest checkpoint for some pipeline that has been processed by the indexer
/// framework. When read from the `Store`, this represents the inclusive upper bound checkpoint of
/// data that has been written to the Store for a pipeline.
#[derive(Default, Debug, Clone, Copy)]
pub struct CommitterWatermark {
    pub epoch_hi_inclusive: u64,
    pub checkpoint_hi_inclusive: u64,
    pub tx_hi: u64,
    pub timestamp_ms_hi_inclusive: u64,
}

/// Represents the inclusive lower bound of available data in the Store for some pipeline.
#[derive(Default, Debug, Clone, Copy)]
pub struct ReaderWatermark {
    /// Within the framework, this value is used to determine the new `reader_lo`.
    pub checkpoint_hi_inclusive: u64,
    /// Within the framework, this value is used to check whether to actually make an update
    /// transaction to the database.
    pub reader_lo: u64,
}

/// A watermark that represents the bounds for the region that the pruner is allowed to prune, and
/// the time in milliseconds the pruner must wait before it can begin pruning data.
#[derive(Default, Debug, Clone, Copy)]
pub struct PrunerWatermark {
    /// The remaining time in milliseconds that the pruner must wait before it can begin pruning.
    /// This is calculated as the time remaining until `pruner_timestamp + delay` has passed.
    pub wait_for_ms: u64,

    /// The pruner can delete up to this checkpoint (exclusive).
    pub reader_lo: u64,

    /// The pruner has already deleted up to this checkpoint (exclusive), so can continue from this
    /// point.
    pub pruner_hi: u64,
}

impl CommitterWatermark {
    pub(crate) fn timestamp(&self) -> DateTime<Utc> {
        DateTime::from_timestamp_millis(self.timestamp_ms_hi_inclusive as i64).unwrap_or_default()
    }

    #[cfg(test)]
    pub(crate) fn new_for_testing(checkpoint_hi_inclusive: u64) -> Self {
        CommitterWatermark {
            epoch_hi_inclusive: 0,
            checkpoint_hi_inclusive,
            tx_hi: 0,
            timestamp_ms_hi_inclusive: 0,
        }
    }
}

impl PrunerWatermark {
    pub(crate) fn wait_for_ms(&self) -> Option<Duration> {
        (self.wait_for_ms > 0).then(|| Duration::from_millis(self.wait_for_ms))
    }

    /// The next chunk of checkpoints that the pruner should work on, to advance the watermark. If
    /// no more checkpoints to prune, returns `None`. Otherwise, returns a tuple (from,
    /// to_exclusive) where `from` is inclusive and `to_exclusive` is exclusive. Advance the
    /// watermark as well.
    pub(crate) fn next_chunk(&mut self, size: u64) -> Option<(u64, u64)> {
        if self.pruner_hi >= self.reader_lo {
            return None;
        }

        let from = self.pruner_hi;
        let to_exclusive = (from + size).min(self.reader_lo);
        self.pruner_hi = to_exclusive;
        Some((from, to_exclusive))
    }
}