1use std::collections::VecDeque;
21use std::iter::Peekable as IterPeekable;
22use std::ops::Range;
23
24use roaring::RoaringBitmap;
25
26use super::BitmapBucketIteratorSource;
27use super::BitmapQuery;
28use super::BitmapScanError;
29use super::DedupedQuery;
30use super::LeafHead;
31use super::ScanDirection;
32use super::Watermarked;
33use super::WatermarkedBucket;
34use super::bound_in_direction;
35use super::bucket_edges;
36use super::build_term_specs;
37use super::count_on_floor_refs;
38use super::eval_term_at_bucket;
39use super::frontier_advanced;
40use super::recompute_unreferenced;
41use super::take_snapshot_bitmap;
42
43pub fn eval_bitmap_query_bucket_iter<'a, S>(
47 source: S,
48 query: BitmapQuery,
49 range: Range<u64>,
50 bucket_size: u64,
51 direction: ScanDirection,
52) -> impl Iterator<Item = WatermarkedBucket> + 'a
53where
54 S: BitmapBucketIteratorSource<'a>,
55{
56 let DedupedQuery {
62 keys: unique_keys,
63 mut terms,
64 } = build_term_specs(query.terms);
65 let mut leaves: Vec<IterPeekable<S::Iter>> = Vec::with_capacity(unique_keys.len());
66 for key in unique_keys {
67 leaves.push(
68 source
69 .scan_bucket_iter(key, range.clone(), direction)
70 .peekable(),
71 );
72 }
73
74 let leaf_count = leaves.len();
75 let terminus = if direction.is_ascending() {
76 range.end
77 } else {
78 range.start
79 };
80 let request_floor = if direction.is_ascending() {
81 range.start
82 } else {
83 range.end
84 };
85 let mut unreferenced = vec![false; leaf_count];
88 let mut front = vec![request_floor; leaf_count];
91 let mut last_emitted: Option<u64> = None;
92 let mut done = false;
93 let mut pending: VecDeque<WatermarkedBucket> = VecDeque::new();
94
95 std::iter::from_fn(move || {
96 loop {
97 if let Some(out) = pending.pop_front() {
98 return Some(out);
99 }
100 if done {
101 return None;
102 }
103
104 let mut class: Vec<Option<LeafHead>> = (0..leaf_count).map(|_| None).collect();
107 for i in 0..leaf_count {
108 if unreferenced[i] {
109 continue;
110 }
111 match leaves[i].peek() {
112 Some(Ok((bucket, _))) => {
113 let (pre, _post) = bucket_edges(*bucket, bucket_size, &range, direction);
114 front[i] = pre;
115 class[i] = Some(LeafHead::Bucket(*bucket));
116 }
117 None => {
118 front[i] = terminus;
119 class[i] = Some(LeafHead::Eof);
120 }
121 Some(Err(_)) => class[i] = Some(LeafHead::Error),
124 }
125 }
126
127 for term in terms.iter_mut() {
131 if !term.unsatisfiable
132 && term
133 .includes
134 .iter()
135 .any(|&i| matches!(class[i], Some(LeafHead::Eof)))
136 {
137 term.unsatisfiable = true;
138 }
139 }
140 recompute_unreferenced(&terms, &class, &mut unreferenced);
144
145 let mut errors: Vec<BitmapScanError> = Vec::new();
148 for i in 0..leaf_count {
149 if !unreferenced[i] && matches!(class[i], Some(LeafHead::Error)) {
150 match leaves[i].next() {
151 Some(Err(e)) => errors.push(e),
152 _ => unreachable!("peek classified Error"),
153 }
154 }
155 }
156
157 let active: Vec<usize> = (0..leaf_count).filter(|&i| !unreferenced[i]).collect();
158 if active.is_empty() {
159 done = true;
162 if frontier_advanced(last_emitted, terminus, direction) {
163 return Some(Ok(Watermarked::Watermark(terminus)));
164 }
165 return None;
166 }
167
168 let floor_pos = active
171 .iter()
172 .map(|&i| front[i])
173 .reduce(|a, b| bound_in_direction(a, b, direction))
174 .expect("active non-empty");
175 if frontier_advanced(last_emitted, floor_pos, direction) {
176 pending.push_back(Ok(Watermarked::Watermark(floor_pos)));
177 last_emitted = Some(floor_pos);
178 }
179
180 if !errors.is_empty() {
183 done = true;
184 pending.push_back(Err(BitmapScanError::collapse(errors)));
185 continue;
186 }
187
188 let floor_bucket = active
190 .iter()
191 .filter_map(|&i| match class[i] {
192 Some(LeafHead::Bucket(b)) => Some(b),
193 _ => None,
194 })
195 .reduce(|a, b| match direction {
196 ScanDirection::Ascending => a.min(b),
197 ScanDirection::Descending => a.max(b),
198 })
199 .expect("active leaves carry buckets when there is no error");
200 let (_pre, post) = bucket_edges(floor_bucket, bucket_size, &range, direction);
201
202 let mut snapshot: Vec<Option<RoaringBitmap>> = (0..leaf_count).map(|_| None).collect();
208 let mut on_floor = vec![false; leaf_count];
209 for i in 0..leaf_count {
210 if !unreferenced[i]
211 && matches!(class[i], Some(LeafHead::Bucket(b)) if b == floor_bucket)
212 {
213 on_floor[i] = true;
214 front[i] = post;
215 snapshot[i] = match leaves[i].next() {
216 Some(Ok((_, bitmap))) => Some(bitmap),
217 _ => None,
218 };
219 }
220 }
221 let mut remaining_refs = count_on_floor_refs(&terms, &on_floor);
222
223 let mut result: Option<RoaringBitmap> = None;
224 for term in &terms {
225 if term.unsatisfiable {
226 continue;
227 }
228 let includes: Vec<Option<RoaringBitmap>> = term
229 .includes
230 .iter()
231 .map(|&i| {
232 take_snapshot_bitmap(&mut snapshot, &mut remaining_refs, &on_floor, i)
233 })
234 .collect();
235 let excludes: Vec<Option<RoaringBitmap>> = term
236 .excludes
237 .iter()
238 .map(|&i| {
239 take_snapshot_bitmap(&mut snapshot, &mut remaining_refs, &on_floor, i)
240 })
241 .collect();
242 if let Some(bitmap) = eval_term_at_bucket(includes, excludes) {
243 result = Some(match result {
244 None => bitmap,
245 Some(acc) => acc | bitmap,
246 });
247 }
248 }
249
250 if let Some(bitmap) = result {
251 pending.push_back(Ok(Watermarked::Item((floor_bucket, bitmap))));
252 }
253 if frontier_advanced(last_emitted, post, direction) {
254 pending.push_back(Ok(Watermarked::Watermark(post)));
255 last_emitted = Some(post);
256 }
257 }
258 })
259}
260
261#[cfg(test)]
262mod tests {
263 use std::collections::BTreeMap;
264 use std::sync::Arc;
265
266 use futures::StreamExt;
267
268 use super::*;
269 use crate::bitmap_query::BitmapScanBudget;
270 use crate::bitmap_query::BitmapTerm;
271 use crate::bitmap_query::eval_bitmap_query_bucket_stream;
272 use crate::bitmap_query::test_utils::*;
273
274 fn collect_marked(items: Vec<WatermarkedBucket>) -> Vec<Watermarked<(u64, Vec<u32>)>> {
277 items
278 .into_iter()
279 .map(|r| r.unwrap().map_item(|(b, bm)| (b, bm.iter().collect())))
280 .collect()
281 }
282
283 fn items_only(marked: &[Watermarked<(u64, Vec<u32>)>]) -> Vec<(u64, Vec<u32>)> {
284 marked
285 .iter()
286 .filter_map(|w| match w {
287 Watermarked::Item(it) => Some(it.clone()),
288 Watermarked::Watermark(_) => None,
289 })
290 .collect()
291 }
292
293 #[test]
294 fn eval_bitmap_query_bucket_iter_uses_iterator_source() {
295 let source = TestBucketSource {
296 buckets: Arc::new(BTreeMap::from([
297 (test_key(b"a"), vec![(0, vec![1, 2, 3]), (1, vec![5])]),
298 (test_key(b"b"), vec![(0, vec![2, 3]), (1, vec![5])]),
299 (test_key(b"c"), vec![(0, vec![3])]),
300 ])),
301 };
302 let query = BitmapQuery::new(vec![
303 BitmapTerm::new(vec![include(b"a"), include(b"b"), exclude(b"c")]).unwrap(),
304 ])
305 .unwrap();
306
307 let out = eval_bitmap_query_bucket_iter(
308 source,
309 query,
310 0..200_000,
311 BUCKET_SIZE,
312 ScanDirection::Ascending,
313 )
314 .collect::<Vec<_>>();
315 let out = items_only(&collect_marked(out));
316
317 assert_eq!(out, vec![(0, vec![2]), (1, vec![5])]);
318 }
319
320 #[test]
326 fn shared_include_across_terms_scans_dimension_once() {
327 use crate::bitmap_query::test_utils::CountingBucketSource;
328
329 let source = CountingBucketSource::new(BTreeMap::from([
330 (test_key(b"a"), vec![(0, vec![1, 2, 3])]),
331 (test_key(b"b"), vec![(0, vec![1])]),
332 (test_key(b"c"), vec![(0, vec![2])]),
333 ]));
334 let query = BitmapQuery::new(vec![
335 BitmapTerm::new(vec![include(b"a"), include(b"b")]).unwrap(),
336 BitmapTerm::new(vec![include(b"a"), include(b"c")]).unwrap(),
337 ])
338 .unwrap();
339
340 let out = items_only(&collect_marked(
341 eval_bitmap_query_bucket_iter(
342 source.clone(),
343 query,
344 0..200_000,
345 BUCKET_SIZE,
346 ScanDirection::Ascending,
347 )
348 .collect(),
349 ));
350
351 assert_eq!(out, vec![(0, vec![1, 2])]);
353 assert_eq!(source.scan_count(&test_key(b"a")), 1);
354 assert_eq!(source.scan_count(&test_key(b"b")), 1);
355 assert_eq!(source.scan_count(&test_key(b"c")), 1);
356 }
357
358 #[tokio::test]
362 async fn eval_bitmap_query_bucket_iter_matches_stream_for_or_terms() {
363 let source = TestBucketSource {
364 buckets: Arc::new(BTreeMap::from([
365 (
366 test_key(b"a"),
367 vec![(0, vec![1, 2, 3]), (1, vec![5, 6]), (2, vec![9])],
368 ),
369 (
370 test_key(b"b"),
371 vec![(0, vec![2, 3]), (1, vec![6]), (2, vec![9, 10])],
372 ),
373 (test_key(b"c"), vec![(0, vec![3]), (2, vec![9])]),
374 (test_key(b"d"), vec![(1, vec![1, 8]), (2, vec![7])]),
375 (test_key(b"e"), vec![(1, vec![8])]),
376 ])),
377 };
378 let query = BitmapQuery::new(vec![
379 BitmapTerm::new(vec![include(b"a"), include(b"b"), exclude(b"c")]).unwrap(),
380 BitmapTerm::new(vec![include(b"d"), exclude(b"e")]).unwrap(),
381 ])
382 .unwrap();
383
384 for direction in [ScanDirection::Ascending, ScanDirection::Descending] {
385 let stream_out: Vec<_> = eval_bitmap_query_bucket_stream(
386 source.clone(),
387 query.clone(),
388 0..300_000,
389 BUCKET_SIZE,
390 direction,
391 BitmapScanBudget::new(1_000_000),
392 )
393 .collect()
394 .await;
395 let iter_out: Vec<_> = eval_bitmap_query_bucket_iter(
396 source.clone(),
397 query.clone(),
398 0..300_000,
399 BUCKET_SIZE,
400 direction,
401 )
402 .collect();
403
404 assert_eq!(
405 collect_marked(stream_out),
406 collect_marked(iter_out),
407 "iter and stream marked sequences diverged for {direction:?}"
408 );
409 }
410 }
411
412 #[tokio::test]
416 async fn eval_bitmap_query_bucket_iter_matches_stream_over_sparse_gaps() {
417 let source = TestBucketSource {
418 buckets: Arc::new(BTreeMap::from([
419 (
420 test_key(b"a"),
421 vec![(0, vec![1, 2, 3]), (5, vec![5, 6]), (9, vec![9])],
422 ),
423 (
424 test_key(b"b"),
425 vec![(0, vec![2, 3]), (5, vec![6]), (9, vec![9, 10])],
426 ),
427 (test_key(b"c"), vec![(0, vec![3]), (9, vec![9])]),
428 (test_key(b"d"), vec![(3, vec![1, 8]), (7, vec![7])]),
429 (test_key(b"e"), vec![(3, vec![8])]),
430 ])),
431 };
432 let query = BitmapQuery::new(vec![
433 BitmapTerm::new(vec![include(b"a"), include(b"b"), exclude(b"c")]).unwrap(),
434 BitmapTerm::new(vec![include(b"d"), exclude(b"e")]).unwrap(),
435 ])
436 .unwrap();
437
438 for direction in [ScanDirection::Ascending, ScanDirection::Descending] {
439 let stream_out: Vec<_> = eval_bitmap_query_bucket_stream(
440 source.clone(),
441 query.clone(),
442 0..(10 * BUCKET_SIZE),
443 BUCKET_SIZE,
444 direction,
445 BitmapScanBudget::new(1_000_000),
446 )
447 .collect()
448 .await;
449 let iter_out: Vec<_> = eval_bitmap_query_bucket_iter(
450 source.clone(),
451 query.clone(),
452 0..(10 * BUCKET_SIZE),
453 BUCKET_SIZE,
454 direction,
455 )
456 .collect();
457
458 assert_eq!(
459 collect_marked(stream_out),
460 collect_marked(iter_out),
461 "iter and stream diverged over sparse gaps for {direction:?}"
462 );
463 }
464 }
465
466 #[test]
469 fn intersect_emits_coalesced_watermarks_over_sparse_gap() {
470 let source = TestBucketSource {
471 buckets: Arc::new(BTreeMap::from([
472 (test_key(b"a"), vec![(0, vec![1]), (2, vec![5])]),
473 (test_key(b"b"), vec![(0, vec![1]), (2, vec![9])]),
474 ])),
475 };
476 let query = BitmapQuery::new(vec![
477 BitmapTerm::new(vec![include(b"a"), include(b"b")]).unwrap(),
478 ])
479 .unwrap();
480
481 let marked = collect_marked(
482 eval_bitmap_query_bucket_iter(
483 source,
484 query,
485 0..300_000,
486 BUCKET_SIZE,
487 ScanDirection::Ascending,
488 )
489 .collect(),
490 );
491
492 assert_eq!(items_only(&marked), vec![(0, vec![1])]);
494
495 let watermarks: Vec<u64> = marked
497 .iter()
498 .filter_map(|w| match w {
499 Watermarked::Watermark(p) => Some(*p),
500 Watermarked::Item(_) => None,
501 })
502 .collect();
503 assert!(
504 watermarks.windows(2).all(|w| w[0] <= w[1]),
505 "ascending watermarks must be non-decreasing: {watermarks:?}"
506 );
507 assert_eq!(
508 watermarks.last().copied(),
509 Some(300_000),
510 "final watermark must reach the range terminus"
511 );
512 }
513
514 #[test]
518 fn unanchored_term_emits_complement_over_gaps_and_past_exclude_eof() {
519 let source = TestBucketSource {
520 buckets: Arc::new(BTreeMap::from([(
521 test_key(b"x"),
522 vec![(0, vec![1, 2]), (2, vec![5])],
523 )])),
524 };
525 let query = BitmapQuery::new(vec![
526 BitmapTerm::new(vec![include_universe(), exclude(b"x")]).unwrap(),
527 ])
528 .unwrap();
529
530 let items: Vec<(u64, RoaringBitmap)> = eval_bitmap_query_bucket_iter(
531 source,
532 query,
533 0..(4 * BUCKET_SIZE),
534 BUCKET_SIZE,
535 ScanDirection::Ascending,
536 )
537 .filter_map(|r| match r.unwrap() {
538 Watermarked::Item(it) => Some(it),
539 Watermarked::Watermark(_) => None,
540 })
541 .collect();
542
543 let complement = |bits: &[u32]| {
544 let mut bm = full_bucket();
545 for &b in bits {
546 bm.remove(b);
547 }
548 bm
549 };
550 let expected = vec![
551 (0, complement(&[1, 2])),
552 (1, full_bucket()),
553 (2, complement(&[5])),
554 (3, full_bucket()),
555 ];
556 assert_eq!(items, expected);
557 }
558
559 #[tokio::test]
563 async fn eval_bitmap_query_bucket_iter_matches_stream_for_unanchored_terms() {
564 let source = TestBucketSource {
565 buckets: Arc::new(BTreeMap::from([
566 (test_key(b"a"), vec![(1, vec![7])]),
567 (test_key(b"x"), vec![(0, vec![1, 2]), (3, vec![5])]),
568 ])),
569 };
570 let query = BitmapQuery::new(vec![
571 BitmapTerm::new(vec![include(b"a")]).unwrap(),
572 BitmapTerm::new(vec![include_universe(), exclude(b"x")]).unwrap(),
573 ])
574 .unwrap();
575
576 for direction in [ScanDirection::Ascending, ScanDirection::Descending] {
577 let stream_out: Vec<_> = eval_bitmap_query_bucket_stream(
578 source.clone(),
579 query.clone(),
580 0..(5 * BUCKET_SIZE),
581 BUCKET_SIZE,
582 direction,
583 BitmapScanBudget::new(1_000_000),
584 )
585 .collect()
586 .await;
587 let iter_out: Vec<_> = eval_bitmap_query_bucket_iter(
588 source.clone(),
589 query.clone(),
590 0..(5 * BUCKET_SIZE),
591 BUCKET_SIZE,
592 direction,
593 )
594 .collect();
595
596 assert_eq!(
597 collect_marked(stream_out),
598 collect_marked(iter_out),
599 "iter and stream diverged on unanchored terms for {direction:?}"
600 );
601 }
602 }
603
604 #[tokio::test]
608 async fn unanchored_budget_exhaustion_resumes_at_watermark() {
609 use crate::bitmap_query::BitmapScanError;
610
611 let source = TestBucketSource {
612 buckets: Arc::new(BTreeMap::new()),
613 };
614 let query = BitmapQuery::new(vec![
615 BitmapTerm::new(vec![include_universe(), exclude(b"x")]).unwrap(),
616 ])
617 .unwrap();
618
619 let first: Vec<_> = eval_bitmap_query_bucket_stream(
620 source.clone(),
621 query.clone(),
622 0..(10 * BUCKET_SIZE),
623 BUCKET_SIZE,
624 ScanDirection::Ascending,
625 BitmapScanBudget::new(3),
626 )
627 .collect()
628 .await;
629
630 let mut covered: Vec<u64> = Vec::new();
631 let mut last_watermark = 0;
632 let mut limit_hit = false;
633 for item in first {
634 match item {
635 Ok(Watermarked::Item((bucket, bitmap))) => {
636 assert_eq!(bitmap, full_bucket());
637 covered.push(bucket);
638 }
639 Ok(Watermarked::Watermark(p)) => last_watermark = p,
640 Err(e) => {
641 assert!(matches!(e, BitmapScanError::ScanLimit));
642 limit_hit = true;
643 }
644 }
645 }
646 assert!(limit_hit, "3-bucket budget cannot cover 10 dense buckets");
647 assert_eq!(covered, vec![0, 1, 2]);
648 assert_eq!(last_watermark, 3 * BUCKET_SIZE);
649
650 let resumed: Vec<_> = eval_bitmap_query_bucket_stream(
651 source,
652 query,
653 last_watermark..(10 * BUCKET_SIZE),
654 BUCKET_SIZE,
655 ScanDirection::Ascending,
656 BitmapScanBudget::new(1_000_000),
657 )
658 .collect()
659 .await;
660 for item in resumed {
661 if let Watermarked::Item((bucket, _)) = item.unwrap() {
662 covered.push(bucket);
663 }
664 }
665 assert_eq!(covered, (0..10).collect::<Vec<_>>());
666 }
667}