sui_inverted_index/bitmap_query/
iter.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Synchronous iterator evaluator for DNF bitmap queries.
5//!
6//! A single flat driver merge-joins every leaf scan against one shared *floor*
7//! (the slowest leaf's position). At the floor bucket it evaluates the query —
8//! intersect each term's included dimensions, subtract its excluded ones, then
9//! union across terms — and emits a watermark at the floor. Because leaves only
10//! ever advance at the floor (peeked one bucket ahead), no branch can run ahead
11//! of the others: the resume cursor is always within one sparse read of every
12//! leaf, and there is no windowing/parking machinery to get wrong. Mirrors the
13//! async [`super::stream`] evaluator, which shares the per-bucket evaluation
14//! ([`eval_term_at_bucket`]) and is cross-checked against this one in tests.
15//!
16//! Budget accounting lives in the request layer for the iterator path (each
17//! backend leaf iterator charges its own per-request budget and yields an error
18//! on exhaustion), so this evaluator only propagates those errors.
19
20use std::collections::VecDeque;
21use std::iter::Peekable as IterPeekable;
22use std::ops::Range;
23
24use roaring::RoaringBitmap;
25
26use super::BitmapBucketIteratorSource;
27use super::BitmapQuery;
28use super::BitmapScanError;
29use super::DedupedQuery;
30use super::LeafHead;
31use super::ScanDirection;
32use super::Watermarked;
33use super::WatermarkedBucket;
34use super::bound_in_direction;
35use super::bucket_edges;
36use super::build_term_specs;
37use super::count_on_floor_refs;
38use super::eval_term_at_bucket;
39use super::frontier_advanced;
40use super::recompute_unreferenced;
41use super::take_snapshot_bitmap;
42
43/// Evaluate a DNF `BitmapQuery` as an ordered iterator of marked bucket bitmaps.
44/// Output emits `Watermarked::Item((bucket_id, bitmap))` interleaved with
45/// `Watermarked::Watermark(p)` derived from the slowest leaf's progress.
46pub fn eval_bitmap_query_bucket_iter<'a, S>(
47    source: S,
48    query: BitmapQuery,
49    range: Range<u64>,
50    bucket_size: u64,
51    direction: ScanDirection,
52) -> impl Iterator<Item = WatermarkedBucket> + 'a
53where
54    S: BitmapBucketIteratorSource<'a>,
55{
56    // One peekable leaf per *unique* dimension key — terms reference them by
57    // index. Identical keys across literals share a single backend scan; see
58    // [`build_term_specs`]. Each leaf iterator borrows the backend's `'a`
59    // store, not `source`, so the thin `source` handle is dropped once the
60    // leaves are built.
61    let DedupedQuery {
62        keys: unique_keys,
63        mut terms,
64    } = build_term_specs(query.terms);
65    let mut leaves: Vec<IterPeekable<S::Iter>> = Vec::with_capacity(unique_keys.len());
66    for key in unique_keys {
67        leaves.push(
68            source
69                .scan_bucket_iter(key, range.clone(), direction)
70                .peekable(),
71        );
72    }
73
74    let leaf_count = leaves.len();
75    let terminus = if direction.is_ascending() {
76        range.end
77    } else {
78        range.start
79    };
80    let request_floor = if direction.is_ascending() {
81        range.start
82    } else {
83        range.end
84    };
85    // `unreferenced[i]`: leaf is retired — either no satisfiable term still
86    // points at it, or its bucket stream is at EOF (a spent exclude).
87    let mut unreferenced = vec![false; leaf_count];
88    // `front[i]`: clamped position each leaf has provably scanned to. Bounds the
89    // resume cursor when a leaf errors before it can advance.
90    let mut front = vec![request_floor; leaf_count];
91    let mut last_emitted: Option<u64> = None;
92    let mut done = false;
93    let mut pending: VecDeque<WatermarkedBucket> = VecDeque::new();
94
95    std::iter::from_fn(move || {
96        loop {
97            if let Some(out) = pending.pop_front() {
98                return Some(out);
99            }
100            if done {
101                return None;
102            }
103
104            // Peek every active leaf (non-consuming); record its head and the
105            // position it has now scanned to.
106            let mut class: Vec<Option<LeafHead>> = (0..leaf_count).map(|_| None).collect();
107            for i in 0..leaf_count {
108                if unreferenced[i] {
109                    continue;
110                }
111                match leaves[i].peek() {
112                    Some(Ok((bucket, _))) => {
113                        let (pre, _post) = bucket_edges(*bucket, bucket_size, &range, direction);
114                        front[i] = pre;
115                        class[i] = Some(LeafHead::Bucket(*bucket));
116                    }
117                    None => {
118                        front[i] = terminus;
119                        class[i] = Some(LeafHead::Eof);
120                    }
121                    // Budget exhaustion: leave `front[i]` at its last scanned
122                    // position so the resume cursor cannot claim past it.
123                    Some(Err(_)) => class[i] = Some(LeafHead::Error),
124                }
125            }
126
127            // An include at EOF makes its term unsatisfiable (the intersection
128            // is permanently empty). With dedup, an EOF'd leaf may be an
129            // include for several terms; all of them become unsatisfiable.
130            for term in terms.iter_mut() {
131                if !term.unsatisfiable
132                    && term
133                        .includes
134                        .iter()
135                        .any(|&i| matches!(class[i], Some(LeafHead::Eof)))
136                {
137                    term.unsatisfiable = true;
138                }
139            }
140            // Recompute leaf liveness from current term state. A leaf may be
141            // shared across terms (include for one, exclude for another), so it
142            // can only be retired when no satisfiable term still references it.
143            recompute_unreferenced(&terms, &class, &mut unreferenced);
144
145            // Consume any budget-error frame so the error surfaces (after the
146            // floor watermark below).
147            let mut errors: Vec<BitmapScanError> = Vec::new();
148            for i in 0..leaf_count {
149                if !unreferenced[i] && matches!(class[i], Some(LeafHead::Error)) {
150                    match leaves[i].next() {
151                        Some(Err(e)) => errors.push(e),
152                        _ => unreachable!("peek classified Error"),
153                    }
154                }
155            }
156
157            let active: Vec<usize> = (0..leaf_count).filter(|&i| !unreferenced[i]).collect();
158            if active.is_empty() {
159                // Every term retired naturally: cap the scan at the range
160                // terminus so the client learns it covered the whole range.
161                done = true;
162                if frontier_advanced(last_emitted, terminus, direction) {
163                    return Some(Ok(Watermarked::Watermark(terminus)));
164                }
165                return None;
166            }
167
168            // The floor is the slowest active leaf's scanned-to position; it is
169            // the merged "every source has scanned past here" watermark.
170            let floor_pos = active
171                .iter()
172                .map(|&i| front[i])
173                .reduce(|a, b| bound_in_direction(a, b, direction))
174                .expect("active non-empty");
175            if frontier_advanced(last_emitted, floor_pos, direction) {
176                pending.push_back(Ok(Watermarked::Watermark(floor_pos)));
177                last_emitted = Some(floor_pos);
178            }
179
180            // Budget exhausted: the floor watermark above is the resume cursor;
181            // everything below it was fully evaluated in prior rounds.
182            if !errors.is_empty() {
183                done = true;
184                pending.push_back(Err(BitmapScanError::collapse(errors)));
185                continue;
186            }
187
188            // Evaluate the DNF at the nearest bucket any active leaf sits on.
189            let floor_bucket = active
190                .iter()
191                .filter_map(|&i| match class[i] {
192                    Some(LeafHead::Bucket(b)) => Some(b),
193                    _ => None,
194                })
195                .reduce(|a, b| match direction {
196                    ScanDirection::Ascending => a.min(b),
197                    ScanDirection::Descending => a.max(b),
198                })
199                .expect("active leaves carry buckets when there is no error");
200            let (_pre, post) = bucket_edges(floor_bucket, bucket_size, &range, direction);
201
202            // Snapshot the bitmaps of leaves sitting on `floor_bucket` —
203            // each unique leaf consumed exactly once — then distribute to
204            // every referencing term. See [`build_term_specs`] for why
205            // dedup matters: a leaf shared across terms can only be
206            // pulled once before its iterator moves on.
207            let mut snapshot: Vec<Option<RoaringBitmap>> = (0..leaf_count).map(|_| None).collect();
208            let mut on_floor = vec![false; leaf_count];
209            for i in 0..leaf_count {
210                if !unreferenced[i]
211                    && matches!(class[i], Some(LeafHead::Bucket(b)) if b == floor_bucket)
212                {
213                    on_floor[i] = true;
214                    front[i] = post;
215                    snapshot[i] = match leaves[i].next() {
216                        Some(Ok((_, bitmap))) => Some(bitmap),
217                        _ => None,
218                    };
219                }
220            }
221            let mut remaining_refs = count_on_floor_refs(&terms, &on_floor);
222
223            let mut result: Option<RoaringBitmap> = None;
224            for term in &terms {
225                if term.unsatisfiable {
226                    continue;
227                }
228                let includes: Vec<Option<RoaringBitmap>> = term
229                    .includes
230                    .iter()
231                    .map(|&i| {
232                        take_snapshot_bitmap(&mut snapshot, &mut remaining_refs, &on_floor, i)
233                    })
234                    .collect();
235                let excludes: Vec<Option<RoaringBitmap>> = term
236                    .excludes
237                    .iter()
238                    .map(|&i| {
239                        take_snapshot_bitmap(&mut snapshot, &mut remaining_refs, &on_floor, i)
240                    })
241                    .collect();
242                if let Some(bitmap) = eval_term_at_bucket(includes, excludes) {
243                    result = Some(match result {
244                        None => bitmap,
245                        Some(acc) => acc | bitmap,
246                    });
247                }
248            }
249
250            if let Some(bitmap) = result {
251                pending.push_back(Ok(Watermarked::Item((floor_bucket, bitmap))));
252            }
253            if frontier_advanced(last_emitted, post, direction) {
254                pending.push_back(Ok(Watermarked::Watermark(post)));
255                last_emitted = Some(post);
256            }
257        }
258    })
259}
260
261#[cfg(test)]
262mod tests {
263    use std::collections::BTreeMap;
264    use std::sync::Arc;
265
266    use futures::StreamExt;
267
268    use super::*;
269    use crate::bitmap_query::BitmapScanBudget;
270    use crate::bitmap_query::BitmapTerm;
271    use crate::bitmap_query::eval_bitmap_query_bucket_stream;
272    use crate::bitmap_query::test_utils::*;
273
274    /// Collect a marked sequence into a comparable `(bucket_id, bits)` /
275    /// watermark form.
276    fn collect_marked(items: Vec<WatermarkedBucket>) -> Vec<Watermarked<(u64, Vec<u32>)>> {
277        items
278            .into_iter()
279            .map(|r| r.unwrap().map_item(|(b, bm)| (b, bm.iter().collect())))
280            .collect()
281    }
282
283    fn items_only(marked: &[Watermarked<(u64, Vec<u32>)>]) -> Vec<(u64, Vec<u32>)> {
284        marked
285            .iter()
286            .filter_map(|w| match w {
287                Watermarked::Item(it) => Some(it.clone()),
288                Watermarked::Watermark(_) => None,
289            })
290            .collect()
291    }
292
293    #[test]
294    fn eval_bitmap_query_bucket_iter_uses_iterator_source() {
295        let source = TestBucketSource {
296            buckets: Arc::new(BTreeMap::from([
297                (test_key(b"a"), vec![(0, vec![1, 2, 3]), (1, vec![5])]),
298                (test_key(b"b"), vec![(0, vec![2, 3]), (1, vec![5])]),
299                (test_key(b"c"), vec![(0, vec![3])]),
300            ])),
301        };
302        let query = BitmapQuery::new(vec![
303            BitmapTerm::new(vec![include(b"a"), include(b"b"), exclude(b"c")]).unwrap(),
304        ])
305        .unwrap();
306
307        let out = eval_bitmap_query_bucket_iter(
308            source,
309            query,
310            0..200_000,
311            BUCKET_SIZE,
312            ScanDirection::Ascending,
313        )
314        .collect::<Vec<_>>();
315        let out = items_only(&collect_marked(out));
316
317        assert_eq!(out, vec![(0, vec![2]), (1, vec![5])]);
318    }
319
320    /// Two terms share the same include `a`. The iter evaluator must collapse
321    /// them to a single backend scan and distribute its per-bucket bitmap to
322    /// both terms. Mirrors the stream-side
323    /// `shared_include_across_terms_scans_dimension_once` test so the dedup
324    /// invariant is exercised on both evaluators.
325    #[test]
326    fn shared_include_across_terms_scans_dimension_once() {
327        use crate::bitmap_query::test_utils::CountingBucketSource;
328
329        let source = CountingBucketSource::new(BTreeMap::from([
330            (test_key(b"a"), vec![(0, vec![1, 2, 3])]),
331            (test_key(b"b"), vec![(0, vec![1])]),
332            (test_key(b"c"), vec![(0, vec![2])]),
333        ]));
334        let query = BitmapQuery::new(vec![
335            BitmapTerm::new(vec![include(b"a"), include(b"b")]).unwrap(),
336            BitmapTerm::new(vec![include(b"a"), include(b"c")]).unwrap(),
337        ])
338        .unwrap();
339
340        let out = items_only(&collect_marked(
341            eval_bitmap_query_bucket_iter(
342                source.clone(),
343                query,
344                0..200_000,
345                BUCKET_SIZE,
346                ScanDirection::Ascending,
347            )
348            .collect(),
349        ));
350
351        // Bucket 0: term1 = a∩b = {1}; term2 = a∩c = {2}; union = {1, 2}.
352        assert_eq!(out, vec![(0, vec![1, 2])]);
353        assert_eq!(source.scan_count(&test_key(b"a")), 1);
354        assert_eq!(source.scan_count(&test_key(b"b")), 1);
355        assert_eq!(source.scan_count(&test_key(b"c")), 1);
356    }
357
358    /// The iterator evaluator must produce the exact same `Watermarked` sequence
359    /// — items AND progress watermarks — as the stream evaluator for the same
360    /// query, since both share the per-bucket DNF evaluation and floor logic.
361    #[tokio::test]
362    async fn eval_bitmap_query_bucket_iter_matches_stream_for_or_terms() {
363        let source = TestBucketSource {
364            buckets: Arc::new(BTreeMap::from([
365                (
366                    test_key(b"a"),
367                    vec![(0, vec![1, 2, 3]), (1, vec![5, 6]), (2, vec![9])],
368                ),
369                (
370                    test_key(b"b"),
371                    vec![(0, vec![2, 3]), (1, vec![6]), (2, vec![9, 10])],
372                ),
373                (test_key(b"c"), vec![(0, vec![3]), (2, vec![9])]),
374                (test_key(b"d"), vec![(1, vec![1, 8]), (2, vec![7])]),
375                (test_key(b"e"), vec![(1, vec![8])]),
376            ])),
377        };
378        let query = BitmapQuery::new(vec![
379            BitmapTerm::new(vec![include(b"a"), include(b"b"), exclude(b"c")]).unwrap(),
380            BitmapTerm::new(vec![include(b"d"), exclude(b"e")]).unwrap(),
381        ])
382        .unwrap();
383
384        for direction in [ScanDirection::Ascending, ScanDirection::Descending] {
385            let stream_out: Vec<_> = eval_bitmap_query_bucket_stream(
386                source.clone(),
387                query.clone(),
388                0..300_000,
389                BUCKET_SIZE,
390                direction,
391                BitmapScanBudget::new(1_000_000),
392            )
393            .collect()
394            .await;
395            let iter_out: Vec<_> = eval_bitmap_query_bucket_iter(
396                source.clone(),
397                query.clone(),
398                0..300_000,
399                BUCKET_SIZE,
400                direction,
401            )
402            .collect();
403
404            assert_eq!(
405                collect_marked(stream_out),
406                collect_marked(iter_out),
407                "iter and stream marked sequences diverged for {direction:?}"
408            );
409        }
410    }
411
412    /// Parity holds even when buckets are spread far apart (sparse gaps, leaves
413    /// leapfrogging) — the regime where a naive merge could drift between the two
414    /// evaluators.
415    #[tokio::test]
416    async fn eval_bitmap_query_bucket_iter_matches_stream_over_sparse_gaps() {
417        let source = TestBucketSource {
418            buckets: Arc::new(BTreeMap::from([
419                (
420                    test_key(b"a"),
421                    vec![(0, vec![1, 2, 3]), (5, vec![5, 6]), (9, vec![9])],
422                ),
423                (
424                    test_key(b"b"),
425                    vec![(0, vec![2, 3]), (5, vec![6]), (9, vec![9, 10])],
426                ),
427                (test_key(b"c"), vec![(0, vec![3]), (9, vec![9])]),
428                (test_key(b"d"), vec![(3, vec![1, 8]), (7, vec![7])]),
429                (test_key(b"e"), vec![(3, vec![8])]),
430            ])),
431        };
432        let query = BitmapQuery::new(vec![
433            BitmapTerm::new(vec![include(b"a"), include(b"b"), exclude(b"c")]).unwrap(),
434            BitmapTerm::new(vec![include(b"d"), exclude(b"e")]).unwrap(),
435        ])
436        .unwrap();
437
438        for direction in [ScanDirection::Ascending, ScanDirection::Descending] {
439            let stream_out: Vec<_> = eval_bitmap_query_bucket_stream(
440                source.clone(),
441                query.clone(),
442                0..(10 * BUCKET_SIZE),
443                BUCKET_SIZE,
444                direction,
445                BitmapScanBudget::new(1_000_000),
446            )
447            .collect()
448            .await;
449            let iter_out: Vec<_> = eval_bitmap_query_bucket_iter(
450                source.clone(),
451                query.clone(),
452                0..(10 * BUCKET_SIZE),
453                BUCKET_SIZE,
454                direction,
455            )
456            .collect();
457
458            assert_eq!(
459                collect_marked(stream_out),
460                collect_marked(iter_out),
461                "iter and stream diverged over sparse gaps for {direction:?}"
462            );
463        }
464    }
465
466    /// A sparse intersection that matches nothing in a gap must still emit
467    /// watermarks that advance the frontier, and cap with the range terminus.
468    #[test]
469    fn intersect_emits_coalesced_watermarks_over_sparse_gap() {
470        let source = TestBucketSource {
471            buckets: Arc::new(BTreeMap::from([
472                (test_key(b"a"), vec![(0, vec![1]), (2, vec![5])]),
473                (test_key(b"b"), vec![(0, vec![1]), (2, vec![9])]),
474            ])),
475        };
476        let query = BitmapQuery::new(vec![
477            BitmapTerm::new(vec![include(b"a"), include(b"b")]).unwrap(),
478        ])
479        .unwrap();
480
481        let marked = collect_marked(
482            eval_bitmap_query_bucket_iter(
483                source,
484                query,
485                0..300_000,
486                BUCKET_SIZE,
487                ScanDirection::Ascending,
488            )
489            .collect(),
490        );
491
492        // Only bucket 0 intersects (member 1); bucket 2 disjoint -> dropped.
493        assert_eq!(items_only(&marked), vec![(0, vec![1])]);
494
495        // Watermarks must be non-decreasing and reach the range terminus.
496        let watermarks: Vec<u64> = marked
497            .iter()
498            .filter_map(|w| match w {
499                Watermarked::Watermark(p) => Some(*p),
500                Watermarked::Item(_) => None,
501            })
502            .collect();
503        assert!(
504            watermarks.windows(2).all(|w| w[0] <= w[1]),
505            "ascending watermarks must be non-decreasing: {watermarks:?}"
506        );
507        assert_eq!(
508            watermarks.last().copied(),
509            Some(300_000),
510            "final watermark must reach the range terminus"
511        );
512    }
513
514    /// An unanchored term (`NOT x`, anchored on the synthesized universe leaf)
515    /// emits the complement at exclude-occupied buckets, full bitmaps at gap
516    /// buckets, and keeps emitting full buckets after the exclude leaf EOFs.
517    #[test]
518    fn unanchored_term_emits_complement_over_gaps_and_past_exclude_eof() {
519        let source = TestBucketSource {
520            buckets: Arc::new(BTreeMap::from([(
521                test_key(b"x"),
522                vec![(0, vec![1, 2]), (2, vec![5])],
523            )])),
524        };
525        let query = BitmapQuery::new(vec![
526            BitmapTerm::new(vec![include_universe(), exclude(b"x")]).unwrap(),
527        ])
528        .unwrap();
529
530        let items: Vec<(u64, RoaringBitmap)> = eval_bitmap_query_bucket_iter(
531            source,
532            query,
533            0..(4 * BUCKET_SIZE),
534            BUCKET_SIZE,
535            ScanDirection::Ascending,
536        )
537        .filter_map(|r| match r.unwrap() {
538            Watermarked::Item(it) => Some(it),
539            Watermarked::Watermark(_) => None,
540        })
541        .collect();
542
543        let complement = |bits: &[u32]| {
544            let mut bm = full_bucket();
545            for &b in bits {
546                bm.remove(b);
547            }
548            bm
549        };
550        let expected = vec![
551            (0, complement(&[1, 2])),
552            (1, full_bucket()),
553            (2, complement(&[5])),
554            (3, full_bucket()),
555        ];
556        assert_eq!(items, expected);
557    }
558
559    /// Iter/stream parity for a mixed DNF with an unanchored term, in both
560    /// directions — the universe leaf forces dense bucket coverage while the
561    /// anchored term stays sparse.
562    #[tokio::test]
563    async fn eval_bitmap_query_bucket_iter_matches_stream_for_unanchored_terms() {
564        let source = TestBucketSource {
565            buckets: Arc::new(BTreeMap::from([
566                (test_key(b"a"), vec![(1, vec![7])]),
567                (test_key(b"x"), vec![(0, vec![1, 2]), (3, vec![5])]),
568            ])),
569        };
570        let query = BitmapQuery::new(vec![
571            BitmapTerm::new(vec![include(b"a")]).unwrap(),
572            BitmapTerm::new(vec![include_universe(), exclude(b"x")]).unwrap(),
573        ])
574        .unwrap();
575
576        for direction in [ScanDirection::Ascending, ScanDirection::Descending] {
577            let stream_out: Vec<_> = eval_bitmap_query_bucket_stream(
578                source.clone(),
579                query.clone(),
580                0..(5 * BUCKET_SIZE),
581                BUCKET_SIZE,
582                direction,
583                BitmapScanBudget::new(1_000_000),
584            )
585            .collect()
586            .await;
587            let iter_out: Vec<_> = eval_bitmap_query_bucket_iter(
588                source.clone(),
589                query.clone(),
590                0..(5 * BUCKET_SIZE),
591                BUCKET_SIZE,
592                direction,
593            )
594            .collect();
595
596            assert_eq!(
597                collect_marked(stream_out),
598                collect_marked(iter_out),
599                "iter and stream diverged on unanchored terms for {direction:?}"
600            );
601        }
602    }
603
604    /// Budget exhaustion mid-dense-scan surfaces `BitmapScanError::ScanLimit`
605    /// after a floor watermark, and resuming from that watermark covers the
606    /// remaining buckets exactly once.
607    #[tokio::test]
608    async fn unanchored_budget_exhaustion_resumes_at_watermark() {
609        use crate::bitmap_query::BitmapScanError;
610
611        let source = TestBucketSource {
612            buckets: Arc::new(BTreeMap::new()),
613        };
614        let query = BitmapQuery::new(vec![
615            BitmapTerm::new(vec![include_universe(), exclude(b"x")]).unwrap(),
616        ])
617        .unwrap();
618
619        let first: Vec<_> = eval_bitmap_query_bucket_stream(
620            source.clone(),
621            query.clone(),
622            0..(10 * BUCKET_SIZE),
623            BUCKET_SIZE,
624            ScanDirection::Ascending,
625            BitmapScanBudget::new(3),
626        )
627        .collect()
628        .await;
629
630        let mut covered: Vec<u64> = Vec::new();
631        let mut last_watermark = 0;
632        let mut limit_hit = false;
633        for item in first {
634            match item {
635                Ok(Watermarked::Item((bucket, bitmap))) => {
636                    assert_eq!(bitmap, full_bucket());
637                    covered.push(bucket);
638                }
639                Ok(Watermarked::Watermark(p)) => last_watermark = p,
640                Err(e) => {
641                    assert!(matches!(e, BitmapScanError::ScanLimit));
642                    limit_hit = true;
643                }
644            }
645        }
646        assert!(limit_hit, "3-bucket budget cannot cover 10 dense buckets");
647        assert_eq!(covered, vec![0, 1, 2]);
648        assert_eq!(last_watermark, 3 * BUCKET_SIZE);
649
650        let resumed: Vec<_> = eval_bitmap_query_bucket_stream(
651            source,
652            query,
653            last_watermark..(10 * BUCKET_SIZE),
654            BUCKET_SIZE,
655            ScanDirection::Ascending,
656            BitmapScanBudget::new(1_000_000),
657        )
658        .collect()
659        .await;
660        for item in resumed {
661            if let Watermarked::Item((bucket, _)) = item.unwrap() {
662                covered.push(bucket);
663            }
664        }
665        assert_eq!(covered, (0..10).collect::<Vec<_>>());
666    }
667}