sui_indexer/models/watermarks.rs
1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::str::FromStr;
5
6use diesel::prelude::*;
7
8use crate::{
9 handlers::{CommitterWatermark, pruner::PrunableTable},
10 schema::watermarks::{self},
11};
12
13/// Represents a row in the `watermarks` table.
14#[derive(Queryable, Insertable, Default, QueryableByName, Clone)]
15#[diesel(table_name = watermarks, primary_key(entity))]
16pub struct StoredWatermark {
17 /// The table governed by this watermark, i.e `epochs`, `checkpoints`, `transactions`.
18 pub pipeline: String,
19 /// Inclusive upper epoch bound for this entity's data. Committer updates this field. Pruner uses
20 /// this to determine if pruning is necessary based on the retention policy.
21 pub epoch_hi_inclusive: i64,
22 /// Inclusive upper checkpoint bound for this entity's data. Committer updates this field. All
23 /// data of this entity in the checkpoint must be persisted before advancing this watermark. The
24 /// committer refers to this on disaster recovery to resume writing.
25 pub checkpoint_hi_inclusive: i64,
26 /// Exclusive upper transaction sequence number bound for this entity's data. Committer updates
27 /// this field.
28 pub tx_hi: i64,
29 /// Inclusive lower epoch bound for this entity's data. Pruner updates this field when the epoch range exceeds the retention policy.
30 pub epoch_lo: i64,
31 /// Inclusive low watermark that the pruner advances. Corresponds to the epoch id, checkpoint
32 /// sequence number, or tx sequence number depending on the entity. Data before this watermark is
33 /// considered pruned by a reader. The underlying data may still exist in the db instance.
34 pub reader_lo: i64,
35 /// Updated using the database's current timestamp when the pruner sees that some data needs to
36 /// be dropped. The pruner uses this column to determine whether to prune or wait long enough
37 /// that all in-flight reads complete or timeout before it acts on an updated watermark.
38 pub timestamp_ms: i64,
39 /// Column used by the pruner to track its true progress. Data below this watermark can be
40 /// immediately pruned.
41 pub pruner_hi: i64,
42}
43
44impl StoredWatermark {
45 pub fn from_upper_bound_update(entity: &str, watermark: CommitterWatermark) -> Self {
46 StoredWatermark {
47 pipeline: entity.to_string(),
48 epoch_hi_inclusive: watermark.epoch_hi_inclusive as i64,
49 checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive as i64,
50 tx_hi: watermark.tx_hi as i64,
51 ..StoredWatermark::default()
52 }
53 }
54
55 pub fn from_lower_bound_update(entity: &str, epoch_lo: u64, reader_lo: u64) -> Self {
56 StoredWatermark {
57 pipeline: entity.to_string(),
58 epoch_lo: epoch_lo as i64,
59 reader_lo: reader_lo as i64,
60 ..StoredWatermark::default()
61 }
62 }
63
64 pub fn entity(&self) -> Option<PrunableTable> {
65 PrunableTable::from_str(&self.pipeline).ok()
66 }
67
68 /// Determine whether to set a new epoch lower bound based on the retention policy.
69 pub fn new_epoch_lo(&self, retention: u64) -> Option<u64> {
70 if self.epoch_lo as u64 + retention <= self.epoch_hi_inclusive as u64 {
71 Some((self.epoch_hi_inclusive as u64).saturating_sub(retention - 1))
72 } else {
73 None
74 }
75 }
76}