sui_rpc_store/schema/
event_bitmap.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! `(dimension_key, bucket)` → `BitmapBlob`.
5//!
6//! Same wire shape as [`super::transaction_bitmap`]
7//! but indexes packed-event-seq space — each set bit identifies a
8//! single event by `(tx_seq << EVENT_BITS) | event_idx`.
9//!
10//! The merge operator is identical in structure to the
11//! transaction-bitmap one (union + optimize). The per-bucket
12//! compaction filter translates the `tx_seq` pruning floor from
13//! [`pruning_watermark::tx_seq_floor`](super::pruning_watermark::tx_seq_floor)
14//! into packed-event-seq space (`tx_seq << EVENT_BITS`) and drops
15//! buckets that fit entirely below it.
16
17use std::sync::atomic::Ordering;
18
19use bytes::Buf;
20use bytes::BufMut;
21use prost::Message;
22use roaring::RoaringBitmap;
23use sui_consistent_store::Decode;
24use sui_consistent_store::Encode;
25use sui_consistent_store::Iter;
26use sui_consistent_store::Protobuf;
27use sui_consistent_store::error::DecodeError;
28use sui_consistent_store::error::EncodeError;
29use sui_consistent_store::error::Error;
30use sui_consistent_store::reader::Reader;
31
32use crate::proto::BitmapBlob;
33use crate::schema::pruning_watermark::tx_seq_floor;
34
35pub const NAME: &str = "event_bitmap";
36
37/// Number of low-order bits in a `packed_event_seq` reserved for
38/// the per-transaction `event_idx`. A transaction may emit up to
39/// `1 << EVENT_BITS` events without colliding with the next
40/// transaction's packed range.
41pub const EVENT_BITS: u32 = 16;
42
43/// Number of consecutive `packed_event_seq` values represented by
44/// one bucket. Sized so each bucket covers
45/// `EVENT_BUCKET_SIZE >> EVENT_BITS = 4096` consecutive
46/// transactions worth of events.
47pub const EVENT_BUCKET_SIZE: u64 = 1 << 28;
48
49const _: () = assert!(EVENT_BUCKET_SIZE <= u32::MAX as u64);
50
51#[derive(Debug, Clone, PartialEq, Eq, Hash)]
52pub struct Key {
53    pub dimension_key: Vec<u8>,
54    pub bucket: u64,
55}
56
57pub type Value = Protobuf<BitmapBlob>;
58
59impl Encode for Key {
60    fn encode_into<B: BufMut>(&self, buf: &mut B) -> Result<(), EncodeError> {
61        buf.put_slice(&self.dimension_key);
62        buf.put_slice(&self.bucket.to_be_bytes());
63        Ok(())
64    }
65}
66
67impl Decode for Key {
68    fn decode<B: Buf>(buf: &mut B) -> Result<Self, DecodeError> {
69        if buf.remaining() < 8 {
70            return Err(DecodeError::msg(format!(
71                "{NAME} Key too short: {} bytes",
72                buf.remaining(),
73            )));
74        }
75        let dim_len = buf.remaining() - 8;
76        let dim_bytes = buf.copy_to_bytes(dim_len);
77        let bucket = buf.get_u64();
78        Ok(Key {
79            dimension_key: dim_bytes.to_vec(),
80            bucket,
81        })
82    }
83}
84
85/// CF options: install the bitmap-union merge operator and a
86/// per-bucket compaction filter that drops buckets whose entire
87/// packed-event-seq range sits below the pruning floor.
88pub fn options(resolver: &sui_consistent_store::CfOptionsResolver) -> rocksdb::Options {
89    let mut opts = resolver.options(NAME);
90    opts.set_merge_operator_associative("event_bitmap_merge", merge);
91    let floor = tx_seq_floor().clone();
92    opts.set_compaction_filter("event_bitmap_pruning", move |_level, key, _value| {
93        let tx_seq_pruned = floor.load(Ordering::Relaxed);
94        if should_remove_bucket(key, tx_seq_pruned) {
95            rocksdb::CompactionDecision::Remove
96        } else {
97            rocksdb::CompactionDecision::Keep
98        }
99    });
100    opts
101}
102
103/// Pure logic of the compaction filter.
104///
105/// Translates the `tx_seq` floor into packed-event-seq space and
106/// asks whether every packed event the bucket could hold is
107/// strictly below the translated floor. Kept on any malformed
108/// input — silent data loss is worse than a stuck row.
109pub(crate) fn should_remove_bucket(key: &[u8], tx_seq_pruned_exclusive: u64) -> bool {
110    if key.len() < 8 {
111        return false;
112    }
113    let bucket_id_bytes: [u8; 8] = key[key.len() - 8..].try_into().expect("slice length");
114    let bucket_id = u64::from_be_bytes(bucket_id_bytes);
115    let packed_floor = packed_pruning_floor(tx_seq_pruned_exclusive);
116    bucket_id
117        .checked_add(1)
118        .and_then(|b| b.checked_mul(EVENT_BUCKET_SIZE))
119        .is_some_and(|highest_plus_one| highest_plus_one <= packed_floor)
120}
121
122/// Translate the `tx_seq` floor into packed-event-seq space.
123///
124/// `packed_event_seq = tx_seq << EVENT_BITS`. For
125/// `tx_seq >= 2^(64 - EVENT_BITS)` the shift would overflow a
126/// `u64`; we saturate to `u64::MAX`, which represents "every
127/// possible event has been pruned" — the conservative direction
128/// for a removal decision.
129fn packed_pruning_floor(tx_seq_pruned_exclusive: u64) -> u64 {
130    const OVERFLOW_THRESHOLD: u64 = 1u64 << (64 - EVENT_BITS);
131    if tx_seq_pruned_exclusive < OVERFLOW_THRESHOLD {
132        tx_seq_pruned_exclusive << EVENT_BITS
133    } else {
134        u64::MAX
135    }
136}
137
138/// Pack `(tx_seq, event_idx)` into a single 64-bit positional
139/// identifier: `tx_seq << EVENT_BITS | event_idx`.
140pub fn pack(tx_seq: u64, event_idx: u32) -> u64 {
141    (tx_seq << EVENT_BITS) | u64::from(event_idx)
142}
143
144/// The bucket that owns a given packed event sequence.
145pub fn bucket_of(packed: u64) -> u64 {
146    packed / EVENT_BUCKET_SIZE
147}
148
149/// The bit position within a bucket for a given packed event
150/// sequence. The cast is safe because `EVENT_BUCKET_SIZE`
151/// fits in a `u32` (enforced at compile time above).
152pub fn bit_of(packed: u64) -> u32 {
153    (packed % EVENT_BUCKET_SIZE) as u32
154}
155
156/// Build a `(Key, Value)` pair that adds the event identified by
157/// `(tx_seq, event_idx)` to the bitmap for its dimension and
158/// bucket. The merge operator unions this single-bit operand
159/// with whatever's already on disk.
160pub fn store_match(dimension_key: Vec<u8>, tx_seq: u64, event_idx: u32) -> (Key, Value) {
161    let packed = pack(tx_seq, event_idx);
162    let mut bitmap = RoaringBitmap::new();
163    bitmap.insert(bit_of(packed));
164    store_bitmap(dimension_key, bucket_of(packed), bitmap)
165}
166
167/// Build a `(Key, Value)` pair that stages the given bitmap as a
168/// merge operand against the existing on-disk bitmap. Useful for
169/// pipelines that batch many events into one bucket per dimension
170/// before writing.
171pub fn store_bitmap(dimension_key: Vec<u8>, bucket: u64, bitmap: RoaringBitmap) -> (Key, Value) {
172    (
173        Key {
174            dimension_key,
175            bucket,
176        },
177        Protobuf(BitmapBlob {
178            data: serialize_bitmap(&bitmap).into(),
179        }),
180    )
181}
182
183impl<R: Reader> super::RpcStoreSchema<R> {
184    /// Look up the event bitmap for `(dimension_key, bucket)` and
185    /// return it deserialized.
186    pub fn get_event_bitmap(
187        &self,
188        dimension_key: Vec<u8>,
189        bucket: u64,
190    ) -> Result<Option<RoaringBitmap>, Error> {
191        let Some(stored) = self.event_bitmap.get(&Key {
192            dimension_key,
193            bucket,
194        })?
195        else {
196            return Ok(None);
197        };
198        let bytes = stored.into_inner().data;
199        let bitmap = RoaringBitmap::deserialize_from(bytes.as_ref())
200            .map_err(|e| DecodeError::with_source("deserialize RoaringBitmap", e))?;
201        Ok(Some(bitmap))
202    }
203
204    /// Iterate every bucket recorded against `dimension_key`, in
205    /// ascending bucket order.
206    pub fn iter_event_bitmap_buckets(
207        &self,
208        dimension_key: Vec<u8>,
209    ) -> Result<Iter<'_, Key, Value>, Error> {
210        self.event_bitmap
211            .iter_prefix(&DimensionPrefix(dimension_key))
212    }
213}
214
215/// Prefix encoder for "all buckets recorded against
216/// `dimension_key`". Encodes as the raw dimension bytes — the
217/// leading bytes of every `Key` whose `dimension_key` matches.
218pub struct DimensionPrefix(pub Vec<u8>);
219
220impl Encode for DimensionPrefix {
221    fn encode_into<B: BufMut>(&self, buf: &mut B) -> Result<(), EncodeError> {
222        buf.put_slice(&self.0);
223        Ok(())
224    }
225}
226
227/// Serialize a roaring bitmap for on-disk storage. Run-encodes
228/// dense containers first so a bucket that matches many
229/// consecutive packed event sequences compresses well.
230fn serialize_bitmap(bitmap: &RoaringBitmap) -> Vec<u8> {
231    let mut buf = Vec::with_capacity(bitmap.serialized_size());
232    bitmap
233        .serialize_into(&mut buf)
234        .expect("RoaringBitmap::serialize_into on Vec cannot fail");
235    buf
236}
237
238/// Associative merge: union every operand bitmap with the
239/// existing on-disk bitmap, then optimize the accumulator before
240/// re-serializing.
241///
242/// Encode / decode failures panic — this CF is written only by
243/// the crate's `store_*` helpers, so a parse failure indicates
244/// corruption rather than a recoverable situation.
245fn merge(
246    _key: &[u8],
247    existing_val: Option<&[u8]>,
248    operands: &rocksdb::MergeOperands,
249) -> Option<Vec<u8>> {
250    let mut acc = match existing_val {
251        Some(bytes) => decode_bitmap(bytes),
252        None => RoaringBitmap::new(),
253    };
254
255    for operand in operands {
256        let bitmap = decode_bitmap(operand);
257        acc |= bitmap;
258    }
259
260    acc.optimize();
261    Some(encode_bitmap_blob(&acc))
262}
263
264fn decode_bitmap(bytes: &[u8]) -> RoaringBitmap {
265    let blob = BitmapBlob::decode(bytes).expect("decode BitmapBlob");
266    RoaringBitmap::deserialize_from(blob.data.as_ref()).expect("deserialize RoaringBitmap")
267}
268
269fn encode_bitmap_blob(bitmap: &RoaringBitmap) -> Vec<u8> {
270    let blob = BitmapBlob {
271        data: serialize_bitmap(bitmap).into(),
272    };
273    blob.encode_to_vec()
274}
275
276#[cfg(test)]
277mod tests {
278    use std::collections::BTreeSet;
279
280    use sui_consistent_store::Db;
281    use sui_consistent_store::DbOptions;
282
283    use super::*;
284    use crate::RpcStoreSchema;
285
286    fn fresh_db() -> (tempfile::TempDir, sui_consistent_store::Db, RpcStoreSchema) {
287        let dir = tempfile::tempdir().unwrap();
288        let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
289        (dir, db, schema)
290    }
291
292    #[test]
293    fn pack_bucket_and_bit_math() {
294        // tx_seq=0, event_idx=0 → packed 0 → bucket 0 / bit 0.
295        let p = pack(0, 0);
296        assert_eq!(p, 0);
297        assert_eq!(bucket_of(p), 0);
298        assert_eq!(bit_of(p), 0);
299
300        // tx_seq=1, event_idx=0 → packed `1 << 16` = 65_536.
301        let p = pack(1, 0);
302        assert_eq!(p, 1 << EVENT_BITS);
303        assert_eq!(bucket_of(p), 0);
304        assert_eq!(bit_of(p), 1 << EVENT_BITS);
305
306        // The first packed value of the next bucket sits at the
307        // boundary `EVENT_BUCKET_SIZE` — that's
308        // `EVENT_BUCKET_SIZE >> EVENT_BITS = 4096` transactions in.
309        let first_in_next_bucket = pack(EVENT_BUCKET_SIZE >> EVENT_BITS, 0);
310        assert_eq!(first_in_next_bucket, EVENT_BUCKET_SIZE);
311        assert_eq!(bucket_of(first_in_next_bucket), 1);
312        assert_eq!(bit_of(first_in_next_bucket), 0);
313    }
314
315    #[test]
316    fn get_returns_none_for_unknown_bucket() {
317        let (_dir, _db, schema) = fresh_db();
318        assert!(
319            schema
320                .get_event_bitmap(b"emitting_module:coin".to_vec(), 0)
321                .unwrap()
322                .is_none()
323        );
324    }
325
326    #[test]
327    fn single_match_round_trips_through_merge() {
328        let (_dir, db, schema) = fresh_db();
329        let (k, v) = store_match(b"emitting_module:coin".to_vec(), 42, 3);
330
331        let mut batch = db.batch();
332        batch.merge(&schema.event_bitmap, &k, &v).unwrap();
333        batch.commit().unwrap();
334
335        let packed = pack(42, 3);
336        let bitmap = schema
337            .get_event_bitmap(b"emitting_module:coin".to_vec(), bucket_of(packed))
338            .unwrap()
339            .expect("bitmap present");
340        let bits: Vec<u32> = bitmap.iter().collect();
341        assert_eq!(bits, vec![bit_of(packed)]);
342    }
343
344    #[test]
345    fn many_matches_in_one_bucket_union() {
346        let (_dir, db, schema) = fresh_db();
347        let dim = b"emitting_module:coin".to_vec();
348        let entries: Vec<(u64, u32)> = vec![(1, 0), (1, 7), (2, 0), (5, 12)];
349
350        let mut batch = db.batch();
351        for (tx, idx) in &entries {
352            let (k, v) = store_match(dim.clone(), *tx, *idx);
353            batch.merge(&schema.event_bitmap, &k, &v).unwrap();
354        }
355        batch.commit().unwrap();
356
357        let bitmap = schema
358            .get_event_bitmap(dim, 0)
359            .unwrap()
360            .expect("bitmap present");
361        let bits: BTreeSet<u32> = bitmap.iter().collect();
362        let expected: BTreeSet<u32> = entries
363            .iter()
364            .map(|(tx, idx)| bit_of(pack(*tx, *idx)))
365            .collect();
366        assert_eq!(bits, expected);
367    }
368
369    #[test]
370    fn distinct_dimensions_do_not_alias() {
371        let (_dir, db, schema) = fresh_db();
372        let (k_a, v_a) = store_match(b"emitting_module:coin".to_vec(), 42, 1);
373        let (k_b, v_b) = store_match(b"emitting_module:nft".to_vec(), 100, 2);
374        let mut batch = db.batch();
375        batch.merge(&schema.event_bitmap, &k_a, &v_a).unwrap();
376        batch.merge(&schema.event_bitmap, &k_b, &v_b).unwrap();
377        batch.commit().unwrap();
378
379        let coin = schema
380            .get_event_bitmap(b"emitting_module:coin".to_vec(), 0)
381            .unwrap()
382            .unwrap();
383        let nft = schema
384            .get_event_bitmap(b"emitting_module:nft".to_vec(), 0)
385            .unwrap()
386            .unwrap();
387        assert_eq!(coin.iter().collect::<Vec<u32>>(), vec![bit_of(pack(42, 1))]);
388        assert_eq!(nft.iter().collect::<Vec<u32>>(), vec![bit_of(pack(100, 2))]);
389    }
390
391    #[test]
392    fn should_remove_bucket_drops_only_fully_pruned_ranges() {
393        let dim = b"emitting_module:coin";
394        let bucket_0_key = Key {
395            dimension_key: dim.to_vec(),
396            bucket: 0,
397        }
398        .encode()
399        .unwrap();
400
401        // Floor 0 → nothing pruned.
402        assert!(!should_remove_bucket(&bucket_0_key, 0));
403
404        // EVENT_BUCKET_SIZE in packed-event-seq space corresponds
405        // to `EVENT_BUCKET_SIZE >> EVENT_BITS` transactions —
406        // anything below that tx_seq floor keeps bucket 0 alive.
407        let txs_per_bucket = EVENT_BUCKET_SIZE >> EVENT_BITS;
408        assert!(!should_remove_bucket(&bucket_0_key, txs_per_bucket - 1));
409        // At the tx_seq floor that translates to exactly
410        // EVENT_BUCKET_SIZE in packed space, bucket 0 becomes
411        // fully pruned.
412        assert!(should_remove_bucket(&bucket_0_key, txs_per_bucket));
413
414        // Bucket 5 needs floor past 6 * EVENT_BUCKET_SIZE in
415        // packed space, i.e. tx_seq past 6 * txs_per_bucket.
416        let bucket_5_key = Key {
417            dimension_key: dim.to_vec(),
418            bucket: 5,
419        }
420        .encode()
421        .unwrap();
422        assert!(!should_remove_bucket(&bucket_5_key, 6 * txs_per_bucket - 1));
423        assert!(should_remove_bucket(&bucket_5_key, 6 * txs_per_bucket));
424
425        // Key too short → kept.
426        assert!(!should_remove_bucket(&[0u8; 4], u64::MAX));
427    }
428
429    #[test]
430    fn packed_pruning_floor_saturates_on_overflow() {
431        assert_eq!(packed_pruning_floor(0), 0);
432        assert_eq!(packed_pruning_floor(1), 1u64 << EVENT_BITS);
433        // Just below the overflow threshold.
434        let just_below = (1u64 << (64 - EVENT_BITS)) - 1;
435        assert_eq!(packed_pruning_floor(just_below), just_below << EVENT_BITS,);
436        // At the threshold — `tx_seq << EVENT_BITS` would
437        // overflow, so we saturate.
438        assert_eq!(packed_pruning_floor(1u64 << (64 - EVENT_BITS)), u64::MAX);
439        assert_eq!(packed_pruning_floor(u64::MAX), u64::MAX);
440    }
441
442    #[test]
443    fn iter_walks_buckets_for_one_dimension_in_order() {
444        let (_dir, db, schema) = fresh_db();
445        let dim = b"emitting_module:coin".to_vec();
446        let other = b"emitting_module:nft".to_vec();
447        // Three events whose packed seqs land in distinct
448        // buckets: bucket 0, bucket 1 (just past 4096 txs), and
449        // bucket 3.
450        let txs_per_bucket = EVENT_BUCKET_SIZE >> EVENT_BITS;
451        let tx_seqs = [0u64, txs_per_bucket + 5, 3 * txs_per_bucket + 9];
452
453        let mut batch = db.batch();
454        for tx in tx_seqs {
455            let (k, v) = store_match(dim.clone(), tx, 0);
456            batch.merge(&schema.event_bitmap, &k, &v).unwrap();
457        }
458        // Unrelated dimension — must not appear in our iter.
459        let (k_other, v_other) = store_match(other, 0, 0);
460        batch
461            .merge(&schema.event_bitmap, &k_other, &v_other)
462            .unwrap();
463        batch.commit().unwrap();
464
465        let buckets: Vec<u64> = schema
466            .iter_event_bitmap_buckets(dim)
467            .unwrap()
468            .map(|res| res.unwrap().0.bucket)
469            .collect();
470        assert_eq!(buckets, vec![0, 1, 3]);
471    }
472}