sui_rpc_api/ledger_history/
watermark.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Shared `Watermark` construction for the v2alpha list APIs.
5//!
6//! Both ledger-history backends — the fullnode (`sui-rpc-api`) and bigtable
7//! (`sui-kv-rpc`) — and all three list handlers (`list_transactions`,
8//! `list_events`, `list_checkpoints`) emit the same wire `Watermark`: a resume
9//! cursor plus a direction-matching completion boundary (`checkpoint_hi`
10//! ascending / `checkpoint_lo` descending). The cursor encoding and the
11//! boundary bookkeeping are identical; what differs per API is how a scan
12//! position resolves into a completion-boundary candidate:
13//!
14//! - `list_transactions` / `list_events` scan within a checkpoint, so an
15//!   item at cp `C` does NOT prove `C` complete (more matches may sit at
16//!   higher/lower tx_seqs / event_seqs). Their boundary candidate is
17//!   `C ∓ 1` — see [`advance_boundary_excluding_cp`].
18//! - `list_checkpoints` dedupes cp_seq, so "cp `C` emitted" ≡ "cp `C`
19//!   complete." It feeds `C` straight into [`advance_checkpoint_boundary`]
20//!   for items, and translates its scan frontier into a cp-space candidate
21//!   itself before doing the same.
22//!
23//! This module owns the shared pieces; each handler keeps only its
24//! API-specific frontier-to-candidate adapter.
25
26use sui_inverted_index::ScanDirection;
27
28use sui_rpc::proto::sui::rpc::v2alpha::QueryEndReason;
29use sui_rpc::proto::sui::rpc::v2alpha::Watermark;
30
31use crate::ledger_history::query_options::QueryOptions;
32
33/// Populate the direction-matching field of a `Watermark` from the
34/// per-scan boundary value. Exactly one of `checkpoint_hi` /
35/// `checkpoint_lo` is set, never both.
36fn set_checkpoint_bound(wm: &mut Watermark, options: &QueryOptions, boundary: Option<u64>) {
37    if options.is_ascending() {
38        wm.checkpoint_hi = boundary;
39    } else {
40        wm.checkpoint_lo = boundary;
41    }
42}
43
44/// Fold an already-resolved completion-boundary `candidate` into the
45/// accumulated boundary, keeping the most-advanced value in scan direction:
46/// the max ascending, the min descending.
47///
48/// Callers resolve `candidate` for their scan domain first — `list_checkpoints`
49/// passes the item's cp directly (dedup makes it complete), while the
50/// per-checkpoint scanners use [`advance_boundary_excluding_cp`].
51pub fn advance_checkpoint_boundary(
52    prev: Option<u64>,
53    candidate: u64,
54    options: &QueryOptions,
55) -> Option<u64> {
56    Some(match prev {
57        None => candidate,
58        Some(p) if options.is_ascending() => p.max(candidate),
59        Some(p) => p.min(candidate),
60    })
61}
62
63/// Fold a cp whose own checkpoint is NOT proven complete into the
64/// accumulated boundary (`list_transactions` / `list_events`: cp `C` may
65/// still hold further matches at other tx_seqs / event_seqs; and any scan
66/// frontier, which lands partway through the checkpoint it resolves to). The
67/// boundary excludes `C` itself: `C - 1` ascending / `C + 1` descending.
68///
69/// When that adjusted candidate would overflow (`C == 0` ascending or
70/// `u64::MAX` descending) the previously accumulated boundary is preserved
71/// rather than collapsed back to `None`.
72pub fn advance_boundary_excluding_cp(
73    prev: Option<u64>,
74    cp: u64,
75    options: &QueryOptions,
76) -> Option<u64> {
77    let candidate = if options.is_ascending() {
78        cp.checked_sub(1)
79    } else {
80        cp.checked_add(1)
81    };
82    match candidate {
83        Some(c) => advance_checkpoint_boundary(prev, c, options),
84        None => prev,
85    }
86}
87
88/// Build the embedded `Watermark` for an item: the cursor encodes this
89/// item's position (so the next request's `after`/`before` resumes past it)
90/// plus the current direction-matching checkpoint boundary. `cp` /
91/// `position` are the item's cursor coordinates (`list_checkpoints` passes
92/// its cp_seq for both).
93pub fn item_watermark(
94    options: &QueryOptions,
95    cp: u64,
96    position: u64,
97    boundary: Option<u64>,
98) -> Watermark {
99    let mut wm = Watermark::default();
100    wm.cursor = Some(options.cursor_for_item(cp, position));
101    set_checkpoint_bound(&mut wm, options, boundary);
102    wm
103}
104
105/// Build a standalone scan-frontier `Watermark`. `cursor_cp` / `position`
106/// are the boundary cursor coordinates the caller has already resolved for
107/// its scan domain (see [`boundary_cursor_cp`] for the per-checkpoint
108/// scanners' direction adjustment); `boundary` is the accumulated
109/// completion boundary.
110pub fn boundary_watermark(
111    options: &QueryOptions,
112    cursor_cp: u64,
113    position: u64,
114    boundary: Option<u64>,
115) -> Watermark {
116    let mut wm = Watermark::default();
117    wm.cursor = Some(options.cursor_for_boundary(cursor_cp, position));
118    set_checkpoint_bound(&mut wm, options, boundary);
119    wm
120}
121
122/// Resolve the boundary-cursor checkpoint coordinate for a `list_transactions`
123/// / `list_events` scan frontier. The cursor encoding is asymmetric:
124/// ascending `Boundary` cursors advance the cp-range start, so the frontier
125/// cp is used directly; descending `Boundary` cursors treat the cp
126/// coordinate as an EXCLUSIVE upper bound, so `cp + 1` is needed to keep
127/// `cp` itself included on resume.
128pub fn boundary_cursor_cp(cp: u64, direction: ScanDirection) -> u64 {
129    if direction.is_ascending() {
130        cp
131    } else {
132        cp.saturating_add(1)
133    }
134}
135
136/// Boundary watermark emitted once a scan has drained its entire resolved
137/// range under natural completion. Unlike per-item watermarks it can claim
138/// the range's final checkpoint complete — `end_checkpoint - 1` ascending
139/// (the exclusive cp upper) or `end_checkpoint` descending (the inclusive cp
140/// lower) — because no further items exist in it within the requested range.
141/// The `(end_checkpoint, end_position)` cursor resumes exactly past the
142/// scanned range.
143pub fn terminal_boundary_watermark(
144    options: &QueryOptions,
145    end_checkpoint: u64,
146    end_position: u64,
147) -> Watermark {
148    let boundary = if options.is_ascending() {
149        end_checkpoint.checked_sub(1)
150    } else {
151        Some(end_checkpoint)
152    };
153    let mut wm = Watermark::default();
154    wm.cursor = Some(options.cursor_for_boundary(end_checkpoint, end_position));
155    set_checkpoint_bound(&mut wm, options, boundary);
156    wm
157}
158
159/// Whether the scan reached the natural end of the requested range (the
160/// ledger tip or a requested `end_checkpoint`) rather than being truncated
161/// by an item or scan limit, or bounded by a client cursor. Only natural
162/// completion proves the range's final checkpoint complete.
163pub fn reached_range_end(reason: QueryEndReason) -> bool {
164    matches!(
165        reason,
166        QueryEndReason::LedgerTip | QueryEndReason::CheckpointBound
167    )
168}
169
170#[cfg(test)]
171mod tests {
172    use super::*;
173    use crate::ledger_history::query_options::QueryType;
174
175    fn options(ascending: bool) -> QueryOptions {
176        let mut request = sui_rpc::proto::sui::rpc::v2alpha::QueryOptions::default();
177        request.ordering = if ascending {
178            sui_rpc::proto::sui::rpc::v2alpha::Ordering::Ascending as i32
179        } else {
180            sui_rpc::proto::sui::rpc::v2alpha::Ordering::Descending as i32
181        };
182        QueryOptions::from_proto(
183            Some(&request),
184            100,
185            100,
186            QueryType::Transactions,
187            Option::<&sui_rpc::proto::sui::rpc::v2alpha::TransactionFilter>::None,
188        )
189        .unwrap()
190    }
191
192    #[test]
193    fn advance_checkpoint_boundary_keeps_most_advanced_in_direction() {
194        let asc = options(true);
195        assert_eq!(advance_checkpoint_boundary(None, 5, &asc), Some(5));
196        assert_eq!(advance_checkpoint_boundary(Some(5), 9, &asc), Some(9));
197        assert_eq!(advance_checkpoint_boundary(Some(9), 5, &asc), Some(9));
198
199        let desc = options(false);
200        assert_eq!(advance_checkpoint_boundary(None, 9, &desc), Some(9));
201        assert_eq!(advance_checkpoint_boundary(Some(9), 5, &desc), Some(5));
202        assert_eq!(advance_checkpoint_boundary(Some(5), 9, &desc), Some(5));
203    }
204
205    /// The per-checkpoint scanners exclude the item's own cp: `C - 1`
206    /// ascending, `C + 1` descending.
207    #[test]
208    fn advance_boundary_excluding_cp_adjusts_by_one() {
209        let asc = options(true);
210        assert_eq!(advance_boundary_excluding_cp(None, 10, &asc), Some(9));
211        assert_eq!(advance_boundary_excluding_cp(Some(9), 12, &asc), Some(11));
212
213        let desc = options(false);
214        assert_eq!(advance_boundary_excluding_cp(None, 10, &desc), Some(11));
215        assert_eq!(advance_boundary_excluding_cp(Some(11), 8, &desc), Some(9));
216    }
217
218    /// Overflow at the range edge (`cp 0` ascending, `u64::MAX` descending)
219    /// preserves the previously accumulated boundary instead of dropping it.
220    #[test]
221    fn advance_boundary_excluding_cp_preserves_prev_on_overflow() {
222        let asc = options(true);
223        assert_eq!(advance_boundary_excluding_cp(Some(4), 0, &asc), Some(4));
224        assert_eq!(advance_boundary_excluding_cp(None, 0, &asc), None);
225
226        let desc = options(false);
227        assert_eq!(
228            advance_boundary_excluding_cp(Some(4), u64::MAX, &desc),
229            Some(4)
230        );
231        assert_eq!(advance_boundary_excluding_cp(None, u64::MAX, &desc), None);
232    }
233
234    #[test]
235    fn boundary_cursor_cp_bumps_descending_only() {
236        assert_eq!(boundary_cursor_cp(10, ScanDirection::Ascending), 10);
237        assert_eq!(boundary_cursor_cp(10, ScanDirection::Descending), 11);
238        assert_eq!(
239            boundary_cursor_cp(u64::MAX, ScanDirection::Descending),
240            u64::MAX
241        );
242    }
243
244    /// Ascending stores the boundary in `checkpoint_hi`; descending in
245    /// `checkpoint_lo`. A client reads the direction-correct bound off the
246    /// wire frame without knowing the request's ordering.
247    #[test]
248    fn item_watermark_sets_direction_matching_bound() {
249        let asc = options(true);
250        let wm = item_watermark(&asc, 9, 42, Some(8));
251        assert_eq!(wm.checkpoint_hi, Some(8));
252        assert_eq!(wm.checkpoint_lo, None);
253        assert_eq!(wm.cursor.as_ref(), Some(&asc.cursor_for_item(9, 42)));
254
255        let desc = options(false);
256        let wm = item_watermark(&desc, 9, 42, Some(10));
257        assert_eq!(wm.checkpoint_hi, None);
258        assert_eq!(wm.checkpoint_lo, Some(10));
259    }
260
261    /// On natural completion the terminal frame claims the range's final
262    /// checkpoint complete: ascending uses `end_checkpoint - 1` and resumes
263    /// from `(end_checkpoint, end_position)`; descending stores the range's
264    /// lowest checkpoint (inclusive) in `checkpoint_lo`.
265    #[test]
266    fn terminal_boundary_watermark_claims_final_checkpoint() {
267        let asc = options(true);
268        let wm = terminal_boundary_watermark(&asc, 10, 100);
269        assert_eq!(wm.checkpoint_hi, Some(9));
270        assert_eq!(wm.checkpoint_lo, None);
271        assert_eq!(wm.cursor.as_ref(), Some(&asc.cursor_for_boundary(10, 100)));
272
273        let desc = options(false);
274        let wm = terminal_boundary_watermark(&desc, 10, 100);
275        assert_eq!(wm.checkpoint_lo, Some(10));
276        assert_eq!(wm.checkpoint_hi, None);
277        assert_eq!(wm.cursor.as_ref(), Some(&desc.cursor_for_boundary(10, 100)));
278    }
279
280    #[test]
281    fn reached_range_end_only_for_natural_completion() {
282        assert!(reached_range_end(QueryEndReason::LedgerTip));
283        assert!(reached_range_end(QueryEndReason::CheckpointBound));
284        assert!(!reached_range_end(QueryEndReason::ScanLimit));
285        assert!(!reached_range_end(QueryEndReason::ItemLimit));
286    }
287}