sui_indexer_alt_framework/store.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
pub use crate::pipeline::sequential::Handler as SequentialHandler;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use scoped_futures::ScopedBoxFuture;
use std::time::Duration;
pub use scoped_futures;
/// Represents a database connection that can be used by the indexer framework to manage watermark
/// operations, agnostic of the underlying store implementation.
#[async_trait]
pub trait Connection: Send + Sync {
/// Given a pipeline, return the committer watermark from the `Store`. This is used by the
/// indexer on startup to determine which checkpoint to resume processing from.
async fn committer_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<CommitterWatermark>>;
/// Given a pipeline, return the reader watermark from the database. This is used by the indexer
/// to determine the new `reader_lo` or inclusive lower bound of available data.
async fn reader_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<ReaderWatermark>>;
/// Upsert the high watermark as long as it raises the watermark stored in the database. Returns
/// a boolean indicating whether the watermark was actually updated or not.
async fn set_committer_watermark(
&mut self,
pipeline: &'static str,
watermark: CommitterWatermark,
) -> anyhow::Result<bool>;
/// Update the `reader_lo` of an existing watermark entry only if it raises `reader_lo`. Readers
/// will reference this as the inclusive lower bound of available data for the corresponding
/// pipeline.
///
/// If an update is to be made, some timestamp (i.e `pruner_timestamp`) should also be set on
/// the watermark entry to the current time. Ideally, this would be from the perspective of the
/// store. If this is not possible, then it should come from some other common source of time
/// between the indexer and its readers. This timestamp is critical to the indexer's operations,
/// as it determines when the pruner can safely begin pruning data. When `pruner_watermark` is
/// called by the indexer, it will retrieve this timestamp to determine how much longer to wait
/// before beginning to prune.
///
/// Returns a boolean indicating whether the watermark was actually updated or not.
async fn set_reader_watermark(
&mut self,
pipeline: &'static str,
reader_lo: u64,
) -> anyhow::Result<bool>;
/// Get the bounds for the region that the pruner is allowed to prune, and the time in
/// milliseconds the pruner must wait before it can begin pruning data for the given `pipeline`.
/// The pruner is allowed to prune the region between the returned `pruner_hi` (inclusive) and
/// `reader_lo` (exclusive) after waiting until `pruner_timestamp + delay` has passed. This
/// minimizes the possibility for the pruner to delete data still expected by inflight read
/// requests.
async fn pruner_watermark(
&mut self,
pipeline: &'static str,
delay: Duration,
) -> anyhow::Result<Option<PrunerWatermark>>;
/// Update the pruner watermark, returns true if the watermark was actually updated
async fn set_pruner_watermark(
&mut self,
pipeline: &'static str,
pruner_hi: u64,
) -> anyhow::Result<bool>;
}
/// A storage-agnostic interface that provides database connections for both watermark management
/// and arbitrary writes. The indexer framework accepts this `Store` implementation to manage
/// watermarks operations through its associated `Connection` type. This store is also passed to the
/// pipeline handlers to perform arbitrary writes to the store.
#[async_trait]
pub trait Store: Send + Sync + 'static + Clone {
type Connection<'c>: Connection
where
Self: 'c;
async fn connect<'c>(&'c self) -> Result<Self::Connection<'c>, anyhow::Error>;
}
pub type HandlerBatch<H> = <H as SequentialHandler>::Batch;
/// Extends the Store trait with transactional capabilities, to be used within the framework for
/// atomic or transactional writes.
#[async_trait]
pub trait TransactionalStore: Store {
async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
where
R: Send + 'a,
F: Send + 'a,
F: for<'r> FnOnce(
&'r mut Self::Connection<'_>,
) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>;
}
/// Represents the highest checkpoint for some pipeline that has been processed by the indexer
/// framework. When read from the `Store`, this represents the inclusive upper bound checkpoint of
/// data that has been written to the Store for a pipeline.
#[derive(Default, Debug, Clone, Copy)]
pub struct CommitterWatermark {
pub epoch_hi_inclusive: u64,
pub checkpoint_hi_inclusive: u64,
pub tx_hi: u64,
pub timestamp_ms_hi_inclusive: u64,
}
/// Represents the inclusive lower bound of available data in the Store for some pipeline.
#[derive(Default, Debug, Clone, Copy)]
pub struct ReaderWatermark {
/// Within the framework, this value is used to determine the new `reader_lo`.
pub checkpoint_hi_inclusive: u64,
/// Within the framework, this value is used to check whether to actually make an update
/// transaction to the database.
pub reader_lo: u64,
}
/// A watermark that represents the bounds for the region that the pruner is allowed to prune, and
/// the time in milliseconds the pruner must wait before it can begin pruning data.
#[derive(Default, Debug, Clone, Copy)]
pub struct PrunerWatermark {
/// The remaining time in milliseconds that the pruner must wait before it can begin pruning.
/// This is calculated as the time remaining until `pruner_timestamp + delay` has passed.
pub wait_for_ms: u64,
/// The pruner can delete up to this checkpoint (exclusive).
pub reader_lo: u64,
/// The pruner has already deleted up to this checkpoint (exclusive), so can continue from this
/// point.
pub pruner_hi: u64,
}
impl CommitterWatermark {
pub(crate) fn timestamp(&self) -> DateTime<Utc> {
DateTime::from_timestamp_millis(self.timestamp_ms_hi_inclusive as i64).unwrap_or_default()
}
#[cfg(test)]
pub(crate) fn new_for_testing(checkpoint_hi_inclusive: u64) -> Self {
CommitterWatermark {
epoch_hi_inclusive: 0,
checkpoint_hi_inclusive,
tx_hi: 0,
timestamp_ms_hi_inclusive: 0,
}
}
}
impl PrunerWatermark {
pub(crate) fn wait_for_ms(&self) -> Option<Duration> {
(self.wait_for_ms > 0).then(|| Duration::from_millis(self.wait_for_ms))
}
/// The next chunk of checkpoints that the pruner should work on, to advance the watermark. If
/// no more checkpoints to prune, returns `None`. Otherwise, returns a tuple (from,
/// to_exclusive) where `from` is inclusive and `to_exclusive` is exclusive. Advance the
/// watermark as well.
pub(crate) fn next_chunk(&mut self, size: u64) -> Option<(u64, u64)> {
if self.pruner_hi >= self.reader_lo {
return None;
}
let from = self.pruner_hi;
let to_exclusive = (from + size).min(self.reader_lo);
self.pruner_hi = to_exclusive;
Some((from, to_exclusive))
}
}