1use std::sync::atomic::Ordering;
18
19use bytes::Buf;
20use bytes::BufMut;
21use prost::Message;
22use roaring::RoaringBitmap;
23use sui_consistent_store::Decode;
24use sui_consistent_store::Encode;
25use sui_consistent_store::Iter;
26use sui_consistent_store::Protobuf;
27use sui_consistent_store::error::DecodeError;
28use sui_consistent_store::error::EncodeError;
29use sui_consistent_store::error::Error;
30use sui_consistent_store::reader::Reader;
31
32use crate::proto::BitmapBlob;
33use crate::schema::pruning_watermark::tx_seq_floor;
34
35pub const NAME: &str = "event_bitmap";
36
37pub const EVENT_BITS: u32 = 16;
42
43pub const EVENT_BUCKET_SIZE: u64 = 1 << 28;
48
49const _: () = assert!(EVENT_BUCKET_SIZE <= u32::MAX as u64);
50
51#[derive(Debug, Clone, PartialEq, Eq, Hash)]
52pub struct Key {
53 pub dimension_key: Vec<u8>,
54 pub bucket: u64,
55}
56
57pub type Value = Protobuf<BitmapBlob>;
58
59impl Encode for Key {
60 fn encode_into<B: BufMut>(&self, buf: &mut B) -> Result<(), EncodeError> {
61 buf.put_slice(&self.dimension_key);
62 buf.put_slice(&self.bucket.to_be_bytes());
63 Ok(())
64 }
65}
66
67impl Decode for Key {
68 fn decode<B: Buf>(buf: &mut B) -> Result<Self, DecodeError> {
69 if buf.remaining() < 8 {
70 return Err(DecodeError::msg(format!(
71 "{NAME} Key too short: {} bytes",
72 buf.remaining(),
73 )));
74 }
75 let dim_len = buf.remaining() - 8;
76 let dim_bytes = buf.copy_to_bytes(dim_len);
77 let bucket = buf.get_u64();
78 Ok(Key {
79 dimension_key: dim_bytes.to_vec(),
80 bucket,
81 })
82 }
83}
84
85pub fn options(resolver: &sui_consistent_store::CfOptionsResolver) -> rocksdb::Options {
89 let mut opts = resolver.options(NAME);
90 opts.set_merge_operator_associative("event_bitmap_merge", merge);
91 let floor = tx_seq_floor().clone();
92 opts.set_compaction_filter("event_bitmap_pruning", move |_level, key, _value| {
93 let tx_seq_pruned = floor.load(Ordering::Relaxed);
94 if should_remove_bucket(key, tx_seq_pruned) {
95 rocksdb::CompactionDecision::Remove
96 } else {
97 rocksdb::CompactionDecision::Keep
98 }
99 });
100 opts
101}
102
103pub(crate) fn should_remove_bucket(key: &[u8], tx_seq_pruned_exclusive: u64) -> bool {
110 if key.len() < 8 {
111 return false;
112 }
113 let bucket_id_bytes: [u8; 8] = key[key.len() - 8..].try_into().expect("slice length");
114 let bucket_id = u64::from_be_bytes(bucket_id_bytes);
115 let packed_floor = packed_pruning_floor(tx_seq_pruned_exclusive);
116 bucket_id
117 .checked_add(1)
118 .and_then(|b| b.checked_mul(EVENT_BUCKET_SIZE))
119 .is_some_and(|highest_plus_one| highest_plus_one <= packed_floor)
120}
121
122fn packed_pruning_floor(tx_seq_pruned_exclusive: u64) -> u64 {
130 const OVERFLOW_THRESHOLD: u64 = 1u64 << (64 - EVENT_BITS);
131 if tx_seq_pruned_exclusive < OVERFLOW_THRESHOLD {
132 tx_seq_pruned_exclusive << EVENT_BITS
133 } else {
134 u64::MAX
135 }
136}
137
138pub fn pack(tx_seq: u64, event_idx: u32) -> u64 {
141 (tx_seq << EVENT_BITS) | u64::from(event_idx)
142}
143
144pub fn bucket_of(packed: u64) -> u64 {
146 packed / EVENT_BUCKET_SIZE
147}
148
149pub fn bit_of(packed: u64) -> u32 {
153 (packed % EVENT_BUCKET_SIZE) as u32
154}
155
156pub fn store_match(dimension_key: Vec<u8>, tx_seq: u64, event_idx: u32) -> (Key, Value) {
161 let packed = pack(tx_seq, event_idx);
162 let mut bitmap = RoaringBitmap::new();
163 bitmap.insert(bit_of(packed));
164 store_bitmap(dimension_key, bucket_of(packed), bitmap)
165}
166
167pub fn store_bitmap(dimension_key: Vec<u8>, bucket: u64, bitmap: RoaringBitmap) -> (Key, Value) {
172 (
173 Key {
174 dimension_key,
175 bucket,
176 },
177 Protobuf(BitmapBlob {
178 data: serialize_bitmap(&bitmap).into(),
179 }),
180 )
181}
182
183impl<R: Reader> super::RpcStoreSchema<R> {
184 pub fn get_event_bitmap(
187 &self,
188 dimension_key: Vec<u8>,
189 bucket: u64,
190 ) -> Result<Option<RoaringBitmap>, Error> {
191 let Some(stored) = self.event_bitmap.get(&Key {
192 dimension_key,
193 bucket,
194 })?
195 else {
196 return Ok(None);
197 };
198 let bytes = stored.into_inner().data;
199 let bitmap = RoaringBitmap::deserialize_from(bytes.as_ref())
200 .map_err(|e| DecodeError::with_source("deserialize RoaringBitmap", e))?;
201 Ok(Some(bitmap))
202 }
203
204 pub fn iter_event_bitmap_buckets(
207 &self,
208 dimension_key: Vec<u8>,
209 ) -> Result<Iter<'_, Key, Value>, Error> {
210 self.event_bitmap
211 .iter_prefix(&DimensionPrefix(dimension_key))
212 }
213}
214
215pub struct DimensionPrefix(pub Vec<u8>);
219
220impl Encode for DimensionPrefix {
221 fn encode_into<B: BufMut>(&self, buf: &mut B) -> Result<(), EncodeError> {
222 buf.put_slice(&self.0);
223 Ok(())
224 }
225}
226
227fn serialize_bitmap(bitmap: &RoaringBitmap) -> Vec<u8> {
231 let mut buf = Vec::with_capacity(bitmap.serialized_size());
232 bitmap
233 .serialize_into(&mut buf)
234 .expect("RoaringBitmap::serialize_into on Vec cannot fail");
235 buf
236}
237
238fn merge(
246 _key: &[u8],
247 existing_val: Option<&[u8]>,
248 operands: &rocksdb::MergeOperands,
249) -> Option<Vec<u8>> {
250 let mut acc = match existing_val {
251 Some(bytes) => decode_bitmap(bytes),
252 None => RoaringBitmap::new(),
253 };
254
255 for operand in operands {
256 let bitmap = decode_bitmap(operand);
257 acc |= bitmap;
258 }
259
260 acc.optimize();
261 Some(encode_bitmap_blob(&acc))
262}
263
264fn decode_bitmap(bytes: &[u8]) -> RoaringBitmap {
265 let blob = BitmapBlob::decode(bytes).expect("decode BitmapBlob");
266 RoaringBitmap::deserialize_from(blob.data.as_ref()).expect("deserialize RoaringBitmap")
267}
268
269fn encode_bitmap_blob(bitmap: &RoaringBitmap) -> Vec<u8> {
270 let blob = BitmapBlob {
271 data: serialize_bitmap(bitmap).into(),
272 };
273 blob.encode_to_vec()
274}
275
276#[cfg(test)]
277mod tests {
278 use std::collections::BTreeSet;
279
280 use sui_consistent_store::Db;
281 use sui_consistent_store::DbOptions;
282
283 use super::*;
284 use crate::RpcStoreSchema;
285
286 fn fresh_db() -> (tempfile::TempDir, sui_consistent_store::Db, RpcStoreSchema) {
287 let dir = tempfile::tempdir().unwrap();
288 let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
289 (dir, db, schema)
290 }
291
292 #[test]
293 fn pack_bucket_and_bit_math() {
294 let p = pack(0, 0);
296 assert_eq!(p, 0);
297 assert_eq!(bucket_of(p), 0);
298 assert_eq!(bit_of(p), 0);
299
300 let p = pack(1, 0);
302 assert_eq!(p, 1 << EVENT_BITS);
303 assert_eq!(bucket_of(p), 0);
304 assert_eq!(bit_of(p), 1 << EVENT_BITS);
305
306 let first_in_next_bucket = pack(EVENT_BUCKET_SIZE >> EVENT_BITS, 0);
310 assert_eq!(first_in_next_bucket, EVENT_BUCKET_SIZE);
311 assert_eq!(bucket_of(first_in_next_bucket), 1);
312 assert_eq!(bit_of(first_in_next_bucket), 0);
313 }
314
315 #[test]
316 fn get_returns_none_for_unknown_bucket() {
317 let (_dir, _db, schema) = fresh_db();
318 assert!(
319 schema
320 .get_event_bitmap(b"emitting_module:coin".to_vec(), 0)
321 .unwrap()
322 .is_none()
323 );
324 }
325
326 #[test]
327 fn single_match_round_trips_through_merge() {
328 let (_dir, db, schema) = fresh_db();
329 let (k, v) = store_match(b"emitting_module:coin".to_vec(), 42, 3);
330
331 let mut batch = db.batch();
332 batch.merge(&schema.event_bitmap, &k, &v).unwrap();
333 batch.commit().unwrap();
334
335 let packed = pack(42, 3);
336 let bitmap = schema
337 .get_event_bitmap(b"emitting_module:coin".to_vec(), bucket_of(packed))
338 .unwrap()
339 .expect("bitmap present");
340 let bits: Vec<u32> = bitmap.iter().collect();
341 assert_eq!(bits, vec![bit_of(packed)]);
342 }
343
344 #[test]
345 fn many_matches_in_one_bucket_union() {
346 let (_dir, db, schema) = fresh_db();
347 let dim = b"emitting_module:coin".to_vec();
348 let entries: Vec<(u64, u32)> = vec![(1, 0), (1, 7), (2, 0), (5, 12)];
349
350 let mut batch = db.batch();
351 for (tx, idx) in &entries {
352 let (k, v) = store_match(dim.clone(), *tx, *idx);
353 batch.merge(&schema.event_bitmap, &k, &v).unwrap();
354 }
355 batch.commit().unwrap();
356
357 let bitmap = schema
358 .get_event_bitmap(dim, 0)
359 .unwrap()
360 .expect("bitmap present");
361 let bits: BTreeSet<u32> = bitmap.iter().collect();
362 let expected: BTreeSet<u32> = entries
363 .iter()
364 .map(|(tx, idx)| bit_of(pack(*tx, *idx)))
365 .collect();
366 assert_eq!(bits, expected);
367 }
368
369 #[test]
370 fn distinct_dimensions_do_not_alias() {
371 let (_dir, db, schema) = fresh_db();
372 let (k_a, v_a) = store_match(b"emitting_module:coin".to_vec(), 42, 1);
373 let (k_b, v_b) = store_match(b"emitting_module:nft".to_vec(), 100, 2);
374 let mut batch = db.batch();
375 batch.merge(&schema.event_bitmap, &k_a, &v_a).unwrap();
376 batch.merge(&schema.event_bitmap, &k_b, &v_b).unwrap();
377 batch.commit().unwrap();
378
379 let coin = schema
380 .get_event_bitmap(b"emitting_module:coin".to_vec(), 0)
381 .unwrap()
382 .unwrap();
383 let nft = schema
384 .get_event_bitmap(b"emitting_module:nft".to_vec(), 0)
385 .unwrap()
386 .unwrap();
387 assert_eq!(coin.iter().collect::<Vec<u32>>(), vec![bit_of(pack(42, 1))]);
388 assert_eq!(nft.iter().collect::<Vec<u32>>(), vec![bit_of(pack(100, 2))]);
389 }
390
391 #[test]
392 fn should_remove_bucket_drops_only_fully_pruned_ranges() {
393 let dim = b"emitting_module:coin";
394 let bucket_0_key = Key {
395 dimension_key: dim.to_vec(),
396 bucket: 0,
397 }
398 .encode()
399 .unwrap();
400
401 assert!(!should_remove_bucket(&bucket_0_key, 0));
403
404 let txs_per_bucket = EVENT_BUCKET_SIZE >> EVENT_BITS;
408 assert!(!should_remove_bucket(&bucket_0_key, txs_per_bucket - 1));
409 assert!(should_remove_bucket(&bucket_0_key, txs_per_bucket));
413
414 let bucket_5_key = Key {
417 dimension_key: dim.to_vec(),
418 bucket: 5,
419 }
420 .encode()
421 .unwrap();
422 assert!(!should_remove_bucket(&bucket_5_key, 6 * txs_per_bucket - 1));
423 assert!(should_remove_bucket(&bucket_5_key, 6 * txs_per_bucket));
424
425 assert!(!should_remove_bucket(&[0u8; 4], u64::MAX));
427 }
428
429 #[test]
430 fn packed_pruning_floor_saturates_on_overflow() {
431 assert_eq!(packed_pruning_floor(0), 0);
432 assert_eq!(packed_pruning_floor(1), 1u64 << EVENT_BITS);
433 let just_below = (1u64 << (64 - EVENT_BITS)) - 1;
435 assert_eq!(packed_pruning_floor(just_below), just_below << EVENT_BITS,);
436 assert_eq!(packed_pruning_floor(1u64 << (64 - EVENT_BITS)), u64::MAX);
439 assert_eq!(packed_pruning_floor(u64::MAX), u64::MAX);
440 }
441
442 #[test]
443 fn iter_walks_buckets_for_one_dimension_in_order() {
444 let (_dir, db, schema) = fresh_db();
445 let dim = b"emitting_module:coin".to_vec();
446 let other = b"emitting_module:nft".to_vec();
447 let txs_per_bucket = EVENT_BUCKET_SIZE >> EVENT_BITS;
451 let tx_seqs = [0u64, txs_per_bucket + 5, 3 * txs_per_bucket + 9];
452
453 let mut batch = db.batch();
454 for tx in tx_seqs {
455 let (k, v) = store_match(dim.clone(), tx, 0);
456 batch.merge(&schema.event_bitmap, &k, &v).unwrap();
457 }
458 let (k_other, v_other) = store_match(other, 0, 0);
460 batch
461 .merge(&schema.event_bitmap, &k_other, &v_other)
462 .unwrap();
463 batch.commit().unwrap();
464
465 let buckets: Vec<u64> = schema
466 .iter_event_bitmap_buckets(dim)
467 .unwrap()
468 .map(|res| res.unwrap().0.bucket)
469 .collect();
470 assert_eq!(buckets, vec![0, 1, 3]);
471 }
472}