1use std::ops::Range;
5
6use bytes::Bytes;
7use fastcrypto::hash::HashFunction;
8use prost::Message;
9use sui_inverted_index::ScanDirection;
10use sui_rpc::proto::sui::rpc::v2alpha::Ordering as ProtoOrdering;
11use sui_rpc::proto::sui::rpc::v2alpha::QueryEndReason;
12use sui_rpc::proto::sui::rpc::v2alpha::QueryOptions as ProtoQueryOptions;
13use sui_types::crypto::DefaultHash;
14
15use crate::ErrorReason;
16use crate::RpcError;
17use crate::proto::google::rpc::bad_request::FieldViolation;
18
19const ORDERING_ASCENDING: i32 = ProtoOrdering::Ascending as i32;
20const ORDERING_DESCENDING: i32 = ProtoOrdering::Descending as i32;
21
22#[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
23pub enum Ordering {
24 Ascending,
25 Descending,
26}
27
28#[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
29pub enum QueryType {
30 Checkpoints,
31 Transactions,
32 Events,
33}
34
35#[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
36enum CursorKind {
37 Item,
38 Boundary,
39}
40
41#[derive(Clone, Debug, Eq, PartialEq)]
43pub struct QueryOptions {
44 query_type: QueryType,
45 pub limit_items: usize,
46 pub ordering: Ordering,
47 after: Option<CursorToken>,
48 before: Option<CursorToken>,
49 scope_digest: [u8; 32],
50}
51
52#[derive(Clone, Debug, Eq, PartialEq)]
53pub struct ResolvedCheckpointRange {
54 pub range: Range<u64>,
55 pub end_reason: QueryEndReason,
56}
57
58#[derive(Clone, Debug, Eq, PartialEq)]
59pub struct ResolvedRange {
60 pub range: Range<u64>,
61 pub end_checkpoint: u64,
62 pub end_position: u64,
63 pub end_reason: QueryEndReason,
64}
65
66#[derive(Clone, Copy, Debug, Eq, PartialEq)]
67pub struct CheckpointRange {
68 start: u64,
69 end: u64,
70 high_reason: QueryEndReason,
71 indexed_tip: u64,
72}
73
74impl QueryOptions {
75 pub fn from_proto(
76 request: Option<&ProtoQueryOptions>,
77 default_limit_items: u32,
78 max_limit_items: u32,
79 query_type: QueryType,
80 filter: Option<&impl Message>,
81 ) -> Result<Self, RpcError> {
82 let limit_items = request
83 .and_then(|options| options.limit_items)
84 .unwrap_or(default_limit_items)
85 .clamp(1, max_limit_items) as usize;
86
87 let ordering = match request.map(|options| options.ordering) {
88 None | Some(ORDERING_ASCENDING) => Ordering::Ascending,
89 Some(ORDERING_DESCENDING) => Ordering::Descending,
90 Some(_) => {
91 return Err(FieldViolation::new("options.ordering")
92 .with_description("invalid ordering")
93 .with_reason(ErrorReason::FieldInvalid)
94 .into());
95 }
96 };
97
98 let scope_digest = scope_digest(filter);
99 let after = parse_cursor(
100 "options.after",
101 request.and_then(|options| options.after.as_ref()),
102 query_type,
103 scope_digest,
104 )?;
105 let before = parse_cursor(
106 "options.before",
107 request.and_then(|options| options.before.as_ref()),
108 query_type,
109 scope_digest,
110 )?;
111
112 Ok(Self {
113 query_type,
114 limit_items,
115 ordering,
116 after,
117 before,
118 scope_digest,
119 })
120 }
121
122 pub fn scan_direction(&self) -> ScanDirection {
123 match self.ordering {
124 Ordering::Ascending => ScanDirection::Ascending,
125 Ordering::Descending => ScanDirection::Descending,
126 }
127 }
128
129 pub fn is_ascending(&self) -> bool {
130 matches!(self.ordering, Ordering::Ascending)
131 }
132
133 pub fn has_after_cursor(&self) -> bool {
140 self.after.is_some()
141 }
142
143 pub fn apply_cursor_bounds(&self, resolved: ResolvedRange) -> ResolvedRange {
144 if resolved.is_empty() {
145 return resolved;
146 }
147
148 let mut start = resolved.range.start;
149 let mut end = resolved.range.end;
150 let mut end_checkpoint = resolved.end_checkpoint;
151 let mut end_position = resolved.end_position;
152 let mut end_reason = resolved.end_reason;
153 let mut cursor_terminal = None;
154
155 if let Some(cursor) = &self.after {
156 let Some(after) = cursor.after_position_start() else {
157 return ResolvedRange::empty_at(
158 cursor.checkpoint,
159 cursor.position,
160 QueryEndReason::CursorBound,
161 );
162 };
163 if after >= start {
164 start = after;
165 if matches!(self.ordering, Ordering::Descending) || after >= end {
166 cursor_terminal = Some((cursor.checkpoint, after));
167 }
168 if matches!(self.ordering, Ordering::Descending) {
169 end_checkpoint = cursor.checkpoint;
170 end_position = after;
171 end_reason = QueryEndReason::CursorBound;
172 }
173 }
174 }
175
176 if let Some(cursor) = &self.before
177 && cursor.position <= end
178 {
179 end = cursor.position;
180 if matches!(self.ordering, Ordering::Ascending) || cursor.position <= start {
181 cursor_terminal = Some((cursor.checkpoint, cursor.position));
182 }
183 if matches!(self.ordering, Ordering::Ascending) {
184 end_checkpoint = cursor.checkpoint;
185 end_position = cursor.position;
186 end_reason = QueryEndReason::CursorBound;
187 }
188 }
189
190 if start >= end {
191 if let Some((checkpoint, position)) = cursor_terminal {
192 end_checkpoint = checkpoint;
193 end_position = position;
194 }
195 if self.after.is_some() || self.before.is_some() {
196 end_reason = QueryEndReason::CursorBound;
197 }
198 ResolvedRange::empty_at(end_checkpoint, end_position, end_reason)
199 } else {
200 ResolvedRange {
201 range: start..end,
202 end_checkpoint,
203 end_position,
204 end_reason,
205 }
206 }
207 }
208
209 pub fn cursor_for_item(&self, checkpoint: u64, position: u64) -> Bytes {
210 self.encode_cursor(CursorKind::Item, checkpoint, position)
211 }
212
213 pub fn cursor_for_boundary(&self, checkpoint: u64, position: u64) -> Bytes {
214 self.encode_cursor(CursorKind::Boundary, checkpoint, position)
215 }
216
217 fn encode_cursor(&self, kind: CursorKind, checkpoint: u64, position: u64) -> Bytes {
218 encode_cursor(CursorToken {
219 query_type: self.query_type,
220 kind,
221 checkpoint,
222 position,
223 scope_digest: self.scope_digest,
224 })
225 }
226}
227
228impl ResolvedCheckpointRange {
229 pub fn empty_at(checkpoint: u64, reason: QueryEndReason) -> Self {
230 Self {
231 range: checkpoint..checkpoint,
232 end_reason: reason,
233 }
234 }
235
236 pub fn is_empty(&self) -> bool {
237 self.range.is_empty()
238 }
239
240 pub fn terminal_checkpoint(&self, ordering: Ordering) -> u64 {
241 match ordering {
242 Ordering::Ascending => self.range.end,
243 Ordering::Descending => self.range.start,
244 }
245 }
246
247 pub fn with_range(self, range: Range<u64>, ordering: Ordering) -> ResolvedRange {
248 let end_position = match ordering {
249 Ordering::Ascending => range.end,
250 Ordering::Descending => range.start,
251 };
252 ResolvedRange {
253 range,
254 end_checkpoint: self.terminal_checkpoint(ordering),
255 end_position,
256 end_reason: self.end_reason,
257 }
258 }
259}
260
261impl ResolvedRange {
262 pub fn empty_at(end_checkpoint: u64, end_position: u64, end_reason: QueryEndReason) -> Self {
263 Self {
264 range: end_position..end_position,
265 end_checkpoint,
266 end_position,
267 end_reason,
268 }
269 }
270
271 pub fn is_empty(&self) -> bool {
272 self.range.is_empty()
273 }
274
275 pub fn end_cursor(&self, options: &QueryOptions) -> Bytes {
276 options.cursor_for_boundary(self.end_checkpoint, self.end_position)
277 }
278}
279
280impl CheckpointRange {
281 pub fn from_request(
282 start_checkpoint: Option<u64>,
283 end_checkpoint: Option<u64>,
284 checkpoint_hi_exclusive: u64,
285 ) -> Result<Self, RpcError> {
286 let start = start_checkpoint.unwrap_or(0);
287 if let Some(end) = end_checkpoint
288 && end < start
289 {
290 return Err(FieldViolation::new("end_checkpoint")
291 .with_description(
292 "end_checkpoint must be greater than or equal to start_checkpoint",
293 )
294 .with_reason(ErrorReason::FieldInvalid)
295 .into());
296 }
297
298 let requested_end = end_checkpoint.unwrap_or(checkpoint_hi_exclusive);
299 let high_reason = if end_checkpoint.is_none() || requested_end > checkpoint_hi_exclusive {
300 QueryEndReason::LedgerTip
301 } else {
302 QueryEndReason::CheckpointBound
303 };
304 let end = requested_end.min(checkpoint_hi_exclusive);
305
306 Ok(Self {
307 start,
308 end,
309 high_reason,
310 indexed_tip: checkpoint_hi_exclusive,
311 })
312 }
313
314 pub fn resolve(self, options: &QueryOptions) -> ResolvedCheckpointRange {
315 let mut start = self.start;
316 let mut end = self.end;
317 let mut low_reason = QueryEndReason::CheckpointBound;
318 let mut high_reason = self.high_reason;
319 let mut cursor_bound = false;
320
321 if let Some(cursor) = &options.after
322 && cursor.checkpoint >= start
323 {
324 start = cursor.checkpoint;
325 cursor_bound = true;
326 if matches!(options.ordering, Ordering::Descending) {
327 low_reason = QueryEndReason::CursorBound;
328 }
329 }
330
331 if let Some(cursor) = &options.before
332 && let Some(upper) = cursor.before_checkpoint_end()
333 && upper <= end
334 {
335 end = upper;
336 cursor_bound = true;
337 if matches!(options.ordering, Ordering::Ascending) {
338 high_reason = QueryEndReason::CursorBound;
339 }
340 }
341
342 if start >= self.indexed_tip {
343 return ResolvedCheckpointRange::empty_at(self.indexed_tip, QueryEndReason::LedgerTip);
344 }
345
346 if start >= end {
347 let reason = if cursor_bound {
348 QueryEndReason::CursorBound
349 } else {
350 match options.ordering {
351 Ordering::Ascending => high_reason,
352 Ordering::Descending => low_reason,
353 }
354 };
355 let checkpoint = match options.ordering {
356 Ordering::Ascending => end,
357 Ordering::Descending => start,
358 };
359 return ResolvedCheckpointRange::empty_at(checkpoint, reason);
360 }
361
362 let end_reason = match options.ordering {
363 Ordering::Ascending => high_reason,
364 Ordering::Descending => low_reason,
365 };
366 ResolvedCheckpointRange {
367 range: start..end,
368 end_reason,
369 }
370 }
371}
372
373#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
374struct CursorToken {
375 query_type: QueryType,
376 kind: CursorKind,
377 checkpoint: u64,
378 position: u64,
379 scope_digest: [u8; 32],
380}
381
382impl CursorToken {
383 fn validate(&self, query_type: QueryType, scope_digest: [u8; 32]) -> bool {
384 self.query_type == query_type && self.scope_digest == scope_digest
385 }
386
387 fn after_position_start(&self) -> Option<u64> {
388 match self.kind {
389 CursorKind::Item => self.position.checked_add(1),
390 CursorKind::Boundary => Some(self.position),
391 }
392 }
393
394 fn before_checkpoint_end(&self) -> Option<u64> {
395 match self.kind {
396 CursorKind::Item => self.checkpoint.checked_add(1),
397 CursorKind::Boundary => Some(self.checkpoint),
398 }
399 }
400}
401
402fn parse_cursor(
403 field: &'static str,
404 cursor: Option<&Bytes>,
405 query_type: QueryType,
406 scope_digest: [u8; 32],
407) -> Result<Option<CursorToken>, RpcError> {
408 cursor
409 .map(|cursor| decode_cursor(field, cursor))
410 .transpose()?
411 .map(|token| {
412 if token.validate(query_type, scope_digest) {
413 Ok(token)
414 } else {
415 Err(invalid_cursor(field, "invalid cursor"))
416 }
417 })
418 .transpose()
419}
420
421fn decode_cursor(field: &'static str, cursor: &[u8]) -> Result<CursorToken, RpcError> {
422 bcs::from_bytes(cursor).map_err(|_| invalid_cursor(field, "invalid cursor"))
423}
424
425fn encode_cursor(cursor: CursorToken) -> Bytes {
426 bcs::to_bytes(&cursor).unwrap().into()
427}
428
429fn scope_digest<F: Message>(filter: Option<&F>) -> [u8; 32] {
437 let mut hasher = DefaultHash::default();
438 hash_optional_message(&mut hasher, filter);
439 hasher.finalize().digest
440}
441
442fn hash_optional_message<M: Message>(hasher: &mut DefaultHash, message: Option<&M>) {
443 match message {
444 None => hasher.update([0]),
445 Some(message) => {
446 hasher.update([1]);
447 let bytes = message.encode_to_vec();
448 let len = u32::try_from(bytes.len()).expect("scan scope part should fit in u32");
449 hasher.update(len.to_be_bytes());
450 hasher.update(bytes);
451 }
452 }
453}
454
455fn invalid_cursor(field: &'static str, description: impl Into<String>) -> RpcError {
456 FieldViolation::new(field)
457 .with_description(description)
458 .with_reason(ErrorReason::FieldInvalid)
459 .into()
460}
461
462#[cfg(test)]
463mod tests {
464 use super::*;
465
466 fn scope_digest_for_filter() -> [u8; 32] {
467 scope_digest(Option::<&ProtoQueryOptions>::None)
468 }
469
470 fn query_options_from_proto(
471 request: Option<&ProtoQueryOptions>,
472 ) -> Result<QueryOptions, RpcError> {
473 QueryOptions::from_proto(
474 request,
475 100,
476 1_000,
477 QueryType::Transactions,
478 Option::<&ProtoQueryOptions>::None,
479 )
480 }
481
482 fn cursor_token(
483 kind: CursorKind,
484 checkpoint: u64,
485 position: u64,
486 query_type: QueryType,
487 ) -> CursorToken {
488 CursorToken {
489 query_type,
490 kind,
491 checkpoint,
492 position,
493 scope_digest: scope_digest_for_filter(),
494 }
495 }
496
497 fn item_cursor(checkpoint: u64, position: u64, query_type: QueryType) -> CursorToken {
498 cursor_token(kind::ITEM, checkpoint, position, query_type)
499 }
500
501 fn boundary_cursor(checkpoint: u64, position: u64, query_type: QueryType) -> CursorToken {
502 cursor_token(kind::BOUNDARY, checkpoint, position, query_type)
503 }
504
505 fn resolved_range(range: Range<u64>) -> ResolvedRange {
506 ResolvedRange {
507 range,
508 end_checkpoint: 20,
509 end_position: 20,
510 end_reason: QueryEndReason::CheckpointBound,
511 }
512 }
513
514 mod kind {
515 use super::CursorKind;
516
517 pub(super) const BOUNDARY: CursorKind = CursorKind::Boundary;
518 pub(super) const ITEM: CursorKind = CursorKind::Item;
519 }
520
521 #[test]
522 fn parses_cursors_and_ordering() {
523 let after = encode_cursor(item_cursor(2, 20, QueryType::Transactions));
524 let before = encode_cursor(item_cursor(3, 30, QueryType::Transactions));
525 let mut request = ProtoQueryOptions::default();
526 request.limit_items = Some(500);
527 request.after = Some(after);
528 request.before = Some(before);
529 request.ordering = ProtoOrdering::Descending as i32;
530
531 let options = query_options_from_proto(Some(&request)).unwrap();
532
533 assert_eq!(options.limit_items, 500);
534 assert_eq!(options.ordering, Ordering::Descending);
535 assert_eq!(options.scan_direction(), ScanDirection::Descending);
536 assert_eq!(
537 options.apply_cursor_bounds(resolved_range(0..100)).range,
538 21..30
539 );
540 }
541
542 #[test]
543 fn has_after_cursor_reflects_only_the_after_field() {
544 let options = query_options_from_proto(Some(&ProtoQueryOptions::default())).unwrap();
546 assert!(!options.has_after_cursor());
547
548 let mut request = ProtoQueryOptions::default();
550 request.before = Some(encode_cursor(item_cursor(3, 30, QueryType::Transactions)));
551 let options = query_options_from_proto(Some(&request)).unwrap();
552 assert!(!options.has_after_cursor());
553
554 let mut request = ProtoQueryOptions::default();
556 request.after = Some(encode_cursor(item_cursor(2, 20, QueryType::Transactions)));
557 let options = query_options_from_proto(Some(&request)).unwrap();
558 assert!(options.has_after_cursor());
559 }
560
561 #[test]
562 fn clamps_limit_items_and_defaults_to_ascending() {
563 let mut request = ProtoQueryOptions::default();
564 request.limit_items = Some(5_000);
565
566 let options = query_options_from_proto(Some(&request)).unwrap();
567
568 assert_eq!(options.limit_items, 1_000);
569 assert_eq!(options.ordering, Ordering::Ascending);
570 assert_eq!(options.scan_direction(), ScanDirection::Ascending);
571 }
572
573 #[test]
574 fn rejects_malformed_cursors_and_unknown_ordering() {
575 let mut request = ProtoQueryOptions::default();
576 request.after = Some(Bytes::from_static(b"short"));
577 assert!(query_options_from_proto(Some(&request)).is_err());
578
579 let mut request = ProtoQueryOptions::default();
580 request.before = Some(Bytes::from_static(b"short"));
581 assert!(query_options_from_proto(Some(&request)).is_err());
582
583 let mut request = ProtoQueryOptions::default();
584 request.ordering = 99;
585 assert!(query_options_from_proto(Some(&request)).is_err());
586 }
587
588 #[test]
589 fn rejects_cursor_for_different_query_type_or_scan_scope() {
590 let token = encode_cursor(item_cursor(1, 9, QueryType::Checkpoints));
591 let mut request = ProtoQueryOptions::default();
592 request.after = Some(token);
593 assert!(query_options_from_proto(Some(&request)).is_err());
594
595 let token = encode_cursor(CursorToken {
596 query_type: QueryType::Transactions,
597 kind: CursorKind::Item,
598 checkpoint: 1,
599 position: 9,
600 scope_digest: [9; 32],
601 });
602 let mut request = ProtoQueryOptions::default();
603 request.before = Some(token);
604 assert!(query_options_from_proto(Some(&request)).is_err());
605 }
606
607 #[test]
608 fn accepts_cursors_for_different_checkpoint_range_and_ordering() {
609 let token = encode_cursor(item_cursor(9, 9, QueryType::Transactions));
610 let mut request = ProtoQueryOptions::default();
611 request.after = Some(token);
612 request.ordering = ProtoOrdering::Descending as i32;
613
614 let options = query_options_from_proto(Some(&request)).unwrap();
615 let range = CheckpointRange::from_request(Some(1_000), Some(1_100), 2_000).unwrap();
616
617 assert_eq!(range.resolve(&options).range, 1_000..1_100);
618 }
619
620 #[test]
621 fn applies_canonical_cursor_bounds() {
622 let options = QueryOptions {
623 query_type: QueryType::Transactions,
624 limit_items: 2,
625 ordering: Ordering::Ascending,
626 after: Some(item_cursor(1, 11, QueryType::Transactions)),
627 before: None,
628 scope_digest: scope_digest_for_filter(),
629 };
630 assert_eq!(
631 options.apply_cursor_bounds(resolved_range(10..20)).range,
632 12..20
633 );
634
635 let options = QueryOptions {
636 after: Some(item_cursor(1, u64::MAX, QueryType::Transactions)),
637 ..options
638 };
639 assert_eq!(
640 options.apply_cursor_bounds(resolved_range(10..20)),
641 ResolvedRange::empty_at(1, u64::MAX, QueryEndReason::CursorBound)
642 );
643
644 let options = QueryOptions {
645 ordering: Ordering::Descending,
646 after: Some(item_cursor(1, 11, QueryType::Transactions)),
647 before: Some(item_cursor(1, 19, QueryType::Transactions)),
648 ..options
649 };
650 let bounded = options.apply_cursor_bounds(resolved_range(10..20));
651 assert_eq!(bounded.range, 12..19);
652 assert_eq!(bounded.end_reason, QueryEndReason::CursorBound);
653 assert_eq!(bounded.end_position, 12);
654
655 let options = QueryOptions {
656 before: Some(item_cursor(1, 12, QueryType::Transactions)),
657 ..options
658 };
659 assert_eq!(
660 options.apply_cursor_bounds(resolved_range(10..20)),
661 ResolvedRange::empty_at(1, 12, QueryEndReason::CursorBound)
662 );
663 }
664
665 #[test]
666 fn applies_boundary_cursor_bounds_without_item_offset() {
667 let options = QueryOptions {
668 query_type: QueryType::Transactions,
669 limit_items: 2,
670 ordering: Ordering::Ascending,
671 after: Some(boundary_cursor(2, 20, QueryType::Transactions)),
672 before: None,
673 scope_digest: scope_digest_for_filter(),
674 };
675 assert_eq!(
676 options.apply_cursor_bounds(resolved_range(10..30)).range,
677 20..30
678 );
679
680 let options = QueryOptions {
681 ordering: Ordering::Descending,
682 after: None,
683 before: Some(boundary_cursor(2, 20, QueryType::Transactions)),
684 ..options
685 };
686 assert_eq!(
687 options.apply_cursor_bounds(resolved_range(10..30)).range,
688 10..20
689 );
690 }
691
692 #[test]
693 fn resolves_checkpoint_range_with_terminal_reason() {
694 assert_eq!(
695 CheckpointRange::from_request(None, None, 20)
696 .unwrap()
697 .resolve(&query_options_from_proto(None).unwrap())
698 .end_reason,
699 QueryEndReason::LedgerTip
700 );
701 assert!(CheckpointRange::from_request(Some(10), Some(9), 20).is_err());
702
703 let range = CheckpointRange::from_request(Some(10), None, 20).unwrap();
704 let resolved = range.resolve(&query_options_from_proto(None).unwrap());
705 assert_eq!(resolved.range, 10..20);
706 assert_eq!(resolved.end_reason, QueryEndReason::LedgerTip);
707
708 let range = CheckpointRange::from_request(Some(30), None, 20).unwrap();
709 assert_eq!(
710 range.resolve(&query_options_from_proto(None).unwrap()),
711 ResolvedCheckpointRange::empty_at(20, QueryEndReason::LedgerTip)
712 );
713 }
714
715 #[test]
719 fn resolves_checkpoint_range_no_longer_clamped_by_width() {
720 let options = query_options_from_proto(None).unwrap();
721 let range = CheckpointRange::from_request(Some(10), Some(10_000_000), 10_000_000).unwrap();
722 let resolved = range.resolve(&options);
723 assert_eq!(resolved.range, 10..10_000_000);
724 assert_eq!(resolved.end_reason, QueryEndReason::CheckpointBound);
725 }
726
727 #[test]
728 fn item_cursor_can_be_used_as_after_or_before() {
729 let options = QueryOptions {
730 query_type: QueryType::Transactions,
731 limit_items: 2,
732 ordering: Ordering::Ascending,
733 after: None,
734 before: None,
735 scope_digest: scope_digest_for_filter(),
736 };
737 let token = options.cursor_for_item(1, 11);
738
739 let mut request = ProtoQueryOptions::default();
740 request.after = Some(token.clone());
741 let options = query_options_from_proto(Some(&request)).unwrap();
742 assert_eq!(
743 options.apply_cursor_bounds(resolved_range(10..20)).range,
744 12..20
745 );
746
747 request.after = None;
748 request.before = Some(token);
749 let options = query_options_from_proto(Some(&request)).unwrap();
750 assert_eq!(
751 options.apply_cursor_bounds(resolved_range(10..20)).range,
752 10..11
753 );
754 }
755}