sui_indexer/models/
watermarks.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::str::FromStr;

use diesel::prelude::*;

use crate::{
    handlers::{pruner::PrunableTable, CommitterWatermark},
    schema::watermarks::{self},
};

/// Represents a row in the `watermarks` table.
#[derive(Queryable, Insertable, Default, QueryableByName, Clone)]
#[diesel(table_name = watermarks, primary_key(entity))]
pub struct StoredWatermark {
    /// The table governed by this watermark, i.e `epochs`, `checkpoints`, `transactions`.
    pub pipeline: String,
    /// Inclusive upper epoch bound for this entity's data. Committer updates this field. Pruner uses
    /// this to determine if pruning is necessary based on the retention policy.
    pub epoch_hi_inclusive: i64,
    /// Inclusive upper checkpoint bound for this entity's data. Committer updates this field. All
    /// data of this entity in the checkpoint must be persisted before advancing this watermark. The
    /// committer refers to this on disaster recovery to resume writing.
    pub checkpoint_hi_inclusive: i64,
    /// Exclusive upper transaction sequence number bound for this entity's data. Committer updates
    /// this field.
    pub tx_hi: i64,
    /// Inclusive lower epoch bound for this entity's data. Pruner updates this field when the epoch range exceeds the retention policy.
    pub epoch_lo: i64,
    /// Inclusive low watermark that the pruner advances. Corresponds to the epoch id, checkpoint
    /// sequence number, or tx sequence number depending on the entity. Data before this watermark is
    /// considered pruned by a reader. The underlying data may still exist in the db instance.
    pub reader_lo: i64,
    /// Updated using the database's current timestamp when the pruner sees that some data needs to
    /// be dropped. The pruner uses this column to determine whether to prune or wait long enough
    /// that all in-flight reads complete or timeout before it acts on an updated watermark.
    pub timestamp_ms: i64,
    /// Column used by the pruner to track its true progress. Data below this watermark can be
    /// immediately pruned.
    pub pruner_hi: i64,
}

impl StoredWatermark {
    pub fn from_upper_bound_update(entity: &str, watermark: CommitterWatermark) -> Self {
        StoredWatermark {
            pipeline: entity.to_string(),
            epoch_hi_inclusive: watermark.epoch_hi_inclusive as i64,
            checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive as i64,
            tx_hi: watermark.tx_hi as i64,
            ..StoredWatermark::default()
        }
    }

    pub fn from_lower_bound_update(entity: &str, epoch_lo: u64, reader_lo: u64) -> Self {
        StoredWatermark {
            pipeline: entity.to_string(),
            epoch_lo: epoch_lo as i64,
            reader_lo: reader_lo as i64,
            ..StoredWatermark::default()
        }
    }

    pub fn entity(&self) -> Option<PrunableTable> {
        PrunableTable::from_str(&self.pipeline).ok()
    }

    /// Determine whether to set a new epoch lower bound based on the retention policy.
    pub fn new_epoch_lo(&self, retention: u64) -> Option<u64> {
        if self.epoch_lo as u64 + retention <= self.epoch_hi_inclusive as u64 {
            Some((self.epoch_hi_inclusive as u64).saturating_sub(retention - 1))
        } else {
            None
        }
    }
}