sui_inverted_index/bitmap_query/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! DNF bitmap index queries over ordered bucket streams.
5//!
6//! Callers build a `BitmapQuery` as an OR of terms. Each term is an AND of
7//! signed dimension-key literals. Evaluation yields matching bitmap members as
8//! they are produced. Back-pressure from downstream consumers, e.g. a
9//! `.take(page_size)`, propagates back to the backend-provided bucket streams
10//! and avoids materializing matches we won't use.
11//!
12//! Queries are intentionally restricted to anchored DNF: every term must contain
13//! at least one positive literal. Positive literals give the evaluator concrete
14//! bitmap streams to scan and intersect; negative literals only shrink those
15//! candidate streams. Negative-only terms such as `NOT sender = A` are
16//! supported by anchoring them upstream on a universe include — a stored
17//! existence marker in event-space (`EventExtant`), a scan-time-synthesized
18//! dense leaf in tx-space (`TxUniverse`, see [`dense_universe_buckets`]) — so
19//! the evaluator itself stays a set of ordered stream merge-joins with no
20//! complement-specific code path. The full-range scan such a term implies is
21//! inherent to negation and is bounded by the per-request bucket budget.
22//!
23//! Backends provide one ordered `(bucket_id, RoaringBitmap)` stream or iterator
24//! per dimension key. The merge-join machinery here is storage-agnostic:
25//! BigTable, RocksDB, or any other backend can reuse it as long as its bucket
26//! source is sparse, ordered by the requested scan direction, and stores bitmap
27//! positions relative to that bucket.
28
29use std::collections::HashMap;
30use std::collections::HashSet;
31use std::ops::Range;
32
33use anyhow::Result;
34use anyhow::bail;
35use futures::stream::BoxStream;
36use roaring::RoaringBitmap;
37
38use crate::dimensions::IndexDimension;
39
40mod iter;
41mod stream;
42
43pub use iter::eval_bitmap_query_bucket_iter;
44pub use stream::BitmapScanMetrics;
45pub use stream::buckets_with_watermarks;
46pub use stream::eval_bitmap_query_stream;
47pub use stream::flatten_watermarked_buckets;
48
49// Cross-checked against the iterative evaluator in iter.rs tests.
50#[cfg(test)]
51pub(crate) use stream::BitmapScanBudget;
52#[cfg(test)]
53pub(crate) use stream::eval_bitmap_query_bucket_stream;
54
55/// Terminal signal raised by a bitmap evaluator on the bucket channel's `Err`.
56/// It must be an error rather than a silent end-of-stream: the driver reads a
57/// clean end as "scanned the whole range" and advances the resume cursor to the
58/// terminus, so a budget or cancel stop has to short-circuit the `try_stream!`
59/// pipeline to truncate the scan at the current floor. The variant is chosen at
60/// the source, so each List handler maps it to a wire outcome with one
61/// exhaustive match.
62#[derive(Debug, thiserror::Error)]
63pub enum BitmapScanError {
64    /// The per-request bucket-scan limit was reached: a graceful early stop. The
65    /// handler ends the stream with `QUERY_END_REASON_SCAN_LIMIT` and the last
66    /// emitted watermark as the continuation cursor.
67    #[error("bitmap scan limit reached")]
68    ScanLimit,
69    /// The scan was cancelled (the request's cancellation token fired). The
70    /// handler ends the stream with a gRPC `Cancelled` status.
71    #[error("bitmap scan cancelled")]
72    Cancelled,
73    /// A backend/storage fault. The handler ends the stream with a gRPC
74    /// `Internal` status carrying this error unchanged.
75    #[error(transparent)]
76    Source(anyhow::Error),
77}
78
79impl BitmapScanError {
80    /// Reduce the terminal errors raised by several leaves in one driver round
81    /// to a single disposition. Precedence: `Source` > `ScanLimit` > `Cancelled`.
82    ///
83    /// - `Source` outranks everything: a real fault must surface as `Internal`,
84    ///   never be masked as a clean `SCAN_LIMIT` end or a `Cancelled` status.
85    /// - `ScanLimit` outranks `Cancelled`: a scan-limit stop already emitted its
86    ///   frontier watermark, so ending the stream cleanly with a continuation
87    ///   cursor is strictly more useful than a `Cancelled` error — the cancel
88    ///   (a deadline/timeout) loses nothing, the resume point is already on the
89    ///   wire.
90    ///
91    /// Multiple concurrent faults are combined into one `Source` so a correlated
92    /// failure keeps every leaf's message instead of an arbitrary one. Panics on
93    /// empty input — the evaluator only collapses when at least one leaf errored.
94    pub(crate) fn collapse(errs: Vec<BitmapScanError>) -> BitmapScanError {
95        assert!(!errs.is_empty(), "BitmapScanError::collapse on empty Vec");
96        let mut cancelled = false;
97        let mut scan_limit = false;
98        let mut faults: Vec<anyhow::Error> = Vec::new();
99        for e in errs {
100            match e {
101                BitmapScanError::Cancelled => cancelled = true,
102                BitmapScanError::ScanLimit => scan_limit = true,
103                BitmapScanError::Source(err) => faults.push(err),
104            }
105        }
106        match faults.len() {
107            0 => {
108                if scan_limit {
109                    BitmapScanError::ScanLimit
110                } else {
111                    debug_assert!(cancelled, "collapse saw only non-erroring leaves");
112                    BitmapScanError::Cancelled
113                }
114            }
115            1 => BitmapScanError::Source(faults.pop().expect("len == 1")),
116            n => {
117                let combined = faults
118                    .iter()
119                    .enumerate()
120                    .map(|(i, err)| format!("  [{i}] {err}"))
121                    .collect::<Vec<_>>()
122                    .join("\n");
123                BitmapScanError::Source(anyhow::anyhow!(
124                    "{n} concurrent bitmap scan faults:\n{combined}"
125                ))
126            }
127        }
128    }
129}
130
131impl From<anyhow::Error> for BitmapScanError {
132    fn from(err: anyhow::Error) -> Self {
133        BitmapScanError::Source(err)
134    }
135}
136
137pub type BitmapScanResult<T> = std::result::Result<T, BitmapScanError>;
138
139/// Item or progress watermark flowing through a bitmap eval pipeline.
140/// `Watermark(p)` means every Item with position strictly before `p`
141/// in scan direction has been emitted upstream. Downstream stages must
142/// preserve watermark/item ordering — that's what makes the watermark a
143/// safe resume cursor on timeout.
144#[derive(Clone, Copy, Debug, Eq, PartialEq)]
145pub enum Watermarked<T> {
146    Item(T),
147    Watermark(u64),
148}
149
150impl<T> Watermarked<T> {
151    pub fn map_item<U>(self, f: impl FnOnce(T) -> U) -> Watermarked<U> {
152        match self {
153            Watermarked::Item(t) => Watermarked::Item(f(t)),
154            Watermarked::Watermark(p) => Watermarked::Watermark(p),
155        }
156    }
157}
158
159/// A stream of `(bucket_id, RoaringBitmap)` in the requested bucket order.
160/// Bitmap positions are **relative** to the bucket (u32 offsets `[0, BUCKET_SIZE)`)
161/// - edge trimming against the requested range happens at the flatten step.
162pub type BucketItem = BitmapScanResult<(u64, RoaringBitmap)>;
163pub type BucketStream = BoxStream<'static, BucketItem>;
164
165/// A bucket stream that interleaves data buckets with progress watermarks.
166/// The flat DNF driver derives each watermark from the slowest leaf's
167/// position, so the output always reflects "every source has scanned past P."
168pub(crate) type WatermarkedBucket = BitmapScanResult<Watermarked<(u64, RoaringBitmap)>>;
169pub type WatermarkedBucketStream = BoxStream<'static, WatermarkedBucket>;
170
171#[derive(Clone, Copy, Debug, Eq, PartialEq)]
172pub enum ScanDirection {
173    Ascending,
174    Descending,
175}
176
177impl ScanDirection {
178    pub fn is_ascending(self) -> bool {
179        matches!(self, Self::Ascending)
180    }
181}
182
183/// Synthesized bucket sequence for the dense tx-seq universe: one full bitmap
184/// per bucket touched by `range`, in scan-direction order. Backends return this
185/// for the query-only `IndexDimension::TxUniverse` key instead of reading
186/// storage — the tx-seq namespace is dense, so the universe is computable.
187///
188/// Bitmaps carry all `[0, bucket_size)` relative bits even in edge buckets;
189/// trimming against the requested range happens at the flatten step, same as
190/// for stored buckets.
191pub fn dense_universe_buckets(
192    range: Range<u64>,
193    bucket_size: u64,
194    direction: ScanDirection,
195) -> impl Iterator<Item = (u64, RoaringBitmap)> + Send + 'static {
196    let bits = u32::try_from(bucket_size).expect("bucket size fits in u32");
197    let mut buckets = if range.is_empty() {
198        0..0
199    } else {
200        (range.start / bucket_size)..(range.end - 1) / bucket_size + 1
201    };
202    std::iter::from_fn(move || {
203        let bucket = match direction {
204            ScanDirection::Ascending => buckets.next(),
205            ScanDirection::Descending => buckets.next_back(),
206        }?;
207        let mut bitmap = RoaringBitmap::new();
208        bitmap.insert_range(0..bits);
209        Some((bucket, bitmap))
210    })
211}
212
213/// Storage backend that can scan one bitmap dimension key over a member range.
214///
215/// The returned stream must be sparse and ordered by the requested direction.
216/// Missing bucket rows are interpreted as all-zero bitmaps by the merge-join
217/// operators.
218pub trait BitmapBucketSource: Clone + Send + 'static {
219    fn scan_bucket_stream(
220        &self,
221        dimension_key: Vec<u8>,
222        range: Range<u64>,
223        direction: ScanDirection,
224    ) -> BucketStream;
225}
226
227/// Storage backend that can scan one bitmap dimension key synchronously.
228///
229/// This is for request-local backends such as RocksDB, where the bucket scan
230/// naturally owns or borrows a synchronous iterator. The iterator evaluator is
231/// fully synchronous so these iterators can stay on the blocking task that owns
232/// them.
233pub trait BitmapBucketIteratorSource<'a>: Clone + 'a {
234    type Iter: Iterator<Item = BucketItem> + 'a;
235
236    fn scan_bucket_iter(
237        &self,
238        dimension_key: Vec<u8>,
239        range: Range<u64>,
240        direction: ScanDirection,
241    ) -> Self::Iter;
242}
243
244/// A DNF query over bitmap dimension scans.
245///
246/// A query is a disjunction of terms. It must contain at least one term, and
247/// every term must be anchored by at least one included dimension key.
248#[derive(Clone, Debug)]
249pub struct BitmapQuery {
250    terms: Vec<BitmapTerm>,
251}
252
253/// One conjunction in a DNF bitmap query.
254///
255/// A term is a conjunction of signed literals. It must include at least one
256/// positive literal so the evaluator has a finite candidate stream to refine.
257#[derive(Clone, Debug)]
258pub struct BitmapTerm {
259    literals: Vec<BitmapLiteral>,
260}
261
262/// Validated `[dimension_tag][dimension_value]` lookup key.
263#[derive(Clone, Debug, Eq, PartialEq, Hash)]
264pub struct BitmapKey(Vec<u8>);
265
266/// One signed dimension-key literal in a bitmap term.
267#[derive(Clone, Debug)]
268pub enum BitmapLiteral {
269    Include(BitmapKey),
270    Exclude(BitmapKey),
271}
272
273impl BitmapKey {
274    pub fn new(bytes: Vec<u8>) -> Result<Self> {
275        if bytes.is_empty() {
276            bail!("bitmap dimension key must not be empty");
277        }
278        if bytes.len() == 1 {
279            bail!("bitmap dimension value must not be empty");
280        }
281        if IndexDimension::from_tag_byte(bytes[0]).is_none() {
282            bail!("unknown bitmap dimension tag {}", bytes[0]);
283        }
284        Ok(Self(bytes))
285    }
286
287    pub fn into_inner(self) -> Vec<u8> {
288        self.0
289    }
290
291    pub fn as_bytes(&self) -> &[u8] {
292        &self.0
293    }
294}
295
296impl TryFrom<Vec<u8>> for BitmapKey {
297    type Error = anyhow::Error;
298
299    fn try_from(value: Vec<u8>) -> Result<Self> {
300        Self::new(value)
301    }
302}
303
304impl BitmapLiteral {
305    pub fn include(dimension_key: Vec<u8>) -> Result<Self> {
306        Ok(Self::Include(BitmapKey::new(dimension_key)?))
307    }
308
309    pub fn exclude(dimension_key: Vec<u8>) -> Result<Self> {
310        Ok(Self::Exclude(BitmapKey::new(dimension_key)?))
311    }
312
313    pub fn key_bytes(&self) -> &[u8] {
314        match self {
315            BitmapLiteral::Include(k) | BitmapLiteral::Exclude(k) => k.as_bytes(),
316        }
317    }
318}
319
320impl BitmapQuery {
321    pub fn new(terms: Vec<BitmapTerm>) -> Result<Self> {
322        if terms.is_empty() {
323            bail!("bitmap query must contain at least one term");
324        }
325        Ok(Self { terms })
326    }
327
328    pub fn scan(dimension_key: Vec<u8>) -> Result<Self> {
329        Ok(Self {
330            terms: vec![BitmapTerm::new(vec![BitmapLiteral::include(
331                dimension_key,
332            )?])?],
333        })
334    }
335
336    /// Count of distinct dimension-key leaves the query will scan. Identical
337    /// keys across literals (whether within a term or across terms) collapse
338    /// to one leaf at evaluation time, so the per-request budget floor —
339    /// "every leaf can emit its first watermark" — applies to this
340    /// deduplicated count, not the raw literal total.
341    pub fn unique_leaf_count(&self) -> usize {
342        self.terms
343            .iter()
344            .flat_map(|t| t.literals.iter().map(|l| l.key_bytes()))
345            .collect::<HashSet<_>>()
346            .len()
347    }
348
349    pub fn terms(&self) -> &[BitmapTerm] {
350        &self.terms
351    }
352}
353
354impl BitmapTerm {
355    pub fn new(literals: Vec<BitmapLiteral>) -> Result<Self> {
356        if !literals
357            .iter()
358            .any(|literal| matches!(literal, BitmapLiteral::Include(_)))
359        {
360            bail!("bitmap query term must contain at least one include literal");
361        }
362        Ok(Self { literals })
363    }
364
365    pub fn literals(&self) -> &[BitmapLiteral] {
366        &self.literals
367    }
368}
369
370/// Deduplicated leaf list + per-term references over those leaves, ready for
371/// the evaluator. `keys[i]` is the dimension-key bytes for leaf `i`; each
372/// [`TermSpec`] references leaves by that index.
373pub(crate) struct DedupedQuery {
374    pub(crate) keys: Vec<Vec<u8>>,
375    pub(crate) terms: Vec<TermSpec>,
376}
377
378/// Deduplicate literals across the whole query and translate each term's
379/// includes/excludes into indices over the shared leaf list.
380///
381/// Dedup is keyed on encoded dimension-key bytes, so the same key appearing
382/// in multiple terms (e.g. `(sender=A AND module=X) OR (sender=A AND
383/// type=Y)`) maps to a single backend scan instead of two duplicate scans.
384pub(crate) fn build_term_specs(terms: Vec<BitmapTerm>) -> DedupedQuery {
385    let mut key_to_idx: HashMap<Vec<u8>, usize> = HashMap::new();
386    let mut keys: Vec<Vec<u8>> = Vec::new();
387    let mut specs: Vec<TermSpec> = Vec::with_capacity(terms.len());
388    for term in terms {
389        let mut include_idx = Vec::with_capacity(term.literals.len());
390        let mut exclude_idx = Vec::with_capacity(term.literals.len());
391        for literal in term.literals {
392            let (push_target, key) = match literal {
393                BitmapLiteral::Include(k) => (&mut include_idx, k.into_inner()),
394                BitmapLiteral::Exclude(k) => (&mut exclude_idx, k.into_inner()),
395            };
396            let idx = match key_to_idx.get(&key) {
397                Some(&i) => i,
398                None => {
399                    let i = keys.len();
400                    key_to_idx.insert(key.clone(), i);
401                    keys.push(key);
402                    i
403                }
404            };
405            push_target.push(idx);
406        }
407        specs.push(TermSpec {
408            includes: include_idx,
409            excludes: exclude_idx,
410            unsatisfiable: false,
411        });
412    }
413    DedupedQuery { keys, terms: specs }
414}
415
416/// The less-advanced of two frontier positions in scan direction: the min
417/// ascending, the max descending. Used to keep a merged frontier bounded by
418/// the slowest contributor.
419fn bound_in_direction(a: u64, b: u64, direction: ScanDirection) -> u64 {
420    match direction {
421        ScanDirection::Ascending => a.min(b),
422        ScanDirection::Descending => a.max(b),
423    }
424}
425
426/// Whether emitting `next` as a watermark advances the frontier past the
427/// previously emitted one. Ascending frontiers strictly increase,
428/// descending strictly decrease; the first watermark always advances.
429fn frontier_advanced(prev: Option<u64>, next: u64, direction: ScanDirection) -> bool {
430    match prev {
431        None => true,
432        Some(prev) => match direction {
433            ScanDirection::Ascending => next > prev,
434            ScanDirection::Descending => next < prev,
435        },
436    }
437}
438
439/// Clamped member-id edges of `bucket` in scan direction: `(pre, post)` where
440/// `pre` is the leading edge (everything before it is already covered) and
441/// `post` is the trailing edge (everything up to and including the bucket is
442/// covered). Ascending: `(low, high)`; descending: `(high, low)`. Both clamped
443/// to the request range so cursors stay in-bounds when they round-trip into a
444/// follow-up request with a different range.
445pub(crate) fn bucket_edges(
446    bucket: u64,
447    bucket_size: u64,
448    range: &Range<u64>,
449    direction: ScanDirection,
450) -> (u64, u64) {
451    let start = bucket.saturating_mul(bucket_size);
452    let end = start.saturating_add(bucket_size);
453    match direction {
454        ScanDirection::Ascending => (start.max(range.start), end.min(range.end)),
455        ScanDirection::Descending => (end.min(range.end), start.max(range.start)),
456    }
457}
458
459/// Pull the snapshotted bitmap for leaf `i` to give to one referencing term
460/// this round. Returns `None` if the leaf isn't on the floor bucket (so the
461/// term short-circuits). When `remaining_refs[i] > 1`, the bitmap is cloned so
462/// other referencing terms still see it; the last reference takes by value to
463/// avoid an unnecessary copy in the common single-reference case.
464pub(crate) fn take_snapshot_bitmap(
465    snapshot: &mut [Option<RoaringBitmap>],
466    remaining_refs: &mut [usize],
467    on_floor: &[bool],
468    i: usize,
469) -> Option<RoaringBitmap> {
470    if !on_floor[i] {
471        return None;
472    }
473    if remaining_refs[i] > 1 {
474        remaining_refs[i] -= 1;
475        snapshot[i].clone()
476    } else {
477        remaining_refs[i] = remaining_refs[i].saturating_sub(1);
478        snapshot[i].take()
479    }
480}
481
482/// Per-round refcount: how many satisfiable term-side slots reference each
483/// on-floor leaf. Drives [`take_snapshot_bitmap`]'s take-vs-clone decision so
484/// the last referencing slot reclaims the bitmap by value.
485pub(crate) fn count_on_floor_refs(terms: &[TermSpec], on_floor: &[bool]) -> Vec<usize> {
486    let mut refs = vec![0usize; on_floor.len()];
487    for term in terms {
488        if term.unsatisfiable {
489            continue;
490        }
491        for &i in term.includes.iter().chain(term.excludes.iter()) {
492            if on_floor[i] {
493                refs[i] += 1;
494            }
495        }
496    }
497    refs
498}
499
500/// Recompute leaf liveness from current term state. A leaf becomes
501/// `unreferenced` when no satisfiable term still points at it, or when its
502/// head is at EOF (the bucket stream is permanently exhausted; any further
503/// peek would be wasted work, and any include-referencing term will be marked
504/// `unsatisfiable` separately).
505///
506/// `unreferenced` is monotonic — entries only transition false → true — so
507/// this is safe to invoke each round; previously-retired leaves stay retired.
508pub(crate) fn recompute_unreferenced(
509    terms: &[TermSpec],
510    class: &[Option<LeafHead>],
511    unreferenced: &mut [bool],
512) {
513    let leaf_count = unreferenced.len();
514    let mut referenced = vec![false; leaf_count];
515    for term in terms {
516        if term.unsatisfiable {
517            continue;
518        }
519        for &i in term.includes.iter().chain(term.excludes.iter()) {
520            if !unreferenced[i] && !matches!(class[i], Some(LeafHead::Eof)) {
521                referenced[i] = true;
522            }
523        }
524    }
525    for i in 0..leaf_count {
526        if !referenced[i] {
527            unreferenced[i] = true;
528        }
529    }
530}
531
532/// Evaluate one DNF term at a single bucket from the per-leaf bitmaps present
533/// there: intersect the includes (any absent include ⇒ empty term), then
534/// subtract the union of the present excludes (`a AND NOT b`). Returns the
535/// term's matches at the bucket, or `None` if empty. Bitmaps are taken by value
536/// so the caller hands over the consumed leaf rows without cloning.
537pub(crate) fn eval_term_at_bucket(
538    includes: Vec<Option<RoaringBitmap>>,
539    excludes: Vec<Option<RoaringBitmap>>,
540) -> Option<RoaringBitmap> {
541    let mut acc: Option<RoaringBitmap> = None;
542    for include in includes {
543        // A missing include means the intersection is empty at this bucket.
544        let bitmap = include?;
545        acc = Some(match acc {
546            None => bitmap,
547            Some(a) => a & bitmap,
548        });
549    }
550    // Anchored terms always carry at least one include, so `acc` is `Some`.
551    let mut acc = acc?;
552    for exclude in excludes.into_iter().flatten() {
553        acc -= exclude;
554    }
555    (!acc.is_empty()).then_some(acc)
556}
557
558/// One DNF term, as index spans into the flat leaf vector. Shared by the stream
559/// and iterator drivers.
560pub(crate) struct TermSpec {
561    pub(crate) includes: Vec<usize>,
562    pub(crate) excludes: Vec<usize>,
563    /// Set once any include leaf hits EOF: the term's intersection is
564    /// permanently empty (it can never match again). Latched.
565    pub(crate) unsatisfiable: bool,
566}
567
568/// A leaf's head this round, from a non-consuming peek.
569pub(crate) enum LeafHead {
570    Bucket(u64),
571    Eof,
572    Error,
573}
574
575#[cfg(test)]
576pub(crate) mod test_utils {
577    use std::collections::BTreeMap;
578    use std::collections::HashMap;
579    use std::sync::Arc;
580    use std::sync::Mutex;
581
582    use futures::StreamExt;
583    use futures::stream;
584
585    use super::*;
586
587    pub(crate) const BUCKET_SIZE: u64 = 100_000;
588    pub(crate) type TestBuckets = BTreeMap<Vec<u8>, Vec<(u64, Vec<u32>)>>;
589
590    #[derive(Clone)]
591    pub(crate) struct TestBucketSource {
592        pub(crate) buckets: Arc<TestBuckets>,
593    }
594
595    impl BitmapBucketSource for TestBucketSource {
596        fn scan_bucket_stream(
597            &self,
598            dimension_key: Vec<u8>,
599            range: Range<u64>,
600            direction: ScanDirection,
601        ) -> BucketStream {
602            stream::iter(self.bucket_items(&dimension_key, range, direction)).boxed()
603        }
604    }
605
606    impl<'a> BitmapBucketIteratorSource<'a> for TestBucketSource {
607        type Iter = std::vec::IntoIter<BucketItem>;
608
609        fn scan_bucket_iter(
610            &self,
611            dimension_key: Vec<u8>,
612            range: Range<u64>,
613            direction: ScanDirection,
614        ) -> Self::Iter {
615            self.bucket_items(&dimension_key, range, direction)
616                .into_iter()
617        }
618    }
619
620    impl TestBucketSource {
621        pub(crate) fn bucket_items(
622            &self,
623            dimension_key: &[u8],
624            range: Range<u64>,
625            direction: ScanDirection,
626        ) -> Vec<BucketItem> {
627            // Mirror the real backends: the tx-universe key is synthesized at
628            // scan time, never read from stored buckets.
629            if dimension_key == universe_key() {
630                return dense_universe_buckets(range, BUCKET_SIZE, direction)
631                    .map(Ok)
632                    .collect();
633            }
634            let mut buckets = self.buckets.get(dimension_key).cloned().unwrap_or_default();
635            if matches!(direction, ScanDirection::Descending) {
636                buckets.reverse();
637            }
638            buckets
639                .into_iter()
640                .map(|(bucket_id, bits)| Ok((bucket_id, make_bitmap(&bits))))
641                .collect()
642        }
643    }
644
645    pub(crate) fn make_bitmap(bits: &[u32]) -> RoaringBitmap {
646        let mut bm = RoaringBitmap::new();
647        for &b in bits {
648            bm.insert(b);
649        }
650        bm
651    }
652
653    pub(crate) fn test_key(value: &[u8]) -> Vec<u8> {
654        crate::dimensions::encode_dimension_key(crate::dimensions::IndexDimension::Sender, value)
655    }
656
657    pub(crate) fn universe_key() -> Vec<u8> {
658        crate::dimensions::encode_dimension_key(
659            crate::dimensions::IndexDimension::TxUniverse,
660            crate::dimensions::TX_UNIVERSE_VALUE,
661        )
662    }
663
664    pub(crate) fn include(value: &[u8]) -> BitmapLiteral {
665        BitmapLiteral::include(test_key(value)).unwrap()
666    }
667
668    pub(crate) fn include_universe() -> BitmapLiteral {
669        BitmapLiteral::include(universe_key()).unwrap()
670    }
671
672    /// Full `[0, BUCKET_SIZE)` bitmap — what the synthesized universe leaf
673    /// yields per bucket.
674    pub(crate) fn full_bucket() -> RoaringBitmap {
675        let mut bm = RoaringBitmap::new();
676        bm.insert_range(0..BUCKET_SIZE as u32);
677        bm
678    }
679
680    /// A `TestBucketSource` that records how many times `scan_bucket_*` is
681    /// invoked per dimension key. Used to verify the evaluator deduplicates
682    /// leaves across terms (a key referenced from multiple terms should be
683    /// scanned exactly once).
684    #[derive(Clone)]
685    pub(crate) struct CountingBucketSource {
686        pub(crate) buckets: Arc<TestBuckets>,
687        scan_counts: Arc<Mutex<HashMap<Vec<u8>, usize>>>,
688    }
689
690    impl CountingBucketSource {
691        pub(crate) fn new(buckets: TestBuckets) -> Self {
692            Self {
693                buckets: Arc::new(buckets),
694                scan_counts: Arc::new(Mutex::new(HashMap::new())),
695            }
696        }
697
698        pub(crate) fn scan_count(&self, key: &[u8]) -> usize {
699            self.scan_counts
700                .lock()
701                .unwrap()
702                .get(key)
703                .copied()
704                .unwrap_or(0)
705        }
706
707        fn record(&self, key: &[u8]) {
708            *self
709                .scan_counts
710                .lock()
711                .unwrap()
712                .entry(key.to_vec())
713                .or_insert(0) += 1;
714        }
715
716        fn bucket_items(
717            &self,
718            dimension_key: &[u8],
719            range: Range<u64>,
720            direction: ScanDirection,
721        ) -> Vec<BucketItem> {
722            if dimension_key == universe_key() {
723                return dense_universe_buckets(range, BUCKET_SIZE, direction)
724                    .map(Ok)
725                    .collect();
726            }
727            let mut buckets = self.buckets.get(dimension_key).cloned().unwrap_or_default();
728            if matches!(direction, ScanDirection::Descending) {
729                buckets.reverse();
730            }
731            buckets
732                .into_iter()
733                .map(|(bucket_id, bits)| Ok((bucket_id, make_bitmap(&bits))))
734                .collect()
735        }
736    }
737
738    impl BitmapBucketSource for CountingBucketSource {
739        fn scan_bucket_stream(
740            &self,
741            dimension_key: Vec<u8>,
742            range: Range<u64>,
743            direction: ScanDirection,
744        ) -> BucketStream {
745            self.record(&dimension_key);
746            stream::iter(self.bucket_items(&dimension_key, range, direction)).boxed()
747        }
748    }
749
750    impl<'a> BitmapBucketIteratorSource<'a> for CountingBucketSource {
751        type Iter = std::vec::IntoIter<BucketItem>;
752
753        fn scan_bucket_iter(
754            &self,
755            dimension_key: Vec<u8>,
756            range: Range<u64>,
757            direction: ScanDirection,
758        ) -> Self::Iter {
759            self.record(&dimension_key);
760            self.bucket_items(&dimension_key, range, direction)
761                .into_iter()
762        }
763    }
764
765    pub(crate) fn exclude(value: &[u8]) -> BitmapLiteral {
766        BitmapLiteral::exclude(test_key(value)).unwrap()
767    }
768}
769
770#[cfg(test)]
771mod tests {
772    use super::test_utils::exclude;
773    use super::test_utils::include;
774    use super::*;
775
776    #[test]
777    fn bitmap_query_validation_rejects_empty_shapes() {
778        assert!(BitmapQuery::new(Vec::new()).is_err());
779        assert!(BitmapLiteral::include(Vec::new()).is_err());
780        assert!(
781            BitmapLiteral::include(vec![crate::dimensions::IndexDimension::Sender.tag_byte()])
782                .is_err()
783        );
784        assert!(BitmapLiteral::include(vec![0xff, 0x00]).is_err());
785        assert!(BitmapTerm::new(vec![exclude(b"neg")]).is_err());
786    }
787
788    #[test]
789    fn dense_universe_buckets_covers_range_in_direction_order() {
790        use super::test_utils::BUCKET_SIZE;
791
792        // Partial edge buckets still yield full bitmaps — trimming happens at
793        // the flatten step.
794        let asc: Vec<u64> =
795            dense_universe_buckets(150_000..350_001, BUCKET_SIZE, ScanDirection::Ascending)
796                .map(|(bucket, bitmap)| {
797                    assert_eq!(bitmap.len(), BUCKET_SIZE);
798                    bucket
799                })
800                .collect();
801        assert_eq!(asc, vec![1, 2, 3]);
802
803        let desc: Vec<u64> =
804            dense_universe_buckets(150_000..350_001, BUCKET_SIZE, ScanDirection::Descending)
805                .map(|(bucket, _)| bucket)
806                .collect();
807        assert_eq!(desc, vec![3, 2, 1]);
808
809        let single: Vec<u64> = dense_universe_buckets(5..6, BUCKET_SIZE, ScanDirection::Ascending)
810            .map(|(bucket, _)| bucket)
811            .collect();
812        assert_eq!(single, vec![0]);
813
814        assert_eq!(
815            dense_universe_buckets(7..7, BUCKET_SIZE, ScanDirection::Ascending).count(),
816            0
817        );
818    }
819
820    #[test]
821    fn unique_leaf_count_counts_distinct_keys_across_terms() {
822        // Two terms both include `a` and `b`; only `c` is unique to term 1
823        // and `d` to term 2. The raw literal count is 6, but only 4 unique
824        // dimension keys are scanned at eval time.
825        let query = BitmapQuery::new(vec![
826            BitmapTerm::new(vec![include(b"a"), include(b"b"), include(b"c")]).unwrap(),
827            BitmapTerm::new(vec![include(b"a"), include(b"b"), include(b"d")]).unwrap(),
828        ])
829        .unwrap();
830        assert_eq!(query.unique_leaf_count(), 4);
831    }
832
833    #[test]
834    fn build_term_specs_collapses_duplicate_keys_to_one_leaf() {
835        // Same key used as include in one term and exclude in another should
836        // share a single leaf slot, with each term referring to it by the
837        // same index.
838        let terms = vec![
839            BitmapTerm::new(vec![include(b"a"), include(b"b")]).unwrap(),
840            BitmapTerm::new(vec![include(b"b"), exclude(b"a")]).unwrap(),
841        ];
842        let DedupedQuery { keys, terms: specs } = build_term_specs(terms);
843        assert_eq!(keys.len(), 2, "only `a` and `b` are unique");
844        // Term 0: include a (slot 0), include b (slot 1).
845        assert_eq!(specs[0].includes, vec![0, 1]);
846        assert!(specs[0].excludes.is_empty());
847        // Term 1: include b (slot 1), exclude a (slot 0) — same slots as term 0.
848        assert_eq!(specs[1].includes, vec![1]);
849        assert_eq!(specs[1].excludes, vec![0]);
850    }
851}