sui_rpc_store/schema/
pruning_watermark.rs1use std::sync::Arc;
27use std::sync::OnceLock;
28use std::sync::atomic::AtomicU64;
29use std::sync::atomic::Ordering;
30
31use sui_consistent_store::Protobuf;
32use sui_consistent_store::error::Error;
33use sui_consistent_store::reader::Reader;
34
35use crate::proto::PruningWatermarks;
36use crate::schema::primitives::UnitKey;
37
38pub const NAME: &str = "pruning_watermark";
39
40pub type Key = UnitKey;
41pub type Value = Protobuf<PruningWatermarks>;
42
43pub fn options(resolver: &sui_consistent_store::CfOptionsResolver) -> rocksdb::Options {
44 resolver.options(NAME)
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
57pub struct Watermarks {
58 pub tx_seq_lo: u64,
59 pub checkpoint_lo: u64,
60}
61
62pub fn store(watermarks: &Watermarks) -> (Key, Value) {
65 (
66 UnitKey,
67 Protobuf(PruningWatermarks {
68 tx_seq_lo: watermarks.tx_seq_lo,
69 checkpoint_lo: watermarks.checkpoint_lo,
70 }),
71 )
72}
73
74pub fn tx_seq_floor() -> &'static Arc<AtomicU64> {
81 static FLOOR: OnceLock<Arc<AtomicU64>> = OnceLock::new();
82 FLOOR.get_or_init(|| Arc::new(AtomicU64::new(0)))
83}
84
85impl<R: Reader> super::RpcStoreSchema<R> {
86 pub fn get_pruning_watermarks(&self) -> Result<Option<Watermarks>, Error> {
88 let Some(stored) = self.pruning_watermark.get(&UnitKey)? else {
89 return Ok(None);
90 };
91 let stored = stored.into_inner();
92 Ok(Some(Watermarks {
93 tx_seq_lo: stored.tx_seq_lo,
94 checkpoint_lo: stored.checkpoint_lo,
95 }))
96 }
97
98 pub fn set_pruning_floor(&self, tx_seq_lo: u64) {
109 tx_seq_floor().store(tx_seq_lo, Ordering::Relaxed);
110 }
111
112 pub fn refresh_pruning_atomics(&self) -> Result<(), Error> {
119 if let Some(watermarks) = self.get_pruning_watermarks()? {
120 self.set_pruning_floor(watermarks.tx_seq_lo);
121 }
122 Ok(())
123 }
124}
125
126#[cfg(test)]
127mod tests {
128 use sui_consistent_store::Db;
129 use sui_consistent_store::DbOptions;
130
131 use super::*;
132 use crate::RpcStoreSchema;
133
134 fn fresh_db() -> (tempfile::TempDir, sui_consistent_store::Db, RpcStoreSchema) {
135 let dir = tempfile::tempdir().unwrap();
136 let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
137 (dir, db, schema)
138 }
139
140 #[test]
141 fn get_returns_none_when_empty() {
142 let (_dir, _db, schema) = fresh_db();
143 assert!(schema.get_pruning_watermarks().unwrap().is_none());
144 }
145
146 #[test]
147 fn store_then_get_round_trips() {
148 let (_dir, db, schema) = fresh_db();
149 let watermarks = Watermarks {
150 tx_seq_lo: 1_000,
151 checkpoint_lo: 50,
152 };
153
154 let (k, v) = store(&watermarks);
155 let mut batch = db.batch();
156 batch.put(&schema.pruning_watermark, &k, &v).unwrap();
157 batch.commit().unwrap();
158
159 let read = schema
160 .get_pruning_watermarks()
161 .unwrap()
162 .expect("watermarks present");
163 assert_eq!(read, watermarks);
164 }
165
166 #[test]
167 fn set_pruning_floor_updates_atomic() {
168 let baseline = tx_seq_floor().load(Ordering::Relaxed);
171 let (_dir, _db, schema) = fresh_db();
172 let target = baseline.wrapping_add(12_345);
173 schema.set_pruning_floor(target);
174 assert_eq!(tx_seq_floor().load(Ordering::Relaxed), target);
175 tx_seq_floor().store(baseline, Ordering::Relaxed);
177 }
178
179 #[test]
180 fn refresh_pulls_disk_watermarks_into_atomic() {
181 let baseline = tx_seq_floor().load(Ordering::Relaxed);
182 let (_dir, db, schema) = fresh_db();
183 let target = baseline.wrapping_add(67_890);
184
185 let (k, v) = store(&Watermarks {
186 tx_seq_lo: target,
187 checkpoint_lo: 0,
188 });
189 let mut batch = db.batch();
190 batch.put(&schema.pruning_watermark, &k, &v).unwrap();
191 batch.commit().unwrap();
192
193 schema.refresh_pruning_atomics().unwrap();
194 assert_eq!(tx_seq_floor().load(Ordering::Relaxed), target);
195 tx_seq_floor().store(baseline, Ordering::Relaxed);
196 }
197
198 #[test]
199 fn refresh_is_a_no_op_when_disk_is_empty() {
200 let baseline = tx_seq_floor().load(Ordering::Relaxed);
201 let (_dir, _db, schema) = fresh_db();
202 schema.refresh_pruning_atomics().unwrap();
204 assert_eq!(tx_seq_floor().load(Ordering::Relaxed), baseline);
205 }
206}