1use std::collections::{BTreeMap, HashMap};
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::anyhow;
9use async_trait::async_trait;
10use backoff::ExponentialBackoff;
11use backoff::future::retry;
12use fastcrypto::encoding::Base64;
13use fastcrypto_zkp::bn254::zk_login_api::ZkLoginEnv;
14use futures::future::join_all;
15use im::hashmap::HashMap as ImHashMap;
16use indexmap::map::IndexMap;
17use itertools::Itertools;
18use jsonrpsee::RpcModule;
19use jsonrpsee::core::RpcResult;
20use move_bytecode_utils::module_cache::GetModule;
21use move_core_types::annotated_value::{MoveStructLayout, MoveTypeLayout};
22use move_core_types::language_storage::StructTag;
23use once_cell::sync::Lazy;
24use shared_crypto::intent::{IntentMessage, PersonalMessage};
25use sui_display::v1::Format;
26use sui_json_rpc_types::ZkLoginIntentScope;
27use sui_types::base_types::SuiAddress;
28use sui_types::signature::{GenericSignature, VerifyParams};
29use sui_types::signature_verification::VerifiedDigestCache;
30use sui_types::storage::ObjectKey;
31use tap::TapFallible;
32use tracing::{debug, error, info, instrument, trace, warn};
33
34use mysten_metrics::add_server_timing;
35use mysten_metrics::spawn_monitored_task;
36use sui_core::authority::AuthorityState;
37use sui_json_rpc_api::{
38 JsonRpcMetrics, QUERY_MAX_RESULT_LIMIT, QUERY_MAX_RESULT_LIMIT_CHECKPOINTS, ReadApiOpenRpc,
39 ReadApiServer, validate_limit,
40};
41use sui_json_rpc_types::{
42 BalanceChange, Checkpoint, CheckpointId, CheckpointPage, DisplayFieldsResponse, EventFilter,
43 ObjectChange, ProtocolConfigResponse, SuiEvent, SuiGetPastObjectRequest, SuiObjectDataOptions,
44 SuiObjectResponse, SuiPastObjectResponse, SuiTransactionBlock, SuiTransactionBlockEvents,
45 SuiTransactionBlockResponse, SuiTransactionBlockResponseOptions,
46};
47use sui_open_rpc::Module;
48use sui_protocol_config::{ProtocolConfig, ProtocolVersion};
49use sui_storage::key_value_store::TransactionKeyValueStore;
50use sui_types::base_types::{ObjectID, SequenceNumber, TransactionDigest};
51use sui_types::crypto::AggregateAuthoritySignature;
52use sui_types::display::DisplayVersionUpdatedEvent;
53use sui_types::effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents};
54use sui_types::error::{SuiError, SuiObjectResponseError};
55use sui_types::messages_checkpoint::{
56 CheckpointContents, CheckpointSequenceNumber, CheckpointSummary, CheckpointTimestamp,
57};
58use sui_types::object::{Object, ObjectRead, PastObjectRead};
59use sui_types::sui_serde::BigInt;
60use sui_types::transaction::TransactionDataAPI;
61use sui_types::transaction::{Transaction, TransactionData};
62
63use crate::authority_state::{StateRead, StateReadError, StateReadResult};
64use crate::error::{Error, RpcInterimResult, SuiRpcInputError};
65use crate::{ObjectProvider, with_tracing};
66use crate::{
67 ObjectProviderCache, SuiRpcModule, get_balance_changes_from_effect, get_object_changes,
68};
69use fastcrypto::encoding::Encoding;
70use fastcrypto::traits::ToFromBytes;
71use shared_crypto::intent::Intent;
72use sui_json_rpc_types::ZkLoginVerifyResult;
73use sui_types::authenticator_state::{ActiveJwk, get_authenticator_state};
74
75const MAX_DISPLAY_NESTED_LEVEL: usize = 10;
77
78const DEFAULT_MAX_DISPLAY_OUTPUT_SIZE: usize = 1024 * 1024;
80
81static MAX_DISPLAY_OUTPUT_SIZE: Lazy<usize> = Lazy::new(|| {
83 let max_opt = std::env::var("MAX_DISPLAY_OUTPUT_SIZE")
84 .ok()
85 .and_then(|s| s.parse().ok());
86
87 if let Some(max) = max_opt {
88 info!("Using custom value for 'MAX_DISPLAY_OUTPUT_SIZE': {max}");
89 max
90 } else {
91 DEFAULT_MAX_DISPLAY_OUTPUT_SIZE
92 }
93});
94
95#[derive(Clone)]
98pub struct ReadApi {
99 pub state: Arc<dyn StateRead>,
100 pub transaction_kv_store: Arc<TransactionKeyValueStore>,
101 pub metrics: Arc<JsonRpcMetrics>,
102}
103
104#[derive(Default)]
108struct IntermediateTransactionResponse {
109 digest: TransactionDigest,
110 transaction: Option<Transaction>,
111 effects: Option<TransactionEffects>,
112 events: Option<SuiTransactionBlockEvents>,
113 checkpoint_seq: Option<CheckpointSequenceNumber>,
114 balance_changes: Option<Vec<BalanceChange>>,
115 object_changes: Option<Vec<ObjectChange>>,
116 timestamp: Option<CheckpointTimestamp>,
117 errors: Vec<String>,
118}
119
120impl IntermediateTransactionResponse {
121 pub fn new(digest: TransactionDigest) -> Self {
122 Self {
123 digest,
124 ..Default::default()
125 }
126 }
127
128 pub fn transaction(&self) -> &Option<Transaction> {
129 &self.transaction
130 }
131}
132
133impl ReadApi {
134 pub fn new(
135 state: Arc<AuthorityState>,
136 transaction_kv_store: Arc<TransactionKeyValueStore>,
137 metrics: Arc<JsonRpcMetrics>,
138 ) -> Self {
139 Self {
140 state,
141 transaction_kv_store,
142 metrics,
143 }
144 }
145
146 async fn get_checkpoint_internal(&self, id: CheckpointId) -> Result<Checkpoint, Error> {
147 Ok(match id {
148 CheckpointId::SequenceNumber(seq) => {
149 let verified_summary = self
150 .transaction_kv_store
151 .get_checkpoint_summary(seq)
152 .await?;
153 let content = self
154 .transaction_kv_store
155 .get_checkpoint_contents(verified_summary.sequence_number)
156 .await?;
157 let signature = verified_summary.auth_sig().signature.clone();
158 (verified_summary.into_data(), content, signature).into()
159 }
160 CheckpointId::Digest(digest) => {
161 let verified_summary = self
162 .transaction_kv_store
163 .get_checkpoint_summary_by_digest(digest)
164 .await?;
165 let content = self
166 .transaction_kv_store
167 .get_checkpoint_contents(verified_summary.sequence_number)
168 .await?;
169 let signature = verified_summary.auth_sig().signature.clone();
170 (verified_summary.into_data(), content, signature).into()
171 }
172 })
173 }
174
175 pub async fn get_checkpoints_internal(
176 state: Arc<dyn StateRead>,
177 transaction_kv_store: Arc<TransactionKeyValueStore>,
178 cursor: Option<CheckpointSequenceNumber>,
180 limit: u64,
181 descending_order: bool,
182 ) -> StateReadResult<Vec<Checkpoint>> {
183 let max_checkpoint = state.get_latest_checkpoint_sequence_number()?;
184 let checkpoint_numbers =
185 calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
186
187 let verified_checkpoints = transaction_kv_store
188 .multi_get_checkpoints_summaries(&checkpoint_numbers)
189 .await?;
190
191 let checkpoint_summaries_and_signatures: Vec<(
192 CheckpointSummary,
193 AggregateAuthoritySignature,
194 )> = verified_checkpoints
195 .into_iter()
196 .flatten()
197 .map(|check| {
198 (
199 check.clone().into_summary_and_sequence().1,
200 check.get_validator_signature(),
201 )
202 })
203 .collect();
204
205 let checkpoint_contents = transaction_kv_store
206 .multi_get_checkpoints_contents(&checkpoint_numbers)
207 .await?;
208 let contents: Vec<CheckpointContents> = checkpoint_contents.into_iter().flatten().collect();
209
210 let mut checkpoints: Vec<Checkpoint> = vec![];
211
212 for (summary_and_sig, content) in checkpoint_summaries_and_signatures
213 .into_iter()
214 .zip(contents.into_iter())
215 {
216 checkpoints.push(Checkpoint::from((
217 summary_and_sig.0,
218 content,
219 summary_and_sig.1,
220 )));
221 }
222
223 Ok(checkpoints)
224 }
225
226 #[instrument(skip_all)]
227 async fn multi_get_transaction_blocks_internal(
228 &self,
229 digests: Vec<TransactionDigest>,
230 opts: Option<SuiTransactionBlockResponseOptions>,
231 ) -> Result<Vec<SuiTransactionBlockResponse>, Error> {
232 trace!("start");
233
234 let num_digests = digests.len();
235 if num_digests > *QUERY_MAX_RESULT_LIMIT {
236 Err(SuiRpcInputError::SizeLimitExceeded(
237 QUERY_MAX_RESULT_LIMIT.to_string(),
238 ))?
239 }
240 self.metrics
241 .get_tx_blocks_limit
242 .observe(digests.len() as f64);
243
244 let opts = opts.unwrap_or_default();
245
246 let mut temp_response: IndexMap<&TransactionDigest, IntermediateTransactionResponse> =
248 IndexMap::from_iter(
249 digests
250 .iter()
251 .map(|k| (k, IntermediateTransactionResponse::new(*k))),
252 );
253 if temp_response.len() < num_digests {
254 Err(SuiRpcInputError::ContainsDuplicates)?
255 }
256
257 if opts.require_input() {
258 trace!("getting input");
259 let digests_clone = digests.clone();
260 let transactions =
261 self.transaction_kv_store.multi_get_tx(&digests_clone).await.tap_err(
262 |err| debug!(digests=?digests_clone, "Failed to multi get transactions: {:?}", err),
263 )?;
264
265 for ((_digest, cache_entry), txn) in
266 temp_response.iter_mut().zip(transactions.into_iter())
267 {
268 cache_entry.transaction = txn;
269 }
270 }
271
272 if opts.require_effects() {
274 trace!("getting effects");
275 let digests_clone = digests.clone();
276 let effects_list = self.transaction_kv_store
277 .multi_get_fx_by_tx_digest(&digests_clone)
278 .await
279 .tap_err(
280 |err| debug!(digests=?digests_clone, "Failed to multi get effects for transactions: {:?}", err),
281 )?;
282 for ((_digest, cache_entry), e) in
283 temp_response.iter_mut().zip(effects_list.into_iter())
284 {
285 cache_entry.effects = e;
286 }
287 }
288
289 trace!("getting checkpoint sequence numbers");
290 let checkpoint_seq_list = self
291 .transaction_kv_store
292 .multi_get_transaction_checkpoint(&digests)
293 .await
294 .tap_err(
295 |err| debug!(digests=?digests, "Failed to multi get checkpoint sequence number: {:?}", err))?;
296 for ((_digest, cache_entry), seq) in temp_response
297 .iter_mut()
298 .zip(checkpoint_seq_list.into_iter())
299 {
300 cache_entry.checkpoint_seq = seq;
301 }
302
303 let unique_checkpoint_numbers = temp_response
304 .values()
305 .filter_map(|cache_entry| cache_entry.checkpoint_seq)
306 .unique()
309 .collect::<Vec<CheckpointSequenceNumber>>();
310
311 trace!("getting checkpoint summaries");
313 let timestamps = self
314 .transaction_kv_store
315 .multi_get_checkpoints_summaries(&unique_checkpoint_numbers)
316 .await
317 .map_err(|e| {
318 Error::UnexpectedError(format!("Failed to fetch checkpoint summaries by these checkpoint ids: {unique_checkpoint_numbers:?} with error: {e:?}"))
319 })?
320 .into_iter()
321 .map(|c| c.map(|checkpoint| checkpoint.timestamp_ms));
322
323 let checkpoint_to_timestamp = unique_checkpoint_numbers
325 .into_iter()
326 .zip(timestamps)
327 .collect::<HashMap<_, _>>();
328
329 for (_, cache_entry) in temp_response.iter_mut() {
331 if cache_entry.checkpoint_seq.is_some() {
332 cache_entry.timestamp = *checkpoint_to_timestamp
334 .get(cache_entry.checkpoint_seq.as_ref().unwrap())
335 .unwrap();
337 }
338 }
339
340 if opts.show_events {
341 trace!("getting events");
342 let mut non_empty_digests = vec![];
343 for cache_entry in temp_response.values() {
344 if let Some(effects) = &cache_entry.effects
345 && effects.events_digest().is_some()
346 {
347 non_empty_digests.push(cache_entry.digest);
348 }
349 }
350 let backoff = ExponentialBackoff {
352 max_elapsed_time: Some(Duration::from_secs(3)),
353 multiplier: 1.0,
354 ..ExponentialBackoff::default()
355 };
356 let mut events = retry(backoff, || async {
357 match self
358 .transaction_kv_store
359 .multi_get_events_by_tx_digests(&non_empty_digests)
360 .await
361 {
362 Ok(events) if !events.contains(&None) => Ok(events),
365 Ok(_) => Err(backoff::Error::transient(Error::UnexpectedError(
366 "Events not found, transaction execution may be incomplete.".into(),
367 ))),
368 Err(e) => Err(backoff::Error::permanent(Error::UnexpectedError(format!(
369 "Failed to call multi_get_events: {e:?}"
370 )))),
371 }
372 })
373 .await
374 .map_err(|e| {
375 Error::UnexpectedError(format!(
376 "Retrieving events with retry failed for transaction digests {digests:?}: {e:?}"
377 ))
378 })?
379 .into_iter();
380
381 for (_, cache_entry) in temp_response.iter_mut() {
383 let transaction_digest = cache_entry.digest;
384 if let Some(events_digest) =
385 cache_entry.effects.as_ref().and_then(|e| e.events_digest())
386 {
387 match events.next() {
388 Some(Some(ev)) => {
389 cache_entry.events =
390 Some(to_sui_transaction_events(self, cache_entry.digest, ev)?)
391 }
392 None | Some(None) => {
393 error!(
394 "Failed to fetch events with event digest {events_digest:?} for txn {transaction_digest}"
395 );
396 cache_entry.errors.push(format!(
397 "Failed to fetch events with event digest {events_digest:?}",
398 ))
399 }
400 }
401 } else {
402 cache_entry.events = Some(SuiTransactionBlockEvents::default());
405 }
406 }
407 }
408
409 let mut object_cache =
410 ObjectProviderCache::new((self.state.clone(), self.transaction_kv_store.clone()));
411
412 if opts.show_balance_changes || opts.show_object_changes {
414 let mut keys = vec![];
415 for resp in temp_response.values() {
416 let effects = resp.effects.as_ref().ok_or_else(|| {
417 SuiRpcInputError::GenericNotFound(
418 "unable to derive balance/object changes because effect is empty"
419 .to_string(),
420 )
421 })?;
422
423 for change in effects.object_changes() {
424 if let Some(input_version) = change.input_version {
425 keys.push(ObjectKey(change.id, input_version));
426 }
427 if let Some(output_version) = change.output_version {
428 keys.push(ObjectKey(change.id, output_version));
429 }
430 }
431 }
432
433 let objects = self
434 .transaction_kv_store
435 .multi_get_objects(&keys)
436 .await?
437 .into_iter()
438 .flatten()
439 .collect::<Vec<_>>();
440
441 object_cache.insert_objects_into_cache(objects);
442 }
443
444 if opts.show_balance_changes {
445 trace!("getting balance changes");
446
447 let mut results = vec![];
448 for resp in temp_response.values() {
449 let input_objects = if let Some(tx) = resp.transaction() {
450 tx.data()
451 .inner()
452 .intent_message
453 .value
454 .input_objects()
455 .unwrap_or_default()
456 } else {
457 Vec::new()
459 };
460 results.push(get_balance_changes_from_effect(
461 &object_cache,
462 resp.effects.as_ref().ok_or_else(|| {
463 SuiRpcInputError::GenericNotFound(
464 "unable to derive balance changes because effect is empty".to_string(),
465 )
466 })?,
467 input_objects,
468 None,
469 ));
470 }
471 let results = join_all(results).await;
472 for (result, entry) in results.into_iter().zip(temp_response.iter_mut()) {
473 match result {
474 Ok(balance_changes) => entry.1.balance_changes = Some(balance_changes),
475 Err(e) => entry
476 .1
477 .errors
478 .push(format!("Failed to fetch balance changes {e:?}")),
479 }
480 }
481 }
482
483 if opts.show_object_changes {
484 trace!("getting object changes");
485
486 let mut results = vec![];
487 for resp in temp_response.values() {
488 let effects = resp.effects.as_ref().ok_or_else(|| {
489 SuiRpcInputError::GenericNotFound(
490 "unable to derive object changes because effect is empty".to_string(),
491 )
492 })?;
493
494 results.push(get_object_changes(
495 &object_cache,
496 effects,
497 resp.transaction
498 .as_ref()
499 .ok_or_else(|| {
500 SuiRpcInputError::GenericNotFound(
501 "unable to derive object changes because transaction is empty"
502 .to_string(),
503 )
504 })?
505 .data()
506 .intent_message()
507 .value
508 .sender(),
509 effects.modified_at_versions(),
510 effects.all_changed_objects(),
511 effects.all_removed_objects(),
512 ));
513 }
514 let results = join_all(results).await;
515 for (result, entry) in results.into_iter().zip(temp_response.iter_mut()) {
516 match result {
517 Ok(object_changes) => entry.1.object_changes = Some(object_changes),
518 Err(e) => entry
519 .1
520 .errors
521 .push(format!("Failed to fetch object changes {e:?}")),
522 }
523 }
524 }
525
526 let epoch_store = self.state.load_epoch_store_one_call_per_task();
527 let converted_tx_block_resps = temp_response
528 .into_iter()
529 .map(|c| convert_to_response(c.1, &opts, epoch_store.module_cache()))
530 .collect::<Result<Vec<_>, _>>()?;
531
532 self.metrics
533 .get_tx_blocks_result_size
534 .observe(converted_tx_block_resps.len() as f64);
535 self.metrics
536 .get_tx_blocks_result_size_total
537 .inc_by(converted_tx_block_resps.len() as u64);
538
539 trace!("done");
540
541 Ok(converted_tx_block_resps)
542 }
543}
544
545#[async_trait]
546impl ReadApiServer for ReadApi {
547 #[instrument(skip(self))]
548 async fn get_object(
549 &self,
550 object_id: ObjectID,
551 options: Option<SuiObjectDataOptions>,
552 ) -> RpcResult<SuiObjectResponse> {
553 with_tracing!(async move {
554 let state = self.state.clone();
555 let object_read = spawn_monitored_task!(async move {
556 state.get_object_read(&object_id).map_err(|e| {
557 warn!(?object_id, "Failed to get object: {:?}", e);
558 Error::from(e)
559 })
560 })
561 .await
562 .map_err(Error::from)??;
563 let options = options.unwrap_or_default();
564
565 match object_read {
566 ObjectRead::NotExists(id) => Ok(SuiObjectResponse::new_with_error(
567 SuiObjectResponseError::NotExists { object_id: id },
568 )),
569 ObjectRead::Exists(object_ref, o, layout) => {
570 let mut display_fields = None;
571 if options.show_display {
572 match get_display_fields(self, &self.transaction_kv_store, &o, &layout)
573 .await
574 {
575 Ok(rendered_fields) => display_fields = Some(rendered_fields),
576 Err(e) => {
577 return Ok(SuiObjectResponse::new(
578 Some((object_ref, o, layout, options, None).try_into()?),
579 Some(SuiObjectResponseError::DisplayError {
580 error: e.to_string(),
581 }),
582 ));
583 }
584 }
585 }
586 Ok(SuiObjectResponse::new_with_data(
587 (object_ref, o, layout, options, display_fields).try_into()?,
588 ))
589 }
590 ObjectRead::Deleted((object_id, version, digest)) => Ok(
591 SuiObjectResponse::new_with_error(SuiObjectResponseError::Deleted {
592 object_id,
593 version,
594 digest,
595 }),
596 ),
597 }
598 })
599 }
600
601 #[instrument(skip(self))]
602 async fn multi_get_objects(
603 &self,
604 object_ids: Vec<ObjectID>,
605 options: Option<SuiObjectDataOptions>,
606 ) -> RpcResult<Vec<SuiObjectResponse>> {
607 with_tracing!(async move {
608 if object_ids.len() <= *QUERY_MAX_RESULT_LIMIT {
609 self.metrics
610 .get_objects_limit
611 .observe(object_ids.len() as f64);
612 let mut futures = vec![];
613 for object_id in object_ids {
614 futures.push(self.get_object(object_id, options.clone()));
615 }
616 let results = join_all(futures).await;
617
618 let objects_result: Result<Vec<SuiObjectResponse>, String> = results
619 .into_iter()
620 .map(|result| match result {
621 Ok(response) => Ok(response),
622 Err(error) => {
623 error!("Failed to fetch object with error: {error:?}");
624 Err(format!("Error: {}", error))
625 }
626 })
627 .collect();
628
629 let objects = objects_result.map_err(|err| {
630 Error::UnexpectedError(format!("Failed to fetch objects with error: {}", err))
631 })?;
632
633 self.metrics
634 .get_objects_result_size
635 .observe(objects.len() as f64);
636 self.metrics
637 .get_objects_result_size_total
638 .inc_by(objects.len() as u64);
639 Ok(objects)
640 } else {
641 Err(SuiRpcInputError::SizeLimitExceeded(
642 QUERY_MAX_RESULT_LIMIT.to_string(),
643 ))?
644 }
645 })
646 }
647
648 #[instrument(skip(self))]
649 async fn try_get_past_object(
650 &self,
651 object_id: ObjectID,
652 version: SequenceNumber,
653 options: Option<SuiObjectDataOptions>,
654 ) -> RpcResult<SuiPastObjectResponse> {
655 with_tracing!(async move {
656 let state = self.state.clone();
657 let past_read = spawn_monitored_task!(async move {
658 state.get_past_object_read(&object_id, version)
659 .map_err(|e| {
660 error!("Failed to call try_get_past_object for object: {object_id:?} version: {version:?} with error: {e:?}");
661 Error::from(e)
662 })}).await.map_err(Error::from)??;
663 let options = options.unwrap_or_default();
664 match past_read {
665 PastObjectRead::ObjectNotExists(id) => {
666 Ok(SuiPastObjectResponse::ObjectNotExists(id))
667 }
668 PastObjectRead::VersionFound(object_ref, o, layout) => {
669 let display_fields = if options.show_display {
670 Some(
672 get_display_fields(self, &self.transaction_kv_store, &o, &layout)
673 .await
674 .map_err(|e| {
675 Error::UnexpectedError(format!(
676 "Unable to render object at version {version}: {e}"
677 ))
678 })?,
679 )
680 } else {
681 None
682 };
683 Ok(SuiPastObjectResponse::VersionFound(
684 (object_ref, o, layout, options, display_fields).try_into()?,
685 ))
686 }
687 PastObjectRead::ObjectDeleted(oref) => {
688 Ok(SuiPastObjectResponse::ObjectDeleted(oref.into()))
689 }
690 PastObjectRead::VersionNotFound(id, seq_num) => {
691 Ok(SuiPastObjectResponse::VersionNotFound(id, seq_num))
692 }
693 PastObjectRead::VersionTooHigh {
694 object_id,
695 asked_version,
696 latest_version,
697 } => Ok(SuiPastObjectResponse::VersionTooHigh {
698 object_id,
699 asked_version,
700 latest_version,
701 }),
702 }
703 })
704 }
705
706 #[instrument(skip(self))]
707 async fn try_get_object_before_version(
708 &self,
709 object_id: ObjectID,
710 version: SequenceNumber,
711 ) -> RpcResult<SuiPastObjectResponse> {
712 let version = self
713 .state
714 .find_object_lt_or_eq_version(&object_id, &version)
715 .await
716 .map_err(Error::from)?
717 .map(|obj| obj.version())
718 .unwrap_or_default();
719 self.try_get_past_object(
720 object_id,
721 version,
722 Some(SuiObjectDataOptions::bcs_lossless()),
723 )
724 .await
725 }
726
727 #[instrument(skip(self))]
728 async fn try_multi_get_past_objects(
729 &self,
730 past_objects: Vec<SuiGetPastObjectRequest>,
731 options: Option<SuiObjectDataOptions>,
732 ) -> RpcResult<Vec<SuiPastObjectResponse>> {
733 with_tracing!(async move {
734 if past_objects.len() <= *QUERY_MAX_RESULT_LIMIT {
735 let mut futures = vec![];
736 for past_object in past_objects {
737 futures.push(self.try_get_past_object(
738 past_object.object_id,
739 past_object.version,
740 options.clone(),
741 ));
742 }
743 let results = join_all(futures).await;
744
745 let (oks, errs): (Vec<_>, Vec<_>) = results.into_iter().partition(Result::is_ok);
746 let success = oks.into_iter().filter_map(Result::ok).collect();
747 let errors: Vec<_> = errs.into_iter().filter_map(Result::err).collect();
748 if !errors.is_empty() {
749 let error_string = errors
750 .iter()
751 .map(|e| e.to_string())
752 .collect::<Vec<String>>()
753 .join("; ");
754 Err(anyhow!("{error_string}").into()) } else {
756 Ok(success)
757 }
758 } else {
759 Err(SuiRpcInputError::SizeLimitExceeded(
760 QUERY_MAX_RESULT_LIMIT.to_string(),
761 ))?
762 }
763 })
764 }
765
766 #[instrument(skip(self))]
767 async fn get_total_transaction_blocks(&self) -> RpcResult<BigInt<u64>> {
768 with_tracing!(async move {
769 Ok(self
770 .state
771 .get_total_transaction_blocks()
772 .map_err(Error::from)?
773 .into()) })
775 }
776
777 #[instrument(skip(self))]
778 async fn get_transaction_block(
779 &self,
780 digest: TransactionDigest,
781 opts: Option<SuiTransactionBlockResponseOptions>,
782 ) -> RpcResult<SuiTransactionBlockResponse> {
783 with_tracing!(async move {
784 let opts = opts.unwrap_or_default();
785 let mut temp_response = IntermediateTransactionResponse::new(digest);
786
787 let transaction_kv_store = self.transaction_kv_store.clone();
789 let transaction = spawn_monitored_task!(async move {
790 let ret = transaction_kv_store.get_tx(digest).await.map_err(|err| {
791 debug!(tx_digest=?digest, "Failed to get transaction: {}", err);
792 Error::from(err)
793 });
794 add_server_timing("tx_kv_lookup");
795 ret
796 })
797 .await
798 .map_err(Error::from)??;
799 let input_objects = transaction
800 .data()
801 .inner()
802 .intent_message
803 .value
804 .input_objects()
805 .unwrap_or_default();
806
807 if opts.require_input() {
809 temp_response.transaction = Some(transaction);
810 }
811
812 if opts.require_effects() {
814 let transaction_kv_store = self.transaction_kv_store.clone();
815 temp_response.effects = Some(
816 spawn_monitored_task!(async move {
817 transaction_kv_store
818 .get_fx_by_tx_digest(digest)
819 .await
820 .map_err(|err| {
821 debug!(tx_digest=?digest, "Failed to get effects: {:?}", err);
822 Error::from(err)
823 })
824 })
825 .await
826 .map_err(Error::from)??,
827 );
828 }
829
830 temp_response.checkpoint_seq = self
831 .transaction_kv_store
832 .deprecated_get_transaction_checkpoint(digest)
833 .await
834 .map_err(|e| {
835 error!("Failed to retrieve checkpoint sequence for transaction {digest:?} with error: {e:?}");
836 Error::from(e)
837 })?;
838
839 if let Some(checkpoint_seq) = &temp_response.checkpoint_seq {
840 let kv_store = self.transaction_kv_store.clone();
841 let checkpoint_seq = *checkpoint_seq;
842 let checkpoint = spawn_monitored_task!(async move {
843 kv_store
844 .get_checkpoint_summary(checkpoint_seq)
846 .await
847 .map_err(|e| {
848 error!("Failed to get checkpoint by sequence number: {checkpoint_seq:?} with error: {e:?}");
849 Error::from(e)
850 })
851 }).await.map_err(Error::from)??;
852 temp_response.timestamp = Some(checkpoint.timestamp_ms);
854 }
855
856 if opts.show_events && temp_response.effects.is_some() {
857 let transaction_kv_store = self.transaction_kv_store.clone();
858 let events = spawn_monitored_task!(async move {
859 transaction_kv_store
860 .multi_get_events_by_tx_digests(&[digest])
861 .await
862 .map_err(|e| {
863 error!("Failed to call get transaction events for transaction: {digest:?} with error {e:?}");
864 Error::from(e)
865 })
866 })
867 .await
868 .map_err(Error::from)??
869 .pop()
870 .flatten();
871 match events {
872 None => temp_response.events = Some(SuiTransactionBlockEvents::default()),
873 Some(events) => match to_sui_transaction_events(self, digest, events) {
874 Ok(e) => temp_response.events = Some(e),
875 Err(e) => temp_response.errors.push(e.to_string()),
876 },
877 }
878 }
879
880 let object_cache =
881 ObjectProviderCache::new((self.state.clone(), self.transaction_kv_store.clone()));
882 if opts.show_balance_changes
883 && let Some(effects) = &temp_response.effects
884 {
885 let balance_changes =
886 get_balance_changes_from_effect(&object_cache, effects, input_objects, None)
887 .await;
888
889 if let Ok(balance_changes) = balance_changes {
890 temp_response.balance_changes = Some(balance_changes);
891 } else {
892 temp_response.errors.push(format!(
893 "Cannot retrieve balance changes: {}",
894 balance_changes.unwrap_err()
895 ));
896 }
897 }
898
899 if opts.show_object_changes
900 && let (Some(effects), Some(input)) =
901 (&temp_response.effects, &temp_response.transaction)
902 {
903 let sender = input.data().intent_message().value.sender();
904 let object_changes = get_object_changes(
905 &object_cache,
906 effects,
907 sender,
908 effects.modified_at_versions(),
909 effects.all_changed_objects(),
910 effects.all_removed_objects(),
911 )
912 .await;
913
914 if let Ok(object_changes) = object_changes {
915 temp_response.object_changes = Some(object_changes);
916 } else {
917 temp_response.errors.push(format!(
918 "Cannot retrieve object changes: {}",
919 object_changes.unwrap_err()
920 ));
921 }
922 }
923 let epoch_store = self.state.load_epoch_store_one_call_per_task();
924 convert_to_response(temp_response, &opts, epoch_store.module_cache())
925 })
926 }
927
928 #[instrument(skip(self))]
929 async fn multi_get_transaction_blocks(
930 &self,
931 digests: Vec<TransactionDigest>,
932 opts: Option<SuiTransactionBlockResponseOptions>,
933 ) -> RpcResult<Vec<SuiTransactionBlockResponse>> {
934 with_tracing!(async move {
935 let cloned_self = self.clone();
936 spawn_monitored_task!(async move {
937 cloned_self
938 .multi_get_transaction_blocks_internal(digests, opts)
939 .await
940 })
941 .await
942 .map_err(Error::from)?
943 })
944 }
945
946 #[instrument(skip(self))]
947 async fn get_events(&self, transaction_digest: TransactionDigest) -> RpcResult<Vec<SuiEvent>> {
948 with_tracing!(async move {
949 let state = self.state.clone();
950 let transaction_kv_store = self.transaction_kv_store.clone();
951 spawn_monitored_task!(async move{
952 let store = state.load_epoch_store_one_call_per_task();
953 let events = transaction_kv_store
954 .multi_get_events_by_tx_digests(&[transaction_digest])
955 .await
956 .map_err(
957 |e| {
958 error!("Failed to get transaction events for transaction {transaction_digest:?} with error: {e:?}");
959 Error::StateReadError(e.into())
960 })?
961 .pop()
962 .flatten();
963 Ok(match events {
964 Some(events) => events
965 .data
966 .into_iter()
967 .enumerate()
968 .map(|(seq, e)| {
969 let layout = store.executor().type_layout_resolver(Box::new(&state.get_backing_package_store().as_ref())).get_annotated_layout(&e.type_)?;
970 SuiEvent::try_from(e, transaction_digest, seq as u64, None, layout)
971 })
972 .collect::<Result<Vec<_>, _>>()
973 .map_err(Error::SuiError)?,
974 None => vec![],
975 })
976 }).await.map_err(Error::from)?
977 })
978 }
979
980 #[instrument(skip(self))]
981 async fn get_latest_checkpoint_sequence_number(&self) -> RpcResult<BigInt<u64>> {
982 with_tracing!(async move {
983 Ok(self
984 .state
985 .get_latest_checkpoint_sequence_number()
986 .map_err(|e| {
987 SuiRpcInputError::GenericNotFound(format!(
988 "Latest checkpoint sequence number was not found with error :{e}"
989 ))
990 })?
991 .into())
992 })
993 }
994
995 #[instrument(skip(self))]
996 async fn get_checkpoint(&self, id: CheckpointId) -> RpcResult<Checkpoint> {
997 with_tracing!(self.get_checkpoint_internal(id))
998 }
999
1000 #[instrument(skip(self))]
1001 async fn get_checkpoints(
1002 &self,
1003 cursor: Option<BigInt<u64>>,
1005 limit: Option<usize>,
1006 descending_order: bool,
1007 ) -> RpcResult<CheckpointPage> {
1008 with_tracing!(async move {
1009 let limit = validate_limit(limit, QUERY_MAX_RESULT_LIMIT_CHECKPOINTS)
1010 .map_err(SuiRpcInputError::from)?;
1011
1012 let state = self.state.clone();
1013 let kv_store = self.transaction_kv_store.clone();
1014
1015 self.metrics.get_checkpoints_limit.observe(limit as f64);
1016
1017 let mut data = spawn_monitored_task!(Self::get_checkpoints_internal(
1018 state,
1019 kv_store,
1020 cursor.map(|s| *s),
1021 limit as u64 + 1,
1022 descending_order,
1023 ))
1024 .await
1025 .map_err(Error::from)?
1026 .map_err(Error::from)?;
1027
1028 let has_next_page = data.len() > limit;
1029 data.truncate(limit);
1030
1031 let next_cursor = if has_next_page {
1032 data.last().cloned().map(|d| d.sequence_number.into())
1033 } else {
1034 None
1035 };
1036
1037 self.metrics
1038 .get_checkpoints_result_size
1039 .observe(data.len() as f64);
1040 self.metrics
1041 .get_checkpoints_result_size_total
1042 .inc_by(data.len() as u64);
1043
1044 Ok(CheckpointPage {
1045 data,
1046 next_cursor,
1047 has_next_page,
1048 })
1049 })
1050 }
1051
1052 #[instrument(skip(self))]
1053 async fn get_protocol_config(
1054 &self,
1055 version: Option<BigInt<u64>>,
1056 ) -> RpcResult<ProtocolConfigResponse> {
1057 with_tracing!(async move {
1058 version
1059 .map(|v| {
1060 ProtocolConfig::get_for_version_if_supported(
1061 (*v).into(),
1062 self.state.get_chain_identifier()?.chain(),
1063 )
1064 .ok_or(SuiRpcInputError::ProtocolVersionUnsupported(
1065 ProtocolVersion::MIN.as_u64(),
1066 ProtocolVersion::MAX.as_u64(),
1067 ))
1068 .map_err(Error::from)
1069 })
1070 .unwrap_or(Ok(self
1071 .state
1072 .load_epoch_store_one_call_per_task()
1073 .protocol_config()
1074 .clone()))
1075 .map(ProtocolConfigResponse::from)
1076 })
1077 }
1078
1079 #[instrument(skip(self))]
1080 async fn get_chain_identifier(&self) -> RpcResult<String> {
1081 with_tracing!(async move {
1082 let ci = self.state.get_chain_identifier()?;
1083 Ok(ci.to_string())
1084 })
1085 }
1086 #[instrument(skip(self))]
1087 async fn verify_zklogin_signature(
1088 &self,
1089 bytes: String,
1090 signature: String,
1091 intent_scope: ZkLoginIntentScope,
1092 author: SuiAddress,
1093 ) -> RpcResult<ZkLoginVerifyResult> {
1094 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1095 let curr_epoch = epoch_store.epoch();
1096 let zklogin_env_native = match self
1097 .state
1098 .get_chain_identifier()
1099 .expect("get chain identifier should not fail")
1100 .chain()
1101 {
1102 sui_protocol_config::Chain::Mainnet | sui_protocol_config::Chain::Testnet => {
1103 ZkLoginEnv::Prod
1104 }
1105 _ => ZkLoginEnv::Test,
1106 };
1107 let GenericSignature::ZkLoginAuthenticator(zklogin_sig) =
1108 GenericSignature::from_bytes(&Base64::decode(&signature).map_err(Error::from)?)
1109 .map_err(Error::from)?
1110 else {
1111 return Err(SuiRpcInputError::GenericNotFound(
1112 "Endpoint only supports zkLogin signature".to_string(),
1113 )
1114 .into());
1115 };
1116
1117 let new_jwks =
1118 match get_authenticator_state(self.state.get_object_store()).map_err(Error::from)? {
1119 Some(authenticator_state) => authenticator_state.active_jwks,
1120 None => {
1121 return Err(SuiRpcInputError::GenericNotFound(
1122 "Authenticator state not found".to_string(),
1123 )
1124 .into());
1125 }
1126 };
1127
1128 let mut oidc_provider_jwks = ImHashMap::new();
1130 for active_jwk in new_jwks.iter() {
1131 let ActiveJwk { jwk_id, jwk, .. } = active_jwk;
1132 match oidc_provider_jwks.entry(jwk_id.clone()) {
1133 im::hashmap::Entry::Occupied(_) => {
1134 warn!("JWK with kid {:?} already exists", jwk_id);
1135 }
1136 im::hashmap::Entry::Vacant(entry) => {
1137 entry.insert(jwk.clone());
1138 }
1139 }
1140 }
1141 let verify_params = VerifyParams::new(
1142 oidc_provider_jwks,
1143 vec![],
1144 zklogin_env_native,
1145 true,
1146 true,
1147 true,
1148 Some(30),
1149 true,
1150 );
1151 match intent_scope {
1152 ZkLoginIntentScope::TransactionData => {
1153 let tx_data: TransactionData =
1154 bcs::from_bytes(&Base64::decode(&bytes).map_err(Error::from)?)
1155 .map_err(Error::from)?;
1156 let intent_msg = IntentMessage::new(Intent::sui_transaction(), tx_data.clone());
1157 let sig = GenericSignature::ZkLoginAuthenticator(zklogin_sig);
1158 match sig.verify_authenticator(
1159 &intent_msg,
1160 author,
1161 curr_epoch,
1162 &verify_params,
1163 Arc::new(VerifiedDigestCache::new_empty()),
1164 ) {
1165 Ok(_) => Ok(ZkLoginVerifyResult {
1166 success: true,
1167 errors: vec![],
1168 }),
1169 Err(e) => Ok(ZkLoginVerifyResult {
1170 success: false,
1171 errors: vec![e.to_string()],
1172 }),
1173 }
1174 }
1175 ZkLoginIntentScope::PersonalMessage => {
1176 let data = PersonalMessage {
1177 message: Base64::decode(&bytes).map_err(Error::from)?,
1178 };
1179 let intent_msg = IntentMessage::new(Intent::personal_message(), data);
1180
1181 let sig = GenericSignature::ZkLoginAuthenticator(zklogin_sig);
1182 match sig.verify_authenticator(
1183 &intent_msg,
1184 author,
1185 curr_epoch,
1186 &verify_params,
1187 Arc::new(VerifiedDigestCache::new_empty()),
1188 ) {
1189 Ok(_) => Ok(ZkLoginVerifyResult {
1190 success: true,
1191 errors: vec![],
1192 }),
1193 Err(e) => Ok(ZkLoginVerifyResult {
1194 success: false,
1195 errors: vec![e.to_string()],
1196 }),
1197 }
1198 }
1199 }
1200 }
1201}
1202
1203impl SuiRpcModule for ReadApi {
1204 fn rpc(self) -> RpcModule<Self> {
1205 self.into_rpc()
1206 }
1207
1208 fn rpc_doc_module() -> Module {
1209 ReadApiOpenRpc::module_doc()
1210 }
1211}
1212
1213#[instrument(skip_all)]
1214fn to_sui_transaction_events(
1215 fullnode_api: &ReadApi,
1216 tx_digest: TransactionDigest,
1217 events: TransactionEvents,
1218) -> Result<SuiTransactionBlockEvents, Error> {
1219 let epoch_store = fullnode_api.state.load_epoch_store_one_call_per_task();
1220 let backing_package_store = fullnode_api.state.get_backing_package_store();
1221 let mut layout_resolver = epoch_store
1222 .executor()
1223 .type_layout_resolver(Box::new(backing_package_store.as_ref()));
1224 Ok(SuiTransactionBlockEvents::try_from(
1225 events,
1226 tx_digest,
1227 None,
1228 layout_resolver.as_mut(),
1229 )?)
1230}
1231
1232#[derive(Debug, thiserror::Error)]
1233pub enum ObjectDisplayError {
1234 #[error("Not a move struct")]
1235 NotMoveStruct,
1236
1237 #[error("Failed to extract layout")]
1238 Layout,
1239
1240 #[error("Failed to extract Move object")]
1241 MoveObject,
1242
1243 #[error(transparent)]
1244 Deserialization(#[from] SuiError),
1245
1246 #[error("Failed to deserialize 'VersionUpdatedEvent': {0}")]
1247 Bcs(#[from] bcs::Error),
1248
1249 #[error(transparent)]
1250 StateReadError(#[from] StateReadError),
1251}
1252
1253#[instrument(skip(fullnode_api, kv_store))]
1254async fn get_display_fields(
1255 fullnode_api: &ReadApi,
1256 kv_store: &Arc<TransactionKeyValueStore>,
1257 original_object: &Object,
1258 original_layout: &Option<MoveStructLayout>,
1259) -> Result<DisplayFieldsResponse, ObjectDisplayError> {
1260 let Some(layout) = original_layout else {
1261 return Ok(DisplayFieldsResponse {
1262 data: None,
1263 error: None,
1264 });
1265 };
1266
1267 let Some(move_object) = original_object.data.try_as_move() else {
1268 return Err(ObjectDisplayError::MoveObject);
1269 };
1270
1271 let Some(display_object) =
1272 get_display_object_by_type(kv_store, fullnode_api, &layout.type_).await?
1273 else {
1274 return Ok(DisplayFieldsResponse {
1275 data: None,
1276 error: None,
1277 });
1278 };
1279
1280 let format = match Format::parse(MAX_DISPLAY_NESTED_LEVEL, &display_object.fields) {
1281 Ok(format) => format,
1282 Err(e) => {
1283 return Ok(DisplayFieldsResponse {
1284 data: None,
1285 error: Some(SuiObjectResponseError::DisplayError {
1286 error: e.to_string(),
1287 }),
1288 });
1289 }
1290 };
1291
1292 let layout = MoveTypeLayout::Struct(Box::new(layout.clone()));
1293 let display = match format.display(*MAX_DISPLAY_OUTPUT_SIZE, move_object.contents(), &layout) {
1294 Ok(fields) => fields,
1295 Err(e) => {
1296 return Ok(DisplayFieldsResponse {
1297 data: None,
1298 error: Some(SuiObjectResponseError::DisplayError {
1299 error: e.to_string(),
1300 }),
1301 });
1302 }
1303 };
1304
1305 let mut fields = BTreeMap::new();
1306 let mut errors = vec![];
1307
1308 for (key, value) in display {
1309 match value {
1310 Ok(v) => {
1311 fields.insert(key, v);
1312 }
1313 Err(e) => {
1314 errors.push(e.to_string());
1315 }
1316 }
1317 }
1318
1319 Ok(DisplayFieldsResponse {
1320 data: (!fields.is_empty()).then_some(fields),
1321 error: (!errors.is_empty()).then(|| SuiObjectResponseError::DisplayError {
1322 error: errors.join("; "),
1323 }),
1324 })
1325}
1326
1327#[instrument(skip(kv_store, fullnode_api))]
1328async fn get_display_object_by_type(
1329 kv_store: &Arc<TransactionKeyValueStore>,
1330 fullnode_api: &ReadApi,
1331 object_type: &StructTag,
1332 ) -> Result<Option<DisplayVersionUpdatedEvent>, ObjectDisplayError> {
1334 let mut events = fullnode_api
1335 .state
1336 .query_events(
1337 kv_store,
1338 EventFilter::MoveEventType(DisplayVersionUpdatedEvent::type_(object_type)),
1339 None,
1340 1,
1341 true,
1342 )
1343 .await?;
1344
1345 if let Some(event) = events.pop() {
1348 let display: DisplayVersionUpdatedEvent = bcs::from_bytes(&event.bcs.into_bytes())?;
1349 Ok(Some(display))
1350 } else {
1351 Ok(None)
1352 }
1353}
1354
1355#[instrument(skip_all)]
1356fn convert_to_response(
1357 cache: IntermediateTransactionResponse,
1358 opts: &SuiTransactionBlockResponseOptions,
1359 module_cache: &impl GetModule,
1360) -> RpcInterimResult<SuiTransactionBlockResponse> {
1361 let mut response = SuiTransactionBlockResponse::new(cache.digest);
1362 response.errors = cache.errors;
1363
1364 if let Some(transaction) = cache.transaction {
1365 if opts.show_raw_input {
1366 response.raw_transaction = bcs::to_bytes(transaction.data()).map_err(|e| {
1367 anyhow!("Failed to serialize raw transaction with error: {e}")
1369 })?;
1370 }
1371
1372 if opts.show_input {
1373 response.transaction = Some(SuiTransactionBlock::try_from(
1374 transaction.into_data(),
1375 module_cache,
1376 )?);
1377 }
1378 }
1379
1380 if let Some(effects) = cache.effects {
1381 if opts.show_raw_effects {
1382 response.raw_effects = bcs::to_bytes(&effects).map_err(|e| {
1383 anyhow!("Failed to serialize transaction block effects with error: {e}")
1385 })?;
1386 }
1387
1388 if opts.show_effects {
1389 response.effects = Some(effects.try_into().map_err(|e| {
1390 anyhow!("Failed to convert transaction block effects with error: {e}")
1392 })?);
1393 }
1394 }
1395
1396 response.checkpoint = cache.checkpoint_seq;
1397 response.timestamp_ms = cache.timestamp;
1398
1399 if opts.show_events {
1400 response.events = cache.events;
1401 }
1402
1403 if opts.show_balance_changes {
1404 response.balance_changes = cache.balance_changes;
1405 }
1406
1407 if opts.show_object_changes {
1408 response.object_changes = cache.object_changes;
1409 }
1410
1411 Ok(response)
1412}
1413
1414fn calculate_checkpoint_numbers(
1415 cursor: Option<CheckpointSequenceNumber>,
1417 limit: u64,
1418 descending_order: bool,
1419 max_checkpoint: CheckpointSequenceNumber,
1420) -> Vec<CheckpointSequenceNumber> {
1421 let (start_index, end_index) = match cursor {
1422 Some(t) => {
1423 if descending_order {
1424 let start = std::cmp::min(t.saturating_sub(1), max_checkpoint);
1425 let end = start.saturating_sub(limit - 1);
1426 (end, start)
1427 } else {
1428 let start =
1429 std::cmp::min(t.checked_add(1).unwrap_or(max_checkpoint), max_checkpoint);
1430 let end = std::cmp::min(
1431 start.checked_add(limit - 1).unwrap_or(max_checkpoint),
1432 max_checkpoint,
1433 );
1434 (start, end)
1435 }
1436 }
1437 None => {
1438 if descending_order {
1439 (max_checkpoint.saturating_sub(limit - 1), max_checkpoint)
1440 } else {
1441 (0, std::cmp::min(limit - 1, max_checkpoint))
1442 }
1443 }
1444 };
1445
1446 if descending_order {
1447 (start_index..=end_index).rev().collect()
1448 } else {
1449 (start_index..=end_index).collect()
1450 }
1451}
1452
1453#[cfg(test)]
1454mod tests {
1455 use super::*;
1456
1457 #[test]
1458 fn test_calculate_checkpoint_numbers() {
1459 let cursor = Some(10);
1460 let limit = 5;
1461 let descending_order = true;
1462 let max_checkpoint = 15;
1463
1464 let checkpoint_numbers =
1465 calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1466
1467 assert_eq!(checkpoint_numbers, vec![9, 8, 7, 6, 5]);
1468 }
1469
1470 #[test]
1471 fn test_calculate_checkpoint_numbers_descending_no_cursor() {
1472 let cursor = None;
1473 let limit = 5;
1474 let descending_order = true;
1475 let max_checkpoint = 15;
1476
1477 let checkpoint_numbers =
1478 calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1479
1480 assert_eq!(checkpoint_numbers, vec![15, 14, 13, 12, 11]);
1481 }
1482
1483 #[test]
1484 fn test_calculate_checkpoint_numbers_ascending_no_cursor() {
1485 let cursor = None;
1486 let limit = 5;
1487 let descending_order = false;
1488 let max_checkpoint = 15;
1489
1490 let checkpoint_numbers =
1491 calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1492
1493 assert_eq!(checkpoint_numbers, vec![0, 1, 2, 3, 4]);
1494 }
1495
1496 #[test]
1497 fn test_calculate_checkpoint_numbers_ascending_with_cursor() {
1498 let cursor = Some(10);
1499 let limit = 5;
1500 let descending_order = false;
1501 let max_checkpoint = 15;
1502
1503 let checkpoint_numbers =
1504 calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1505
1506 assert_eq!(checkpoint_numbers, vec![11, 12, 13, 14, 15]);
1507 }
1508
1509 #[test]
1510 fn test_calculate_checkpoint_numbers_ascending_limit_exceeds_max() {
1511 let cursor = None;
1512 let limit = 20;
1513 let descending_order = false;
1514 let max_checkpoint = 15;
1515
1516 let checkpoint_numbers =
1517 calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1518
1519 assert_eq!(checkpoint_numbers, (0..=15).collect::<Vec<_>>());
1520 }
1521
1522 #[test]
1523 fn test_calculate_checkpoint_numbers_descending_limit_exceeds_max() {
1524 let cursor = None;
1525 let limit = 20;
1526 let descending_order = true;
1527 let max_checkpoint = 15;
1528
1529 let checkpoint_numbers =
1530 calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1531
1532 assert_eq!(checkpoint_numbers, (0..=15).rev().collect::<Vec<_>>());
1533 }
1534}