1use std::sync::atomic::Ordering;
23
24use bytes::Buf;
25use bytes::BufMut;
26use prost::Message;
27use roaring::RoaringBitmap;
28use sui_consistent_store::Decode;
29use sui_consistent_store::Encode;
30use sui_consistent_store::Iter;
31use sui_consistent_store::Protobuf;
32use sui_consistent_store::error::DecodeError;
33use sui_consistent_store::error::EncodeError;
34use sui_consistent_store::error::Error;
35use sui_consistent_store::reader::Reader;
36
37use crate::proto::BitmapBlob;
38use crate::schema::pruning_watermark::tx_seq_floor;
39
40pub const NAME: &str = "transaction_bitmap";
41
42pub const TX_BUCKET_SIZE: u64 = 65_536;
46
47const _: () = assert!(TX_BUCKET_SIZE <= u32::MAX as u64);
48
49#[derive(Debug, Clone, PartialEq, Eq, Hash)]
50pub struct Key {
51 pub dimension_key: Vec<u8>,
52 pub bucket: u64,
53}
54
55pub type Value = Protobuf<BitmapBlob>;
56
57impl Encode for Key {
58 fn encode_into<B: BufMut>(&self, buf: &mut B) -> Result<(), EncodeError> {
59 buf.put_slice(&self.dimension_key);
60 buf.put_slice(&self.bucket.to_be_bytes());
61 Ok(())
62 }
63}
64
65impl Decode for Key {
66 fn decode<B: Buf>(buf: &mut B) -> Result<Self, DecodeError> {
67 if buf.remaining() < 8 {
68 return Err(DecodeError::msg(format!(
69 "{NAME} Key too short: {} bytes",
70 buf.remaining(),
71 )));
72 }
73 let dim_len = buf.remaining() - 8;
74 let dim_bytes = buf.copy_to_bytes(dim_len);
75 let bucket = buf.get_u64();
76 Ok(Key {
77 dimension_key: dim_bytes.to_vec(),
78 bucket,
79 })
80 }
81}
82
83pub fn options(resolver: &sui_consistent_store::CfOptionsResolver) -> rocksdb::Options {
87 let mut opts = resolver.options(NAME);
88 opts.set_merge_operator_associative("transaction_bitmap_merge", merge);
89 let floor = tx_seq_floor().clone();
90 opts.set_compaction_filter("transaction_bitmap_pruning", move |_level, key, _value| {
91 let pruned_exclusive = floor.load(Ordering::Relaxed);
92 if should_remove_bucket(key, pruned_exclusive) {
93 rocksdb::CompactionDecision::Remove
94 } else {
95 rocksdb::CompactionDecision::Keep
96 }
97 });
98 opts
99}
100
101pub(crate) fn should_remove_bucket(key: &[u8], pruned_exclusive: u64) -> bool {
113 if key.len() < 8 {
114 return false;
115 }
116 let bucket_id_bytes: [u8; 8] = key[key.len() - 8..].try_into().expect("slice length");
117 let bucket_id = u64::from_be_bytes(bucket_id_bytes);
118 bucket_id
119 .checked_add(1)
120 .and_then(|b| b.checked_mul(TX_BUCKET_SIZE))
121 .is_some_and(|highest_plus_one| highest_plus_one <= pruned_exclusive)
122}
123
124pub fn bucket_of(tx_seq: u64) -> u64 {
126 tx_seq / TX_BUCKET_SIZE
127}
128
129pub fn bit_of(tx_seq: u64) -> u32 {
133 (tx_seq % TX_BUCKET_SIZE) as u32
134}
135
136pub fn store_match(dimension_key: Vec<u8>, tx_seq: u64) -> (Key, Value) {
141 let mut bitmap = RoaringBitmap::new();
142 bitmap.insert(bit_of(tx_seq));
143 store_bitmap(dimension_key, bucket_of(tx_seq), bitmap)
144}
145
146pub fn store_bitmap(dimension_key: Vec<u8>, bucket: u64, bitmap: RoaringBitmap) -> (Key, Value) {
151 (
152 Key {
153 dimension_key,
154 bucket,
155 },
156 Protobuf(BitmapBlob {
157 data: serialize_bitmap(&bitmap).into(),
158 }),
159 )
160}
161
162impl<R: Reader> super::RpcStoreSchema<R> {
163 pub fn get_transaction_bitmap(
166 &self,
167 dimension_key: Vec<u8>,
168 bucket: u64,
169 ) -> Result<Option<RoaringBitmap>, Error> {
170 let Some(stored) = self.transaction_bitmap.get(&Key {
171 dimension_key,
172 bucket,
173 })?
174 else {
175 return Ok(None);
176 };
177 let bytes = stored.into_inner().data;
178 let bitmap = RoaringBitmap::deserialize_from(bytes.as_ref())
179 .map_err(|e| DecodeError::with_source("deserialize RoaringBitmap", e))?;
180 Ok(Some(bitmap))
181 }
182
183 pub fn iter_transaction_bitmap_buckets(
186 &self,
187 dimension_key: Vec<u8>,
188 ) -> Result<Iter<'_, Key, Value>, Error> {
189 self.transaction_bitmap
190 .iter_prefix(&DimensionPrefix(dimension_key))
191 }
192}
193
194pub struct DimensionPrefix(pub Vec<u8>);
198
199impl Encode for DimensionPrefix {
200 fn encode_into<B: BufMut>(&self, buf: &mut B) -> Result<(), EncodeError> {
201 buf.put_slice(&self.0);
202 Ok(())
203 }
204}
205
206fn serialize_bitmap(bitmap: &RoaringBitmap) -> Vec<u8> {
210 let mut buf = Vec::with_capacity(bitmap.serialized_size());
211 bitmap
212 .serialize_into(&mut buf)
213 .expect("RoaringBitmap::serialize_into on Vec cannot fail");
214 buf
215}
216
217fn merge(
225 _key: &[u8],
226 existing_val: Option<&[u8]>,
227 operands: &rocksdb::MergeOperands,
228) -> Option<Vec<u8>> {
229 let mut acc = match existing_val {
230 Some(bytes) => decode_bitmap(bytes),
231 None => RoaringBitmap::new(),
232 };
233
234 for operand in operands {
235 let bitmap = decode_bitmap(operand);
236 acc |= bitmap;
237 }
238
239 acc.optimize();
244 Some(encode_bitmap_blob(&acc))
245}
246
247fn decode_bitmap(bytes: &[u8]) -> RoaringBitmap {
248 let blob = BitmapBlob::decode(bytes).expect("decode BitmapBlob");
249 RoaringBitmap::deserialize_from(blob.data.as_ref()).expect("deserialize RoaringBitmap")
250}
251
252fn encode_bitmap_blob(bitmap: &RoaringBitmap) -> Vec<u8> {
253 let blob = BitmapBlob {
254 data: serialize_bitmap(bitmap).into(),
255 };
256 blob.encode_to_vec()
257}
258
259#[cfg(test)]
260mod tests {
261 use std::collections::BTreeSet;
262
263 use sui_consistent_store::Db;
264 use sui_consistent_store::DbOptions;
265
266 use super::*;
267 use crate::RpcStoreSchema;
268
269 fn fresh_db() -> (tempfile::TempDir, sui_consistent_store::Db, RpcStoreSchema) {
270 let dir = tempfile::tempdir().unwrap();
271 let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
272 (dir, db, schema)
273 }
274
275 #[test]
276 fn bucket_and_bit_math() {
277 assert_eq!(bucket_of(0), 0);
278 assert_eq!(bit_of(0), 0);
279 assert_eq!(bucket_of(TX_BUCKET_SIZE - 1), 0);
280 assert_eq!(bit_of(TX_BUCKET_SIZE - 1), (TX_BUCKET_SIZE - 1) as u32);
281 assert_eq!(bucket_of(TX_BUCKET_SIZE), 1);
282 assert_eq!(bit_of(TX_BUCKET_SIZE), 0);
283 assert_eq!(bucket_of(3 * TX_BUCKET_SIZE + 7), 3);
284 assert_eq!(bit_of(3 * TX_BUCKET_SIZE + 7), 7);
285 }
286
287 #[test]
288 fn get_returns_none_for_unknown_bucket() {
289 let (_dir, _db, schema) = fresh_db();
290 assert!(
291 schema
292 .get_transaction_bitmap(b"sender:alice".to_vec(), 0)
293 .unwrap()
294 .is_none()
295 );
296 }
297
298 #[test]
299 fn single_match_round_trips_through_merge() {
300 let (_dir, db, schema) = fresh_db();
301 let (k, v) = store_match(b"sender:alice".to_vec(), 42);
302
303 let mut batch = db.batch();
304 batch.merge(&schema.transaction_bitmap, &k, &v).unwrap();
305 batch.commit().unwrap();
306
307 let bitmap = schema
308 .get_transaction_bitmap(b"sender:alice".to_vec(), bucket_of(42))
309 .unwrap()
310 .expect("bitmap present");
311 let bits: Vec<u32> = bitmap.iter().collect();
312 assert_eq!(bits, vec![42]);
313 }
314
315 #[test]
316 fn many_matches_in_one_bucket_union() {
317 let (_dir, db, schema) = fresh_db();
318 let dim = b"sender:alice".to_vec();
319
320 let mut batch = db.batch();
321 for tx_seq in [1u64, 17, 256, 9_999] {
322 let (k, v) = store_match(dim.clone(), tx_seq);
323 batch.merge(&schema.transaction_bitmap, &k, &v).unwrap();
324 }
325 batch.commit().unwrap();
326
327 let bitmap = schema
328 .get_transaction_bitmap(dim, 0)
329 .unwrap()
330 .expect("bitmap present");
331 let bits: BTreeSet<u32> = bitmap.iter().collect();
332 assert_eq!(bits, BTreeSet::from([1, 17, 256, 9_999]));
333 }
334
335 #[test]
336 fn distinct_dimensions_do_not_alias() {
337 let (_dir, db, schema) = fresh_db();
338 let (k_a, v_a) = store_match(b"sender:alice".to_vec(), 42);
339 let (k_b, v_b) = store_match(b"sender:bob".to_vec(), 100);
340 let mut batch = db.batch();
341 batch.merge(&schema.transaction_bitmap, &k_a, &v_a).unwrap();
342 batch.merge(&schema.transaction_bitmap, &k_b, &v_b).unwrap();
343 batch.commit().unwrap();
344
345 let alice = schema
346 .get_transaction_bitmap(b"sender:alice".to_vec(), 0)
347 .unwrap()
348 .unwrap();
349 let bob = schema
350 .get_transaction_bitmap(b"sender:bob".to_vec(), 0)
351 .unwrap()
352 .unwrap();
353 assert_eq!(alice.iter().collect::<Vec<u32>>(), vec![42]);
354 assert_eq!(bob.iter().collect::<Vec<u32>>(), vec![100]);
355 }
356
357 #[test]
358 fn should_remove_bucket_drops_only_fully_pruned_ranges() {
359 let dim = b"sender:alice";
360
361 let just_at_floor_key = Key {
365 dimension_key: dim.to_vec(),
366 bucket: 0,
367 }
368 .encode()
369 .unwrap();
370 assert!(!should_remove_bucket(
371 &just_at_floor_key,
372 TX_BUCKET_SIZE - 1
373 ));
374
375 assert!(should_remove_bucket(&just_at_floor_key, TX_BUCKET_SIZE));
378
379 let middle_key = Key {
382 dimension_key: dim.to_vec(),
383 bucket: 3,
384 }
385 .encode()
386 .unwrap();
387 assert!(!should_remove_bucket(
388 &middle_key,
389 3 * TX_BUCKET_SIZE + (TX_BUCKET_SIZE / 2),
390 ));
391
392 assert!(should_remove_bucket(&middle_key, 4 * TX_BUCKET_SIZE));
394
395 assert!(!should_remove_bucket(&[0u8; 4], u64::MAX));
397
398 assert!(!should_remove_bucket(&middle_key, 0));
400 }
401
402 #[test]
403 fn iter_walks_buckets_for_one_dimension_in_order() {
404 let (_dir, db, schema) = fresh_db();
405 let dim = b"sender:alice".to_vec();
406 let other = b"sender:bob".to_vec();
407
408 let mut batch = db.batch();
409 for tx_seq in [1u64, TX_BUCKET_SIZE + 5, 3 * TX_BUCKET_SIZE + 9] {
410 let (k, v) = store_match(dim.clone(), tx_seq);
411 batch.merge(&schema.transaction_bitmap, &k, &v).unwrap();
412 }
413 let (k_other, v_other) = store_match(other, 7);
415 batch
416 .merge(&schema.transaction_bitmap, &k_other, &v_other)
417 .unwrap();
418 batch.commit().unwrap();
419
420 let buckets: Vec<u64> = schema
421 .iter_transaction_bitmap_buckets(dim)
422 .unwrap()
423 .map(|res| res.unwrap().0.bucket)
424 .collect();
425 assert_eq!(buckets, vec![0, 1, 3]);
426 }
427}