sui_rpc_api/ledger_history/
watermark.rs1use sui_inverted_index::ScanDirection;
27
28use sui_rpc::proto::sui::rpc::v2alpha::QueryEndReason;
29use sui_rpc::proto::sui::rpc::v2alpha::Watermark;
30
31use crate::ledger_history::query_options::QueryOptions;
32
33fn set_checkpoint_bound(wm: &mut Watermark, options: &QueryOptions, boundary: Option<u64>) {
37 if options.is_ascending() {
38 wm.checkpoint_hi = boundary;
39 } else {
40 wm.checkpoint_lo = boundary;
41 }
42}
43
44pub fn advance_checkpoint_boundary(
52 prev: Option<u64>,
53 candidate: u64,
54 options: &QueryOptions,
55) -> Option<u64> {
56 Some(match prev {
57 None => candidate,
58 Some(p) if options.is_ascending() => p.max(candidate),
59 Some(p) => p.min(candidate),
60 })
61}
62
63pub fn advance_boundary_excluding_cp(
73 prev: Option<u64>,
74 cp: u64,
75 options: &QueryOptions,
76) -> Option<u64> {
77 let candidate = if options.is_ascending() {
78 cp.checked_sub(1)
79 } else {
80 cp.checked_add(1)
81 };
82 match candidate {
83 Some(c) => advance_checkpoint_boundary(prev, c, options),
84 None => prev,
85 }
86}
87
88pub fn item_watermark(
94 options: &QueryOptions,
95 cp: u64,
96 position: u64,
97 boundary: Option<u64>,
98) -> Watermark {
99 let mut wm = Watermark::default();
100 wm.cursor = Some(options.cursor_for_item(cp, position));
101 set_checkpoint_bound(&mut wm, options, boundary);
102 wm
103}
104
105pub fn boundary_watermark(
111 options: &QueryOptions,
112 cursor_cp: u64,
113 position: u64,
114 boundary: Option<u64>,
115) -> Watermark {
116 let mut wm = Watermark::default();
117 wm.cursor = Some(options.cursor_for_boundary(cursor_cp, position));
118 set_checkpoint_bound(&mut wm, options, boundary);
119 wm
120}
121
122pub fn boundary_cursor_cp(cp: u64, direction: ScanDirection) -> u64 {
129 if direction.is_ascending() {
130 cp
131 } else {
132 cp.saturating_add(1)
133 }
134}
135
136pub fn terminal_boundary_watermark(
144 options: &QueryOptions,
145 end_checkpoint: u64,
146 end_position: u64,
147) -> Watermark {
148 let boundary = if options.is_ascending() {
149 end_checkpoint.checked_sub(1)
150 } else {
151 Some(end_checkpoint)
152 };
153 let mut wm = Watermark::default();
154 wm.cursor = Some(options.cursor_for_boundary(end_checkpoint, end_position));
155 set_checkpoint_bound(&mut wm, options, boundary);
156 wm
157}
158
159pub fn reached_range_end(reason: QueryEndReason) -> bool {
164 matches!(
165 reason,
166 QueryEndReason::LedgerTip | QueryEndReason::CheckpointBound
167 )
168}
169
170#[cfg(test)]
171mod tests {
172 use super::*;
173 use crate::ledger_history::query_options::QueryType;
174
175 fn options(ascending: bool) -> QueryOptions {
176 let mut request = sui_rpc::proto::sui::rpc::v2alpha::QueryOptions::default();
177 request.ordering = if ascending {
178 sui_rpc::proto::sui::rpc::v2alpha::Ordering::Ascending as i32
179 } else {
180 sui_rpc::proto::sui::rpc::v2alpha::Ordering::Descending as i32
181 };
182 QueryOptions::from_proto(
183 Some(&request),
184 100,
185 100,
186 QueryType::Transactions,
187 Option::<&sui_rpc::proto::sui::rpc::v2alpha::TransactionFilter>::None,
188 )
189 .unwrap()
190 }
191
192 #[test]
193 fn advance_checkpoint_boundary_keeps_most_advanced_in_direction() {
194 let asc = options(true);
195 assert_eq!(advance_checkpoint_boundary(None, 5, &asc), Some(5));
196 assert_eq!(advance_checkpoint_boundary(Some(5), 9, &asc), Some(9));
197 assert_eq!(advance_checkpoint_boundary(Some(9), 5, &asc), Some(9));
198
199 let desc = options(false);
200 assert_eq!(advance_checkpoint_boundary(None, 9, &desc), Some(9));
201 assert_eq!(advance_checkpoint_boundary(Some(9), 5, &desc), Some(5));
202 assert_eq!(advance_checkpoint_boundary(Some(5), 9, &desc), Some(5));
203 }
204
205 #[test]
208 fn advance_boundary_excluding_cp_adjusts_by_one() {
209 let asc = options(true);
210 assert_eq!(advance_boundary_excluding_cp(None, 10, &asc), Some(9));
211 assert_eq!(advance_boundary_excluding_cp(Some(9), 12, &asc), Some(11));
212
213 let desc = options(false);
214 assert_eq!(advance_boundary_excluding_cp(None, 10, &desc), Some(11));
215 assert_eq!(advance_boundary_excluding_cp(Some(11), 8, &desc), Some(9));
216 }
217
218 #[test]
221 fn advance_boundary_excluding_cp_preserves_prev_on_overflow() {
222 let asc = options(true);
223 assert_eq!(advance_boundary_excluding_cp(Some(4), 0, &asc), Some(4));
224 assert_eq!(advance_boundary_excluding_cp(None, 0, &asc), None);
225
226 let desc = options(false);
227 assert_eq!(
228 advance_boundary_excluding_cp(Some(4), u64::MAX, &desc),
229 Some(4)
230 );
231 assert_eq!(advance_boundary_excluding_cp(None, u64::MAX, &desc), None);
232 }
233
234 #[test]
235 fn boundary_cursor_cp_bumps_descending_only() {
236 assert_eq!(boundary_cursor_cp(10, ScanDirection::Ascending), 10);
237 assert_eq!(boundary_cursor_cp(10, ScanDirection::Descending), 11);
238 assert_eq!(
239 boundary_cursor_cp(u64::MAX, ScanDirection::Descending),
240 u64::MAX
241 );
242 }
243
244 #[test]
248 fn item_watermark_sets_direction_matching_bound() {
249 let asc = options(true);
250 let wm = item_watermark(&asc, 9, 42, Some(8));
251 assert_eq!(wm.checkpoint_hi, Some(8));
252 assert_eq!(wm.checkpoint_lo, None);
253 assert_eq!(wm.cursor.as_ref(), Some(&asc.cursor_for_item(9, 42)));
254
255 let desc = options(false);
256 let wm = item_watermark(&desc, 9, 42, Some(10));
257 assert_eq!(wm.checkpoint_hi, None);
258 assert_eq!(wm.checkpoint_lo, Some(10));
259 }
260
261 #[test]
266 fn terminal_boundary_watermark_claims_final_checkpoint() {
267 let asc = options(true);
268 let wm = terminal_boundary_watermark(&asc, 10, 100);
269 assert_eq!(wm.checkpoint_hi, Some(9));
270 assert_eq!(wm.checkpoint_lo, None);
271 assert_eq!(wm.cursor.as_ref(), Some(&asc.cursor_for_boundary(10, 100)));
272
273 let desc = options(false);
274 let wm = terminal_boundary_watermark(&desc, 10, 100);
275 assert_eq!(wm.checkpoint_lo, Some(10));
276 assert_eq!(wm.checkpoint_hi, None);
277 assert_eq!(wm.cursor.as_ref(), Some(&desc.cursor_for_boundary(10, 100)));
278 }
279
280 #[test]
281 fn reached_range_end_only_for_natural_completion() {
282 assert!(reached_range_end(QueryEndReason::LedgerTip));
283 assert!(reached_range_end(QueryEndReason::CheckpointBound));
284 assert!(!reached_range_end(QueryEndReason::ScanLimit));
285 assert!(!reached_range_end(QueryEndReason::ItemLimit));
286 }
287}