sui_indexer/models/
watermarks.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::str::FromStr;
5
6use diesel::prelude::*;
7
8use crate::{
9    handlers::{CommitterWatermark, pruner::PrunableTable},
10    schema::watermarks::{self},
11};
12
13/// Represents a row in the `watermarks` table.
14#[derive(Queryable, Insertable, Default, QueryableByName, Clone)]
15#[diesel(table_name = watermarks, primary_key(entity))]
16pub struct StoredWatermark {
17    /// The table governed by this watermark, i.e `epochs`, `checkpoints`, `transactions`.
18    pub pipeline: String,
19    /// Inclusive upper epoch bound for this entity's data. Committer updates this field. Pruner uses
20    /// this to determine if pruning is necessary based on the retention policy.
21    pub epoch_hi_inclusive: i64,
22    /// Inclusive upper checkpoint bound for this entity's data. Committer updates this field. All
23    /// data of this entity in the checkpoint must be persisted before advancing this watermark. The
24    /// committer refers to this on disaster recovery to resume writing.
25    pub checkpoint_hi_inclusive: i64,
26    /// Exclusive upper transaction sequence number bound for this entity's data. Committer updates
27    /// this field.
28    pub tx_hi: i64,
29    /// Inclusive lower epoch bound for this entity's data. Pruner updates this field when the epoch range exceeds the retention policy.
30    pub epoch_lo: i64,
31    /// Inclusive low watermark that the pruner advances. Corresponds to the epoch id, checkpoint
32    /// sequence number, or tx sequence number depending on the entity. Data before this watermark is
33    /// considered pruned by a reader. The underlying data may still exist in the db instance.
34    pub reader_lo: i64,
35    /// Updated using the database's current timestamp when the pruner sees that some data needs to
36    /// be dropped. The pruner uses this column to determine whether to prune or wait long enough
37    /// that all in-flight reads complete or timeout before it acts on an updated watermark.
38    pub timestamp_ms: i64,
39    /// Column used by the pruner to track its true progress. Data below this watermark can be
40    /// immediately pruned.
41    pub pruner_hi: i64,
42}
43
44impl StoredWatermark {
45    pub fn from_upper_bound_update(entity: &str, watermark: CommitterWatermark) -> Self {
46        StoredWatermark {
47            pipeline: entity.to_string(),
48            epoch_hi_inclusive: watermark.epoch_hi_inclusive as i64,
49            checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive as i64,
50            tx_hi: watermark.tx_hi as i64,
51            ..StoredWatermark::default()
52        }
53    }
54
55    pub fn from_lower_bound_update(entity: &str, epoch_lo: u64, reader_lo: u64) -> Self {
56        StoredWatermark {
57            pipeline: entity.to_string(),
58            epoch_lo: epoch_lo as i64,
59            reader_lo: reader_lo as i64,
60            ..StoredWatermark::default()
61        }
62    }
63
64    pub fn entity(&self) -> Option<PrunableTable> {
65        PrunableTable::from_str(&self.pipeline).ok()
66    }
67
68    /// Determine whether to set a new epoch lower bound based on the retention policy.
69    pub fn new_epoch_lo(&self, retention: u64) -> Option<u64> {
70        if self.epoch_lo as u64 + retention <= self.epoch_hi_inclusive as u64 {
71            Some((self.epoch_hi_inclusive as u64).saturating_sub(retention - 1))
72        } else {
73            None
74        }
75    }
76}