sui_inverted_index/bitmap_query/
stream.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Async stream evaluator for DNF bitmap queries.
5//!
6//! A single flat driver merge-joins every leaf scan against one shared *floor*
7//! (the slowest leaf's position). At the floor bucket it evaluates the query —
8//! intersect each term's included dimensions, subtract its excluded ones, then
9//! union across terms — and emits a watermark at the floor. Leaves only ever
10//! advance at the floor (peeked one bucket ahead, polled concurrently), so no
11//! branch can run ahead of the others: the resume cursor stays within one sparse
12//! read of every leaf, and there is no windowing/parking to get wrong. Consumed
13//! by streaming backends such as BigTable; the synchronous
14//! [`super::eval_bitmap_query_bucket_iter`] mirrors it and shares the per-bucket
15//! evaluation ([`eval_term_at_bucket`]).
16
17use std::ops::Range;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::sync::atomic::AtomicU64;
21use std::sync::atomic::Ordering;
22
23use futures::Stream;
24use futures::StreamExt;
25use futures::stream::BoxStream;
26use futures::stream::Peekable;
27use roaring::RoaringBitmap;
28
29use super::BitmapBucketSource;
30use super::BitmapQuery;
31use super::BitmapScanError;
32use super::BitmapScanResult;
33use super::BucketItem;
34use super::BucketStream;
35use super::DedupedQuery;
36use super::LeafHead;
37use super::ScanDirection;
38use super::Watermarked;
39use super::WatermarkedBucketStream;
40use super::bound_in_direction;
41use super::bucket_edges;
42use super::build_term_specs;
43use super::count_on_floor_refs;
44use super::eval_term_at_bucket;
45use super::frontier_advanced;
46use super::recompute_unreferenced;
47use super::take_snapshot_bitmap;
48
49/// Per-request bucket-scan accounting, delivered via the `on_metrics`
50/// callback passed to `eval_bitmap_query_stream`. Fires once when the
51/// eval pipeline is dropped (natural end, error, or consumer cancel).
52/// The sole exception is the budget-misconfig early-out, which errors
53/// before any scan is set up and emits nothing.
54#[derive(Clone, Copy, Debug, Eq, PartialEq)]
55pub struct BitmapScanMetrics {
56    /// Buckets actually evaluated (charged against the per-request
57    /// budget). At exhaustion each leaf may have fetched one extra
58    /// bucket that was discarded rather than evaluated, so observed
59    /// backend reads can exceed this by up to `BitmapQuery::unique_leaf_count()`.
60    pub buckets_evaluated: u64,
61}
62
63/// Per-request evaluated-bucket budget shared across all dimension
64/// streams of one eval. Charges are post-poll — see
65/// `budgeted_bucket_stream`.
66#[derive(Clone)]
67pub(crate) struct BitmapScanBudget {
68    initial: u64,
69    remaining: Arc<AtomicU64>,
70}
71
72impl BitmapScanBudget {
73    pub(crate) fn new(initial: u64) -> Self {
74        Self {
75            initial,
76            remaining: Arc::new(AtomicU64::new(initial)),
77        }
78    }
79
80    /// Charge one bucket. Returns false on underflow.
81    fn try_take(&self) -> bool {
82        self.remaining
83            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |b| {
84                if b == 0 { None } else { Some(b - 1) }
85            })
86            .is_ok()
87    }
88
89    /// Charge a leaf's mandatory first bucket: decrements the shared pool
90    /// when it can, but ALWAYS succeeds. The runtime guards `budget >=
91    /// unique_leaf_count`, but a *shared* atomic with concurrent leaves
92    /// gives no ordering guarantee — a sparse term can drain the pool
93    /// before a slower sibling leaf charges its first bucket, leaving that
94    /// leaf unable to report its first position (a cursorless `SCAN_LIMIT`).
95    /// Reserving the first bucket per leaf makes the `unique_leaf_count`
96    /// floor's promise — "every leaf reaches its first bucket" — hold.
97    /// Charging-when-possible keeps `buckets_evaluated` accurate in the
98    /// common `budget >> leaves` case; it only undercounts a first bucket
99    /// taken after the pool was already exhausted by other leaves.
100    fn take_first(&self) {
101        let _ = self
102            .remaining
103            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |b| {
104                Some(b.saturating_sub(1))
105            });
106    }
107
108    fn buckets_evaluated(&self) -> u64 {
109        self.initial
110            .saturating_sub(self.remaining.load(Ordering::SeqCst))
111    }
112}
113
114/// RAII guard: fires `on_metrics` exactly once on drop with the final
115/// `BitmapScanMetrics`. Held inside the boxed eval stream so the callback
116/// fires on natural end, error, or consumer cancel.
117struct ObserveOnDrop<F: FnOnce(BitmapScanMetrics) + Send + 'static> {
118    budget: BitmapScanBudget,
119    callback: Option<F>,
120}
121
122impl<F: FnOnce(BitmapScanMetrics) + Send + 'static> Drop for ObserveOnDrop<F> {
123    fn drop(&mut self) {
124        if let Some(cb) = self.callback.take() {
125            cb(BitmapScanMetrics {
126                buckets_evaluated: self.budget.buckets_evaluated(),
127            });
128        }
129    }
130}
131
132/// Evaluate a DNF `BitmapQuery` against a backend-provided bitmap source.
133///
134/// `budget` caps evaluated buckets across all dimension scans (see
135/// [`BitmapScanError::ScanLimit`] and [`BitmapScanMetrics`]). `on_metrics`
136/// fires exactly once when the eval stream is dropped.
137///
138/// Output emits `Watermarked::Item(absolute_member_id)` interleaved with
139/// `Watermarked::Watermark(p)` derived from the slowest leaf — sparse scans
140/// that match nothing still report progress at the rate sources advance.
141pub fn eval_bitmap_query_stream<S, F>(
142    source: S,
143    query: BitmapQuery,
144    range: Range<u64>,
145    bucket_size: u64,
146    direction: ScanDirection,
147    budget: u64,
148    on_metrics: F,
149) -> BoxStream<'static, BitmapScanResult<Watermarked<u64>>>
150where
151    S: BitmapBucketSource,
152    F: FnOnce(BitmapScanMetrics) + Send + 'static,
153{
154    let leaves = query.unique_leaf_count();
155    if (budget as usize) < leaves {
156        // Misconfig guard: short-circuit before any scan setup. No
157        // `on_metrics` here — there's no scan to account for, and the
158        // error surfaces on its own. `on_metrics` is dropped uncalled.
159        return async_stream::stream! {
160            // Misconfiguration is a genuine fault, not a `ScanLimit` stop.
161            yield Err(BitmapScanError::Source(anyhow::anyhow!(
162                "bitmap scan budget {budget} is insufficient for {leaves} leaf streams; \
163                 server is misconfigured"
164            )));
165        }
166        .boxed();
167    }
168    let budget = BitmapScanBudget::new(budget);
169    let bucket_stream = eval_bitmap_query_bucket_stream(
170        source,
171        query,
172        range.clone(),
173        bucket_size,
174        direction,
175        budget.clone(),
176    );
177    let inner = flatten_watermarked_buckets(bucket_stream, range.clone(), bucket_size, direction);
178    // Wrapping the guard inside `async_stream::stream!` keeps it alive
179    // for the stream's full lifetime; the callback fires when the
180    // consumer drops the returned `BoxStream`.
181    let guard = ObserveOnDrop {
182        budget,
183        callback: Some(on_metrics),
184    };
185    async_stream::stream! {
186        let _guard = guard;
187        futures::pin_mut!(inner);
188        while let Some(item) = inner.next().await {
189            yield item;
190        }
191    }
192    .boxed()
193}
194
195/// Non-consuming peek of one leaf, paired with its index and the position it has
196/// now scanned to (`None` for an error head, which leaves the prior position).
197async fn peek_leaf<S>(
198    mut leaf: Pin<&mut Peekable<S>>,
199    idx: usize,
200    bucket_size: u64,
201    range: &Range<u64>,
202    direction: ScanDirection,
203    terminus: u64,
204) -> (usize, LeafHead, Option<u64>)
205where
206    S: Stream<Item = BucketItem>,
207{
208    match leaf.as_mut().peek().await {
209        Some(Ok((bucket, _))) => {
210            let (pre, _post) = bucket_edges(*bucket, bucket_size, range, direction);
211            (idx, LeafHead::Bucket(*bucket), Some(pre))
212        }
213        None => (idx, LeafHead::Eof, Some(terminus)),
214        Some(Err(_)) => (idx, LeafHead::Error, None),
215    }
216}
217
218/// Evaluate a DNF `BitmapQuery` as an ordered `WatermarkedBucketStream`.
219///
220/// The flat driver: each round peeks every active leaf concurrently, takes the
221/// slowest leaf's position as the floor (the merged watermark), evaluates the
222/// whole DNF at the floor bucket, and advances only the leaves sitting there.
223/// No leaf runs more than one peeked bucket ahead of the floor.
224pub(crate) fn eval_bitmap_query_bucket_stream<S>(
225    source: S,
226    query: BitmapQuery,
227    range: Range<u64>,
228    bucket_size: u64,
229    direction: ScanDirection,
230    budget: BitmapScanBudget,
231) -> WatermarkedBucketStream
232where
233    S: BitmapBucketSource,
234{
235    // One peekable leaf per *unique* dimension key — terms reference them by
236    // index. Identical keys across literals share a single backend scan, so
237    // `(sender=A AND module=X) OR (sender=A AND type=Y)` reads `sender=A`
238    // once. Budgeted bucket streams are `'static`, so `source` is only
239    // borrowed while building.
240    let DedupedQuery {
241        keys: unique_keys,
242        mut terms,
243    } = build_term_specs(query.terms);
244    let mut leaves: Vec<Peekable<BucketStream>> = Vec::with_capacity(unique_keys.len());
245    for key in unique_keys {
246        let raw = source.scan_bucket_stream(key, range.clone(), direction);
247        leaves.push(
248            budgeted_bucket_stream(raw, budget.clone())
249                .boxed()
250                .peekable(),
251        );
252    }
253
254    let leaf_count = leaves.len();
255    let terminus = if direction.is_ascending() {
256        range.end
257    } else {
258        range.start
259    };
260    let request_floor = if direction.is_ascending() {
261        range.start
262    } else {
263        range.end
264    };
265
266    async_stream::try_stream! {
267        // `unreferenced[i]`: leaf is retired — either no satisfiable term still
268        // points at it, or its bucket stream is at EOF (a spent exclude).
269        let mut unreferenced = vec![false; leaf_count];
270        // `front[i]`: clamped position each leaf has provably scanned to. Bounds
271        // the resume cursor when a leaf errors before it can advance.
272        let mut front = vec![request_floor; leaf_count];
273        let mut last_emitted: Option<u64> = None;
274
275        loop {
276            // Peek every active leaf concurrently (preserves cross-scan
277            // parallelism), recording each head and the position it scanned to.
278            let mut peeks = Vec::new();
279            for (i, leaf) in leaves.iter_mut().enumerate() {
280                if !unreferenced[i] {
281                    peeks.push(peek_leaf(
282                        Pin::new(leaf),
283                        i,
284                        bucket_size,
285                        &range,
286                        direction,
287                        terminus,
288                    ));
289                }
290            }
291            let results = futures::future::join_all(peeks).await;
292            let mut class: Vec<Option<LeafHead>> = (0..leaf_count).map(|_| None).collect();
293            for (i, head, scanned_to) in results {
294                if let Some(p) = scanned_to {
295                    front[i] = p;
296                }
297                class[i] = Some(head);
298            }
299
300            // An include at EOF makes its term unsatisfiable (the intersection
301            // is permanently empty). With dedup, an EOF'd leaf may be an
302            // include for several terms; all of them become unsatisfiable.
303            for term in terms.iter_mut() {
304                if !term.unsatisfiable
305                    && term
306                        .includes
307                        .iter()
308                        .any(|&i| matches!(class[i], Some(LeafHead::Eof)))
309                {
310                    term.unsatisfiable = true;
311                }
312            }
313            // Recompute leaf liveness from current term state. A leaf may be
314            // shared across terms (include for one, exclude for another), so it
315            // can only be retired when no satisfiable term still references
316            // it — or when its head is at EOF.
317            recompute_unreferenced(&terms, &class, &mut unreferenced);
318
319            // Consume any budget-error frame so the error surfaces (after the
320            // floor watermark below).
321            let mut errors: Vec<BitmapScanError> = Vec::new();
322            for i in 0..leaf_count {
323                if !unreferenced[i] && matches!(class[i], Some(LeafHead::Error)) {
324                    match Pin::new(&mut leaves[i]).next().await {
325                        Some(Err(e)) => errors.push(e),
326                        _ => unreachable!("peek classified Error"),
327                    }
328                }
329            }
330
331            let active: Vec<usize> = (0..leaf_count).filter(|&i| !unreferenced[i]).collect();
332            if active.is_empty() {
333                // Every term retired naturally: cap at the range terminus so the
334                // client learns the scan covered the whole range.
335                if frontier_advanced(last_emitted, terminus, direction) {
336                    yield Watermarked::Watermark(terminus);
337                }
338                return;
339            }
340
341            // The floor is the slowest active leaf's scanned-to position: the
342            // merged "every source has scanned past here" watermark.
343            let floor_pos = active
344                .iter()
345                .map(|&i| front[i])
346                .reduce(|a, b| bound_in_direction(a, b, direction))
347                .expect("active non-empty");
348            if frontier_advanced(last_emitted, floor_pos, direction) {
349                yield Watermarked::Watermark(floor_pos);
350                last_emitted = Some(floor_pos);
351            }
352
353            // Budget exhausted: the floor watermark above is the resume cursor;
354            // everything below it was fully evaluated in prior rounds.
355            if !errors.is_empty() {
356                Err(BitmapScanError::collapse(errors))?;
357            }
358
359            // Evaluate the DNF at the nearest bucket any active leaf sits on.
360            let floor_bucket = active
361                .iter()
362                .filter_map(|&i| match class[i] {
363                    Some(LeafHead::Bucket(b)) => Some(b),
364                    _ => None,
365                })
366                .reduce(|a, b| match direction {
367                    ScanDirection::Ascending => a.min(b),
368                    ScanDirection::Descending => a.max(b),
369                })
370                .expect("active leaves carry buckets when there is no error");
371            let (_pre, post) = bucket_edges(floor_bucket, bucket_size, &range, direction);
372
373            // Snapshot the bitmaps of leaves sitting on `floor_bucket` —
374            // each unique leaf consumed exactly once, regardless of how many
375            // terms reference it — then distribute. Without dedup, a leaf
376            // shared across multiple terms would otherwise be polled once per
377            // term, each call advancing past the bucket so siblings see
378            // nothing. The single-consume + distribute keeps storage reads
379            // proportional to unique keys, not literal occurrences.
380            let mut snapshot: Vec<Option<RoaringBitmap>> =
381                (0..leaf_count).map(|_| None).collect();
382            let mut on_floor = vec![false; leaf_count];
383            for i in 0..leaf_count {
384                if !unreferenced[i]
385                    && matches!(class[i], Some(LeafHead::Bucket(b)) if b == floor_bucket)
386                {
387                    on_floor[i] = true;
388                    front[i] = post;
389                    snapshot[i] = match Pin::new(&mut leaves[i]).next().await {
390                        Some(Ok((_, bitmap))) => Some(bitmap),
391                        _ => None,
392                    };
393                }
394            }
395            let mut remaining_refs = count_on_floor_refs(&terms, &on_floor);
396
397            let mut result: Option<RoaringBitmap> = None;
398            for term in &terms {
399                if term.unsatisfiable {
400                    continue;
401                }
402                let includes: Vec<Option<RoaringBitmap>> = term
403                    .includes
404                    .iter()
405                    .map(|&i| {
406                        take_snapshot_bitmap(&mut snapshot, &mut remaining_refs, &on_floor, i)
407                    })
408                    .collect();
409                let excludes: Vec<Option<RoaringBitmap>> = term
410                    .excludes
411                    .iter()
412                    .map(|&i| {
413                        take_snapshot_bitmap(&mut snapshot, &mut remaining_refs, &on_floor, i)
414                    })
415                    .collect();
416                if let Some(bitmap) = eval_term_at_bucket(includes, excludes) {
417                    result = Some(match result {
418                        None => bitmap,
419                        Some(acc) => acc | bitmap,
420                    });
421                }
422            }
423
424            if let Some(bitmap) = result {
425                yield Watermarked::Item((floor_bucket, bitmap));
426            }
427            if frontier_advanced(last_emitted, post, direction) {
428                yield Watermarked::Watermark(post);
429                last_emitted = Some(post);
430            }
431        }
432    }
433    .boxed()
434}
435
436/// Wrap a raw per-dimension bucket stream with the shared scan budget: charge
437/// one bucket per pull (the first via `take_first`, the rest via `try_take`),
438/// yielding [`BitmapScanError::ScanLimit`] when the pool is empty. Never a silent
439/// EOF — the driver must see the error to truncate at the floor.
440fn budgeted_bucket_stream<S>(
441    inner: S,
442    budget: BitmapScanBudget,
443) -> impl Stream<Item = BucketItem> + Send + 'static
444where
445    S: Stream<Item = BucketItem> + Send + 'static,
446{
447    async_stream::try_stream! {
448        futures::pin_mut!(inner);
449        let mut first = true;
450        while let Some(item) = inner.next().await {
451            let item = item?;
452            if first {
453                budget.take_first();
454                first = false;
455            } else if !budget.try_take() {
456                Err(BitmapScanError::ScanLimit)?;
457            }
458            yield item;
459        }
460    }
461}
462
463/// Convenience adapter: wrap a single raw `BucketStream` into a
464/// `WatermarkedBucketStream` with one `Watermark(post_bucket)` after each
465/// bucket plus one final at the range terminus on EOF.
466///
467/// The DNF eval pipeline budgets and merges leaves itself; this helper is for
468/// backend-side consumers (e.g. RocksDB single-dimension scans) that want
469/// bucket-level output without the full eval machinery.
470pub fn buckets_with_watermarks<S>(
471    stream: S,
472    range: Range<u64>,
473    bucket_size: u64,
474    direction: ScanDirection,
475) -> impl Stream<Item = BitmapScanResult<Watermarked<(u64, RoaringBitmap)>>> + Send + 'static
476where
477    S: Stream<Item = BucketItem> + Send + 'static,
478{
479    async_stream::try_stream! {
480        if range.is_empty() {
481            return;
482        }
483        futures::pin_mut!(stream);
484        let mut last_emitted: Option<u64> = None;
485        while let Some(item) = stream.next().await {
486            let (bucket_id, bitmap) = item?;
487            yield Watermarked::Item((bucket_id, bitmap));
488            // Ascending = just past this bucket. Descending = this
489            // bucket's low edge. Clamp to the request bounds — cursors
490            // round-trip into subsequent requests with different ranges.
491            let bucket_start = bucket_id.saturating_mul(bucket_size);
492            let watermark = if direction.is_ascending() {
493                bucket_start.saturating_add(bucket_size).min(range.end)
494            } else {
495                bucket_start.max(range.start)
496            };
497            last_emitted = Some(watermark);
498            yield Watermarked::Watermark(watermark);
499        }
500        // Natural EOF: cap with a watermark at the range boundary so
501        // handlers get an explicit "scan covered the range" signal.
502        // Skip if a per-bucket watermark already exceeded it.
503        let range_end = if direction.is_ascending() {
504            range.end
505        } else {
506            range.start
507        };
508        let should_emit = match last_emitted {
509            None => true,
510            Some(prev) => {
511                if direction.is_ascending() {
512                    range_end > prev
513                } else {
514                    range_end < prev
515                }
516            }
517        };
518        if should_emit {
519            yield Watermarked::Watermark(range_end);
520        }
521    }
522}
523
524/// Flatten marked bucket bitmaps into absolute member ids with
525/// edge-bucket trimming against `range`. Watermarks pass through
526/// unchanged.
527pub fn flatten_watermarked_buckets<S>(
528    stream: S,
529    range: Range<u64>,
530    bucket_size: u64,
531    direction: ScanDirection,
532) -> impl Stream<Item = BitmapScanResult<Watermarked<u64>>> + Send + 'static
533where
534    S: Stream<Item = BitmapScanResult<Watermarked<(u64, RoaringBitmap)>>> + Send + 'static,
535{
536    async_stream::try_stream! {
537        if range.is_empty() {
538            return;
539        }
540        let start_bucket = range.start / bucket_size;
541        let end_bucket = (range.end - 1) / bucket_size;
542        futures::pin_mut!(stream);
543        while let Some(item) = stream.next().await {
544            match item? {
545                Watermarked::Watermark(p) => yield Watermarked::Watermark(p),
546                Watermarked::Item((bucket_id, bitmap)) => {
547                    let bucket_start = bucket_id * bucket_size;
548                    let is_first = bucket_id == start_bucket;
549                    let is_last = bucket_id == end_bucket;
550                    let lo = if is_first {
551                        (range.start - bucket_start) as u32
552                    } else {
553                        0
554                    };
555                    let hi = if is_last {
556                        ((range.end - bucket_start).min(bucket_size)) as u32
557                    } else {
558                        bucket_size as u32
559                    };
560
561                    if direction.is_ascending() {
562                        for bit in bitmap.iter() {
563                            if bit >= lo && bit < hi {
564                                yield Watermarked::Item(bucket_start + bit as u64);
565                            }
566                        }
567                    } else {
568                        for bit in bitmap.iter().rev() {
569                            if bit >= lo && bit < hi {
570                                yield Watermarked::Item(bucket_start + bit as u64);
571                            }
572                        }
573                    }
574                }
575            }
576        }
577    }
578}
579
580#[cfg(test)]
581mod tests {
582    use std::collections::BTreeMap;
583    use std::sync::Arc;
584
585    use futures::TryStreamExt;
586    use futures::stream;
587
588    use super::*;
589    use crate::bitmap_query::BitmapLiteral;
590    use crate::bitmap_query::BitmapTerm;
591    use crate::bitmap_query::BucketStream;
592
593    const BUCKET_SIZE: u64 = 100_000;
594    type TestBuckets = BTreeMap<Vec<u8>, Vec<(u64, Vec<u32>)>>;
595
596    #[derive(Clone)]
597    struct TestBucketSource {
598        buckets: Arc<TestBuckets>,
599    }
600
601    impl BitmapBucketSource for TestBucketSource {
602        fn scan_bucket_stream(
603            &self,
604            dimension_key: Vec<u8>,
605            _range: Range<u64>,
606            direction: ScanDirection,
607        ) -> BucketStream {
608            let mut buckets = self
609                .buckets
610                .get(&dimension_key)
611                .cloned()
612                .unwrap_or_default();
613            if matches!(direction, ScanDirection::Descending) {
614                buckets.reverse();
615            }
616            make_bucket_stream(
617                buckets
618                    .iter()
619                    .map(|(bucket_id, bits)| (*bucket_id, bits.as_slice()))
620                    .collect(),
621            )
622        }
623    }
624
625    fn make_bitmap(bits: &[u32]) -> RoaringBitmap {
626        let mut bm = RoaringBitmap::new();
627        for &b in bits {
628            bm.insert(b);
629        }
630        bm
631    }
632
633    fn make_bucket_stream(items: Vec<(u64, &[u32])>) -> BucketStream {
634        let items: Vec<BucketItem> = items
635            .into_iter()
636            .map(|(bid, bits)| Ok((bid, make_bitmap(bits))))
637            .collect();
638        stream::iter(items).boxed()
639    }
640
641    /// Drain into (items, watermarks) parallel vecs. Order between the
642    /// two is lost; for ordering checks collect `Watermarked` directly.
643    async fn drain_marked(
644        stream: BoxStream<'static, BitmapScanResult<Watermarked<u64>>>,
645    ) -> BitmapScanResult<(Vec<u64>, Vec<u64>)> {
646        let all: Vec<Watermarked<u64>> = stream.try_collect().await?;
647        let mut items = Vec::new();
648        let mut watermarks = Vec::new();
649        for m in all {
650            match m {
651                Watermarked::Item(v) => items.push(v),
652                Watermarked::Watermark(f) => watermarks.push(f),
653            }
654        }
655        Ok((items, watermarks))
656    }
657
658    fn test_key(value: &[u8]) -> Vec<u8> {
659        crate::dimensions::encode_dimension_key(crate::dimensions::IndexDimension::Sender, value)
660    }
661
662    fn include(value: &[u8]) -> BitmapLiteral {
663        BitmapLiteral::include(test_key(value)).unwrap()
664    }
665
666    fn exclude(value: &[u8]) -> BitmapLiteral {
667        BitmapLiteral::exclude(test_key(value)).unwrap()
668    }
669
670    /// Tightest-budget starvation. `term1 = (a AND b)` is disjoint (matches
671    /// nothing) and `term2 = c`'s only data sits far ahead of the request floor.
672    /// With `budget == unique_leaf_count`, round 1 still fetches every leaf's first
673    /// bucket, so the driver derives a real floor watermark before the budget
674    /// exhausts advancing `a`. The scan therefore ends with that forward-progress
675    /// watermark ahead of `SCAN_LIMIT` — never a cursorless error that would
676    /// livelock the client on retry.
677    #[tokio::test]
678    async fn nested_term_starvation_emits_frontier_before_scan_limit() {
679        let source = TestBucketSource {
680            buckets: Arc::new(BTreeMap::from([
681                (
682                    test_key(b"a"),
683                    vec![
684                        (0, vec![1]),
685                        (1, vec![1]),
686                        (2, vec![1]),
687                        (3, vec![1]),
688                        (4, vec![1]),
689                    ],
690                ),
691                (test_key(b"b"), vec![(50, vec![1])]),
692                (test_key(b"c"), vec![(40, vec![7])]),
693            ])),
694        };
695        let query = BitmapQuery::new(vec![
696            BitmapTerm::new(vec![include(b"a"), include(b"b")]).unwrap(),
697            BitmapTerm::new(vec![include(b"c")]).unwrap(),
698        ])
699        .unwrap();
700
701        // Budget == unique_leaf_count: the runtime floor, the tightest starvation.
702        let stream = eval_bitmap_query_stream(
703            source,
704            query,
705            0..(60 * BUCKET_SIZE),
706            BUCKET_SIZE,
707            ScanDirection::Ascending,
708            3,
709            |_| {},
710        );
711        // Collect rather than try_collect: short-circuiting on Err would
712        // drop the pre-error watermark under test.
713        let all: Vec<BitmapScanResult<Watermarked<u64>>> = stream.collect().await;
714
715        let last_ok =
716            all.iter().rev().find_map(|r| r.as_ref().ok()).expect(
717                "a frontier watermark must surface before the (otherwise cursorless) error",
718            );
719        match last_ok {
720            Watermarked::Watermark(p) => assert!(
721                *p > 0,
722                "frontier must reflect real progress past the request floor (got {p})"
723            ),
724            Watermarked::Item(_) => panic!("disjoint intersect must not emit items here"),
725        }
726        let err = all
727            .last()
728            .expect("non-empty")
729            .as_ref()
730            .expect_err("scan must terminate with an error");
731        assert!(
732            matches!(err, BitmapScanError::ScanLimit),
733            "expected ScanLimit, got {err:?}"
734        );
735    }
736
737    /// Flush-on-error: a budget error truncates the scan at the floor, but
738    /// matches at or below the floor were already emitted in earlier rounds.
739    /// Here `c` matches in the FIRST bucket — below where `(a AND b)` exhausts
740    /// the budget — so its item must be DELIVERED, and the resume cursor must
741    /// advance to that death floor rather than be pinned at the request floor
742    /// (the livelock this guards against). The delivered item stays below the
743    /// final watermark, so resuming from that cursor will not re-emit it.
744    ///
745    /// The death floor is `post` of bucket 0 (one `BUCKET_SIZE`): every leaf's
746    /// first bucket is reserved (`take_first`), so the shared budget is spent
747    /// reaching bucket 0 across the leaves and `(a AND b)` errors before it can
748    /// advance to bucket 1.
749    #[tokio::test]
750    async fn flush_on_error_delivers_below_floor_sibling_result() {
751        let source = TestBucketSource {
752            buckets: Arc::new(BTreeMap::from([
753                (
754                    test_key(b"a"),
755                    vec![
756                        (0, vec![1]),
757                        (1, vec![1]),
758                        (2, vec![1]),
759                        (3, vec![1]),
760                        (4, vec![1]),
761                    ],
762                ),
763                (test_key(b"b"), vec![(50, vec![1])]),
764                (test_key(b"c"), vec![(0, vec![7])]),
765            ])),
766        };
767        let query = BitmapQuery::new(vec![
768            BitmapTerm::new(vec![include(b"a"), include(b"b")]).unwrap(),
769            BitmapTerm::new(vec![include(b"c")]).unwrap(),
770        ])
771        .unwrap();
772
773        let stream = eval_bitmap_query_stream(
774            source,
775            query,
776            0..(60 * BUCKET_SIZE),
777            BUCKET_SIZE,
778            ScanDirection::Ascending,
779            3,
780            |_| {},
781        );
782        let all: Vec<BitmapScanResult<Watermarked<u64>>> = stream.collect().await;
783
784        // c's bucket-0 match (member id 7) is delivered despite term1 dying.
785        let items: Vec<u64> = all
786            .iter()
787            .filter_map(|r| match r {
788                Ok(Watermarked::Item(v)) => Some(*v),
789                _ => None,
790            })
791            .collect();
792        assert_eq!(items, vec![7], "c's below-floor match must be delivered");
793
794        // Resume frontier advances to term1's death floor (post of bucket 0),
795        // not stuck at the request floor 0.
796        let last_wm = all
797            .iter()
798            .rev()
799            .find_map(|r| match r {
800                Ok(Watermarked::Watermark(p)) => Some(*p),
801                _ => None,
802            })
803            .expect("a frontier watermark must surface");
804        assert_eq!(last_wm, BUCKET_SIZE, "frontier must advance past floor");
805        // The delivered item is below the resume cursor, so a resume won't
806        // re-emit it.
807        assert!(
808            items.iter().all(|&i| i < last_wm),
809            "items must be below the resume cursor"
810        );
811
812        let err = all
813            .last()
814            .expect("non-empty")
815            .as_ref()
816            .expect_err("scan must terminate with an error");
817        assert!(
818            matches!(err, BitmapScanError::ScanLimit),
819            "expected ScanLimit, got {err:?}"
820        );
821    }
822
823    #[test]
824    fn bitmap_query_validation_rejects_empty_shapes() {
825        assert!(BitmapQuery::new(Vec::new()).is_err());
826        assert!(BitmapLiteral::include(Vec::new()).is_err());
827        assert!(
828            BitmapLiteral::include(vec![crate::dimensions::IndexDimension::Sender.tag_byte()])
829                .is_err()
830        );
831        assert!(BitmapLiteral::include(vec![0xff, 0x00]).is_err());
832        assert!(BitmapTerm::new(vec![exclude(b"neg")]).is_err());
833    }
834
835    /// Two terms share the same include literal `a`. Dedup must collapse them
836    /// to a single backend scan of `a` and distribute its per-bucket bitmap to
837    /// both terms — otherwise term 2 would see `a` already consumed by term 1
838    /// at the floor bucket and silently drop matches.
839    #[tokio::test]
840    async fn shared_include_across_terms_scans_dimension_once() {
841        use crate::bitmap_query::test_utils::CountingBucketSource;
842
843        let source = CountingBucketSource::new(BTreeMap::from([
844            (test_key(b"a"), vec![(0, vec![1, 2, 3])]),
845            (test_key(b"b"), vec![(0, vec![1])]),
846            (test_key(b"c"), vec![(0, vec![2])]),
847        ]));
848        let query = BitmapQuery::new(vec![
849            BitmapTerm::new(vec![include(b"a"), include(b"b")]).unwrap(),
850            BitmapTerm::new(vec![include(b"a"), include(b"c")]).unwrap(),
851        ])
852        .unwrap();
853
854        let stream = eval_bitmap_query_stream(
855            source.clone(),
856            query,
857            0..200_000,
858            BUCKET_SIZE,
859            ScanDirection::Ascending,
860            u64::MAX,
861            |_| {},
862        );
863        let (items, _watermarks) = drain_marked(stream).await.unwrap();
864
865        // Term 1: a ∩ b = {1}; term 2: a ∩ c = {2}; OR = {1, 2}. If `a` were
866        // not distributed to term 2, term 2 would be empty and items = [1].
867        assert_eq!(items, vec![1, 2]);
868        // The dedup property: `a` was scanned exactly once.
869        assert_eq!(source.scan_count(&test_key(b"a")), 1);
870        assert_eq!(source.scan_count(&test_key(b"b")), 1);
871        assert_eq!(source.scan_count(&test_key(b"c")), 1);
872    }
873
874    /// Same key appearing as include in one term and exclude in another:
875    /// dedup still collapses to one leaf, and snapshot-distribute clones so
876    /// both polarities see the bitmap.
877    #[tokio::test]
878    async fn shared_key_across_include_and_exclude_terms_scans_once() {
879        use crate::bitmap_query::test_utils::CountingBucketSource;
880
881        let source = CountingBucketSource::new(BTreeMap::from([
882            (test_key(b"a"), vec![(0, vec![1, 2])]),
883            (test_key(b"b"), vec![(0, vec![1, 2, 3])]),
884        ]));
885        // term 1: b AND NOT a -> {3}
886        // term 2: b AND a     -> {1, 2}
887        // OR -> {1, 2, 3}
888        let query = BitmapQuery::new(vec![
889            BitmapTerm::new(vec![include(b"b"), exclude(b"a")]).unwrap(),
890            BitmapTerm::new(vec![include(b"b"), include(b"a")]).unwrap(),
891        ])
892        .unwrap();
893
894        let stream = eval_bitmap_query_stream(
895            source.clone(),
896            query,
897            0..100_000,
898            BUCKET_SIZE,
899            ScanDirection::Ascending,
900            u64::MAX,
901            |_| {},
902        );
903        let (items, _watermarks) = drain_marked(stream).await.unwrap();
904
905        assert_eq!(items, vec![1, 2, 3]);
906        assert_eq!(source.scan_count(&test_key(b"a")), 1);
907        assert_eq!(source.scan_count(&test_key(b"b")), 1);
908    }
909
910    #[tokio::test]
911    async fn eval_bitmap_query_stream_uses_backend_source() {
912        let source = TestBucketSource {
913            buckets: Arc::new(BTreeMap::from([
914                (test_key(b"a"), vec![(0, vec![1, 2, 3]), (1, vec![5])]),
915                (test_key(b"b"), vec![(0, vec![2, 3]), (1, vec![5])]),
916                (test_key(b"c"), vec![(0, vec![3])]),
917            ])),
918        };
919        let query = BitmapQuery::new(vec![
920            BitmapTerm::new(vec![include(b"a"), include(b"b"), exclude(b"c")]).unwrap(),
921        ])
922        .unwrap();
923
924        let stream = eval_bitmap_query_stream(
925            source,
926            query,
927            0..200_000,
928            BUCKET_SIZE,
929            ScanDirection::Ascending,
930            u64::MAX,
931            |_| {},
932        );
933        let (items, _watermarks) = drain_marked(stream).await.unwrap();
934
935        assert_eq!(items, vec![2, BUCKET_SIZE + 5]);
936    }
937
938    #[tokio::test]
939    async fn eval_bitmap_query_stream_descending() {
940        let source = TestBucketSource {
941            buckets: Arc::new(BTreeMap::from([
942                (test_key(b"a"), vec![(0, vec![1, 2, 3]), (1, vec![5])]),
943                (test_key(b"b"), vec![(0, vec![2, 3]), (1, vec![5])]),
944                (test_key(b"c"), vec![(0, vec![3])]),
945            ])),
946        };
947        let query = BitmapQuery::new(vec![
948            BitmapTerm::new(vec![include(b"a"), include(b"b"), exclude(b"c")]).unwrap(),
949        ])
950        .unwrap();
951
952        let stream = eval_bitmap_query_stream(
953            source,
954            query,
955            0..200_000,
956            BUCKET_SIZE,
957            ScanDirection::Descending,
958            u64::MAX,
959            |_| {},
960        );
961        let (items, _watermarks) = drain_marked(stream).await.unwrap();
962
963        assert_eq!(items, vec![BUCKET_SIZE + 5, 2]);
964    }
965
966    /// End-to-end: `buckets_with_watermarks` injects watermarks, then
967    /// `flatten_watermarked_buckets` flattens items and passes watermarks through.
968    /// Verifies edge trimming + marker interleaving in one composed test.
969    #[tokio::test]
970    async fn buckets_with_watermarks_then_flatten_watermarked_buckets_ascending() {
971        let range = 50u64..(2 * BUCKET_SIZE + 50_001);
972        let items = stream::iter(vec![
973            // bucket 0: bit 10 trimmed (< 50); 50 and bucket_size-1 kept.
974            Ok((0u64, {
975                let mut bm = RoaringBitmap::new();
976                bm.insert(10);
977                bm.insert(50);
978                bm.insert((BUCKET_SIZE - 1) as u32);
979                bm
980            })),
981            // bucket 1: middle, full pass-through.
982            Ok((1u64, {
983                let mut bm = RoaringBitmap::new();
984                bm.insert(0);
985                bm.insert((BUCKET_SIZE - 1) as u32);
986                bm
987            })),
988            // bucket 2: bit 50_001 trimmed (>= hi=50_001 relative).
989            Ok((2u64, {
990                let mut bm = RoaringBitmap::new();
991                bm.insert(0);
992                bm.insert(50_000);
993                bm.insert(50_001);
994                bm
995            })),
996        ]);
997        let marked_buckets =
998            buckets_with_watermarks(items, range.clone(), BUCKET_SIZE, ScanDirection::Ascending);
999        let out: Vec<Watermarked<u64>> = flatten_watermarked_buckets(
1000            marked_buckets,
1001            range,
1002            BUCKET_SIZE,
1003            ScanDirection::Ascending,
1004        )
1005        .try_collect()
1006        .await
1007        .unwrap();
1008        // Items are interleaved with watermarks at each bucket boundary.
1009        // Watermark(p) is emitted AFTER the bucket's items so its arrival proves
1010        // those items also passed.
1011        assert_eq!(
1012            out,
1013            vec![
1014                Watermarked::Item(50),
1015                Watermarked::Item(BUCKET_SIZE - 1),
1016                Watermarked::Watermark(BUCKET_SIZE),
1017                Watermarked::Item(BUCKET_SIZE),
1018                Watermarked::Item(2 * BUCKET_SIZE - 1),
1019                Watermarked::Watermark(2 * BUCKET_SIZE),
1020                Watermarked::Item(2 * BUCKET_SIZE),
1021                Watermarked::Item(2 * BUCKET_SIZE + 50_000),
1022                // Edge bucket watermark is clamped to range.end so cursors
1023                // don't claim progress past the requested upper bound.
1024                Watermarked::Watermark(2 * BUCKET_SIZE + 50_001),
1025            ],
1026        );
1027    }
1028
1029    /// `buckets_with_watermarks` standalone: verify each bucket gets its own
1030    /// `Watermarked::Watermark` immediately after, with no flattening / trimming.
1031    /// This is the variant the rocksdb branch consumes directly.
1032    #[tokio::test]
1033    async fn buckets_with_watermarks_one_per_bucket_no_flatten() {
1034        let items = stream::iter(vec![
1035            Ok((0u64, {
1036                let mut bm = RoaringBitmap::new();
1037                bm.insert(1);
1038                bm.insert(2);
1039                bm
1040            })),
1041            Ok((3u64, {
1042                let mut bm = RoaringBitmap::new();
1043                bm.insert(5);
1044                bm
1045            })),
1046        ]);
1047        // Range extends past the last populated bucket (bucket 3 = positions
1048        // [3*BUCKET_SIZE, 4*BUCKET_SIZE)); the final natural-EOF watermark
1049        // caps at the request range boundary so resume cursors don't leave
1050        // the empty tail (4*BUCKET_SIZE..5*BUCKET_SIZE) un-acknowledged.
1051        let out: Vec<Watermarked<(u64, Vec<u32>)>> = buckets_with_watermarks(
1052            items,
1053            0..(5 * BUCKET_SIZE),
1054            BUCKET_SIZE,
1055            ScanDirection::Ascending,
1056        )
1057        .map_ok(|m| m.map_item(|(bid, bm)| (bid, bm.iter().collect::<Vec<_>>())))
1058        .try_collect()
1059        .await
1060        .unwrap();
1061        assert_eq!(
1062            out,
1063            vec![
1064                Watermarked::Item((0, vec![1, 2])),
1065                Watermarked::Watermark(BUCKET_SIZE),
1066                Watermarked::Item((3, vec![5])),
1067                Watermarked::Watermark(4 * BUCKET_SIZE),
1068                Watermarked::Watermark(5 * BUCKET_SIZE),
1069            ],
1070        );
1071    }
1072
1073    #[tokio::test]
1074    async fn scan_budget_below_unique_leaf_count_yields_misconfig_error() {
1075        // Defensive runtime guard: a per-request budget smaller than the
1076        // query's leaf count would produce a cursorless SCAN_LIMIT
1077        // (merged watermarks stay None until every child reports). The
1078        // eval surfaces this as a plain anyhow error — distinct from
1079        // BitmapScanError::ScanLimit — so the handler propagates it as
1080        // Internal rather than SCAN_LIMIT.
1081        let source = TestBucketSource {
1082            buckets: Arc::new(BTreeMap::from([(
1083                test_key(b"a"),
1084                vec![(0, vec![1, 2]), (1, vec![3])],
1085            )])),
1086        };
1087        let query = BitmapQuery::new(vec![BitmapTerm::new(vec![include(b"a")]).unwrap()]).unwrap();
1088
1089        let metrics = std::sync::Arc::new(std::sync::Mutex::new(None));
1090        let metrics_sink = metrics.clone();
1091        let stream = eval_bitmap_query_stream(
1092            source,
1093            query,
1094            0..200_000,
1095            BUCKET_SIZE,
1096            ScanDirection::Ascending,
1097            0,
1098            move |m| *metrics_sink.lock().unwrap() = Some(m),
1099        );
1100        let err = drain_marked(stream).await.unwrap_err();
1101
1102        assert!(
1103            matches!(err, BitmapScanError::Source(_)),
1104            "must be a Source fault (cursorless), never a clean ScanLimit end"
1105        );
1106        assert!(
1107            err.to_string().contains("insufficient for"),
1108            "expected misconfig error, got {err:?}"
1109        );
1110        // The misconfig guard short-circuits before any scan setup, so
1111        // no scan metrics are emitted (the callback is dropped uncalled).
1112        assert!(
1113            metrics.lock().unwrap().is_none(),
1114            "misconfig early-out should not emit scan metrics"
1115        );
1116    }
1117
1118    #[tokio::test]
1119    async fn scan_budget_shared_across_dimensions() {
1120        // Three include dimensions with several buckets each. Budget = 4
1121        // should be consumed across all per-dimension fetches before
1122        // `BitmapScanError::ScanLimit` surfaces from the merged eval stream.
1123        let source = TestBucketSource {
1124            buckets: Arc::new(BTreeMap::from([
1125                (
1126                    test_key(b"a"),
1127                    vec![(0, vec![1]), (1, vec![2]), (2, vec![3])],
1128                ),
1129                (
1130                    test_key(b"b"),
1131                    vec![(0, vec![1]), (1, vec![2]), (2, vec![3])],
1132                ),
1133                (
1134                    test_key(b"c"),
1135                    vec![(0, vec![1]), (1, vec![2]), (2, vec![3])],
1136                ),
1137            ])),
1138        };
1139        let query = BitmapQuery::new(vec![
1140            BitmapTerm::new(vec![include(b"a"), include(b"b"), include(b"c")]).unwrap(),
1141        ])
1142        .unwrap();
1143
1144        let metrics = std::sync::Arc::new(std::sync::Mutex::new(None));
1145        let metrics_sink = metrics.clone();
1146        let stream = eval_bitmap_query_stream(
1147            source,
1148            query,
1149            0..300_000,
1150            BUCKET_SIZE,
1151            ScanDirection::Ascending,
1152            4,
1153            move |m| *metrics_sink.lock().unwrap() = Some(m),
1154        );
1155        let err = drain_marked(stream).await.unwrap_err();
1156
1157        assert!(
1158            matches!(err, BitmapScanError::ScanLimit),
1159            "expected ScanLimit, got {err:?}"
1160        );
1161        // All four buckets were evaluated through budgeted_bucket_stream
1162        // before the fifth try_take() failed and surfaced BitmapScanError::ScanLimit.
1163        assert_eq!(metrics.lock().unwrap().unwrap().buckets_evaluated, 4);
1164    }
1165
1166    /// Budget exhausting on an exclude leaf must NOT be mistaken for the
1167    /// exclude reaching its range terminus. With silent EOF semantics, includes
1168    /// past the exclude cutoff would leak unfiltered. With `BitmapScanError::ScanLimit`,
1169    /// the error propagates and the eval pipeline short-circuits cleanly.
1170    #[tokio::test]
1171    async fn scan_budget_exclude_side_exhaustion_does_not_leak_includes() {
1172        let source = TestBucketSource {
1173            buckets: Arc::new(BTreeMap::from([
1174                (
1175                    test_key(b"inc"),
1176                    vec![(0, vec![1]), (1, vec![2]), (2, vec![3])],
1177                ),
1178                (
1179                    test_key(b"exc"),
1180                    vec![(0, vec![1]), (1, vec![2]), (2, vec![3])],
1181                ),
1182            ])),
1183        };
1184        let query = BitmapQuery::new(vec![
1185            BitmapTerm::new(vec![include(b"inc"), exclude(b"exc")]).unwrap(),
1186        ])
1187        .unwrap();
1188
1189        // Budget = leaf count gives every leaf one bucket fetch (the
1190        // minimum the runtime guard allows; see
1191        // `scan_budget_below_unique_leaf_count_yields_misconfig_error`). Once
1192        // the budget exhausts mid-scan, `BitmapScanError::ScanLimit` propagates
1193        // without the driver mistaking the exclude leaf's error for a
1194        // natural EOF.
1195        let stream = eval_bitmap_query_stream(
1196            source,
1197            query,
1198            0..300_000,
1199            BUCKET_SIZE,
1200            ScanDirection::Ascending,
1201            2,
1202            |_| {},
1203        );
1204        let result = drain_marked(stream).await;
1205
1206        // Must error, not return Ok with leaked include rows.
1207        let err = result.expect_err("must surface scan-limit, not silently emit includes");
1208        assert!(
1209            matches!(err, BitmapScanError::ScanLimit),
1210            "expected ScanLimit, got {err:?}"
1211        );
1212    }
1213
1214    /// Disjoint-intersect: the leaves advance but the term matches nothing, so
1215    /// no item is ever emitted. The driver's floor watermark must still surface
1216    /// real progress before the budget error; otherwise handlers fall back to
1217    /// the request lower bound and the client livelocks on retry.
1218    #[tokio::test]
1219    async fn sparse_intersect_emits_frontier_watermark_before_scan_limit() {
1220        // include "a" at buckets [0, 1, 2, ...], include "b" at bucket 100 —
1221        // disjoint, so the driver advances the floor through a's buckets one by
1222        // one and emits zero output. Budget=4 forces error mid-scan.
1223        let source = TestBucketSource {
1224            buckets: Arc::new(BTreeMap::from([
1225                (
1226                    test_key(b"a"),
1227                    vec![
1228                        (0, vec![1]),
1229                        (1, vec![1]),
1230                        (2, vec![1]),
1231                        (3, vec![1]),
1232                        (4, vec![1]),
1233                    ],
1234                ),
1235                (test_key(b"b"), vec![(100, vec![1])]),
1236            ])),
1237        };
1238        let query = BitmapQuery::new(vec![
1239            BitmapTerm::new(vec![include(b"a"), include(b"b")]).unwrap(),
1240        ])
1241        .unwrap();
1242
1243        let stream = eval_bitmap_query_stream(
1244            source,
1245            query,
1246            0..(110 * BUCKET_SIZE),
1247            BUCKET_SIZE,
1248            ScanDirection::Ascending,
1249            4,
1250            |_| {},
1251        );
1252        // Don't try_collect — short-circuiting on Err would drop the
1253        // pre-error watermark we're verifying.
1254        let all: Vec<BitmapScanResult<Watermarked<u64>>> = stream.collect().await;
1255
1256        let last_ok = all
1257            .iter()
1258            .rev()
1259            .find_map(|r| r.as_ref().ok())
1260            .expect("expected a watermark item before the error");
1261        match last_ok {
1262            Watermarked::Watermark(p) => {
1263                assert!(
1264                    *p > 0,
1265                    "frontier watermark must reflect real progress (got {p})"
1266                );
1267            }
1268            Watermarked::Item(_) => panic!("disjoint intersect must not emit items"),
1269        }
1270        let err = all
1271            .last()
1272            .expect("non-empty")
1273            .as_ref()
1274            .expect_err("scan must terminate with an error");
1275        assert!(
1276            matches!(err, BitmapScanError::ScanLimit),
1277            "expected ScanLimit, got {err:?}"
1278        );
1279    }
1280
1281    #[tokio::test]
1282    async fn eval_emits_watermarks_at_bucket_boundaries_ascending() {
1283        let source = TestBucketSource {
1284            buckets: Arc::new(BTreeMap::from([(
1285                test_key(b"a"),
1286                vec![(0, vec![1]), (3, vec![2]), (7, vec![3])],
1287            )])),
1288        };
1289        let query = BitmapQuery::new(vec![BitmapTerm::new(vec![include(b"a")]).unwrap()]).unwrap();
1290
1291        let stream = eval_bitmap_query_stream(
1292            source,
1293            query,
1294            0..(8 * BUCKET_SIZE),
1295            BUCKET_SIZE,
1296            ScanDirection::Ascending,
1297            u64::MAX,
1298            |_| {},
1299        );
1300        let (items, watermarks) = drain_marked(stream).await.unwrap();
1301
1302        // Items at the bits within each of the three populated buckets.
1303        assert_eq!(items, vec![1, 3 * BUCKET_SIZE + 2, 7 * BUCKET_SIZE + 3]);
1304        // The flat driver emits the floor bucket's leading edge (pre) and
1305        // trailing edge (post) each round, so each populated bucket [0, 3, 7]
1306        // contributes both: pre/post = (0, bs), (3bs, 4bs), (7bs, 8bs). The
1307        // final post(7)=8bs is the range terminus.
1308        assert_eq!(
1309            watermarks,
1310            vec![
1311                0,
1312                BUCKET_SIZE,
1313                3 * BUCKET_SIZE,
1314                4 * BUCKET_SIZE,
1315                7 * BUCKET_SIZE,
1316                8 * BUCKET_SIZE,
1317            ]
1318        );
1319    }
1320
1321    #[tokio::test]
1322    async fn eval_emits_watermarks_at_bucket_boundaries_descending() {
1323        let source = TestBucketSource {
1324            buckets: Arc::new(BTreeMap::from([(
1325                test_key(b"a"),
1326                vec![(0, vec![1]), (3, vec![2]), (7, vec![3])],
1327            )])),
1328        };
1329        let query = BitmapQuery::new(vec![BitmapTerm::new(vec![include(b"a")]).unwrap()]).unwrap();
1330
1331        let stream = eval_bitmap_query_stream(
1332            source,
1333            query,
1334            0..(8 * BUCKET_SIZE),
1335            BUCKET_SIZE,
1336            ScanDirection::Descending,
1337            u64::MAX,
1338            |_| {},
1339        );
1340        let (items, watermarks) = drain_marked(stream).await.unwrap();
1341
1342        assert_eq!(items, vec![7 * BUCKET_SIZE + 3, 3 * BUCKET_SIZE + 2, 1]);
1343        // Descending pre/post per matched bucket [7, 3, 0]: pre is the high
1344        // edge, post the low edge — (8bs, 7bs), (4bs, 3bs), (1bs, 0). pre(7)=8bs
1345        // is range.end; post(0)=0 is the range terminus.
1346        assert_eq!(
1347            watermarks,
1348            vec![
1349                8 * BUCKET_SIZE,
1350                7 * BUCKET_SIZE,
1351                4 * BUCKET_SIZE,
1352                3 * BUCKET_SIZE,
1353                BUCKET_SIZE,
1354                0,
1355            ]
1356        );
1357    }
1358
1359    #[tokio::test]
1360    async fn eval_emits_per_source_watermarks_and_final_eof_when_no_bucket_yielded() {
1361        // Two include dimensions whose buckets never align -> the term
1362        // yields no Items. The floor watermark (pre+post per bucket) still
1363        // propagates the actual scan progress, and the driver caps the stream
1364        // with a final range_end watermark on natural EOF so clients see "scan
1365        // covered the range with no matches."
1366        let source = TestBucketSource {
1367            buckets: Arc::new(BTreeMap::from([
1368                (
1369                    test_key(b"a"),
1370                    vec![(0, vec![1]), (2, vec![3]), (4, vec![5])],
1371                ),
1372                (
1373                    test_key(b"b"),
1374                    vec![(1, vec![1]), (3, vec![3]), (5, vec![5])],
1375                ),
1376            ])),
1377        };
1378        let query = BitmapQuery::new(vec![
1379            BitmapTerm::new(vec![include(b"a"), include(b"b")]).unwrap(),
1380        ])
1381        .unwrap();
1382
1383        let stream = eval_bitmap_query_stream(
1384            source,
1385            query,
1386            0..(6 * BUCKET_SIZE),
1387            BUCKET_SIZE,
1388            ScanDirection::Ascending,
1389            u64::MAX,
1390            |_| {},
1391        );
1392        let (items, watermarks) = drain_marked(stream).await.unwrap();
1393
1394        assert!(items.is_empty(), "disjoint intersect must not emit items");
1395        // Watermarks reflect real per-source progress as intersect drops
1396        // misaligned buckets, then the eval root adds the final range_end.
1397        assert!(
1398            !watermarks.is_empty(),
1399            "expected per-source watermarks to propagate, got none"
1400        );
1401        let mut prev = 0u64;
1402        for w in &watermarks {
1403            assert!(
1404                *w >= prev,
1405                "ascending watermarks must be monotonic, got {watermarks:?}"
1406            );
1407            assert!(
1408                *w <= 6 * BUCKET_SIZE,
1409                "watermark exceeds range.end ({watermarks:?})"
1410            );
1411            prev = *w;
1412        }
1413        assert_eq!(
1414            *watermarks.last().unwrap(),
1415            6 * BUCKET_SIZE,
1416            "final watermark must be range.end on natural EOF"
1417        );
1418    }
1419
1420    #[tokio::test]
1421    async fn eval_watermark_ordering_invariant_item_then_watermark() {
1422        // Critical invariant: for each bucket, all Items come BEFORE the
1423        // post-bucket watermark. This is what makes the watermark safe as
1424        // a resume cursor — its arrival downstream proves the dominated
1425        // items also arrived in the same stream order.
1426        let source = TestBucketSource {
1427            buckets: Arc::new(BTreeMap::from([(
1428                test_key(b"a"),
1429                vec![(0, vec![10, 20, 30]), (1, vec![40, 50])],
1430            )])),
1431        };
1432        let query = BitmapQuery::new(vec![BitmapTerm::new(vec![include(b"a")]).unwrap()]).unwrap();
1433
1434        let stream = eval_bitmap_query_stream(
1435            source,
1436            query,
1437            0..(2 * BUCKET_SIZE),
1438            BUCKET_SIZE,
1439            ScanDirection::Ascending,
1440            u64::MAX,
1441            |_| {},
1442        );
1443        let all: Vec<Watermarked<u64>> = stream.try_collect().await.unwrap();
1444
1445        // Per-source pre+post watermarks: each bucket emits Watermark(pre),
1446        // Item(s)…, Watermark(post). pre(0)=0, post(0)=BUCKET_SIZE,
1447        // pre(1)=BUCKET_SIZE (dup of post(0), filtered), post(1)=2*BUCKET_SIZE,
1448        // EOF=2*BUCKET_SIZE (dup, filtered).
1449        assert_eq!(
1450            all,
1451            vec![
1452                Watermarked::Watermark(0),
1453                Watermarked::Item(10),
1454                Watermarked::Item(20),
1455                Watermarked::Item(30),
1456                Watermarked::Watermark(BUCKET_SIZE),
1457                Watermarked::Item(BUCKET_SIZE + 40),
1458                Watermarked::Item(BUCKET_SIZE + 50),
1459                Watermarked::Watermark(2 * BUCKET_SIZE),
1460            ],
1461        );
1462    }
1463
1464    #[tokio::test]
1465    async fn buckets_with_watermarks_then_flatten_watermarked_buckets_descending() {
1466        let range = 50u64..(2 * BUCKET_SIZE + 50_001);
1467        let items = stream::iter(vec![
1468            Ok((2u64, {
1469                let mut bm = RoaringBitmap::new();
1470                bm.insert(0);
1471                bm.insert(50_000);
1472                bm.insert(50_001);
1473                bm
1474            })),
1475            Ok((1u64, {
1476                let mut bm = RoaringBitmap::new();
1477                bm.insert(0);
1478                bm.insert((BUCKET_SIZE - 1) as u32);
1479                bm
1480            })),
1481            Ok((0u64, {
1482                let mut bm = RoaringBitmap::new();
1483                bm.insert(10);
1484                bm.insert(50);
1485                bm.insert((BUCKET_SIZE - 1) as u32);
1486                bm
1487            })),
1488        ]);
1489        let marked_buckets =
1490            buckets_with_watermarks(items, range.clone(), BUCKET_SIZE, ScanDirection::Descending);
1491        let out: Vec<Watermarked<u64>> = flatten_watermarked_buckets(
1492            marked_buckets,
1493            range,
1494            BUCKET_SIZE,
1495            ScanDirection::Descending,
1496        )
1497        .try_collect()
1498        .await
1499        .unwrap();
1500        // Descending: watermark(p) = "all items >= p have been emitted." After
1501        // bucket 2 yields, frontier = 2 * BUCKET_SIZE (bucket 2's low edge).
1502        assert_eq!(
1503            out,
1504            vec![
1505                Watermarked::Item(2 * BUCKET_SIZE + 50_000),
1506                Watermarked::Item(2 * BUCKET_SIZE),
1507                Watermarked::Watermark(2 * BUCKET_SIZE),
1508                Watermarked::Item(2 * BUCKET_SIZE - 1),
1509                Watermarked::Item(BUCKET_SIZE),
1510                Watermarked::Watermark(BUCKET_SIZE),
1511                Watermarked::Item(BUCKET_SIZE - 1),
1512                Watermarked::Item(50),
1513                // Edge bucket watermark is clamped to range.start so cursors
1514                // don't claim progress past the requested lower bound (in
1515                // descending: lower position is "further past").
1516                Watermarked::Watermark(50),
1517            ],
1518        );
1519    }
1520
1521    /// Single-error `collapse` returns that error.
1522    #[test]
1523    fn collapse_single_scan_limit() {
1524        assert!(matches!(
1525            BitmapScanError::collapse(vec![BitmapScanError::ScanLimit]),
1526            BitmapScanError::ScanLimit
1527        ));
1528    }
1529
1530    /// All-`ScanLimit` aggregate collapses to a single `ScanLimit`.
1531    #[test]
1532    fn collapse_all_scan_limit() {
1533        assert!(matches!(
1534            BitmapScanError::collapse(
1535                vec![BitmapScanError::ScanLimit, BitmapScanError::ScanLimit,]
1536            ),
1537            BitmapScanError::ScanLimit
1538        ));
1539    }
1540
1541    /// A `Source` fault co-occurring with a `ScanLimit` must win: masking the
1542    /// fault as a clean `ScanLimit` end would silently corrupt results.
1543    #[test]
1544    fn collapse_fault_outranks_scan_limit() {
1545        let collapsed = BitmapScanError::collapse(vec![
1546            BitmapScanError::ScanLimit,
1547            BitmapScanError::Source(anyhow::anyhow!("storage boom")),
1548        ]);
1549        match collapsed {
1550            BitmapScanError::Source(e) => assert!(e.to_string().contains("storage boom")),
1551            other => panic!("expected Source fault to win, got {other:?}"),
1552        }
1553    }
1554
1555    /// A `Source` fault outranks both `ScanLimit` and `Cancelled`: a real fault
1556    /// must never be masked as a clean end or a cancel.
1557    #[test]
1558    fn collapse_source_outranks_scan_limit_and_cancelled() {
1559        let collapsed = BitmapScanError::collapse(vec![
1560            BitmapScanError::ScanLimit,
1561            BitmapScanError::Cancelled,
1562            BitmapScanError::Source(anyhow::anyhow!("storage boom")),
1563        ]);
1564        match collapsed {
1565            BitmapScanError::Source(e) => assert!(e.to_string().contains("storage boom")),
1566            other => panic!("expected Source to win, got {other:?}"),
1567        }
1568    }
1569
1570    /// `ScanLimit` outranks `Cancelled`: the scan-limit frontier is a usable
1571    /// continuation cursor, strictly better than ending on a cancel.
1572    #[test]
1573    fn collapse_scan_limit_outranks_cancelled() {
1574        assert!(matches!(
1575            BitmapScanError::collapse(
1576                vec![BitmapScanError::Cancelled, BitmapScanError::ScanLimit,]
1577            ),
1578            BitmapScanError::ScanLimit
1579        ));
1580    }
1581
1582    /// Several concurrent faults combine into one `Source` that keeps every
1583    /// leaf's message rather than dropping all but one.
1584    #[test]
1585    fn collapse_combines_concurrent_faults() {
1586        let collapsed = BitmapScanError::collapse(vec![
1587            BitmapScanError::Source(anyhow::anyhow!("boom one")),
1588            BitmapScanError::Source(anyhow::anyhow!("boom two")),
1589        ]);
1590        match collapsed {
1591            BitmapScanError::Source(e) => {
1592                let s = e.to_string();
1593                assert!(s.contains("boom one"), "missing first fault: {s}");
1594                assert!(s.contains("boom two"), "missing second fault: {s}");
1595            }
1596            other => panic!("expected combined Source, got {other:?}"),
1597        }
1598    }
1599
1600    /// `From<anyhow::Error>` is a fault funnel: every `anyhow` becomes `Source`.
1601    /// Terminal dispositions are decided at the source and never reconstructed
1602    /// here, so this never produces `ScanLimit`/`Cancelled`.
1603    #[test]
1604    fn from_anyhow_funnels_to_source() {
1605        let fault: anyhow::Error = anyhow::anyhow!("storage boom");
1606        match BitmapScanError::from(fault) {
1607            BitmapScanError::Source(e) => assert!(e.to_string().contains("storage boom")),
1608            other => panic!("expected Source, got {other:?}"),
1609        }
1610    }
1611}