sui_rpc_store/schema/
transaction_bitmap.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! `(dimension_key, bucket)` → `BitmapBlob`.
5//!
6//! Inverted bitmap index over `tx_seq` space. The dimension key is
7//! a variable-length opaque token (e.g. `[tag][sender]`); each
8//! bucket holds the roaring bitmap of tx_seqs whose containing
9//! transaction matches that dimension.
10//!
11//! Indexer pipelines stage merge operands carrying a small bitmap
12//! (often a single bit per write); the merge operator unions every
13//! operand against the existing on-disk bitmap and emits a single
14//! consolidated value optimized for the on-disk encoding.
15//!
16//! A per-bucket compaction filter reads the shared `tx_seq`
17//! pruning floor from
18//! [`pruning_watermark::tx_seq_floor`](super::pruning_watermark::tx_seq_floor)
19//! and drops buckets whose entire `tx_seq` range sits below the
20//! floor.
21
22use std::sync::atomic::Ordering;
23
24use bytes::Buf;
25use bytes::BufMut;
26use prost::Message;
27use roaring::RoaringBitmap;
28use sui_consistent_store::Decode;
29use sui_consistent_store::Encode;
30use sui_consistent_store::Iter;
31use sui_consistent_store::Protobuf;
32use sui_consistent_store::error::DecodeError;
33use sui_consistent_store::error::EncodeError;
34use sui_consistent_store::error::Error;
35use sui_consistent_store::reader::Reader;
36
37use crate::proto::BitmapBlob;
38use crate::schema::pruning_watermark::tx_seq_floor;
39
40pub const NAME: &str = "transaction_bitmap";
41
42/// Number of consecutive `tx_seq` values represented by one
43/// bucket. Sized to keep individual bitmaps small (~8 KiB at
44/// worst-case density) and the per-row read cost predictable.
45pub const TX_BUCKET_SIZE: u64 = 65_536;
46
47const _: () = assert!(TX_BUCKET_SIZE <= u32::MAX as u64);
48
49#[derive(Debug, Clone, PartialEq, Eq, Hash)]
50pub struct Key {
51    pub dimension_key: Vec<u8>,
52    pub bucket: u64,
53}
54
55pub type Value = Protobuf<BitmapBlob>;
56
57impl Encode for Key {
58    fn encode_into<B: BufMut>(&self, buf: &mut B) -> Result<(), EncodeError> {
59        buf.put_slice(&self.dimension_key);
60        buf.put_slice(&self.bucket.to_be_bytes());
61        Ok(())
62    }
63}
64
65impl Decode for Key {
66    fn decode<B: Buf>(buf: &mut B) -> Result<Self, DecodeError> {
67        if buf.remaining() < 8 {
68            return Err(DecodeError::msg(format!(
69                "{NAME} Key too short: {} bytes",
70                buf.remaining(),
71            )));
72        }
73        let dim_len = buf.remaining() - 8;
74        let dim_bytes = buf.copy_to_bytes(dim_len);
75        let bucket = buf.get_u64();
76        Ok(Key {
77            dimension_key: dim_bytes.to_vec(),
78            bucket,
79        })
80    }
81}
82
83/// CF options: install the bitmap-union merge operator and a
84/// per-bucket compaction filter that drops buckets whose entire
85/// `tx_seq` range sits below the pruning floor.
86pub fn options(resolver: &sui_consistent_store::CfOptionsResolver) -> rocksdb::Options {
87    let mut opts = resolver.options(NAME);
88    opts.set_merge_operator_associative("transaction_bitmap_merge", merge);
89    let floor = tx_seq_floor().clone();
90    opts.set_compaction_filter("transaction_bitmap_pruning", move |_level, key, _value| {
91        let pruned_exclusive = floor.load(Ordering::Relaxed);
92        if should_remove_bucket(key, pruned_exclusive) {
93            rocksdb::CompactionDecision::Remove
94        } else {
95            rocksdb::CompactionDecision::Keep
96        }
97    });
98    opts
99}
100
101/// Pure logic of the compaction filter: decide whether the bucket
102/// identified by `key`'s trailing 8-byte big-endian `bucket_id`
103/// can be removed given the exclusive `tx_seq` pruning floor.
104///
105/// A bucket is removable iff every `tx_seq` it covers is strictly
106/// below the floor — i.e. `(bucket_id + 1) * TX_BUCKET_SIZE <=
107/// pruned_exclusive`. Arithmetic uses `checked_*` so a corrupted
108/// `bucket_id` can't overflow and cause spurious removal.
109///
110/// Kept rather than removed on any malformed input — silent data
111/// loss is worse than a stuck row.
112pub(crate) fn should_remove_bucket(key: &[u8], pruned_exclusive: u64) -> bool {
113    if key.len() < 8 {
114        return false;
115    }
116    let bucket_id_bytes: [u8; 8] = key[key.len() - 8..].try_into().expect("slice length");
117    let bucket_id = u64::from_be_bytes(bucket_id_bytes);
118    bucket_id
119        .checked_add(1)
120        .and_then(|b| b.checked_mul(TX_BUCKET_SIZE))
121        .is_some_and(|highest_plus_one| highest_plus_one <= pruned_exclusive)
122}
123
124/// The bucket that owns a given `tx_seq`.
125pub fn bucket_of(tx_seq: u64) -> u64 {
126    tx_seq / TX_BUCKET_SIZE
127}
128
129/// The bit position within a bucket for a given `tx_seq`. The
130/// cast is safe because `TX_BUCKET_SIZE <= u32::MAX` (enforced
131/// at compile time above).
132pub fn bit_of(tx_seq: u64) -> u32 {
133    (tx_seq % TX_BUCKET_SIZE) as u32
134}
135
136/// Build a `(Key, Value)` pair that adds `tx_seq` to the bitmap
137/// for `(dimension_key, bucket_of(tx_seq))`. The merge operator
138/// unions this single-bit operand with whatever's already on
139/// disk.
140pub fn store_match(dimension_key: Vec<u8>, tx_seq: u64) -> (Key, Value) {
141    let mut bitmap = RoaringBitmap::new();
142    bitmap.insert(bit_of(tx_seq));
143    store_bitmap(dimension_key, bucket_of(tx_seq), bitmap)
144}
145
146/// Build a `(Key, Value)` pair that stages the given bitmap as a
147/// merge operand against the existing on-disk bitmap. Useful for
148/// pipelines that batch many tx_seqs into one bucket per
149/// dimension before writing.
150pub fn store_bitmap(dimension_key: Vec<u8>, bucket: u64, bitmap: RoaringBitmap) -> (Key, Value) {
151    (
152        Key {
153            dimension_key,
154            bucket,
155        },
156        Protobuf(BitmapBlob {
157            data: serialize_bitmap(&bitmap).into(),
158        }),
159    )
160}
161
162impl<R: Reader> super::RpcStoreSchema<R> {
163    /// Look up the bitmap for `(dimension_key, bucket)` and
164    /// return it deserialized.
165    pub fn get_transaction_bitmap(
166        &self,
167        dimension_key: Vec<u8>,
168        bucket: u64,
169    ) -> Result<Option<RoaringBitmap>, Error> {
170        let Some(stored) = self.transaction_bitmap.get(&Key {
171            dimension_key,
172            bucket,
173        })?
174        else {
175            return Ok(None);
176        };
177        let bytes = stored.into_inner().data;
178        let bitmap = RoaringBitmap::deserialize_from(bytes.as_ref())
179            .map_err(|e| DecodeError::with_source("deserialize RoaringBitmap", e))?;
180        Ok(Some(bitmap))
181    }
182
183    /// Iterate every bucket recorded against `dimension_key`, in
184    /// ascending bucket order.
185    pub fn iter_transaction_bitmap_buckets(
186        &self,
187        dimension_key: Vec<u8>,
188    ) -> Result<Iter<'_, Key, Value>, Error> {
189        self.transaction_bitmap
190            .iter_prefix(&DimensionPrefix(dimension_key))
191    }
192}
193
194/// Prefix encoder for "all buckets recorded against
195/// `dimension_key`". Encodes as the raw dimension bytes — the
196/// leading bytes of every `Key` whose `dimension_key` matches.
197pub struct DimensionPrefix(pub Vec<u8>);
198
199impl Encode for DimensionPrefix {
200    fn encode_into<B: BufMut>(&self, buf: &mut B) -> Result<(), EncodeError> {
201        buf.put_slice(&self.0);
202        Ok(())
203    }
204}
205
206/// Serialize a roaring bitmap for on-disk storage. Run-encodes
207/// dense containers first so a bucket that matches many
208/// consecutive `tx_seq` values compresses well.
209fn serialize_bitmap(bitmap: &RoaringBitmap) -> Vec<u8> {
210    let mut buf = Vec::with_capacity(bitmap.serialized_size());
211    bitmap
212        .serialize_into(&mut buf)
213        .expect("RoaringBitmap::serialize_into on Vec cannot fail");
214    buf
215}
216
217/// Associative merge: union every operand bitmap with the
218/// existing on-disk bitmap, then optimize the accumulator before
219/// re-serializing.
220///
221/// Encode / decode failures panic — this CF is written only by
222/// the crate's `store_*` helpers, so a parse failure indicates
223/// corruption rather than a recoverable situation.
224fn merge(
225    _key: &[u8],
226    existing_val: Option<&[u8]>,
227    operands: &rocksdb::MergeOperands,
228) -> Option<Vec<u8>> {
229    let mut acc = match existing_val {
230        Some(bytes) => decode_bitmap(bytes),
231        None => RoaringBitmap::new(),
232    };
233
234    for operand in operands {
235        let bitmap = decode_bitmap(operand);
236        acc |= bitmap;
237    }
238
239    // Convert dense containers to runs before persisting. The
240    // operands are typically tiny (one bit per call) so there's
241    // nothing for run-encoding to collapse on them; the
242    // accumulator is what RocksDB writes back to disk.
243    acc.optimize();
244    Some(encode_bitmap_blob(&acc))
245}
246
247fn decode_bitmap(bytes: &[u8]) -> RoaringBitmap {
248    let blob = BitmapBlob::decode(bytes).expect("decode BitmapBlob");
249    RoaringBitmap::deserialize_from(blob.data.as_ref()).expect("deserialize RoaringBitmap")
250}
251
252fn encode_bitmap_blob(bitmap: &RoaringBitmap) -> Vec<u8> {
253    let blob = BitmapBlob {
254        data: serialize_bitmap(bitmap).into(),
255    };
256    blob.encode_to_vec()
257}
258
259#[cfg(test)]
260mod tests {
261    use std::collections::BTreeSet;
262
263    use sui_consistent_store::Db;
264    use sui_consistent_store::DbOptions;
265
266    use super::*;
267    use crate::RpcStoreSchema;
268
269    fn fresh_db() -> (tempfile::TempDir, sui_consistent_store::Db, RpcStoreSchema) {
270        let dir = tempfile::tempdir().unwrap();
271        let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
272        (dir, db, schema)
273    }
274
275    #[test]
276    fn bucket_and_bit_math() {
277        assert_eq!(bucket_of(0), 0);
278        assert_eq!(bit_of(0), 0);
279        assert_eq!(bucket_of(TX_BUCKET_SIZE - 1), 0);
280        assert_eq!(bit_of(TX_BUCKET_SIZE - 1), (TX_BUCKET_SIZE - 1) as u32);
281        assert_eq!(bucket_of(TX_BUCKET_SIZE), 1);
282        assert_eq!(bit_of(TX_BUCKET_SIZE), 0);
283        assert_eq!(bucket_of(3 * TX_BUCKET_SIZE + 7), 3);
284        assert_eq!(bit_of(3 * TX_BUCKET_SIZE + 7), 7);
285    }
286
287    #[test]
288    fn get_returns_none_for_unknown_bucket() {
289        let (_dir, _db, schema) = fresh_db();
290        assert!(
291            schema
292                .get_transaction_bitmap(b"sender:alice".to_vec(), 0)
293                .unwrap()
294                .is_none()
295        );
296    }
297
298    #[test]
299    fn single_match_round_trips_through_merge() {
300        let (_dir, db, schema) = fresh_db();
301        let (k, v) = store_match(b"sender:alice".to_vec(), 42);
302
303        let mut batch = db.batch();
304        batch.merge(&schema.transaction_bitmap, &k, &v).unwrap();
305        batch.commit().unwrap();
306
307        let bitmap = schema
308            .get_transaction_bitmap(b"sender:alice".to_vec(), bucket_of(42))
309            .unwrap()
310            .expect("bitmap present");
311        let bits: Vec<u32> = bitmap.iter().collect();
312        assert_eq!(bits, vec![42]);
313    }
314
315    #[test]
316    fn many_matches_in_one_bucket_union() {
317        let (_dir, db, schema) = fresh_db();
318        let dim = b"sender:alice".to_vec();
319
320        let mut batch = db.batch();
321        for tx_seq in [1u64, 17, 256, 9_999] {
322            let (k, v) = store_match(dim.clone(), tx_seq);
323            batch.merge(&schema.transaction_bitmap, &k, &v).unwrap();
324        }
325        batch.commit().unwrap();
326
327        let bitmap = schema
328            .get_transaction_bitmap(dim, 0)
329            .unwrap()
330            .expect("bitmap present");
331        let bits: BTreeSet<u32> = bitmap.iter().collect();
332        assert_eq!(bits, BTreeSet::from([1, 17, 256, 9_999]));
333    }
334
335    #[test]
336    fn distinct_dimensions_do_not_alias() {
337        let (_dir, db, schema) = fresh_db();
338        let (k_a, v_a) = store_match(b"sender:alice".to_vec(), 42);
339        let (k_b, v_b) = store_match(b"sender:bob".to_vec(), 100);
340        let mut batch = db.batch();
341        batch.merge(&schema.transaction_bitmap, &k_a, &v_a).unwrap();
342        batch.merge(&schema.transaction_bitmap, &k_b, &v_b).unwrap();
343        batch.commit().unwrap();
344
345        let alice = schema
346            .get_transaction_bitmap(b"sender:alice".to_vec(), 0)
347            .unwrap()
348            .unwrap();
349        let bob = schema
350            .get_transaction_bitmap(b"sender:bob".to_vec(), 0)
351            .unwrap()
352            .unwrap();
353        assert_eq!(alice.iter().collect::<Vec<u32>>(), vec![42]);
354        assert_eq!(bob.iter().collect::<Vec<u32>>(), vec![100]);
355    }
356
357    #[test]
358    fn should_remove_bucket_drops_only_fully_pruned_ranges() {
359        let dim = b"sender:alice";
360
361        // A bucket whose highest tx_seq is exactly at the floor:
362        // the floor is *exclusive*, so this bucket is still
363        // partially live and must not be removed.
364        let just_at_floor_key = Key {
365            dimension_key: dim.to_vec(),
366            bucket: 0,
367        }
368        .encode()
369        .unwrap();
370        assert!(!should_remove_bucket(
371            &just_at_floor_key,
372            TX_BUCKET_SIZE - 1
373        ));
374
375        // Move the floor one past the bucket's highest tx_seq:
376        // every entry it could hold is pruned, removable.
377        assert!(should_remove_bucket(&just_at_floor_key, TX_BUCKET_SIZE));
378
379        // Bucket 3 covers `tx_seq` in `[3 * BUCKET, 4 * BUCKET)`.
380        // Floor sitting in the middle of the bucket keeps it.
381        let middle_key = Key {
382            dimension_key: dim.to_vec(),
383            bucket: 3,
384        }
385        .encode()
386        .unwrap();
387        assert!(!should_remove_bucket(
388            &middle_key,
389            3 * TX_BUCKET_SIZE + (TX_BUCKET_SIZE / 2),
390        ));
391
392        // Floor past the bucket's high end → removable.
393        assert!(should_remove_bucket(&middle_key, 4 * TX_BUCKET_SIZE));
394
395        // Key shorter than 8 bytes → kept.
396        assert!(!should_remove_bucket(&[0u8; 4], u64::MAX));
397
398        // Floor of 0 → nothing removable.
399        assert!(!should_remove_bucket(&middle_key, 0));
400    }
401
402    #[test]
403    fn iter_walks_buckets_for_one_dimension_in_order() {
404        let (_dir, db, schema) = fresh_db();
405        let dim = b"sender:alice".to_vec();
406        let other = b"sender:bob".to_vec();
407
408        let mut batch = db.batch();
409        for tx_seq in [1u64, TX_BUCKET_SIZE + 5, 3 * TX_BUCKET_SIZE + 9] {
410            let (k, v) = store_match(dim.clone(), tx_seq);
411            batch.merge(&schema.transaction_bitmap, &k, &v).unwrap();
412        }
413        // Unrelated dimension — must not appear in our iter.
414        let (k_other, v_other) = store_match(other, 7);
415        batch
416            .merge(&schema.transaction_bitmap, &k_other, &v_other)
417            .unwrap();
418        batch.commit().unwrap();
419
420        let buckets: Vec<u64> = schema
421            .iter_transaction_bitmap_buckets(dim)
422            .unwrap()
423            .map(|res| res.unwrap().0.bucket)
424            .collect();
425        assert_eq!(buckets, vec![0, 1, 3]);
426    }
427}