1use std::collections::{BTreeMap, HashMap};
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::anyhow;
9use async_trait::async_trait;
10use backoff::ExponentialBackoff;
11use backoff::future::retry;
12use fastcrypto::encoding::Base64;
13use fastcrypto_zkp::bn254::zk_login_api::ZkLoginEnv;
14use futures::future::join_all;
15use im::hashmap::HashMap as ImHashMap;
16use indexmap::map::IndexMap;
17use itertools::Itertools;
18use jsonrpsee::RpcModule;
19use jsonrpsee::core::RpcResult;
20use move_bytecode_utils::module_cache::GetModule;
21use move_core_types::annotated_value::{MoveStructLayout, MoveTypeLayout};
22use move_core_types::language_storage::StructTag;
23use once_cell::sync::Lazy;
24use shared_crypto::intent::{IntentMessage, PersonalMessage};
25use sui_display::v1::Format;
26use sui_json_rpc_types::ZkLoginIntentScope;
27use sui_types::base_types::SuiAddress;
28use sui_types::signature::{GenericSignature, VerifyParams};
29use sui_types::signature_verification::VerifiedDigestCache;
30use sui_types::storage::ObjectKey;
31use tap::TapFallible;
32use tracing::{debug, error, info, instrument, trace, warn};
33
34use mysten_metrics::add_server_timing;
35use sui_core::authority::AuthorityState;
36use sui_json_rpc_api::{
37 JsonRpcMetrics, QUERY_MAX_RESULT_LIMIT, QUERY_MAX_RESULT_LIMIT_CHECKPOINTS, ReadApiOpenRpc,
38 ReadApiServer, validate_limit,
39};
40use sui_json_rpc_types::{
41 BalanceChange, Checkpoint, CheckpointId, CheckpointPage, DisplayFieldsResponse, EventFilter,
42 ObjectChange, ProtocolConfigResponse, SuiEvent, SuiGetPastObjectRequest, SuiObjectDataOptions,
43 SuiObjectResponse, SuiPastObjectResponse, SuiTransactionBlock, SuiTransactionBlockEvents,
44 SuiTransactionBlockResponse, SuiTransactionBlockResponseOptions,
45};
46use sui_open_rpc::Module;
47use sui_protocol_config::{ProtocolConfig, ProtocolVersion};
48use sui_storage::key_value_store::TransactionKeyValueStore;
49use sui_types::base_types::{ObjectID, SequenceNumber, TransactionDigest};
50use sui_types::crypto::AggregateAuthoritySignature;
51use sui_types::display::DisplayVersionUpdatedEvent;
52use sui_types::effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents};
53use sui_types::error::{SuiError, SuiObjectResponseError};
54use sui_types::messages_checkpoint::{
55 CheckpointContents, CheckpointSequenceNumber, CheckpointSummary, CheckpointTimestamp,
56};
57use sui_types::object::{Object, ObjectRead, PastObjectRead};
58use sui_types::sui_serde::BigInt;
59use sui_types::transaction::TransactionDataAPI;
60use sui_types::transaction::{Transaction, TransactionData};
61
62use crate::authority_state::{StateRead, StateReadError, StateReadResult};
63use crate::error::{Error, RpcInterimResult, SuiRpcInputError};
64use crate::{ObjectProvider, with_tracing};
65use crate::{
66 ObjectProviderCache, SuiRpcModule, get_balance_changes_from_effect, get_object_changes,
67};
68use fastcrypto::encoding::Encoding;
69use fastcrypto::traits::ToFromBytes;
70use shared_crypto::intent::Intent;
71use sui_json_rpc_types::ZkLoginVerifyResult;
72use sui_types::authenticator_state::{ActiveJwk, get_authenticator_state};
73
74const MAX_DISPLAY_NESTED_LEVEL: usize = 10;
76
77const DEFAULT_MAX_DISPLAY_OUTPUT_SIZE: usize = 1024 * 1024;
79
80static MAX_DISPLAY_OUTPUT_SIZE: Lazy<usize> = Lazy::new(|| {
82 let max_opt = std::env::var("MAX_DISPLAY_OUTPUT_SIZE")
83 .ok()
84 .and_then(|s| s.parse().ok());
85
86 if let Some(max) = max_opt {
87 info!("Using custom value for 'MAX_DISPLAY_OUTPUT_SIZE': {max}");
88 max
89 } else {
90 DEFAULT_MAX_DISPLAY_OUTPUT_SIZE
91 }
92});
93
94#[derive(Clone)]
97pub struct ReadApi {
98 pub state: Arc<dyn StateRead>,
99 pub transaction_kv_store: Arc<TransactionKeyValueStore>,
100 pub metrics: Arc<JsonRpcMetrics>,
101}
102
103#[derive(Default)]
107struct IntermediateTransactionResponse {
108 digest: TransactionDigest,
109 transaction: Option<Transaction>,
110 effects: Option<TransactionEffects>,
111 events: Option<SuiTransactionBlockEvents>,
112 checkpoint_seq: Option<CheckpointSequenceNumber>,
113 balance_changes: Option<Vec<BalanceChange>>,
114 object_changes: Option<Vec<ObjectChange>>,
115 timestamp: Option<CheckpointTimestamp>,
116 errors: Vec<String>,
117}
118
119impl IntermediateTransactionResponse {
120 pub fn new(digest: TransactionDigest) -> Self {
121 Self {
122 digest,
123 ..Default::default()
124 }
125 }
126
127 pub fn transaction(&self) -> &Option<Transaction> {
128 &self.transaction
129 }
130}
131
132impl ReadApi {
133 pub fn new(
134 state: Arc<AuthorityState>,
135 transaction_kv_store: Arc<TransactionKeyValueStore>,
136 metrics: Arc<JsonRpcMetrics>,
137 ) -> Self {
138 Self {
139 state,
140 transaction_kv_store,
141 metrics,
142 }
143 }
144
145 async fn get_checkpoint_internal(&self, id: CheckpointId) -> Result<Checkpoint, Error> {
146 Ok(match id {
147 CheckpointId::SequenceNumber(seq) => {
148 let verified_summary = self
149 .transaction_kv_store
150 .get_checkpoint_summary(seq)
151 .await?;
152 let content = self
153 .transaction_kv_store
154 .get_checkpoint_contents(verified_summary.sequence_number)
155 .await?;
156 let signature = verified_summary.auth_sig().signature.clone();
157 (verified_summary.into_data(), content, signature).into()
158 }
159 CheckpointId::Digest(digest) => {
160 let verified_summary = self
161 .transaction_kv_store
162 .get_checkpoint_summary_by_digest(digest)
163 .await?;
164 let content = self
165 .transaction_kv_store
166 .get_checkpoint_contents(verified_summary.sequence_number)
167 .await?;
168 let signature = verified_summary.auth_sig().signature.clone();
169 (verified_summary.into_data(), content, signature).into()
170 }
171 })
172 }
173
174 pub async fn get_checkpoints_internal(
175 state: Arc<dyn StateRead>,
176 transaction_kv_store: Arc<TransactionKeyValueStore>,
177 cursor: Option<CheckpointSequenceNumber>,
179 limit: u64,
180 descending_order: bool,
181 ) -> StateReadResult<Vec<Checkpoint>> {
182 let max_checkpoint = state.get_latest_checkpoint_sequence_number()?;
183 let checkpoint_numbers =
184 calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
185
186 let verified_checkpoints = transaction_kv_store
187 .multi_get_checkpoints_summaries(&checkpoint_numbers)
188 .await?;
189
190 let checkpoint_summaries_and_signatures: Vec<(
191 CheckpointSummary,
192 AggregateAuthoritySignature,
193 )> = verified_checkpoints
194 .into_iter()
195 .flatten()
196 .map(|check| {
197 (
198 check.clone().into_summary_and_sequence().1,
199 check.get_validator_signature(),
200 )
201 })
202 .collect();
203
204 let checkpoint_contents = transaction_kv_store
205 .multi_get_checkpoints_contents(&checkpoint_numbers)
206 .await?;
207 let contents: Vec<CheckpointContents> = checkpoint_contents.into_iter().flatten().collect();
208
209 let mut checkpoints: Vec<Checkpoint> = vec![];
210
211 for (summary_and_sig, content) in checkpoint_summaries_and_signatures
212 .into_iter()
213 .zip(contents.into_iter())
214 {
215 checkpoints.push(Checkpoint::from((
216 summary_and_sig.0,
217 content,
218 summary_and_sig.1,
219 )));
220 }
221
222 Ok(checkpoints)
223 }
224
225 #[instrument(skip_all)]
226 async fn multi_get_transaction_blocks_internal(
227 &self,
228 digests: Vec<TransactionDigest>,
229 opts: Option<SuiTransactionBlockResponseOptions>,
230 ) -> Result<Vec<SuiTransactionBlockResponse>, Error> {
231 trace!("start");
232
233 let num_digests = digests.len();
234 if num_digests > *QUERY_MAX_RESULT_LIMIT {
235 Err(SuiRpcInputError::SizeLimitExceeded(
236 QUERY_MAX_RESULT_LIMIT.to_string(),
237 ))?
238 }
239 self.metrics
240 .get_tx_blocks_limit
241 .observe(digests.len() as f64);
242
243 let opts = opts.unwrap_or_default();
244
245 let mut temp_response: IndexMap<&TransactionDigest, IntermediateTransactionResponse> =
247 IndexMap::from_iter(
248 digests
249 .iter()
250 .map(|k| (k, IntermediateTransactionResponse::new(*k))),
251 );
252 if temp_response.len() < num_digests {
253 Err(SuiRpcInputError::ContainsDuplicates)?
254 }
255
256 if opts.require_input() {
257 trace!("getting input");
258 let digests_clone = digests.clone();
259 let transactions =
260 self.transaction_kv_store.multi_get_tx(&digests_clone).await.tap_err(
261 |err| debug!(digests=?digests_clone, "Failed to multi get transactions: {:?}", err),
262 )?;
263
264 for ((_digest, cache_entry), txn) in
265 temp_response.iter_mut().zip(transactions.into_iter())
266 {
267 cache_entry.transaction = txn;
268 }
269 }
270
271 if opts.require_effects() {
273 trace!("getting effects");
274 let digests_clone = digests.clone();
275 let effects_list = self.transaction_kv_store
276 .multi_get_fx_by_tx_digest(&digests_clone)
277 .await
278 .tap_err(
279 |err| debug!(digests=?digests_clone, "Failed to multi get effects for transactions: {:?}", err),
280 )?;
281 for ((_digest, cache_entry), e) in
282 temp_response.iter_mut().zip(effects_list.into_iter())
283 {
284 cache_entry.effects = e;
285 }
286 }
287
288 trace!("getting checkpoint sequence numbers");
289 let checkpoint_seq_list = self
290 .transaction_kv_store
291 .multi_get_transaction_checkpoint(&digests)
292 .await
293 .tap_err(
294 |err| debug!(digests=?digests, "Failed to multi get checkpoint sequence number: {:?}", err))?;
295 for ((_digest, cache_entry), seq) in temp_response
296 .iter_mut()
297 .zip(checkpoint_seq_list.into_iter())
298 {
299 cache_entry.checkpoint_seq = seq;
300 }
301
302 let unique_checkpoint_numbers = temp_response
303 .values()
304 .filter_map(|cache_entry| cache_entry.checkpoint_seq)
305 .unique()
308 .collect::<Vec<CheckpointSequenceNumber>>();
309
310 trace!("getting checkpoint summaries");
312 let timestamps = self
313 .transaction_kv_store
314 .multi_get_checkpoints_summaries(&unique_checkpoint_numbers)
315 .await
316 .map_err(|e| {
317 Error::UnexpectedError(format!("Failed to fetch checkpoint summaries by these checkpoint ids: {unique_checkpoint_numbers:?} with error: {e:?}"))
318 })?
319 .into_iter()
320 .map(|c| c.map(|checkpoint| checkpoint.timestamp_ms));
321
322 let checkpoint_to_timestamp = unique_checkpoint_numbers
324 .into_iter()
325 .zip(timestamps)
326 .collect::<HashMap<_, _>>();
327
328 for (_, cache_entry) in temp_response.iter_mut() {
330 if cache_entry.checkpoint_seq.is_some() {
331 cache_entry.timestamp = *checkpoint_to_timestamp
333 .get(cache_entry.checkpoint_seq.as_ref().unwrap())
334 .unwrap();
336 }
337 }
338
339 if opts.show_events {
340 trace!("getting events");
341 let mut non_empty_digests = vec![];
342 for cache_entry in temp_response.values() {
343 if let Some(effects) = &cache_entry.effects
344 && effects.events_digest().is_some()
345 {
346 non_empty_digests.push(cache_entry.digest);
347 }
348 }
349 let backoff = ExponentialBackoff {
351 max_elapsed_time: Some(Duration::from_secs(3)),
352 multiplier: 1.0,
353 ..ExponentialBackoff::default()
354 };
355 let mut events = retry(backoff, || async {
356 match self
357 .transaction_kv_store
358 .multi_get_events_by_tx_digests(&non_empty_digests)
359 .await
360 {
361 Ok(events) if !events.contains(&None) => Ok(events),
364 Ok(_) => Err(backoff::Error::transient(Error::UnexpectedError(
365 "Events not found, transaction execution may be incomplete.".into(),
366 ))),
367 Err(e) => Err(backoff::Error::permanent(Error::UnexpectedError(format!(
368 "Failed to call multi_get_events: {e:?}"
369 )))),
370 }
371 })
372 .await
373 .map_err(|e| {
374 Error::UnexpectedError(format!(
375 "Retrieving events with retry failed for transaction digests {digests:?}: {e:?}"
376 ))
377 })?
378 .into_iter();
379
380 for (_, cache_entry) in temp_response.iter_mut() {
382 let transaction_digest = cache_entry.digest;
383 if let Some(events_digest) =
384 cache_entry.effects.as_ref().and_then(|e| e.events_digest())
385 {
386 match events.next() {
387 Some(Some(ev)) => {
388 cache_entry.events =
389 Some(to_sui_transaction_events(self, cache_entry.digest, ev)?)
390 }
391 None | Some(None) => {
392 error!(
393 "Failed to fetch events with event digest {events_digest:?} for txn {transaction_digest}"
394 );
395 cache_entry.errors.push(format!(
396 "Failed to fetch events with event digest {events_digest:?}",
397 ))
398 }
399 }
400 } else {
401 cache_entry.events = Some(SuiTransactionBlockEvents::default());
404 }
405 }
406 }
407
408 let mut object_cache =
409 ObjectProviderCache::new((self.state.clone(), self.transaction_kv_store.clone()));
410
411 if opts.show_balance_changes || opts.show_object_changes {
413 let mut keys = vec![];
414 for resp in temp_response.values() {
415 let effects = resp.effects.as_ref().ok_or_else(|| {
416 SuiRpcInputError::GenericNotFound(
417 "unable to derive balance/object changes because effect is empty"
418 .to_string(),
419 )
420 })?;
421
422 for change in effects.object_changes() {
423 if let Some(input_version) = change.input_version {
424 keys.push(ObjectKey(change.id, input_version));
425 }
426 if let Some(output_version) = change.output_version {
427 keys.push(ObjectKey(change.id, output_version));
428 }
429 }
430 }
431
432 let objects = self
433 .transaction_kv_store
434 .multi_get_objects(&keys)
435 .await?
436 .into_iter()
437 .flatten()
438 .collect::<Vec<_>>();
439
440 object_cache.insert_objects_into_cache(objects);
441 }
442
443 if opts.show_balance_changes {
444 trace!("getting balance changes");
445
446 let mut results = vec![];
447 for resp in temp_response.values() {
448 let input_objects = if let Some(tx) = resp.transaction() {
449 tx.data()
450 .inner()
451 .intent_message
452 .value
453 .input_objects()
454 .unwrap_or_default()
455 } else {
456 Vec::new()
458 };
459 results.push(get_balance_changes_from_effect(
460 &object_cache,
461 resp.effects.as_ref().ok_or_else(|| {
462 SuiRpcInputError::GenericNotFound(
463 "unable to derive balance changes because effect is empty".to_string(),
464 )
465 })?,
466 input_objects,
467 None,
468 ));
469 }
470 let results = join_all(results).await;
471 for (result, entry) in results.into_iter().zip(temp_response.iter_mut()) {
472 match result {
473 Ok(balance_changes) => entry.1.balance_changes = Some(balance_changes),
474 Err(e) => entry
475 .1
476 .errors
477 .push(format!("Failed to fetch balance changes {e:?}")),
478 }
479 }
480 }
481
482 if opts.show_object_changes {
483 trace!("getting object changes");
484
485 let mut results = vec![];
486 for resp in temp_response.values() {
487 let effects = resp.effects.as_ref().ok_or_else(|| {
488 SuiRpcInputError::GenericNotFound(
489 "unable to derive object changes because effect is empty".to_string(),
490 )
491 })?;
492
493 results.push(get_object_changes(
494 &object_cache,
495 effects,
496 resp.transaction
497 .as_ref()
498 .ok_or_else(|| {
499 SuiRpcInputError::GenericNotFound(
500 "unable to derive object changes because transaction is empty"
501 .to_string(),
502 )
503 })?
504 .data()
505 .intent_message()
506 .value
507 .sender(),
508 effects.modified_at_versions(),
509 effects.all_changed_objects(),
510 effects.all_removed_objects(),
511 ));
512 }
513 let results = join_all(results).await;
514 for (result, entry) in results.into_iter().zip(temp_response.iter_mut()) {
515 match result {
516 Ok(object_changes) => entry.1.object_changes = Some(object_changes),
517 Err(e) => entry
518 .1
519 .errors
520 .push(format!("Failed to fetch object changes {e:?}")),
521 }
522 }
523 }
524
525 let epoch_store = self.state.load_epoch_store_one_call_per_task();
526 let converted_tx_block_resps = temp_response
527 .into_iter()
528 .map(|c| convert_to_response(c.1, &opts, epoch_store.module_cache()))
529 .collect::<Result<Vec<_>, _>>()?;
530
531 self.metrics
532 .get_tx_blocks_result_size
533 .observe(converted_tx_block_resps.len() as f64);
534 self.metrics
535 .get_tx_blocks_result_size_total
536 .inc_by(converted_tx_block_resps.len() as u64);
537
538 trace!("done");
539
540 Ok(converted_tx_block_resps)
541 }
542}
543
544#[async_trait]
545impl ReadApiServer for ReadApi {
546 #[instrument(skip(self))]
547 async fn get_object(
548 &self,
549 object_id: ObjectID,
550 options: Option<SuiObjectDataOptions>,
551 ) -> RpcResult<SuiObjectResponse> {
552 with_tracing!(async move {
553 let state = self.state.clone();
554 let object_read = state.get_object_read(&object_id).map_err(|e| {
555 warn!(?object_id, "Failed to get object: {:?}", e);
556 Error::from(e)
557 })?;
558 let options = options.unwrap_or_default();
559
560 match object_read {
561 ObjectRead::NotExists(id) => Ok(SuiObjectResponse::new_with_error(
562 SuiObjectResponseError::NotExists { object_id: id },
563 )),
564 ObjectRead::Exists(object_ref, o, layout) => {
565 let mut display_fields = None;
566 if options.show_display {
567 match get_display_fields(self, &self.transaction_kv_store, &o, &layout)
568 .await
569 {
570 Ok(rendered_fields) => display_fields = Some(rendered_fields),
571 Err(e) => {
572 return Ok(SuiObjectResponse::new(
573 Some((object_ref, o, layout, options, None).try_into()?),
574 Some(SuiObjectResponseError::DisplayError {
575 error: e.to_string(),
576 }),
577 ));
578 }
579 }
580 }
581 Ok(SuiObjectResponse::new_with_data(
582 (object_ref, o, layout, options, display_fields).try_into()?,
583 ))
584 }
585 ObjectRead::Deleted((object_id, version, digest)) => Ok(
586 SuiObjectResponse::new_with_error(SuiObjectResponseError::Deleted {
587 object_id,
588 version,
589 digest,
590 }),
591 ),
592 }
593 })
594 }
595
596 #[instrument(skip(self))]
597 async fn multi_get_objects(
598 &self,
599 object_ids: Vec<ObjectID>,
600 options: Option<SuiObjectDataOptions>,
601 ) -> RpcResult<Vec<SuiObjectResponse>> {
602 with_tracing!(async move {
603 if object_ids.len() <= *QUERY_MAX_RESULT_LIMIT {
604 self.metrics
605 .get_objects_limit
606 .observe(object_ids.len() as f64);
607 let mut futures = vec![];
608 for object_id in object_ids {
609 futures.push(self.get_object(object_id, options.clone()));
610 }
611 let results = join_all(futures).await;
612
613 let objects_result: Result<Vec<SuiObjectResponse>, String> = results
614 .into_iter()
615 .map(|result| match result {
616 Ok(response) => Ok(response),
617 Err(error) => {
618 error!("Failed to fetch object with error: {error:?}");
619 Err(format!("Error: {}", error))
620 }
621 })
622 .collect();
623
624 let objects = objects_result.map_err(|err| {
625 Error::UnexpectedError(format!("Failed to fetch objects with error: {}", err))
626 })?;
627
628 self.metrics
629 .get_objects_result_size
630 .observe(objects.len() as f64);
631 self.metrics
632 .get_objects_result_size_total
633 .inc_by(objects.len() as u64);
634 Ok(objects)
635 } else {
636 Err(SuiRpcInputError::SizeLimitExceeded(
637 QUERY_MAX_RESULT_LIMIT.to_string(),
638 ))?
639 }
640 })
641 }
642
643 #[instrument(skip(self))]
644 async fn try_get_past_object(
645 &self,
646 object_id: ObjectID,
647 version: SequenceNumber,
648 options: Option<SuiObjectDataOptions>,
649 ) -> RpcResult<SuiPastObjectResponse> {
650 with_tracing!(async move {
651 let state = self.state.clone();
652 let past_read = state
653 .get_past_object_read(&object_id, version)
654 .map_err(|e| {
655 error!("Failed to call try_get_past_object for object: {object_id:?} version: {version:?} with error: {e:?}");
656 Error::from(e)
657 })?;
658 let options = options.unwrap_or_default();
659 match past_read {
660 PastObjectRead::ObjectNotExists(id) => {
661 Ok(SuiPastObjectResponse::ObjectNotExists(id))
662 }
663 PastObjectRead::VersionFound(object_ref, o, layout) => {
664 let display_fields = if options.show_display {
665 Some(
667 get_display_fields(self, &self.transaction_kv_store, &o, &layout)
668 .await
669 .map_err(|e| {
670 Error::UnexpectedError(format!(
671 "Unable to render object at version {version}: {e}"
672 ))
673 })?,
674 )
675 } else {
676 None
677 };
678 Ok(SuiPastObjectResponse::VersionFound(
679 (object_ref, o, layout, options, display_fields).try_into()?,
680 ))
681 }
682 PastObjectRead::ObjectDeleted(oref) => {
683 Ok(SuiPastObjectResponse::ObjectDeleted(oref.into()))
684 }
685 PastObjectRead::VersionNotFound(id, seq_num) => {
686 Ok(SuiPastObjectResponse::VersionNotFound(id, seq_num))
687 }
688 PastObjectRead::VersionTooHigh {
689 object_id,
690 asked_version,
691 latest_version,
692 } => Ok(SuiPastObjectResponse::VersionTooHigh {
693 object_id,
694 asked_version,
695 latest_version,
696 }),
697 }
698 })
699 }
700
701 #[instrument(skip(self))]
702 async fn try_get_object_before_version(
703 &self,
704 object_id: ObjectID,
705 version: SequenceNumber,
706 ) -> RpcResult<SuiPastObjectResponse> {
707 let version = self
708 .state
709 .find_object_lt_or_eq_version(&object_id, &version)
710 .await
711 .map_err(Error::from)?
712 .map(|obj| obj.version())
713 .unwrap_or_default();
714 self.try_get_past_object(
715 object_id,
716 version,
717 Some(SuiObjectDataOptions::bcs_lossless()),
718 )
719 .await
720 }
721
722 #[instrument(skip(self))]
723 async fn try_multi_get_past_objects(
724 &self,
725 past_objects: Vec<SuiGetPastObjectRequest>,
726 options: Option<SuiObjectDataOptions>,
727 ) -> RpcResult<Vec<SuiPastObjectResponse>> {
728 with_tracing!(async move {
729 if past_objects.len() <= *QUERY_MAX_RESULT_LIMIT {
730 let mut futures = vec![];
731 for past_object in past_objects {
732 futures.push(self.try_get_past_object(
733 past_object.object_id,
734 past_object.version,
735 options.clone(),
736 ));
737 }
738 let results = join_all(futures).await;
739
740 let (oks, errs): (Vec<_>, Vec<_>) = results.into_iter().partition(Result::is_ok);
741 let success = oks.into_iter().filter_map(Result::ok).collect();
742 let errors: Vec<_> = errs.into_iter().filter_map(Result::err).collect();
743 if !errors.is_empty() {
744 let error_string = errors
745 .iter()
746 .map(|e| e.to_string())
747 .collect::<Vec<String>>()
748 .join("; ");
749 Err(anyhow!("{error_string}").into()) } else {
751 Ok(success)
752 }
753 } else {
754 Err(SuiRpcInputError::SizeLimitExceeded(
755 QUERY_MAX_RESULT_LIMIT.to_string(),
756 ))?
757 }
758 })
759 }
760
761 #[instrument(skip(self))]
762 async fn get_total_transaction_blocks(&self) -> RpcResult<BigInt<u64>> {
763 with_tracing!(async move {
764 Ok(self
765 .state
766 .get_total_transaction_blocks()
767 .map_err(Error::from)?
768 .into()) })
770 }
771
772 #[instrument(skip(self))]
773 async fn get_transaction_block(
774 &self,
775 digest: TransactionDigest,
776 opts: Option<SuiTransactionBlockResponseOptions>,
777 ) -> RpcResult<SuiTransactionBlockResponse> {
778 with_tracing!(async move {
779 let opts = opts.unwrap_or_default();
780 let mut temp_response = IntermediateTransactionResponse::new(digest);
781
782 let transaction_kv_store = self.transaction_kv_store.clone();
784 let transaction = async move {
785 let ret = transaction_kv_store.get_tx(digest).await.map_err(|err| {
786 debug!(tx_digest=?digest, "Failed to get transaction: {}", err);
787 Error::from(err)
788 });
789 add_server_timing("tx_kv_lookup");
790 ret
791 }
792 .await?;
793 let input_objects = transaction
794 .data()
795 .inner()
796 .intent_message
797 .value
798 .input_objects()
799 .unwrap_or_default();
800
801 if opts.require_input() {
803 temp_response.transaction = Some(transaction);
804 }
805
806 if opts.require_effects() {
808 let transaction_kv_store = self.transaction_kv_store.clone();
809 temp_response.effects = Some(
810 transaction_kv_store
811 .get_fx_by_tx_digest(digest)
812 .await
813 .map_err(|err| {
814 debug!(tx_digest=?digest, "Failed to get effects: {:?}", err);
815 Error::from(err)
816 })?,
817 );
818 }
819
820 temp_response.checkpoint_seq = self
821 .transaction_kv_store
822 .deprecated_get_transaction_checkpoint(digest)
823 .await
824 .map_err(|e| {
825 error!("Failed to retrieve checkpoint sequence for transaction {digest:?} with error: {e:?}");
826 Error::from(e)
827 })?;
828
829 if let Some(checkpoint_seq) = &temp_response.checkpoint_seq {
830 let kv_store = self.transaction_kv_store.clone();
831 let checkpoint_seq = *checkpoint_seq;
832 let checkpoint = kv_store
833 .get_checkpoint_summary(checkpoint_seq)
835 .await
836 .map_err(|e| {
837 error!("Failed to get checkpoint by sequence number: {checkpoint_seq:?} with error: {e:?}");
838 Error::from(e)
839 })?;
840 temp_response.timestamp = Some(checkpoint.timestamp_ms);
842 }
843
844 if opts.show_events && temp_response.effects.is_some() {
845 let transaction_kv_store = self.transaction_kv_store.clone();
846 let events = transaction_kv_store
847 .multi_get_events_by_tx_digests(&[digest])
848 .await
849 .map_err(|e| {
850 error!("Failed to call get transaction events for transaction: {digest:?} with error {e:?}");
851 Error::from(e)
852 })?
853 .pop()
854 .flatten();
855 match events {
856 None => temp_response.events = Some(SuiTransactionBlockEvents::default()),
857 Some(events) => match to_sui_transaction_events(self, digest, events) {
858 Ok(e) => temp_response.events = Some(e),
859 Err(e) => temp_response.errors.push(e.to_string()),
860 },
861 }
862 }
863
864 let object_cache =
865 ObjectProviderCache::new((self.state.clone(), self.transaction_kv_store.clone()));
866 if opts.show_balance_changes
867 && let Some(effects) = &temp_response.effects
868 {
869 let balance_changes =
870 get_balance_changes_from_effect(&object_cache, effects, input_objects, None)
871 .await;
872
873 if let Ok(balance_changes) = balance_changes {
874 temp_response.balance_changes = Some(balance_changes);
875 } else {
876 temp_response.errors.push(format!(
877 "Cannot retrieve balance changes: {}",
878 balance_changes.unwrap_err()
879 ));
880 }
881 }
882
883 if opts.show_object_changes
884 && let (Some(effects), Some(input)) =
885 (&temp_response.effects, &temp_response.transaction)
886 {
887 let sender = input.data().intent_message().value.sender();
888 let object_changes = get_object_changes(
889 &object_cache,
890 effects,
891 sender,
892 effects.modified_at_versions(),
893 effects.all_changed_objects(),
894 effects.all_removed_objects(),
895 )
896 .await;
897
898 if let Ok(object_changes) = object_changes {
899 temp_response.object_changes = Some(object_changes);
900 } else {
901 temp_response.errors.push(format!(
902 "Cannot retrieve object changes: {}",
903 object_changes.unwrap_err()
904 ));
905 }
906 }
907 let epoch_store = self.state.load_epoch_store_one_call_per_task();
908 convert_to_response(temp_response, &opts, epoch_store.module_cache())
909 })
910 }
911
912 #[instrument(skip(self))]
913 async fn multi_get_transaction_blocks(
914 &self,
915 digests: Vec<TransactionDigest>,
916 opts: Option<SuiTransactionBlockResponseOptions>,
917 ) -> RpcResult<Vec<SuiTransactionBlockResponse>> {
918 with_tracing!(async move {
919 self.multi_get_transaction_blocks_internal(digests, opts)
920 .await
921 })
922 }
923
924 #[instrument(skip(self))]
925 async fn get_events(&self, transaction_digest: TransactionDigest) -> RpcResult<Vec<SuiEvent>> {
926 with_tracing!(async move {
927 let state = self.state.clone();
928 let transaction_kv_store = self.transaction_kv_store.clone();
929 async move {
930 let store = state.load_epoch_store_one_call_per_task();
931 let events = transaction_kv_store
932 .multi_get_events_by_tx_digests(&[transaction_digest])
933 .await
934 .map_err(|e| {
935 error!("Failed to get transaction events for transaction {transaction_digest:?} with error: {e:?}");
936 Error::StateReadError(e.into())
937 })?
938 .pop()
939 .flatten();
940 Ok(match events {
941 Some(events) => events
942 .data
943 .into_iter()
944 .enumerate()
945 .map(|(seq, e)| {
946 let layout = store
947 .executor()
948 .type_layout_resolver(Box::new(
949 &state.get_backing_package_store().as_ref(),
950 ))
951 .get_annotated_layout(&e.type_)?;
952 SuiEvent::try_from(e, transaction_digest, seq as u64, None, layout)
953 })
954 .collect::<Result<Vec<_>, _>>()
955 .map_err(Error::SuiError)?,
956 None => vec![],
957 })
958 }
959 .await
960 })
961 }
962
963 #[instrument(skip(self))]
964 async fn get_latest_checkpoint_sequence_number(&self) -> RpcResult<BigInt<u64>> {
965 with_tracing!(async move {
966 Ok(self
967 .state
968 .get_latest_checkpoint_sequence_number()
969 .map_err(|e| {
970 SuiRpcInputError::GenericNotFound(format!(
971 "Latest checkpoint sequence number was not found with error :{e}"
972 ))
973 })?
974 .into())
975 })
976 }
977
978 #[instrument(skip(self))]
979 async fn get_checkpoint(&self, id: CheckpointId) -> RpcResult<Checkpoint> {
980 with_tracing!(self.get_checkpoint_internal(id))
981 }
982
983 #[instrument(skip(self))]
984 async fn get_checkpoints(
985 &self,
986 cursor: Option<BigInt<u64>>,
988 limit: Option<usize>,
989 descending_order: bool,
990 ) -> RpcResult<CheckpointPage> {
991 with_tracing!(async move {
992 let limit = validate_limit(limit, QUERY_MAX_RESULT_LIMIT_CHECKPOINTS)
993 .map_err(SuiRpcInputError::from)?;
994
995 let state = self.state.clone();
996 let kv_store = self.transaction_kv_store.clone();
997
998 self.metrics.get_checkpoints_limit.observe(limit as f64);
999
1000 let mut data = Self::get_checkpoints_internal(
1001 state,
1002 kv_store,
1003 cursor.map(|s| *s),
1004 limit as u64 + 1,
1005 descending_order,
1006 )
1007 .await
1008 .map_err(Error::from)?;
1009
1010 let has_next_page = data.len() > limit;
1011 data.truncate(limit);
1012
1013 let next_cursor = if has_next_page {
1014 data.last().cloned().map(|d| d.sequence_number.into())
1015 } else {
1016 None
1017 };
1018
1019 self.metrics
1020 .get_checkpoints_result_size
1021 .observe(data.len() as f64);
1022 self.metrics
1023 .get_checkpoints_result_size_total
1024 .inc_by(data.len() as u64);
1025
1026 Ok(CheckpointPage {
1027 data,
1028 next_cursor,
1029 has_next_page,
1030 })
1031 })
1032 }
1033
1034 #[instrument(skip(self))]
1035 async fn get_protocol_config(
1036 &self,
1037 version: Option<BigInt<u64>>,
1038 ) -> RpcResult<ProtocolConfigResponse> {
1039 with_tracing!(async move {
1040 version
1041 .map(|v| {
1042 ProtocolConfig::get_for_version_if_supported(
1043 (*v).into(),
1044 self.state.get_chain_identifier()?.chain(),
1045 )
1046 .ok_or(SuiRpcInputError::ProtocolVersionUnsupported(
1047 ProtocolVersion::MIN.as_u64(),
1048 ProtocolVersion::MAX.as_u64(),
1049 ))
1050 .map_err(Error::from)
1051 })
1052 .unwrap_or(Ok(self
1053 .state
1054 .load_epoch_store_one_call_per_task()
1055 .protocol_config()
1056 .clone()))
1057 .map(ProtocolConfigResponse::from)
1058 })
1059 }
1060
1061 #[instrument(skip(self))]
1062 async fn get_chain_identifier(&self) -> RpcResult<String> {
1063 with_tracing!(async move {
1064 let ci = self.state.get_chain_identifier()?;
1065 Ok(ci.to_string())
1066 })
1067 }
1068 #[instrument(skip(self))]
1069 async fn verify_zklogin_signature(
1070 &self,
1071 bytes: String,
1072 signature: String,
1073 intent_scope: ZkLoginIntentScope,
1074 author: SuiAddress,
1075 ) -> RpcResult<ZkLoginVerifyResult> {
1076 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1077 let curr_epoch = epoch_store.epoch();
1078 let zklogin_env_native = match self
1079 .state
1080 .get_chain_identifier()
1081 .expect("get chain identifier should not fail")
1082 .chain()
1083 {
1084 sui_protocol_config::Chain::Mainnet | sui_protocol_config::Chain::Testnet => {
1085 ZkLoginEnv::Prod
1086 }
1087 _ => ZkLoginEnv::Test,
1088 };
1089 let GenericSignature::ZkLoginAuthenticator(zklogin_sig) =
1090 GenericSignature::from_bytes(&Base64::decode(&signature).map_err(Error::from)?)
1091 .map_err(Error::from)?
1092 else {
1093 return Err(SuiRpcInputError::GenericNotFound(
1094 "Endpoint only supports zkLogin signature".to_string(),
1095 )
1096 .into());
1097 };
1098
1099 let new_jwks =
1100 match get_authenticator_state(self.state.get_object_store()).map_err(Error::from)? {
1101 Some(authenticator_state) => authenticator_state.active_jwks,
1102 None => {
1103 return Err(SuiRpcInputError::GenericNotFound(
1104 "Authenticator state not found".to_string(),
1105 )
1106 .into());
1107 }
1108 };
1109
1110 let mut oidc_provider_jwks = ImHashMap::new();
1112 for active_jwk in new_jwks.iter() {
1113 let ActiveJwk { jwk_id, jwk, .. } = active_jwk;
1114 match oidc_provider_jwks.entry(jwk_id.clone()) {
1115 im::hashmap::Entry::Occupied(_) => {
1116 warn!("JWK with kid {:?} already exists", jwk_id);
1117 }
1118 im::hashmap::Entry::Vacant(entry) => {
1119 entry.insert(jwk.clone());
1120 }
1121 }
1122 }
1123 let verify_params = VerifyParams::new(
1124 oidc_provider_jwks,
1125 vec![],
1126 zklogin_env_native,
1127 true,
1128 true,
1129 true,
1130 Some(30),
1131 true,
1132 );
1133 match intent_scope {
1134 ZkLoginIntentScope::TransactionData => {
1135 let tx_data: TransactionData =
1136 bcs::from_bytes(&Base64::decode(&bytes).map_err(Error::from)?)
1137 .map_err(Error::from)?;
1138 let intent_msg = IntentMessage::new(Intent::sui_transaction(), tx_data.clone());
1139 let sig = GenericSignature::ZkLoginAuthenticator(zklogin_sig);
1140 match sig.verify_authenticator(
1141 &intent_msg,
1142 author,
1143 curr_epoch,
1144 &verify_params,
1145 Arc::new(VerifiedDigestCache::new_empty()),
1146 ) {
1147 Ok(_) => Ok(ZkLoginVerifyResult {
1148 success: true,
1149 errors: vec![],
1150 }),
1151 Err(e) => Ok(ZkLoginVerifyResult {
1152 success: false,
1153 errors: vec![e.to_string()],
1154 }),
1155 }
1156 }
1157 ZkLoginIntentScope::PersonalMessage => {
1158 let data = PersonalMessage {
1159 message: Base64::decode(&bytes).map_err(Error::from)?,
1160 };
1161 let intent_msg = IntentMessage::new(Intent::personal_message(), data);
1162
1163 let sig = GenericSignature::ZkLoginAuthenticator(zklogin_sig);
1164 match sig.verify_authenticator(
1165 &intent_msg,
1166 author,
1167 curr_epoch,
1168 &verify_params,
1169 Arc::new(VerifiedDigestCache::new_empty()),
1170 ) {
1171 Ok(_) => Ok(ZkLoginVerifyResult {
1172 success: true,
1173 errors: vec![],
1174 }),
1175 Err(e) => Ok(ZkLoginVerifyResult {
1176 success: false,
1177 errors: vec![e.to_string()],
1178 }),
1179 }
1180 }
1181 }
1182 }
1183}
1184
1185impl SuiRpcModule for ReadApi {
1186 fn rpc(self) -> RpcModule<Self> {
1187 self.into_rpc()
1188 }
1189
1190 fn rpc_doc_module() -> Module {
1191 ReadApiOpenRpc::module_doc()
1192 }
1193}
1194
1195#[instrument(skip_all)]
1196fn to_sui_transaction_events(
1197 fullnode_api: &ReadApi,
1198 tx_digest: TransactionDigest,
1199 events: TransactionEvents,
1200) -> Result<SuiTransactionBlockEvents, Error> {
1201 let epoch_store = fullnode_api.state.load_epoch_store_one_call_per_task();
1202 let backing_package_store = fullnode_api.state.get_backing_package_store();
1203 let mut layout_resolver = epoch_store
1204 .executor()
1205 .type_layout_resolver(Box::new(backing_package_store.as_ref()));
1206 Ok(SuiTransactionBlockEvents::try_from(
1207 events,
1208 tx_digest,
1209 None,
1210 layout_resolver.as_mut(),
1211 )?)
1212}
1213
1214#[derive(Debug, thiserror::Error)]
1215pub enum ObjectDisplayError {
1216 #[error("Not a move struct")]
1217 NotMoveStruct,
1218
1219 #[error("Failed to extract layout")]
1220 Layout,
1221
1222 #[error("Failed to extract Move object")]
1223 MoveObject,
1224
1225 #[error(transparent)]
1226 Deserialization(#[from] SuiError),
1227
1228 #[error("Failed to deserialize 'VersionUpdatedEvent': {0}")]
1229 Bcs(#[from] bcs::Error),
1230
1231 #[error(transparent)]
1232 StateReadError(#[from] StateReadError),
1233}
1234
1235#[instrument(skip(fullnode_api, kv_store))]
1236async fn get_display_fields(
1237 fullnode_api: &ReadApi,
1238 kv_store: &Arc<TransactionKeyValueStore>,
1239 original_object: &Object,
1240 original_layout: &Option<MoveStructLayout>,
1241) -> Result<DisplayFieldsResponse, ObjectDisplayError> {
1242 let Some(layout) = original_layout else {
1243 return Ok(DisplayFieldsResponse {
1244 data: None,
1245 error: None,
1246 });
1247 };
1248
1249 let Some(move_object) = original_object.data.try_as_move() else {
1250 return Err(ObjectDisplayError::MoveObject);
1251 };
1252
1253 let Some(display_object) =
1254 get_display_object_by_type(kv_store, fullnode_api, &layout.type_).await?
1255 else {
1256 return Ok(DisplayFieldsResponse {
1257 data: None,
1258 error: None,
1259 });
1260 };
1261
1262 let format = match Format::parse(MAX_DISPLAY_NESTED_LEVEL, &display_object.fields) {
1263 Ok(format) => format,
1264 Err(e) => {
1265 return Ok(DisplayFieldsResponse {
1266 data: None,
1267 error: Some(SuiObjectResponseError::DisplayError {
1268 error: e.to_string(),
1269 }),
1270 });
1271 }
1272 };
1273
1274 let layout = MoveTypeLayout::Struct(Box::new(layout.clone()));
1275 let display = match format.display(*MAX_DISPLAY_OUTPUT_SIZE, move_object.contents(), &layout) {
1276 Ok(fields) => fields,
1277 Err(e) => {
1278 return Ok(DisplayFieldsResponse {
1279 data: None,
1280 error: Some(SuiObjectResponseError::DisplayError {
1281 error: e.to_string(),
1282 }),
1283 });
1284 }
1285 };
1286
1287 let mut fields = BTreeMap::new();
1288 let mut errors = vec![];
1289
1290 for (key, value) in display {
1291 match value {
1292 Ok(v) => {
1293 fields.insert(key, v);
1294 }
1295 Err(e) => {
1296 errors.push(e.to_string());
1297 }
1298 }
1299 }
1300
1301 Ok(DisplayFieldsResponse {
1302 data: (!fields.is_empty()).then_some(fields),
1303 error: (!errors.is_empty()).then(|| SuiObjectResponseError::DisplayError {
1304 error: errors.join("; "),
1305 }),
1306 })
1307}
1308
1309#[instrument(skip(kv_store, fullnode_api))]
1310async fn get_display_object_by_type(
1311 kv_store: &Arc<TransactionKeyValueStore>,
1312 fullnode_api: &ReadApi,
1313 object_type: &StructTag,
1314 ) -> Result<Option<DisplayVersionUpdatedEvent>, ObjectDisplayError> {
1316 let mut events = fullnode_api
1317 .state
1318 .query_events(
1319 kv_store,
1320 EventFilter::MoveEventType(DisplayVersionUpdatedEvent::type_(object_type)),
1321 None,
1322 1,
1323 true,
1324 )
1325 .await?;
1326
1327 if let Some(event) = events.pop() {
1330 let display: DisplayVersionUpdatedEvent = bcs::from_bytes(&event.bcs.into_bytes())?;
1331 Ok(Some(display))
1332 } else {
1333 Ok(None)
1334 }
1335}
1336
1337#[instrument(skip_all)]
1338fn convert_to_response(
1339 cache: IntermediateTransactionResponse,
1340 opts: &SuiTransactionBlockResponseOptions,
1341 module_cache: &impl GetModule,
1342) -> RpcInterimResult<SuiTransactionBlockResponse> {
1343 let mut response = SuiTransactionBlockResponse::new(cache.digest);
1344 response.errors = cache.errors;
1345
1346 if let Some(transaction) = cache.transaction {
1347 if opts.show_raw_input {
1348 response.raw_transaction = bcs::to_bytes(transaction.data()).map_err(|e| {
1349 anyhow!("Failed to serialize raw transaction with error: {e}")
1351 })?;
1352 }
1353
1354 if opts.show_input {
1355 response.transaction = Some(SuiTransactionBlock::try_from(
1356 transaction.into_data(),
1357 module_cache,
1358 )?);
1359 }
1360 }
1361
1362 if let Some(effects) = cache.effects {
1363 if opts.show_raw_effects {
1364 response.raw_effects = bcs::to_bytes(&effects).map_err(|e| {
1365 anyhow!("Failed to serialize transaction block effects with error: {e}")
1367 })?;
1368 }
1369
1370 if opts.show_effects {
1371 response.effects = Some(effects.try_into().map_err(|e| {
1372 anyhow!("Failed to convert transaction block effects with error: {e}")
1374 })?);
1375 }
1376 }
1377
1378 response.checkpoint = cache.checkpoint_seq;
1379 response.timestamp_ms = cache.timestamp;
1380
1381 if opts.show_events {
1382 response.events = cache.events;
1383 }
1384
1385 if opts.show_balance_changes {
1386 response.balance_changes = cache.balance_changes;
1387 }
1388
1389 if opts.show_object_changes {
1390 response.object_changes = cache.object_changes;
1391 }
1392
1393 Ok(response)
1394}
1395
1396fn calculate_checkpoint_numbers(
1397 cursor: Option<CheckpointSequenceNumber>,
1399 limit: u64,
1400 descending_order: bool,
1401 max_checkpoint: CheckpointSequenceNumber,
1402) -> Vec<CheckpointSequenceNumber> {
1403 let (start_index, end_index) = match cursor {
1404 Some(t) => {
1405 if descending_order {
1406 let start = std::cmp::min(t.saturating_sub(1), max_checkpoint);
1407 let end = start.saturating_sub(limit - 1);
1408 (end, start)
1409 } else {
1410 let start =
1411 std::cmp::min(t.checked_add(1).unwrap_or(max_checkpoint), max_checkpoint);
1412 let end = std::cmp::min(
1413 start.checked_add(limit - 1).unwrap_or(max_checkpoint),
1414 max_checkpoint,
1415 );
1416 (start, end)
1417 }
1418 }
1419 None => {
1420 if descending_order {
1421 (max_checkpoint.saturating_sub(limit - 1), max_checkpoint)
1422 } else {
1423 (0, std::cmp::min(limit - 1, max_checkpoint))
1424 }
1425 }
1426 };
1427
1428 if descending_order {
1429 (start_index..=end_index).rev().collect()
1430 } else {
1431 (start_index..=end_index).collect()
1432 }
1433}
1434
1435#[cfg(test)]
1436mod tests {
1437 use super::*;
1438
1439 #[test]
1440 fn test_calculate_checkpoint_numbers() {
1441 let cursor = Some(10);
1442 let limit = 5;
1443 let descending_order = true;
1444 let max_checkpoint = 15;
1445
1446 let checkpoint_numbers =
1447 calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1448
1449 assert_eq!(checkpoint_numbers, vec![9, 8, 7, 6, 5]);
1450 }
1451
1452 #[test]
1453 fn test_calculate_checkpoint_numbers_descending_no_cursor() {
1454 let cursor = None;
1455 let limit = 5;
1456 let descending_order = true;
1457 let max_checkpoint = 15;
1458
1459 let checkpoint_numbers =
1460 calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1461
1462 assert_eq!(checkpoint_numbers, vec![15, 14, 13, 12, 11]);
1463 }
1464
1465 #[test]
1466 fn test_calculate_checkpoint_numbers_ascending_no_cursor() {
1467 let cursor = None;
1468 let limit = 5;
1469 let descending_order = false;
1470 let max_checkpoint = 15;
1471
1472 let checkpoint_numbers =
1473 calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1474
1475 assert_eq!(checkpoint_numbers, vec![0, 1, 2, 3, 4]);
1476 }
1477
1478 #[test]
1479 fn test_calculate_checkpoint_numbers_ascending_with_cursor() {
1480 let cursor = Some(10);
1481 let limit = 5;
1482 let descending_order = false;
1483 let max_checkpoint = 15;
1484
1485 let checkpoint_numbers =
1486 calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1487
1488 assert_eq!(checkpoint_numbers, vec![11, 12, 13, 14, 15]);
1489 }
1490
1491 #[test]
1492 fn test_calculate_checkpoint_numbers_ascending_limit_exceeds_max() {
1493 let cursor = None;
1494 let limit = 20;
1495 let descending_order = false;
1496 let max_checkpoint = 15;
1497
1498 let checkpoint_numbers =
1499 calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1500
1501 assert_eq!(checkpoint_numbers, (0..=15).collect::<Vec<_>>());
1502 }
1503
1504 #[test]
1505 fn test_calculate_checkpoint_numbers_descending_limit_exceeds_max() {
1506 let cursor = None;
1507 let limit = 20;
1508 let descending_order = true;
1509 let max_checkpoint = 15;
1510
1511 let checkpoint_numbers =
1512 calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1513
1514 assert_eq!(checkpoint_numbers, (0..=15).rev().collect::<Vec<_>>());
1515 }
1516}