1use std::collections::HashMap;
30use std::collections::HashSet;
31use std::ops::Range;
32
33use anyhow::Result;
34use anyhow::bail;
35use futures::stream::BoxStream;
36use roaring::RoaringBitmap;
37
38use crate::dimensions::IndexDimension;
39
40mod iter;
41mod stream;
42
43pub use iter::eval_bitmap_query_bucket_iter;
44pub use stream::BitmapScanMetrics;
45pub use stream::buckets_with_watermarks;
46pub use stream::eval_bitmap_query_stream;
47pub use stream::flatten_watermarked_buckets;
48
49#[cfg(test)]
51pub(crate) use stream::BitmapScanBudget;
52#[cfg(test)]
53pub(crate) use stream::eval_bitmap_query_bucket_stream;
54
55#[derive(Debug, thiserror::Error)]
63pub enum BitmapScanError {
64 #[error("bitmap scan limit reached")]
68 ScanLimit,
69 #[error("bitmap scan cancelled")]
72 Cancelled,
73 #[error(transparent)]
76 Source(anyhow::Error),
77}
78
79impl BitmapScanError {
80 pub(crate) fn collapse(errs: Vec<BitmapScanError>) -> BitmapScanError {
95 assert!(!errs.is_empty(), "BitmapScanError::collapse on empty Vec");
96 let mut cancelled = false;
97 let mut scan_limit = false;
98 let mut faults: Vec<anyhow::Error> = Vec::new();
99 for e in errs {
100 match e {
101 BitmapScanError::Cancelled => cancelled = true,
102 BitmapScanError::ScanLimit => scan_limit = true,
103 BitmapScanError::Source(err) => faults.push(err),
104 }
105 }
106 match faults.len() {
107 0 => {
108 if scan_limit {
109 BitmapScanError::ScanLimit
110 } else {
111 debug_assert!(cancelled, "collapse saw only non-erroring leaves");
112 BitmapScanError::Cancelled
113 }
114 }
115 1 => BitmapScanError::Source(faults.pop().expect("len == 1")),
116 n => {
117 let combined = faults
118 .iter()
119 .enumerate()
120 .map(|(i, err)| format!(" [{i}] {err}"))
121 .collect::<Vec<_>>()
122 .join("\n");
123 BitmapScanError::Source(anyhow::anyhow!(
124 "{n} concurrent bitmap scan faults:\n{combined}"
125 ))
126 }
127 }
128 }
129}
130
131impl From<anyhow::Error> for BitmapScanError {
132 fn from(err: anyhow::Error) -> Self {
133 BitmapScanError::Source(err)
134 }
135}
136
137pub type BitmapScanResult<T> = std::result::Result<T, BitmapScanError>;
138
139#[derive(Clone, Copy, Debug, Eq, PartialEq)]
145pub enum Watermarked<T> {
146 Item(T),
147 Watermark(u64),
148}
149
150impl<T> Watermarked<T> {
151 pub fn map_item<U>(self, f: impl FnOnce(T) -> U) -> Watermarked<U> {
152 match self {
153 Watermarked::Item(t) => Watermarked::Item(f(t)),
154 Watermarked::Watermark(p) => Watermarked::Watermark(p),
155 }
156 }
157}
158
159pub type BucketItem = BitmapScanResult<(u64, RoaringBitmap)>;
163pub type BucketStream = BoxStream<'static, BucketItem>;
164
165pub(crate) type WatermarkedBucket = BitmapScanResult<Watermarked<(u64, RoaringBitmap)>>;
169pub type WatermarkedBucketStream = BoxStream<'static, WatermarkedBucket>;
170
171#[derive(Clone, Copy, Debug, Eq, PartialEq)]
172pub enum ScanDirection {
173 Ascending,
174 Descending,
175}
176
177impl ScanDirection {
178 pub fn is_ascending(self) -> bool {
179 matches!(self, Self::Ascending)
180 }
181}
182
183pub fn dense_universe_buckets(
192 range: Range<u64>,
193 bucket_size: u64,
194 direction: ScanDirection,
195) -> impl Iterator<Item = (u64, RoaringBitmap)> + Send + 'static {
196 let bits = u32::try_from(bucket_size).expect("bucket size fits in u32");
197 let mut buckets = if range.is_empty() {
198 0..0
199 } else {
200 (range.start / bucket_size)..(range.end - 1) / bucket_size + 1
201 };
202 std::iter::from_fn(move || {
203 let bucket = match direction {
204 ScanDirection::Ascending => buckets.next(),
205 ScanDirection::Descending => buckets.next_back(),
206 }?;
207 let mut bitmap = RoaringBitmap::new();
208 bitmap.insert_range(0..bits);
209 Some((bucket, bitmap))
210 })
211}
212
213pub trait BitmapBucketSource: Clone + Send + 'static {
219 fn scan_bucket_stream(
220 &self,
221 dimension_key: Vec<u8>,
222 range: Range<u64>,
223 direction: ScanDirection,
224 ) -> BucketStream;
225}
226
227pub trait BitmapBucketIteratorSource<'a>: Clone + 'a {
234 type Iter: Iterator<Item = BucketItem> + 'a;
235
236 fn scan_bucket_iter(
237 &self,
238 dimension_key: Vec<u8>,
239 range: Range<u64>,
240 direction: ScanDirection,
241 ) -> Self::Iter;
242}
243
244#[derive(Clone, Debug)]
249pub struct BitmapQuery {
250 terms: Vec<BitmapTerm>,
251}
252
253#[derive(Clone, Debug)]
258pub struct BitmapTerm {
259 literals: Vec<BitmapLiteral>,
260}
261
262#[derive(Clone, Debug, Eq, PartialEq, Hash)]
264pub struct BitmapKey(Vec<u8>);
265
266#[derive(Clone, Debug)]
268pub enum BitmapLiteral {
269 Include(BitmapKey),
270 Exclude(BitmapKey),
271}
272
273impl BitmapKey {
274 pub fn new(bytes: Vec<u8>) -> Result<Self> {
275 if bytes.is_empty() {
276 bail!("bitmap dimension key must not be empty");
277 }
278 if bytes.len() == 1 {
279 bail!("bitmap dimension value must not be empty");
280 }
281 if IndexDimension::from_tag_byte(bytes[0]).is_none() {
282 bail!("unknown bitmap dimension tag {}", bytes[0]);
283 }
284 Ok(Self(bytes))
285 }
286
287 pub fn into_inner(self) -> Vec<u8> {
288 self.0
289 }
290
291 pub fn as_bytes(&self) -> &[u8] {
292 &self.0
293 }
294}
295
296impl TryFrom<Vec<u8>> for BitmapKey {
297 type Error = anyhow::Error;
298
299 fn try_from(value: Vec<u8>) -> Result<Self> {
300 Self::new(value)
301 }
302}
303
304impl BitmapLiteral {
305 pub fn include(dimension_key: Vec<u8>) -> Result<Self> {
306 Ok(Self::Include(BitmapKey::new(dimension_key)?))
307 }
308
309 pub fn exclude(dimension_key: Vec<u8>) -> Result<Self> {
310 Ok(Self::Exclude(BitmapKey::new(dimension_key)?))
311 }
312
313 pub fn key_bytes(&self) -> &[u8] {
314 match self {
315 BitmapLiteral::Include(k) | BitmapLiteral::Exclude(k) => k.as_bytes(),
316 }
317 }
318}
319
320impl BitmapQuery {
321 pub fn new(terms: Vec<BitmapTerm>) -> Result<Self> {
322 if terms.is_empty() {
323 bail!("bitmap query must contain at least one term");
324 }
325 Ok(Self { terms })
326 }
327
328 pub fn scan(dimension_key: Vec<u8>) -> Result<Self> {
329 Ok(Self {
330 terms: vec![BitmapTerm::new(vec![BitmapLiteral::include(
331 dimension_key,
332 )?])?],
333 })
334 }
335
336 pub fn unique_leaf_count(&self) -> usize {
342 self.terms
343 .iter()
344 .flat_map(|t| t.literals.iter().map(|l| l.key_bytes()))
345 .collect::<HashSet<_>>()
346 .len()
347 }
348
349 pub fn terms(&self) -> &[BitmapTerm] {
350 &self.terms
351 }
352}
353
354impl BitmapTerm {
355 pub fn new(literals: Vec<BitmapLiteral>) -> Result<Self> {
356 if !literals
357 .iter()
358 .any(|literal| matches!(literal, BitmapLiteral::Include(_)))
359 {
360 bail!("bitmap query term must contain at least one include literal");
361 }
362 Ok(Self { literals })
363 }
364
365 pub fn literals(&self) -> &[BitmapLiteral] {
366 &self.literals
367 }
368}
369
370pub(crate) struct DedupedQuery {
374 pub(crate) keys: Vec<Vec<u8>>,
375 pub(crate) terms: Vec<TermSpec>,
376}
377
378pub(crate) fn build_term_specs(terms: Vec<BitmapTerm>) -> DedupedQuery {
385 let mut key_to_idx: HashMap<Vec<u8>, usize> = HashMap::new();
386 let mut keys: Vec<Vec<u8>> = Vec::new();
387 let mut specs: Vec<TermSpec> = Vec::with_capacity(terms.len());
388 for term in terms {
389 let mut include_idx = Vec::with_capacity(term.literals.len());
390 let mut exclude_idx = Vec::with_capacity(term.literals.len());
391 for literal in term.literals {
392 let (push_target, key) = match literal {
393 BitmapLiteral::Include(k) => (&mut include_idx, k.into_inner()),
394 BitmapLiteral::Exclude(k) => (&mut exclude_idx, k.into_inner()),
395 };
396 let idx = match key_to_idx.get(&key) {
397 Some(&i) => i,
398 None => {
399 let i = keys.len();
400 key_to_idx.insert(key.clone(), i);
401 keys.push(key);
402 i
403 }
404 };
405 push_target.push(idx);
406 }
407 specs.push(TermSpec {
408 includes: include_idx,
409 excludes: exclude_idx,
410 unsatisfiable: false,
411 });
412 }
413 DedupedQuery { keys, terms: specs }
414}
415
416fn bound_in_direction(a: u64, b: u64, direction: ScanDirection) -> u64 {
420 match direction {
421 ScanDirection::Ascending => a.min(b),
422 ScanDirection::Descending => a.max(b),
423 }
424}
425
426fn frontier_advanced(prev: Option<u64>, next: u64, direction: ScanDirection) -> bool {
430 match prev {
431 None => true,
432 Some(prev) => match direction {
433 ScanDirection::Ascending => next > prev,
434 ScanDirection::Descending => next < prev,
435 },
436 }
437}
438
439pub(crate) fn bucket_edges(
446 bucket: u64,
447 bucket_size: u64,
448 range: &Range<u64>,
449 direction: ScanDirection,
450) -> (u64, u64) {
451 let start = bucket.saturating_mul(bucket_size);
452 let end = start.saturating_add(bucket_size);
453 match direction {
454 ScanDirection::Ascending => (start.max(range.start), end.min(range.end)),
455 ScanDirection::Descending => (end.min(range.end), start.max(range.start)),
456 }
457}
458
459pub(crate) fn take_snapshot_bitmap(
465 snapshot: &mut [Option<RoaringBitmap>],
466 remaining_refs: &mut [usize],
467 on_floor: &[bool],
468 i: usize,
469) -> Option<RoaringBitmap> {
470 if !on_floor[i] {
471 return None;
472 }
473 if remaining_refs[i] > 1 {
474 remaining_refs[i] -= 1;
475 snapshot[i].clone()
476 } else {
477 remaining_refs[i] = remaining_refs[i].saturating_sub(1);
478 snapshot[i].take()
479 }
480}
481
482pub(crate) fn count_on_floor_refs(terms: &[TermSpec], on_floor: &[bool]) -> Vec<usize> {
486 let mut refs = vec![0usize; on_floor.len()];
487 for term in terms {
488 if term.unsatisfiable {
489 continue;
490 }
491 for &i in term.includes.iter().chain(term.excludes.iter()) {
492 if on_floor[i] {
493 refs[i] += 1;
494 }
495 }
496 }
497 refs
498}
499
500pub(crate) fn recompute_unreferenced(
509 terms: &[TermSpec],
510 class: &[Option<LeafHead>],
511 unreferenced: &mut [bool],
512) {
513 let leaf_count = unreferenced.len();
514 let mut referenced = vec![false; leaf_count];
515 for term in terms {
516 if term.unsatisfiable {
517 continue;
518 }
519 for &i in term.includes.iter().chain(term.excludes.iter()) {
520 if !unreferenced[i] && !matches!(class[i], Some(LeafHead::Eof)) {
521 referenced[i] = true;
522 }
523 }
524 }
525 for i in 0..leaf_count {
526 if !referenced[i] {
527 unreferenced[i] = true;
528 }
529 }
530}
531
532pub(crate) fn eval_term_at_bucket(
538 includes: Vec<Option<RoaringBitmap>>,
539 excludes: Vec<Option<RoaringBitmap>>,
540) -> Option<RoaringBitmap> {
541 let mut acc: Option<RoaringBitmap> = None;
542 for include in includes {
543 let bitmap = include?;
545 acc = Some(match acc {
546 None => bitmap,
547 Some(a) => a & bitmap,
548 });
549 }
550 let mut acc = acc?;
552 for exclude in excludes.into_iter().flatten() {
553 acc -= exclude;
554 }
555 (!acc.is_empty()).then_some(acc)
556}
557
558pub(crate) struct TermSpec {
561 pub(crate) includes: Vec<usize>,
562 pub(crate) excludes: Vec<usize>,
563 pub(crate) unsatisfiable: bool,
566}
567
568pub(crate) enum LeafHead {
570 Bucket(u64),
571 Eof,
572 Error,
573}
574
575#[cfg(test)]
576pub(crate) mod test_utils {
577 use std::collections::BTreeMap;
578 use std::collections::HashMap;
579 use std::sync::Arc;
580 use std::sync::Mutex;
581
582 use futures::StreamExt;
583 use futures::stream;
584
585 use super::*;
586
587 pub(crate) const BUCKET_SIZE: u64 = 100_000;
588 pub(crate) type TestBuckets = BTreeMap<Vec<u8>, Vec<(u64, Vec<u32>)>>;
589
590 #[derive(Clone)]
591 pub(crate) struct TestBucketSource {
592 pub(crate) buckets: Arc<TestBuckets>,
593 }
594
595 impl BitmapBucketSource for TestBucketSource {
596 fn scan_bucket_stream(
597 &self,
598 dimension_key: Vec<u8>,
599 range: Range<u64>,
600 direction: ScanDirection,
601 ) -> BucketStream {
602 stream::iter(self.bucket_items(&dimension_key, range, direction)).boxed()
603 }
604 }
605
606 impl<'a> BitmapBucketIteratorSource<'a> for TestBucketSource {
607 type Iter = std::vec::IntoIter<BucketItem>;
608
609 fn scan_bucket_iter(
610 &self,
611 dimension_key: Vec<u8>,
612 range: Range<u64>,
613 direction: ScanDirection,
614 ) -> Self::Iter {
615 self.bucket_items(&dimension_key, range, direction)
616 .into_iter()
617 }
618 }
619
620 impl TestBucketSource {
621 pub(crate) fn bucket_items(
622 &self,
623 dimension_key: &[u8],
624 range: Range<u64>,
625 direction: ScanDirection,
626 ) -> Vec<BucketItem> {
627 if dimension_key == universe_key() {
630 return dense_universe_buckets(range, BUCKET_SIZE, direction)
631 .map(Ok)
632 .collect();
633 }
634 let mut buckets = self.buckets.get(dimension_key).cloned().unwrap_or_default();
635 if matches!(direction, ScanDirection::Descending) {
636 buckets.reverse();
637 }
638 buckets
639 .into_iter()
640 .map(|(bucket_id, bits)| Ok((bucket_id, make_bitmap(&bits))))
641 .collect()
642 }
643 }
644
645 pub(crate) fn make_bitmap(bits: &[u32]) -> RoaringBitmap {
646 let mut bm = RoaringBitmap::new();
647 for &b in bits {
648 bm.insert(b);
649 }
650 bm
651 }
652
653 pub(crate) fn test_key(value: &[u8]) -> Vec<u8> {
654 crate::dimensions::encode_dimension_key(crate::dimensions::IndexDimension::Sender, value)
655 }
656
657 pub(crate) fn universe_key() -> Vec<u8> {
658 crate::dimensions::encode_dimension_key(
659 crate::dimensions::IndexDimension::TxUniverse,
660 crate::dimensions::TX_UNIVERSE_VALUE,
661 )
662 }
663
664 pub(crate) fn include(value: &[u8]) -> BitmapLiteral {
665 BitmapLiteral::include(test_key(value)).unwrap()
666 }
667
668 pub(crate) fn include_universe() -> BitmapLiteral {
669 BitmapLiteral::include(universe_key()).unwrap()
670 }
671
672 pub(crate) fn full_bucket() -> RoaringBitmap {
675 let mut bm = RoaringBitmap::new();
676 bm.insert_range(0..BUCKET_SIZE as u32);
677 bm
678 }
679
680 #[derive(Clone)]
685 pub(crate) struct CountingBucketSource {
686 pub(crate) buckets: Arc<TestBuckets>,
687 scan_counts: Arc<Mutex<HashMap<Vec<u8>, usize>>>,
688 }
689
690 impl CountingBucketSource {
691 pub(crate) fn new(buckets: TestBuckets) -> Self {
692 Self {
693 buckets: Arc::new(buckets),
694 scan_counts: Arc::new(Mutex::new(HashMap::new())),
695 }
696 }
697
698 pub(crate) fn scan_count(&self, key: &[u8]) -> usize {
699 self.scan_counts
700 .lock()
701 .unwrap()
702 .get(key)
703 .copied()
704 .unwrap_or(0)
705 }
706
707 fn record(&self, key: &[u8]) {
708 *self
709 .scan_counts
710 .lock()
711 .unwrap()
712 .entry(key.to_vec())
713 .or_insert(0) += 1;
714 }
715
716 fn bucket_items(
717 &self,
718 dimension_key: &[u8],
719 range: Range<u64>,
720 direction: ScanDirection,
721 ) -> Vec<BucketItem> {
722 if dimension_key == universe_key() {
723 return dense_universe_buckets(range, BUCKET_SIZE, direction)
724 .map(Ok)
725 .collect();
726 }
727 let mut buckets = self.buckets.get(dimension_key).cloned().unwrap_or_default();
728 if matches!(direction, ScanDirection::Descending) {
729 buckets.reverse();
730 }
731 buckets
732 .into_iter()
733 .map(|(bucket_id, bits)| Ok((bucket_id, make_bitmap(&bits))))
734 .collect()
735 }
736 }
737
738 impl BitmapBucketSource for CountingBucketSource {
739 fn scan_bucket_stream(
740 &self,
741 dimension_key: Vec<u8>,
742 range: Range<u64>,
743 direction: ScanDirection,
744 ) -> BucketStream {
745 self.record(&dimension_key);
746 stream::iter(self.bucket_items(&dimension_key, range, direction)).boxed()
747 }
748 }
749
750 impl<'a> BitmapBucketIteratorSource<'a> for CountingBucketSource {
751 type Iter = std::vec::IntoIter<BucketItem>;
752
753 fn scan_bucket_iter(
754 &self,
755 dimension_key: Vec<u8>,
756 range: Range<u64>,
757 direction: ScanDirection,
758 ) -> Self::Iter {
759 self.record(&dimension_key);
760 self.bucket_items(&dimension_key, range, direction)
761 .into_iter()
762 }
763 }
764
765 pub(crate) fn exclude(value: &[u8]) -> BitmapLiteral {
766 BitmapLiteral::exclude(test_key(value)).unwrap()
767 }
768}
769
770#[cfg(test)]
771mod tests {
772 use super::test_utils::exclude;
773 use super::test_utils::include;
774 use super::*;
775
776 #[test]
777 fn bitmap_query_validation_rejects_empty_shapes() {
778 assert!(BitmapQuery::new(Vec::new()).is_err());
779 assert!(BitmapLiteral::include(Vec::new()).is_err());
780 assert!(
781 BitmapLiteral::include(vec![crate::dimensions::IndexDimension::Sender.tag_byte()])
782 .is_err()
783 );
784 assert!(BitmapLiteral::include(vec![0xff, 0x00]).is_err());
785 assert!(BitmapTerm::new(vec![exclude(b"neg")]).is_err());
786 }
787
788 #[test]
789 fn dense_universe_buckets_covers_range_in_direction_order() {
790 use super::test_utils::BUCKET_SIZE;
791
792 let asc: Vec<u64> =
795 dense_universe_buckets(150_000..350_001, BUCKET_SIZE, ScanDirection::Ascending)
796 .map(|(bucket, bitmap)| {
797 assert_eq!(bitmap.len(), BUCKET_SIZE);
798 bucket
799 })
800 .collect();
801 assert_eq!(asc, vec![1, 2, 3]);
802
803 let desc: Vec<u64> =
804 dense_universe_buckets(150_000..350_001, BUCKET_SIZE, ScanDirection::Descending)
805 .map(|(bucket, _)| bucket)
806 .collect();
807 assert_eq!(desc, vec![3, 2, 1]);
808
809 let single: Vec<u64> = dense_universe_buckets(5..6, BUCKET_SIZE, ScanDirection::Ascending)
810 .map(|(bucket, _)| bucket)
811 .collect();
812 assert_eq!(single, vec![0]);
813
814 assert_eq!(
815 dense_universe_buckets(7..7, BUCKET_SIZE, ScanDirection::Ascending).count(),
816 0
817 );
818 }
819
820 #[test]
821 fn unique_leaf_count_counts_distinct_keys_across_terms() {
822 let query = BitmapQuery::new(vec![
826 BitmapTerm::new(vec![include(b"a"), include(b"b"), include(b"c")]).unwrap(),
827 BitmapTerm::new(vec![include(b"a"), include(b"b"), include(b"d")]).unwrap(),
828 ])
829 .unwrap();
830 assert_eq!(query.unique_leaf_count(), 4);
831 }
832
833 #[test]
834 fn build_term_specs_collapses_duplicate_keys_to_one_leaf() {
835 let terms = vec![
839 BitmapTerm::new(vec![include(b"a"), include(b"b")]).unwrap(),
840 BitmapTerm::new(vec![include(b"b"), exclude(b"a")]).unwrap(),
841 ];
842 let DedupedQuery { keys, terms: specs } = build_term_specs(terms);
843 assert_eq!(keys.len(), 2, "only `a` and `b` are unique");
844 assert_eq!(specs[0].includes, vec![0, 1]);
846 assert!(specs[0].excludes.is_empty());
847 assert_eq!(specs[1].includes, vec![1]);
849 assert_eq!(specs[1].excludes, vec![0]);
850 }
851}