1use std::ops::Range;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::sync::atomic::AtomicU64;
21use std::sync::atomic::Ordering;
22
23use futures::Stream;
24use futures::StreamExt;
25use futures::stream::BoxStream;
26use futures::stream::Peekable;
27use roaring::RoaringBitmap;
28
29use super::BitmapBucketSource;
30use super::BitmapQuery;
31use super::BitmapScanError;
32use super::BitmapScanResult;
33use super::BucketItem;
34use super::BucketStream;
35use super::DedupedQuery;
36use super::LeafHead;
37use super::ScanDirection;
38use super::Watermarked;
39use super::WatermarkedBucketStream;
40use super::bound_in_direction;
41use super::bucket_edges;
42use super::build_term_specs;
43use super::count_on_floor_refs;
44use super::eval_term_at_bucket;
45use super::frontier_advanced;
46use super::recompute_unreferenced;
47use super::take_snapshot_bitmap;
48
49#[derive(Clone, Copy, Debug, Eq, PartialEq)]
55pub struct BitmapScanMetrics {
56 pub buckets_evaluated: u64,
61}
62
63#[derive(Clone)]
67pub(crate) struct BitmapScanBudget {
68 initial: u64,
69 remaining: Arc<AtomicU64>,
70}
71
72impl BitmapScanBudget {
73 pub(crate) fn new(initial: u64) -> Self {
74 Self {
75 initial,
76 remaining: Arc::new(AtomicU64::new(initial)),
77 }
78 }
79
80 fn try_take(&self) -> bool {
82 self.remaining
83 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |b| {
84 if b == 0 { None } else { Some(b - 1) }
85 })
86 .is_ok()
87 }
88
89 fn take_first(&self) {
101 let _ = self
102 .remaining
103 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |b| {
104 Some(b.saturating_sub(1))
105 });
106 }
107
108 fn buckets_evaluated(&self) -> u64 {
109 self.initial
110 .saturating_sub(self.remaining.load(Ordering::SeqCst))
111 }
112}
113
114struct ObserveOnDrop<F: FnOnce(BitmapScanMetrics) + Send + 'static> {
118 budget: BitmapScanBudget,
119 callback: Option<F>,
120}
121
122impl<F: FnOnce(BitmapScanMetrics) + Send + 'static> Drop for ObserveOnDrop<F> {
123 fn drop(&mut self) {
124 if let Some(cb) = self.callback.take() {
125 cb(BitmapScanMetrics {
126 buckets_evaluated: self.budget.buckets_evaluated(),
127 });
128 }
129 }
130}
131
132pub fn eval_bitmap_query_stream<S, F>(
142 source: S,
143 query: BitmapQuery,
144 range: Range<u64>,
145 bucket_size: u64,
146 direction: ScanDirection,
147 budget: u64,
148 on_metrics: F,
149) -> BoxStream<'static, BitmapScanResult<Watermarked<u64>>>
150where
151 S: BitmapBucketSource,
152 F: FnOnce(BitmapScanMetrics) + Send + 'static,
153{
154 let leaves = query.unique_leaf_count();
155 if (budget as usize) < leaves {
156 return async_stream::stream! {
160 yield Err(BitmapScanError::Source(anyhow::anyhow!(
162 "bitmap scan budget {budget} is insufficient for {leaves} leaf streams; \
163 server is misconfigured"
164 )));
165 }
166 .boxed();
167 }
168 let budget = BitmapScanBudget::new(budget);
169 let bucket_stream = eval_bitmap_query_bucket_stream(
170 source,
171 query,
172 range.clone(),
173 bucket_size,
174 direction,
175 budget.clone(),
176 );
177 let inner = flatten_watermarked_buckets(bucket_stream, range.clone(), bucket_size, direction);
178 let guard = ObserveOnDrop {
182 budget,
183 callback: Some(on_metrics),
184 };
185 async_stream::stream! {
186 let _guard = guard;
187 futures::pin_mut!(inner);
188 while let Some(item) = inner.next().await {
189 yield item;
190 }
191 }
192 .boxed()
193}
194
195async fn peek_leaf<S>(
198 mut leaf: Pin<&mut Peekable<S>>,
199 idx: usize,
200 bucket_size: u64,
201 range: &Range<u64>,
202 direction: ScanDirection,
203 terminus: u64,
204) -> (usize, LeafHead, Option<u64>)
205where
206 S: Stream<Item = BucketItem>,
207{
208 match leaf.as_mut().peek().await {
209 Some(Ok((bucket, _))) => {
210 let (pre, _post) = bucket_edges(*bucket, bucket_size, range, direction);
211 (idx, LeafHead::Bucket(*bucket), Some(pre))
212 }
213 None => (idx, LeafHead::Eof, Some(terminus)),
214 Some(Err(_)) => (idx, LeafHead::Error, None),
215 }
216}
217
218pub(crate) fn eval_bitmap_query_bucket_stream<S>(
225 source: S,
226 query: BitmapQuery,
227 range: Range<u64>,
228 bucket_size: u64,
229 direction: ScanDirection,
230 budget: BitmapScanBudget,
231) -> WatermarkedBucketStream
232where
233 S: BitmapBucketSource,
234{
235 let DedupedQuery {
241 keys: unique_keys,
242 mut terms,
243 } = build_term_specs(query.terms);
244 let mut leaves: Vec<Peekable<BucketStream>> = Vec::with_capacity(unique_keys.len());
245 for key in unique_keys {
246 let raw = source.scan_bucket_stream(key, range.clone(), direction);
247 leaves.push(
248 budgeted_bucket_stream(raw, budget.clone())
249 .boxed()
250 .peekable(),
251 );
252 }
253
254 let leaf_count = leaves.len();
255 let terminus = if direction.is_ascending() {
256 range.end
257 } else {
258 range.start
259 };
260 let request_floor = if direction.is_ascending() {
261 range.start
262 } else {
263 range.end
264 };
265
266 async_stream::try_stream! {
267 let mut unreferenced = vec![false; leaf_count];
270 let mut front = vec![request_floor; leaf_count];
273 let mut last_emitted: Option<u64> = None;
274
275 loop {
276 let mut peeks = Vec::new();
279 for (i, leaf) in leaves.iter_mut().enumerate() {
280 if !unreferenced[i] {
281 peeks.push(peek_leaf(
282 Pin::new(leaf),
283 i,
284 bucket_size,
285 &range,
286 direction,
287 terminus,
288 ));
289 }
290 }
291 let results = futures::future::join_all(peeks).await;
292 let mut class: Vec<Option<LeafHead>> = (0..leaf_count).map(|_| None).collect();
293 for (i, head, scanned_to) in results {
294 if let Some(p) = scanned_to {
295 front[i] = p;
296 }
297 class[i] = Some(head);
298 }
299
300 for term in terms.iter_mut() {
304 if !term.unsatisfiable
305 && term
306 .includes
307 .iter()
308 .any(|&i| matches!(class[i], Some(LeafHead::Eof)))
309 {
310 term.unsatisfiable = true;
311 }
312 }
313 recompute_unreferenced(&terms, &class, &mut unreferenced);
318
319 let mut errors: Vec<BitmapScanError> = Vec::new();
322 for i in 0..leaf_count {
323 if !unreferenced[i] && matches!(class[i], Some(LeafHead::Error)) {
324 match Pin::new(&mut leaves[i]).next().await {
325 Some(Err(e)) => errors.push(e),
326 _ => unreachable!("peek classified Error"),
327 }
328 }
329 }
330
331 let active: Vec<usize> = (0..leaf_count).filter(|&i| !unreferenced[i]).collect();
332 if active.is_empty() {
333 if frontier_advanced(last_emitted, terminus, direction) {
336 yield Watermarked::Watermark(terminus);
337 }
338 return;
339 }
340
341 let floor_pos = active
344 .iter()
345 .map(|&i| front[i])
346 .reduce(|a, b| bound_in_direction(a, b, direction))
347 .expect("active non-empty");
348 if frontier_advanced(last_emitted, floor_pos, direction) {
349 yield Watermarked::Watermark(floor_pos);
350 last_emitted = Some(floor_pos);
351 }
352
353 if !errors.is_empty() {
356 Err(BitmapScanError::collapse(errors))?;
357 }
358
359 let floor_bucket = active
361 .iter()
362 .filter_map(|&i| match class[i] {
363 Some(LeafHead::Bucket(b)) => Some(b),
364 _ => None,
365 })
366 .reduce(|a, b| match direction {
367 ScanDirection::Ascending => a.min(b),
368 ScanDirection::Descending => a.max(b),
369 })
370 .expect("active leaves carry buckets when there is no error");
371 let (_pre, post) = bucket_edges(floor_bucket, bucket_size, &range, direction);
372
373 let mut snapshot: Vec<Option<RoaringBitmap>> =
381 (0..leaf_count).map(|_| None).collect();
382 let mut on_floor = vec![false; leaf_count];
383 for i in 0..leaf_count {
384 if !unreferenced[i]
385 && matches!(class[i], Some(LeafHead::Bucket(b)) if b == floor_bucket)
386 {
387 on_floor[i] = true;
388 front[i] = post;
389 snapshot[i] = match Pin::new(&mut leaves[i]).next().await {
390 Some(Ok((_, bitmap))) => Some(bitmap),
391 _ => None,
392 };
393 }
394 }
395 let mut remaining_refs = count_on_floor_refs(&terms, &on_floor);
396
397 let mut result: Option<RoaringBitmap> = None;
398 for term in &terms {
399 if term.unsatisfiable {
400 continue;
401 }
402 let includes: Vec<Option<RoaringBitmap>> = term
403 .includes
404 .iter()
405 .map(|&i| {
406 take_snapshot_bitmap(&mut snapshot, &mut remaining_refs, &on_floor, i)
407 })
408 .collect();
409 let excludes: Vec<Option<RoaringBitmap>> = term
410 .excludes
411 .iter()
412 .map(|&i| {
413 take_snapshot_bitmap(&mut snapshot, &mut remaining_refs, &on_floor, i)
414 })
415 .collect();
416 if let Some(bitmap) = eval_term_at_bucket(includes, excludes) {
417 result = Some(match result {
418 None => bitmap,
419 Some(acc) => acc | bitmap,
420 });
421 }
422 }
423
424 if let Some(bitmap) = result {
425 yield Watermarked::Item((floor_bucket, bitmap));
426 }
427 if frontier_advanced(last_emitted, post, direction) {
428 yield Watermarked::Watermark(post);
429 last_emitted = Some(post);
430 }
431 }
432 }
433 .boxed()
434}
435
436fn budgeted_bucket_stream<S>(
441 inner: S,
442 budget: BitmapScanBudget,
443) -> impl Stream<Item = BucketItem> + Send + 'static
444where
445 S: Stream<Item = BucketItem> + Send + 'static,
446{
447 async_stream::try_stream! {
448 futures::pin_mut!(inner);
449 let mut first = true;
450 while let Some(item) = inner.next().await {
451 let item = item?;
452 if first {
453 budget.take_first();
454 first = false;
455 } else if !budget.try_take() {
456 Err(BitmapScanError::ScanLimit)?;
457 }
458 yield item;
459 }
460 }
461}
462
463pub fn buckets_with_watermarks<S>(
471 stream: S,
472 range: Range<u64>,
473 bucket_size: u64,
474 direction: ScanDirection,
475) -> impl Stream<Item = BitmapScanResult<Watermarked<(u64, RoaringBitmap)>>> + Send + 'static
476where
477 S: Stream<Item = BucketItem> + Send + 'static,
478{
479 async_stream::try_stream! {
480 if range.is_empty() {
481 return;
482 }
483 futures::pin_mut!(stream);
484 let mut last_emitted: Option<u64> = None;
485 while let Some(item) = stream.next().await {
486 let (bucket_id, bitmap) = item?;
487 yield Watermarked::Item((bucket_id, bitmap));
488 let bucket_start = bucket_id.saturating_mul(bucket_size);
492 let watermark = if direction.is_ascending() {
493 bucket_start.saturating_add(bucket_size).min(range.end)
494 } else {
495 bucket_start.max(range.start)
496 };
497 last_emitted = Some(watermark);
498 yield Watermarked::Watermark(watermark);
499 }
500 let range_end = if direction.is_ascending() {
504 range.end
505 } else {
506 range.start
507 };
508 let should_emit = match last_emitted {
509 None => true,
510 Some(prev) => {
511 if direction.is_ascending() {
512 range_end > prev
513 } else {
514 range_end < prev
515 }
516 }
517 };
518 if should_emit {
519 yield Watermarked::Watermark(range_end);
520 }
521 }
522}
523
524pub fn flatten_watermarked_buckets<S>(
528 stream: S,
529 range: Range<u64>,
530 bucket_size: u64,
531 direction: ScanDirection,
532) -> impl Stream<Item = BitmapScanResult<Watermarked<u64>>> + Send + 'static
533where
534 S: Stream<Item = BitmapScanResult<Watermarked<(u64, RoaringBitmap)>>> + Send + 'static,
535{
536 async_stream::try_stream! {
537 if range.is_empty() {
538 return;
539 }
540 let start_bucket = range.start / bucket_size;
541 let end_bucket = (range.end - 1) / bucket_size;
542 futures::pin_mut!(stream);
543 while let Some(item) = stream.next().await {
544 match item? {
545 Watermarked::Watermark(p) => yield Watermarked::Watermark(p),
546 Watermarked::Item((bucket_id, bitmap)) => {
547 let bucket_start = bucket_id * bucket_size;
548 let is_first = bucket_id == start_bucket;
549 let is_last = bucket_id == end_bucket;
550 let lo = if is_first {
551 (range.start - bucket_start) as u32
552 } else {
553 0
554 };
555 let hi = if is_last {
556 ((range.end - bucket_start).min(bucket_size)) as u32
557 } else {
558 bucket_size as u32
559 };
560
561 if direction.is_ascending() {
562 for bit in bitmap.iter() {
563 if bit >= lo && bit < hi {
564 yield Watermarked::Item(bucket_start + bit as u64);
565 }
566 }
567 } else {
568 for bit in bitmap.iter().rev() {
569 if bit >= lo && bit < hi {
570 yield Watermarked::Item(bucket_start + bit as u64);
571 }
572 }
573 }
574 }
575 }
576 }
577 }
578}
579
580#[cfg(test)]
581mod tests {
582 use std::collections::BTreeMap;
583 use std::sync::Arc;
584
585 use futures::TryStreamExt;
586 use futures::stream;
587
588 use super::*;
589 use crate::bitmap_query::BitmapLiteral;
590 use crate::bitmap_query::BitmapTerm;
591 use crate::bitmap_query::BucketStream;
592
593 const BUCKET_SIZE: u64 = 100_000;
594 type TestBuckets = BTreeMap<Vec<u8>, Vec<(u64, Vec<u32>)>>;
595
596 #[derive(Clone)]
597 struct TestBucketSource {
598 buckets: Arc<TestBuckets>,
599 }
600
601 impl BitmapBucketSource for TestBucketSource {
602 fn scan_bucket_stream(
603 &self,
604 dimension_key: Vec<u8>,
605 _range: Range<u64>,
606 direction: ScanDirection,
607 ) -> BucketStream {
608 let mut buckets = self
609 .buckets
610 .get(&dimension_key)
611 .cloned()
612 .unwrap_or_default();
613 if matches!(direction, ScanDirection::Descending) {
614 buckets.reverse();
615 }
616 make_bucket_stream(
617 buckets
618 .iter()
619 .map(|(bucket_id, bits)| (*bucket_id, bits.as_slice()))
620 .collect(),
621 )
622 }
623 }
624
625 fn make_bitmap(bits: &[u32]) -> RoaringBitmap {
626 let mut bm = RoaringBitmap::new();
627 for &b in bits {
628 bm.insert(b);
629 }
630 bm
631 }
632
633 fn make_bucket_stream(items: Vec<(u64, &[u32])>) -> BucketStream {
634 let items: Vec<BucketItem> = items
635 .into_iter()
636 .map(|(bid, bits)| Ok((bid, make_bitmap(bits))))
637 .collect();
638 stream::iter(items).boxed()
639 }
640
641 async fn drain_marked(
644 stream: BoxStream<'static, BitmapScanResult<Watermarked<u64>>>,
645 ) -> BitmapScanResult<(Vec<u64>, Vec<u64>)> {
646 let all: Vec<Watermarked<u64>> = stream.try_collect().await?;
647 let mut items = Vec::new();
648 let mut watermarks = Vec::new();
649 for m in all {
650 match m {
651 Watermarked::Item(v) => items.push(v),
652 Watermarked::Watermark(f) => watermarks.push(f),
653 }
654 }
655 Ok((items, watermarks))
656 }
657
658 fn test_key(value: &[u8]) -> Vec<u8> {
659 crate::dimensions::encode_dimension_key(crate::dimensions::IndexDimension::Sender, value)
660 }
661
662 fn include(value: &[u8]) -> BitmapLiteral {
663 BitmapLiteral::include(test_key(value)).unwrap()
664 }
665
666 fn exclude(value: &[u8]) -> BitmapLiteral {
667 BitmapLiteral::exclude(test_key(value)).unwrap()
668 }
669
670 #[tokio::test]
678 async fn nested_term_starvation_emits_frontier_before_scan_limit() {
679 let source = TestBucketSource {
680 buckets: Arc::new(BTreeMap::from([
681 (
682 test_key(b"a"),
683 vec![
684 (0, vec![1]),
685 (1, vec![1]),
686 (2, vec![1]),
687 (3, vec![1]),
688 (4, vec![1]),
689 ],
690 ),
691 (test_key(b"b"), vec![(50, vec![1])]),
692 (test_key(b"c"), vec![(40, vec![7])]),
693 ])),
694 };
695 let query = BitmapQuery::new(vec![
696 BitmapTerm::new(vec![include(b"a"), include(b"b")]).unwrap(),
697 BitmapTerm::new(vec![include(b"c")]).unwrap(),
698 ])
699 .unwrap();
700
701 let stream = eval_bitmap_query_stream(
703 source,
704 query,
705 0..(60 * BUCKET_SIZE),
706 BUCKET_SIZE,
707 ScanDirection::Ascending,
708 3,
709 |_| {},
710 );
711 let all: Vec<BitmapScanResult<Watermarked<u64>>> = stream.collect().await;
714
715 let last_ok =
716 all.iter().rev().find_map(|r| r.as_ref().ok()).expect(
717 "a frontier watermark must surface before the (otherwise cursorless) error",
718 );
719 match last_ok {
720 Watermarked::Watermark(p) => assert!(
721 *p > 0,
722 "frontier must reflect real progress past the request floor (got {p})"
723 ),
724 Watermarked::Item(_) => panic!("disjoint intersect must not emit items here"),
725 }
726 let err = all
727 .last()
728 .expect("non-empty")
729 .as_ref()
730 .expect_err("scan must terminate with an error");
731 assert!(
732 matches!(err, BitmapScanError::ScanLimit),
733 "expected ScanLimit, got {err:?}"
734 );
735 }
736
737 #[tokio::test]
750 async fn flush_on_error_delivers_below_floor_sibling_result() {
751 let source = TestBucketSource {
752 buckets: Arc::new(BTreeMap::from([
753 (
754 test_key(b"a"),
755 vec![
756 (0, vec![1]),
757 (1, vec![1]),
758 (2, vec![1]),
759 (3, vec![1]),
760 (4, vec![1]),
761 ],
762 ),
763 (test_key(b"b"), vec![(50, vec![1])]),
764 (test_key(b"c"), vec![(0, vec![7])]),
765 ])),
766 };
767 let query = BitmapQuery::new(vec![
768 BitmapTerm::new(vec![include(b"a"), include(b"b")]).unwrap(),
769 BitmapTerm::new(vec![include(b"c")]).unwrap(),
770 ])
771 .unwrap();
772
773 let stream = eval_bitmap_query_stream(
774 source,
775 query,
776 0..(60 * BUCKET_SIZE),
777 BUCKET_SIZE,
778 ScanDirection::Ascending,
779 3,
780 |_| {},
781 );
782 let all: Vec<BitmapScanResult<Watermarked<u64>>> = stream.collect().await;
783
784 let items: Vec<u64> = all
786 .iter()
787 .filter_map(|r| match r {
788 Ok(Watermarked::Item(v)) => Some(*v),
789 _ => None,
790 })
791 .collect();
792 assert_eq!(items, vec![7], "c's below-floor match must be delivered");
793
794 let last_wm = all
797 .iter()
798 .rev()
799 .find_map(|r| match r {
800 Ok(Watermarked::Watermark(p)) => Some(*p),
801 _ => None,
802 })
803 .expect("a frontier watermark must surface");
804 assert_eq!(last_wm, BUCKET_SIZE, "frontier must advance past floor");
805 assert!(
808 items.iter().all(|&i| i < last_wm),
809 "items must be below the resume cursor"
810 );
811
812 let err = all
813 .last()
814 .expect("non-empty")
815 .as_ref()
816 .expect_err("scan must terminate with an error");
817 assert!(
818 matches!(err, BitmapScanError::ScanLimit),
819 "expected ScanLimit, got {err:?}"
820 );
821 }
822
823 #[test]
824 fn bitmap_query_validation_rejects_empty_shapes() {
825 assert!(BitmapQuery::new(Vec::new()).is_err());
826 assert!(BitmapLiteral::include(Vec::new()).is_err());
827 assert!(
828 BitmapLiteral::include(vec![crate::dimensions::IndexDimension::Sender.tag_byte()])
829 .is_err()
830 );
831 assert!(BitmapLiteral::include(vec![0xff, 0x00]).is_err());
832 assert!(BitmapTerm::new(vec![exclude(b"neg")]).is_err());
833 }
834
835 #[tokio::test]
840 async fn shared_include_across_terms_scans_dimension_once() {
841 use crate::bitmap_query::test_utils::CountingBucketSource;
842
843 let source = CountingBucketSource::new(BTreeMap::from([
844 (test_key(b"a"), vec![(0, vec![1, 2, 3])]),
845 (test_key(b"b"), vec![(0, vec![1])]),
846 (test_key(b"c"), vec![(0, vec![2])]),
847 ]));
848 let query = BitmapQuery::new(vec![
849 BitmapTerm::new(vec![include(b"a"), include(b"b")]).unwrap(),
850 BitmapTerm::new(vec![include(b"a"), include(b"c")]).unwrap(),
851 ])
852 .unwrap();
853
854 let stream = eval_bitmap_query_stream(
855 source.clone(),
856 query,
857 0..200_000,
858 BUCKET_SIZE,
859 ScanDirection::Ascending,
860 u64::MAX,
861 |_| {},
862 );
863 let (items, _watermarks) = drain_marked(stream).await.unwrap();
864
865 assert_eq!(items, vec![1, 2]);
868 assert_eq!(source.scan_count(&test_key(b"a")), 1);
870 assert_eq!(source.scan_count(&test_key(b"b")), 1);
871 assert_eq!(source.scan_count(&test_key(b"c")), 1);
872 }
873
874 #[tokio::test]
878 async fn shared_key_across_include_and_exclude_terms_scans_once() {
879 use crate::bitmap_query::test_utils::CountingBucketSource;
880
881 let source = CountingBucketSource::new(BTreeMap::from([
882 (test_key(b"a"), vec![(0, vec![1, 2])]),
883 (test_key(b"b"), vec![(0, vec![1, 2, 3])]),
884 ]));
885 let query = BitmapQuery::new(vec![
889 BitmapTerm::new(vec![include(b"b"), exclude(b"a")]).unwrap(),
890 BitmapTerm::new(vec![include(b"b"), include(b"a")]).unwrap(),
891 ])
892 .unwrap();
893
894 let stream = eval_bitmap_query_stream(
895 source.clone(),
896 query,
897 0..100_000,
898 BUCKET_SIZE,
899 ScanDirection::Ascending,
900 u64::MAX,
901 |_| {},
902 );
903 let (items, _watermarks) = drain_marked(stream).await.unwrap();
904
905 assert_eq!(items, vec![1, 2, 3]);
906 assert_eq!(source.scan_count(&test_key(b"a")), 1);
907 assert_eq!(source.scan_count(&test_key(b"b")), 1);
908 }
909
910 #[tokio::test]
911 async fn eval_bitmap_query_stream_uses_backend_source() {
912 let source = TestBucketSource {
913 buckets: Arc::new(BTreeMap::from([
914 (test_key(b"a"), vec![(0, vec![1, 2, 3]), (1, vec![5])]),
915 (test_key(b"b"), vec![(0, vec![2, 3]), (1, vec![5])]),
916 (test_key(b"c"), vec![(0, vec![3])]),
917 ])),
918 };
919 let query = BitmapQuery::new(vec![
920 BitmapTerm::new(vec![include(b"a"), include(b"b"), exclude(b"c")]).unwrap(),
921 ])
922 .unwrap();
923
924 let stream = eval_bitmap_query_stream(
925 source,
926 query,
927 0..200_000,
928 BUCKET_SIZE,
929 ScanDirection::Ascending,
930 u64::MAX,
931 |_| {},
932 );
933 let (items, _watermarks) = drain_marked(stream).await.unwrap();
934
935 assert_eq!(items, vec![2, BUCKET_SIZE + 5]);
936 }
937
938 #[tokio::test]
939 async fn eval_bitmap_query_stream_descending() {
940 let source = TestBucketSource {
941 buckets: Arc::new(BTreeMap::from([
942 (test_key(b"a"), vec![(0, vec![1, 2, 3]), (1, vec![5])]),
943 (test_key(b"b"), vec![(0, vec![2, 3]), (1, vec![5])]),
944 (test_key(b"c"), vec![(0, vec![3])]),
945 ])),
946 };
947 let query = BitmapQuery::new(vec![
948 BitmapTerm::new(vec![include(b"a"), include(b"b"), exclude(b"c")]).unwrap(),
949 ])
950 .unwrap();
951
952 let stream = eval_bitmap_query_stream(
953 source,
954 query,
955 0..200_000,
956 BUCKET_SIZE,
957 ScanDirection::Descending,
958 u64::MAX,
959 |_| {},
960 );
961 let (items, _watermarks) = drain_marked(stream).await.unwrap();
962
963 assert_eq!(items, vec![BUCKET_SIZE + 5, 2]);
964 }
965
966 #[tokio::test]
970 async fn buckets_with_watermarks_then_flatten_watermarked_buckets_ascending() {
971 let range = 50u64..(2 * BUCKET_SIZE + 50_001);
972 let items = stream::iter(vec![
973 Ok((0u64, {
975 let mut bm = RoaringBitmap::new();
976 bm.insert(10);
977 bm.insert(50);
978 bm.insert((BUCKET_SIZE - 1) as u32);
979 bm
980 })),
981 Ok((1u64, {
983 let mut bm = RoaringBitmap::new();
984 bm.insert(0);
985 bm.insert((BUCKET_SIZE - 1) as u32);
986 bm
987 })),
988 Ok((2u64, {
990 let mut bm = RoaringBitmap::new();
991 bm.insert(0);
992 bm.insert(50_000);
993 bm.insert(50_001);
994 bm
995 })),
996 ]);
997 let marked_buckets =
998 buckets_with_watermarks(items, range.clone(), BUCKET_SIZE, ScanDirection::Ascending);
999 let out: Vec<Watermarked<u64>> = flatten_watermarked_buckets(
1000 marked_buckets,
1001 range,
1002 BUCKET_SIZE,
1003 ScanDirection::Ascending,
1004 )
1005 .try_collect()
1006 .await
1007 .unwrap();
1008 assert_eq!(
1012 out,
1013 vec![
1014 Watermarked::Item(50),
1015 Watermarked::Item(BUCKET_SIZE - 1),
1016 Watermarked::Watermark(BUCKET_SIZE),
1017 Watermarked::Item(BUCKET_SIZE),
1018 Watermarked::Item(2 * BUCKET_SIZE - 1),
1019 Watermarked::Watermark(2 * BUCKET_SIZE),
1020 Watermarked::Item(2 * BUCKET_SIZE),
1021 Watermarked::Item(2 * BUCKET_SIZE + 50_000),
1022 Watermarked::Watermark(2 * BUCKET_SIZE + 50_001),
1025 ],
1026 );
1027 }
1028
1029 #[tokio::test]
1033 async fn buckets_with_watermarks_one_per_bucket_no_flatten() {
1034 let items = stream::iter(vec![
1035 Ok((0u64, {
1036 let mut bm = RoaringBitmap::new();
1037 bm.insert(1);
1038 bm.insert(2);
1039 bm
1040 })),
1041 Ok((3u64, {
1042 let mut bm = RoaringBitmap::new();
1043 bm.insert(5);
1044 bm
1045 })),
1046 ]);
1047 let out: Vec<Watermarked<(u64, Vec<u32>)>> = buckets_with_watermarks(
1052 items,
1053 0..(5 * BUCKET_SIZE),
1054 BUCKET_SIZE,
1055 ScanDirection::Ascending,
1056 )
1057 .map_ok(|m| m.map_item(|(bid, bm)| (bid, bm.iter().collect::<Vec<_>>())))
1058 .try_collect()
1059 .await
1060 .unwrap();
1061 assert_eq!(
1062 out,
1063 vec![
1064 Watermarked::Item((0, vec![1, 2])),
1065 Watermarked::Watermark(BUCKET_SIZE),
1066 Watermarked::Item((3, vec![5])),
1067 Watermarked::Watermark(4 * BUCKET_SIZE),
1068 Watermarked::Watermark(5 * BUCKET_SIZE),
1069 ],
1070 );
1071 }
1072
1073 #[tokio::test]
1074 async fn scan_budget_below_unique_leaf_count_yields_misconfig_error() {
1075 let source = TestBucketSource {
1082 buckets: Arc::new(BTreeMap::from([(
1083 test_key(b"a"),
1084 vec![(0, vec![1, 2]), (1, vec![3])],
1085 )])),
1086 };
1087 let query = BitmapQuery::new(vec![BitmapTerm::new(vec![include(b"a")]).unwrap()]).unwrap();
1088
1089 let metrics = std::sync::Arc::new(std::sync::Mutex::new(None));
1090 let metrics_sink = metrics.clone();
1091 let stream = eval_bitmap_query_stream(
1092 source,
1093 query,
1094 0..200_000,
1095 BUCKET_SIZE,
1096 ScanDirection::Ascending,
1097 0,
1098 move |m| *metrics_sink.lock().unwrap() = Some(m),
1099 );
1100 let err = drain_marked(stream).await.unwrap_err();
1101
1102 assert!(
1103 matches!(err, BitmapScanError::Source(_)),
1104 "must be a Source fault (cursorless), never a clean ScanLimit end"
1105 );
1106 assert!(
1107 err.to_string().contains("insufficient for"),
1108 "expected misconfig error, got {err:?}"
1109 );
1110 assert!(
1113 metrics.lock().unwrap().is_none(),
1114 "misconfig early-out should not emit scan metrics"
1115 );
1116 }
1117
1118 #[tokio::test]
1119 async fn scan_budget_shared_across_dimensions() {
1120 let source = TestBucketSource {
1124 buckets: Arc::new(BTreeMap::from([
1125 (
1126 test_key(b"a"),
1127 vec![(0, vec![1]), (1, vec![2]), (2, vec![3])],
1128 ),
1129 (
1130 test_key(b"b"),
1131 vec![(0, vec![1]), (1, vec![2]), (2, vec![3])],
1132 ),
1133 (
1134 test_key(b"c"),
1135 vec![(0, vec![1]), (1, vec![2]), (2, vec![3])],
1136 ),
1137 ])),
1138 };
1139 let query = BitmapQuery::new(vec![
1140 BitmapTerm::new(vec![include(b"a"), include(b"b"), include(b"c")]).unwrap(),
1141 ])
1142 .unwrap();
1143
1144 let metrics = std::sync::Arc::new(std::sync::Mutex::new(None));
1145 let metrics_sink = metrics.clone();
1146 let stream = eval_bitmap_query_stream(
1147 source,
1148 query,
1149 0..300_000,
1150 BUCKET_SIZE,
1151 ScanDirection::Ascending,
1152 4,
1153 move |m| *metrics_sink.lock().unwrap() = Some(m),
1154 );
1155 let err = drain_marked(stream).await.unwrap_err();
1156
1157 assert!(
1158 matches!(err, BitmapScanError::ScanLimit),
1159 "expected ScanLimit, got {err:?}"
1160 );
1161 assert_eq!(metrics.lock().unwrap().unwrap().buckets_evaluated, 4);
1164 }
1165
1166 #[tokio::test]
1171 async fn scan_budget_exclude_side_exhaustion_does_not_leak_includes() {
1172 let source = TestBucketSource {
1173 buckets: Arc::new(BTreeMap::from([
1174 (
1175 test_key(b"inc"),
1176 vec![(0, vec![1]), (1, vec![2]), (2, vec![3])],
1177 ),
1178 (
1179 test_key(b"exc"),
1180 vec![(0, vec![1]), (1, vec![2]), (2, vec![3])],
1181 ),
1182 ])),
1183 };
1184 let query = BitmapQuery::new(vec![
1185 BitmapTerm::new(vec![include(b"inc"), exclude(b"exc")]).unwrap(),
1186 ])
1187 .unwrap();
1188
1189 let stream = eval_bitmap_query_stream(
1196 source,
1197 query,
1198 0..300_000,
1199 BUCKET_SIZE,
1200 ScanDirection::Ascending,
1201 2,
1202 |_| {},
1203 );
1204 let result = drain_marked(stream).await;
1205
1206 let err = result.expect_err("must surface scan-limit, not silently emit includes");
1208 assert!(
1209 matches!(err, BitmapScanError::ScanLimit),
1210 "expected ScanLimit, got {err:?}"
1211 );
1212 }
1213
1214 #[tokio::test]
1219 async fn sparse_intersect_emits_frontier_watermark_before_scan_limit() {
1220 let source = TestBucketSource {
1224 buckets: Arc::new(BTreeMap::from([
1225 (
1226 test_key(b"a"),
1227 vec![
1228 (0, vec![1]),
1229 (1, vec![1]),
1230 (2, vec![1]),
1231 (3, vec![1]),
1232 (4, vec![1]),
1233 ],
1234 ),
1235 (test_key(b"b"), vec![(100, vec![1])]),
1236 ])),
1237 };
1238 let query = BitmapQuery::new(vec![
1239 BitmapTerm::new(vec![include(b"a"), include(b"b")]).unwrap(),
1240 ])
1241 .unwrap();
1242
1243 let stream = eval_bitmap_query_stream(
1244 source,
1245 query,
1246 0..(110 * BUCKET_SIZE),
1247 BUCKET_SIZE,
1248 ScanDirection::Ascending,
1249 4,
1250 |_| {},
1251 );
1252 let all: Vec<BitmapScanResult<Watermarked<u64>>> = stream.collect().await;
1255
1256 let last_ok = all
1257 .iter()
1258 .rev()
1259 .find_map(|r| r.as_ref().ok())
1260 .expect("expected a watermark item before the error");
1261 match last_ok {
1262 Watermarked::Watermark(p) => {
1263 assert!(
1264 *p > 0,
1265 "frontier watermark must reflect real progress (got {p})"
1266 );
1267 }
1268 Watermarked::Item(_) => panic!("disjoint intersect must not emit items"),
1269 }
1270 let err = all
1271 .last()
1272 .expect("non-empty")
1273 .as_ref()
1274 .expect_err("scan must terminate with an error");
1275 assert!(
1276 matches!(err, BitmapScanError::ScanLimit),
1277 "expected ScanLimit, got {err:?}"
1278 );
1279 }
1280
1281 #[tokio::test]
1282 async fn eval_emits_watermarks_at_bucket_boundaries_ascending() {
1283 let source = TestBucketSource {
1284 buckets: Arc::new(BTreeMap::from([(
1285 test_key(b"a"),
1286 vec![(0, vec![1]), (3, vec![2]), (7, vec![3])],
1287 )])),
1288 };
1289 let query = BitmapQuery::new(vec![BitmapTerm::new(vec![include(b"a")]).unwrap()]).unwrap();
1290
1291 let stream = eval_bitmap_query_stream(
1292 source,
1293 query,
1294 0..(8 * BUCKET_SIZE),
1295 BUCKET_SIZE,
1296 ScanDirection::Ascending,
1297 u64::MAX,
1298 |_| {},
1299 );
1300 let (items, watermarks) = drain_marked(stream).await.unwrap();
1301
1302 assert_eq!(items, vec![1, 3 * BUCKET_SIZE + 2, 7 * BUCKET_SIZE + 3]);
1304 assert_eq!(
1309 watermarks,
1310 vec![
1311 0,
1312 BUCKET_SIZE,
1313 3 * BUCKET_SIZE,
1314 4 * BUCKET_SIZE,
1315 7 * BUCKET_SIZE,
1316 8 * BUCKET_SIZE,
1317 ]
1318 );
1319 }
1320
1321 #[tokio::test]
1322 async fn eval_emits_watermarks_at_bucket_boundaries_descending() {
1323 let source = TestBucketSource {
1324 buckets: Arc::new(BTreeMap::from([(
1325 test_key(b"a"),
1326 vec![(0, vec![1]), (3, vec![2]), (7, vec![3])],
1327 )])),
1328 };
1329 let query = BitmapQuery::new(vec![BitmapTerm::new(vec![include(b"a")]).unwrap()]).unwrap();
1330
1331 let stream = eval_bitmap_query_stream(
1332 source,
1333 query,
1334 0..(8 * BUCKET_SIZE),
1335 BUCKET_SIZE,
1336 ScanDirection::Descending,
1337 u64::MAX,
1338 |_| {},
1339 );
1340 let (items, watermarks) = drain_marked(stream).await.unwrap();
1341
1342 assert_eq!(items, vec![7 * BUCKET_SIZE + 3, 3 * BUCKET_SIZE + 2, 1]);
1343 assert_eq!(
1347 watermarks,
1348 vec![
1349 8 * BUCKET_SIZE,
1350 7 * BUCKET_SIZE,
1351 4 * BUCKET_SIZE,
1352 3 * BUCKET_SIZE,
1353 BUCKET_SIZE,
1354 0,
1355 ]
1356 );
1357 }
1358
1359 #[tokio::test]
1360 async fn eval_emits_per_source_watermarks_and_final_eof_when_no_bucket_yielded() {
1361 let source = TestBucketSource {
1367 buckets: Arc::new(BTreeMap::from([
1368 (
1369 test_key(b"a"),
1370 vec![(0, vec![1]), (2, vec![3]), (4, vec![5])],
1371 ),
1372 (
1373 test_key(b"b"),
1374 vec![(1, vec![1]), (3, vec![3]), (5, vec![5])],
1375 ),
1376 ])),
1377 };
1378 let query = BitmapQuery::new(vec![
1379 BitmapTerm::new(vec![include(b"a"), include(b"b")]).unwrap(),
1380 ])
1381 .unwrap();
1382
1383 let stream = eval_bitmap_query_stream(
1384 source,
1385 query,
1386 0..(6 * BUCKET_SIZE),
1387 BUCKET_SIZE,
1388 ScanDirection::Ascending,
1389 u64::MAX,
1390 |_| {},
1391 );
1392 let (items, watermarks) = drain_marked(stream).await.unwrap();
1393
1394 assert!(items.is_empty(), "disjoint intersect must not emit items");
1395 assert!(
1398 !watermarks.is_empty(),
1399 "expected per-source watermarks to propagate, got none"
1400 );
1401 let mut prev = 0u64;
1402 for w in &watermarks {
1403 assert!(
1404 *w >= prev,
1405 "ascending watermarks must be monotonic, got {watermarks:?}"
1406 );
1407 assert!(
1408 *w <= 6 * BUCKET_SIZE,
1409 "watermark exceeds range.end ({watermarks:?})"
1410 );
1411 prev = *w;
1412 }
1413 assert_eq!(
1414 *watermarks.last().unwrap(),
1415 6 * BUCKET_SIZE,
1416 "final watermark must be range.end on natural EOF"
1417 );
1418 }
1419
1420 #[tokio::test]
1421 async fn eval_watermark_ordering_invariant_item_then_watermark() {
1422 let source = TestBucketSource {
1427 buckets: Arc::new(BTreeMap::from([(
1428 test_key(b"a"),
1429 vec![(0, vec![10, 20, 30]), (1, vec![40, 50])],
1430 )])),
1431 };
1432 let query = BitmapQuery::new(vec![BitmapTerm::new(vec![include(b"a")]).unwrap()]).unwrap();
1433
1434 let stream = eval_bitmap_query_stream(
1435 source,
1436 query,
1437 0..(2 * BUCKET_SIZE),
1438 BUCKET_SIZE,
1439 ScanDirection::Ascending,
1440 u64::MAX,
1441 |_| {},
1442 );
1443 let all: Vec<Watermarked<u64>> = stream.try_collect().await.unwrap();
1444
1445 assert_eq!(
1450 all,
1451 vec![
1452 Watermarked::Watermark(0),
1453 Watermarked::Item(10),
1454 Watermarked::Item(20),
1455 Watermarked::Item(30),
1456 Watermarked::Watermark(BUCKET_SIZE),
1457 Watermarked::Item(BUCKET_SIZE + 40),
1458 Watermarked::Item(BUCKET_SIZE + 50),
1459 Watermarked::Watermark(2 * BUCKET_SIZE),
1460 ],
1461 );
1462 }
1463
1464 #[tokio::test]
1465 async fn buckets_with_watermarks_then_flatten_watermarked_buckets_descending() {
1466 let range = 50u64..(2 * BUCKET_SIZE + 50_001);
1467 let items = stream::iter(vec![
1468 Ok((2u64, {
1469 let mut bm = RoaringBitmap::new();
1470 bm.insert(0);
1471 bm.insert(50_000);
1472 bm.insert(50_001);
1473 bm
1474 })),
1475 Ok((1u64, {
1476 let mut bm = RoaringBitmap::new();
1477 bm.insert(0);
1478 bm.insert((BUCKET_SIZE - 1) as u32);
1479 bm
1480 })),
1481 Ok((0u64, {
1482 let mut bm = RoaringBitmap::new();
1483 bm.insert(10);
1484 bm.insert(50);
1485 bm.insert((BUCKET_SIZE - 1) as u32);
1486 bm
1487 })),
1488 ]);
1489 let marked_buckets =
1490 buckets_with_watermarks(items, range.clone(), BUCKET_SIZE, ScanDirection::Descending);
1491 let out: Vec<Watermarked<u64>> = flatten_watermarked_buckets(
1492 marked_buckets,
1493 range,
1494 BUCKET_SIZE,
1495 ScanDirection::Descending,
1496 )
1497 .try_collect()
1498 .await
1499 .unwrap();
1500 assert_eq!(
1503 out,
1504 vec![
1505 Watermarked::Item(2 * BUCKET_SIZE + 50_000),
1506 Watermarked::Item(2 * BUCKET_SIZE),
1507 Watermarked::Watermark(2 * BUCKET_SIZE),
1508 Watermarked::Item(2 * BUCKET_SIZE - 1),
1509 Watermarked::Item(BUCKET_SIZE),
1510 Watermarked::Watermark(BUCKET_SIZE),
1511 Watermarked::Item(BUCKET_SIZE - 1),
1512 Watermarked::Item(50),
1513 Watermarked::Watermark(50),
1517 ],
1518 );
1519 }
1520
1521 #[test]
1523 fn collapse_single_scan_limit() {
1524 assert!(matches!(
1525 BitmapScanError::collapse(vec![BitmapScanError::ScanLimit]),
1526 BitmapScanError::ScanLimit
1527 ));
1528 }
1529
1530 #[test]
1532 fn collapse_all_scan_limit() {
1533 assert!(matches!(
1534 BitmapScanError::collapse(
1535 vec![BitmapScanError::ScanLimit, BitmapScanError::ScanLimit,]
1536 ),
1537 BitmapScanError::ScanLimit
1538 ));
1539 }
1540
1541 #[test]
1544 fn collapse_fault_outranks_scan_limit() {
1545 let collapsed = BitmapScanError::collapse(vec![
1546 BitmapScanError::ScanLimit,
1547 BitmapScanError::Source(anyhow::anyhow!("storage boom")),
1548 ]);
1549 match collapsed {
1550 BitmapScanError::Source(e) => assert!(e.to_string().contains("storage boom")),
1551 other => panic!("expected Source fault to win, got {other:?}"),
1552 }
1553 }
1554
1555 #[test]
1558 fn collapse_source_outranks_scan_limit_and_cancelled() {
1559 let collapsed = BitmapScanError::collapse(vec![
1560 BitmapScanError::ScanLimit,
1561 BitmapScanError::Cancelled,
1562 BitmapScanError::Source(anyhow::anyhow!("storage boom")),
1563 ]);
1564 match collapsed {
1565 BitmapScanError::Source(e) => assert!(e.to_string().contains("storage boom")),
1566 other => panic!("expected Source to win, got {other:?}"),
1567 }
1568 }
1569
1570 #[test]
1573 fn collapse_scan_limit_outranks_cancelled() {
1574 assert!(matches!(
1575 BitmapScanError::collapse(
1576 vec![BitmapScanError::Cancelled, BitmapScanError::ScanLimit,]
1577 ),
1578 BitmapScanError::ScanLimit
1579 ));
1580 }
1581
1582 #[test]
1585 fn collapse_combines_concurrent_faults() {
1586 let collapsed = BitmapScanError::collapse(vec![
1587 BitmapScanError::Source(anyhow::anyhow!("boom one")),
1588 BitmapScanError::Source(anyhow::anyhow!("boom two")),
1589 ]);
1590 match collapsed {
1591 BitmapScanError::Source(e) => {
1592 let s = e.to_string();
1593 assert!(s.contains("boom one"), "missing first fault: {s}");
1594 assert!(s.contains("boom two"), "missing second fault: {s}");
1595 }
1596 other => panic!("expected combined Source, got {other:?}"),
1597 }
1598 }
1599
1600 #[test]
1604 fn from_anyhow_funnels_to_source() {
1605 let fault: anyhow::Error = anyhow::anyhow!("storage boom");
1606 match BitmapScanError::from(fault) {
1607 BitmapScanError::Source(e) => assert!(e.to_string().contains("storage boom")),
1608 other => panic!("expected Source, got {other:?}"),
1609 }
1610 }
1611}