sui_rpc_store/schema/
pruning_watermark.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! `()` → `PruningWatermarks`.
5//!
6//! Singleton row that holds the lowest still-available `tx_seq`
7//! and `checkpoint_seq`. Drives the bitmap CFs' compaction
8//! filters and feeds `available_range` requests.
9//!
10//! The bitmap CFs need to know the current `tx_seq` floor at
11//! compaction time, which runs in a RocksDB background thread
12//! without access to the schema. The pattern used here:
13//!
14//! 1. A process-wide `Arc<AtomicU64>` ([`tx_seq_floor`]) holds the
15//!    current floor.
16//! 2. Bitmap CF [`options`](super::transaction_bitmap::options)
17//!    install compaction filters that clone the `Arc` and read the
18//!    atomic on every key they consider.
19//! 3. Indexer pipelines that advance pruning call
20//!    [`super::RpcStoreSchema::set_pruning_floor`] after their batch
21//!    commits, so the on-disk row and the atomic agree.
22//! 4. On startup callers run
23//!    [`super::RpcStoreSchema::refresh_pruning_atomics`] once to load the
24//!    persisted floor into the atomic.
25
26use std::sync::Arc;
27use std::sync::OnceLock;
28use std::sync::atomic::AtomicU64;
29use std::sync::atomic::Ordering;
30
31use sui_consistent_store::Protobuf;
32use sui_consistent_store::error::Error;
33use sui_consistent_store::reader::Reader;
34
35use crate::proto::PruningWatermarks;
36use crate::schema::primitives::UnitKey;
37
38pub const NAME: &str = "pruning_watermark";
39
40pub type Key = UnitKey;
41pub type Value = Protobuf<PruningWatermarks>;
42
43pub fn options(resolver: &sui_consistent_store::CfOptionsResolver) -> rocksdb::Options {
44    resolver.options(NAME)
45}
46
47/// Caller-facing view of the pruning watermarks.
48///
49/// `tx_seq_lo` is the lowest `tx_seq` whose downstream rows
50/// (`tx_metadata_by_seq`, `transactions`, `effects`, `events`,
51/// and the bitmap CFs) are still present. Everything strictly
52/// below it has been pruned.
53///
54/// `checkpoint_lo` is the analogous floor for the
55/// `checkpoint_summary` / `checkpoint_contents` CFs.
56#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
57pub struct Watermarks {
58    pub tx_seq_lo: u64,
59    pub checkpoint_lo: u64,
60}
61
62/// Build the singleton `(Key, Value)` pair recording the current
63/// pruning floor.
64pub fn store(watermarks: &Watermarks) -> (Key, Value) {
65    (
66        UnitKey,
67        Protobuf(PruningWatermarks {
68            tx_seq_lo: watermarks.tx_seq_lo,
69            checkpoint_lo: watermarks.checkpoint_lo,
70        }),
71    )
72}
73
74/// Process-wide `tx_seq` pruning floor used by the bitmap CFs'
75/// compaction filters. Lazily allocated on first access.
76///
77/// The atomic carries the *exclusive* floor: every `tx_seq <
78/// floor` is considered pruned. Bitmap buckets that fit entirely
79/// below the floor become removable on the next compaction sweep.
80pub fn tx_seq_floor() -> &'static Arc<AtomicU64> {
81    static FLOOR: OnceLock<Arc<AtomicU64>> = OnceLock::new();
82    FLOOR.get_or_init(|| Arc::new(AtomicU64::new(0)))
83}
84
85impl<R: Reader> super::RpcStoreSchema<R> {
86    /// Read the persisted pruning watermarks from disk.
87    pub fn get_pruning_watermarks(&self) -> Result<Option<Watermarks>, Error> {
88        let Some(stored) = self.pruning_watermark.get(&UnitKey)? else {
89            return Ok(None);
90        };
91        let stored = stored.into_inner();
92        Ok(Some(Watermarks {
93            tx_seq_lo: stored.tx_seq_lo,
94            checkpoint_lo: stored.checkpoint_lo,
95        }))
96    }
97
98    /// Update the `tx_seq` floor used by the bitmap CFs'
99    /// compaction filters.
100    ///
101    /// Callers that advance pruning should:
102    ///
103    /// 1. Stage a write to the `pruning_watermark` CF via
104    ///    [`store`].
105    /// 2. Commit the batch so the new watermarks are durable.
106    /// 3. Call this method with the new `tx_seq_lo` so the
107    ///    in-memory floor matches what's on disk.
108    pub fn set_pruning_floor(&self, tx_seq_lo: u64) {
109        tx_seq_floor().store(tx_seq_lo, Ordering::Relaxed);
110    }
111
112    /// Load the persisted pruning watermarks from disk into the
113    /// in-memory bitmap floor.
114    ///
115    /// Call once on startup so the bitmap compaction filters see
116    /// the persisted floor instead of starting at zero (where
117    /// they'd prune nothing).
118    pub fn refresh_pruning_atomics(&self) -> Result<(), Error> {
119        if let Some(watermarks) = self.get_pruning_watermarks()? {
120            self.set_pruning_floor(watermarks.tx_seq_lo);
121        }
122        Ok(())
123    }
124}
125
126#[cfg(test)]
127mod tests {
128    use sui_consistent_store::Db;
129    use sui_consistent_store::DbOptions;
130
131    use super::*;
132    use crate::RpcStoreSchema;
133
134    fn fresh_db() -> (tempfile::TempDir, sui_consistent_store::Db, RpcStoreSchema) {
135        let dir = tempfile::tempdir().unwrap();
136        let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
137        (dir, db, schema)
138    }
139
140    #[test]
141    fn get_returns_none_when_empty() {
142        let (_dir, _db, schema) = fresh_db();
143        assert!(schema.get_pruning_watermarks().unwrap().is_none());
144    }
145
146    #[test]
147    fn store_then_get_round_trips() {
148        let (_dir, db, schema) = fresh_db();
149        let watermarks = Watermarks {
150            tx_seq_lo: 1_000,
151            checkpoint_lo: 50,
152        };
153
154        let (k, v) = store(&watermarks);
155        let mut batch = db.batch();
156        batch.put(&schema.pruning_watermark, &k, &v).unwrap();
157        batch.commit().unwrap();
158
159        let read = schema
160            .get_pruning_watermarks()
161            .unwrap()
162            .expect("watermarks present");
163        assert_eq!(read, watermarks);
164    }
165
166    #[test]
167    fn set_pruning_floor_updates_atomic() {
168        // Take a baseline so this test isn't affected by ordering
169        // against other tests sharing the process-wide atomic.
170        let baseline = tx_seq_floor().load(Ordering::Relaxed);
171        let (_dir, _db, schema) = fresh_db();
172        let target = baseline.wrapping_add(12_345);
173        schema.set_pruning_floor(target);
174        assert_eq!(tx_seq_floor().load(Ordering::Relaxed), target);
175        // Restore the floor so we don't leak state across tests.
176        tx_seq_floor().store(baseline, Ordering::Relaxed);
177    }
178
179    #[test]
180    fn refresh_pulls_disk_watermarks_into_atomic() {
181        let baseline = tx_seq_floor().load(Ordering::Relaxed);
182        let (_dir, db, schema) = fresh_db();
183        let target = baseline.wrapping_add(67_890);
184
185        let (k, v) = store(&Watermarks {
186            tx_seq_lo: target,
187            checkpoint_lo: 0,
188        });
189        let mut batch = db.batch();
190        batch.put(&schema.pruning_watermark, &k, &v).unwrap();
191        batch.commit().unwrap();
192
193        schema.refresh_pruning_atomics().unwrap();
194        assert_eq!(tx_seq_floor().load(Ordering::Relaxed), target);
195        tx_seq_floor().store(baseline, Ordering::Relaxed);
196    }
197
198    #[test]
199    fn refresh_is_a_no_op_when_disk_is_empty() {
200        let baseline = tx_seq_floor().load(Ordering::Relaxed);
201        let (_dir, _db, schema) = fresh_db();
202        // No write — refresh should not touch the atomic.
203        schema.refresh_pruning_atomics().unwrap();
204        assert_eq!(tx_seq_floor().load(Ordering::Relaxed), baseline);
205    }
206}