sui_json_rpc/
read_api.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::{BTreeMap, HashMap};
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::anyhow;
9use async_trait::async_trait;
10use backoff::ExponentialBackoff;
11use backoff::future::retry;
12use fastcrypto::encoding::Base64;
13use fastcrypto_zkp::bn254::zk_login_api::ZkLoginEnv;
14use futures::future::join_all;
15use im::hashmap::HashMap as ImHashMap;
16use indexmap::map::IndexMap;
17use itertools::Itertools;
18use jsonrpsee::RpcModule;
19use jsonrpsee::core::RpcResult;
20use move_bytecode_utils::module_cache::GetModule;
21use move_core_types::annotated_value::{MoveStructLayout, MoveTypeLayout};
22use move_core_types::language_storage::StructTag;
23use once_cell::sync::Lazy;
24use shared_crypto::intent::{IntentMessage, PersonalMessage};
25use sui_display::v1::Format;
26use sui_json_rpc_types::ZkLoginIntentScope;
27use sui_types::base_types::SuiAddress;
28use sui_types::signature::{GenericSignature, VerifyParams};
29use sui_types::signature_verification::VerifiedDigestCache;
30use sui_types::storage::ObjectKey;
31use tap::TapFallible;
32use tracing::{debug, error, info, instrument, trace, warn};
33
34use mysten_metrics::add_server_timing;
35use mysten_metrics::spawn_monitored_task;
36use sui_core::authority::AuthorityState;
37use sui_json_rpc_api::{
38    JsonRpcMetrics, QUERY_MAX_RESULT_LIMIT, QUERY_MAX_RESULT_LIMIT_CHECKPOINTS, ReadApiOpenRpc,
39    ReadApiServer, validate_limit,
40};
41use sui_json_rpc_types::{
42    BalanceChange, Checkpoint, CheckpointId, CheckpointPage, DisplayFieldsResponse, EventFilter,
43    ObjectChange, ProtocolConfigResponse, SuiEvent, SuiGetPastObjectRequest, SuiObjectDataOptions,
44    SuiObjectResponse, SuiPastObjectResponse, SuiTransactionBlock, SuiTransactionBlockEvents,
45    SuiTransactionBlockResponse, SuiTransactionBlockResponseOptions,
46};
47use sui_open_rpc::Module;
48use sui_protocol_config::{ProtocolConfig, ProtocolVersion};
49use sui_storage::key_value_store::TransactionKeyValueStore;
50use sui_types::base_types::{ObjectID, SequenceNumber, TransactionDigest};
51use sui_types::crypto::AggregateAuthoritySignature;
52use sui_types::display::DisplayVersionUpdatedEvent;
53use sui_types::effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents};
54use sui_types::error::{SuiError, SuiObjectResponseError};
55use sui_types::messages_checkpoint::{
56    CheckpointContents, CheckpointSequenceNumber, CheckpointSummary, CheckpointTimestamp,
57};
58use sui_types::object::{Object, ObjectRead, PastObjectRead};
59use sui_types::sui_serde::BigInt;
60use sui_types::transaction::TransactionDataAPI;
61use sui_types::transaction::{Transaction, TransactionData};
62
63use crate::authority_state::{StateRead, StateReadError, StateReadResult};
64use crate::error::{Error, RpcInterimResult, SuiRpcInputError};
65use crate::{ObjectProvider, with_tracing};
66use crate::{
67    ObjectProviderCache, SuiRpcModule, get_balance_changes_from_effect, get_object_changes,
68};
69use fastcrypto::encoding::Encoding;
70use fastcrypto::traits::ToFromBytes;
71use shared_crypto::intent::Intent;
72use sui_json_rpc_types::ZkLoginVerifyResult;
73use sui_types::authenticator_state::{ActiveJwk, get_authenticator_state};
74
75/// A field access in a  Display string cannot exceed this level of nesting.
76const MAX_DISPLAY_NESTED_LEVEL: usize = 10;
77
78/// Default budget for Display output size.
79const DEFAULT_MAX_DISPLAY_OUTPUT_SIZE: usize = 1024 * 1024;
80
81/// Overall display output cannot exceed this size.
82static MAX_DISPLAY_OUTPUT_SIZE: Lazy<usize> = Lazy::new(|| {
83    let max_opt = std::env::var("MAX_DISPLAY_OUTPUT_SIZE")
84        .ok()
85        .and_then(|s| s.parse().ok());
86
87    if let Some(max) = max_opt {
88        info!("Using custom value for 'MAX_DISPLAY_OUTPUT_SIZE': {max}");
89        max
90    } else {
91        DEFAULT_MAX_DISPLAY_OUTPUT_SIZE
92    }
93});
94
95// An implementation of the read portion of the JSON-RPC interface intended for use in
96// Fullnodes.
97#[derive(Clone)]
98pub struct ReadApi {
99    pub state: Arc<dyn StateRead>,
100    pub transaction_kv_store: Arc<TransactionKeyValueStore>,
101    pub metrics: Arc<JsonRpcMetrics>,
102}
103
104// Internal data structure to make it easy to work with data returned from
105// authority store and also enable code sharing between get_transaction_with_options,
106// multi_get_transaction_with_options, etc.
107#[derive(Default)]
108struct IntermediateTransactionResponse {
109    digest: TransactionDigest,
110    transaction: Option<Transaction>,
111    effects: Option<TransactionEffects>,
112    events: Option<SuiTransactionBlockEvents>,
113    checkpoint_seq: Option<CheckpointSequenceNumber>,
114    balance_changes: Option<Vec<BalanceChange>>,
115    object_changes: Option<Vec<ObjectChange>>,
116    timestamp: Option<CheckpointTimestamp>,
117    errors: Vec<String>,
118}
119
120impl IntermediateTransactionResponse {
121    pub fn new(digest: TransactionDigest) -> Self {
122        Self {
123            digest,
124            ..Default::default()
125        }
126    }
127
128    pub fn transaction(&self) -> &Option<Transaction> {
129        &self.transaction
130    }
131}
132
133impl ReadApi {
134    pub fn new(
135        state: Arc<AuthorityState>,
136        transaction_kv_store: Arc<TransactionKeyValueStore>,
137        metrics: Arc<JsonRpcMetrics>,
138    ) -> Self {
139        Self {
140            state,
141            transaction_kv_store,
142            metrics,
143        }
144    }
145
146    async fn get_checkpoint_internal(&self, id: CheckpointId) -> Result<Checkpoint, Error> {
147        Ok(match id {
148            CheckpointId::SequenceNumber(seq) => {
149                let verified_summary = self
150                    .transaction_kv_store
151                    .get_checkpoint_summary(seq)
152                    .await?;
153                let content = self
154                    .transaction_kv_store
155                    .get_checkpoint_contents(verified_summary.sequence_number)
156                    .await?;
157                let signature = verified_summary.auth_sig().signature.clone();
158                (verified_summary.into_data(), content, signature).into()
159            }
160            CheckpointId::Digest(digest) => {
161                let verified_summary = self
162                    .transaction_kv_store
163                    .get_checkpoint_summary_by_digest(digest)
164                    .await?;
165                let content = self
166                    .transaction_kv_store
167                    .get_checkpoint_contents(verified_summary.sequence_number)
168                    .await?;
169                let signature = verified_summary.auth_sig().signature.clone();
170                (verified_summary.into_data(), content, signature).into()
171            }
172        })
173    }
174
175    pub async fn get_checkpoints_internal(
176        state: Arc<dyn StateRead>,
177        transaction_kv_store: Arc<TransactionKeyValueStore>,
178        // If `Some`, the query will start from the next item after the specified cursor
179        cursor: Option<CheckpointSequenceNumber>,
180        limit: u64,
181        descending_order: bool,
182    ) -> StateReadResult<Vec<Checkpoint>> {
183        let max_checkpoint = state.get_latest_checkpoint_sequence_number()?;
184        let checkpoint_numbers =
185            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
186
187        let verified_checkpoints = transaction_kv_store
188            .multi_get_checkpoints_summaries(&checkpoint_numbers)
189            .await?;
190
191        let checkpoint_summaries_and_signatures: Vec<(
192            CheckpointSummary,
193            AggregateAuthoritySignature,
194        )> = verified_checkpoints
195            .into_iter()
196            .flatten()
197            .map(|check| {
198                (
199                    check.clone().into_summary_and_sequence().1,
200                    check.get_validator_signature(),
201                )
202            })
203            .collect();
204
205        let checkpoint_contents = transaction_kv_store
206            .multi_get_checkpoints_contents(&checkpoint_numbers)
207            .await?;
208        let contents: Vec<CheckpointContents> = checkpoint_contents.into_iter().flatten().collect();
209
210        let mut checkpoints: Vec<Checkpoint> = vec![];
211
212        for (summary_and_sig, content) in checkpoint_summaries_and_signatures
213            .into_iter()
214            .zip(contents.into_iter())
215        {
216            checkpoints.push(Checkpoint::from((
217                summary_and_sig.0,
218                content,
219                summary_and_sig.1,
220            )));
221        }
222
223        Ok(checkpoints)
224    }
225
226    #[instrument(skip_all)]
227    async fn multi_get_transaction_blocks_internal(
228        &self,
229        digests: Vec<TransactionDigest>,
230        opts: Option<SuiTransactionBlockResponseOptions>,
231    ) -> Result<Vec<SuiTransactionBlockResponse>, Error> {
232        trace!("start");
233
234        let num_digests = digests.len();
235        if num_digests > *QUERY_MAX_RESULT_LIMIT {
236            Err(SuiRpcInputError::SizeLimitExceeded(
237                QUERY_MAX_RESULT_LIMIT.to_string(),
238            ))?
239        }
240        self.metrics
241            .get_tx_blocks_limit
242            .observe(digests.len() as f64);
243
244        let opts = opts.unwrap_or_default();
245
246        // use LinkedHashMap to dedup and can iterate in insertion order.
247        let mut temp_response: IndexMap<&TransactionDigest, IntermediateTransactionResponse> =
248            IndexMap::from_iter(
249                digests
250                    .iter()
251                    .map(|k| (k, IntermediateTransactionResponse::new(*k))),
252            );
253        if temp_response.len() < num_digests {
254            Err(SuiRpcInputError::ContainsDuplicates)?
255        }
256
257        if opts.require_input() {
258            trace!("getting input");
259            let digests_clone = digests.clone();
260            let transactions =
261                self.transaction_kv_store.multi_get_tx(&digests_clone).await.tap_err(
262                    |err| debug!(digests=?digests_clone, "Failed to multi get transactions: {:?}", err),
263                )?;
264
265            for ((_digest, cache_entry), txn) in
266                temp_response.iter_mut().zip(transactions.into_iter())
267            {
268                cache_entry.transaction = txn;
269            }
270        }
271
272        // Fetch effects when `show_events` is true because events relies on effects
273        if opts.require_effects() {
274            trace!("getting effects");
275            let digests_clone = digests.clone();
276            let effects_list = self.transaction_kv_store
277                .multi_get_fx_by_tx_digest(&digests_clone)
278                .await
279                .tap_err(
280                    |err| debug!(digests=?digests_clone, "Failed to multi get effects for transactions: {:?}", err),
281                )?;
282            for ((_digest, cache_entry), e) in
283                temp_response.iter_mut().zip(effects_list.into_iter())
284            {
285                cache_entry.effects = e;
286            }
287        }
288
289        trace!("getting checkpoint sequence numbers");
290        let checkpoint_seq_list = self
291            .transaction_kv_store
292            .multi_get_transaction_checkpoint(&digests)
293            .await
294            .tap_err(
295                |err| debug!(digests=?digests, "Failed to multi get checkpoint sequence number: {:?}", err))?;
296        for ((_digest, cache_entry), seq) in temp_response
297            .iter_mut()
298            .zip(checkpoint_seq_list.into_iter())
299        {
300            cache_entry.checkpoint_seq = seq;
301        }
302
303        let unique_checkpoint_numbers = temp_response
304            .values()
305            .filter_map(|cache_entry| cache_entry.checkpoint_seq)
306            // It's likely that many transactions have the same checkpoint, so we don't
307            // need to over-fetch
308            .unique()
309            .collect::<Vec<CheckpointSequenceNumber>>();
310
311        // fetch timestamp from the DB
312        trace!("getting checkpoint summaries");
313        let timestamps = self
314            .transaction_kv_store
315            .multi_get_checkpoints_summaries(&unique_checkpoint_numbers)
316            .await
317            .map_err(|e| {
318                Error::UnexpectedError(format!("Failed to fetch checkpoint summaries by these checkpoint ids: {unique_checkpoint_numbers:?} with error: {e:?}"))
319            })?
320            .into_iter()
321            .map(|c| c.map(|checkpoint| checkpoint.timestamp_ms));
322
323        // construct a hashmap of checkpoint -> timestamp for fast lookup
324        let checkpoint_to_timestamp = unique_checkpoint_numbers
325            .into_iter()
326            .zip(timestamps)
327            .collect::<HashMap<_, _>>();
328
329        // fill cache with the timestamp
330        for (_, cache_entry) in temp_response.iter_mut() {
331            if cache_entry.checkpoint_seq.is_some() {
332                // safe to unwrap because is_some is checked
333                cache_entry.timestamp = *checkpoint_to_timestamp
334                    .get(cache_entry.checkpoint_seq.as_ref().unwrap())
335                    // Safe to unwrap because checkpoint_seq is guaranteed to exist in checkpoint_to_timestamp
336                    .unwrap();
337            }
338        }
339
340        if opts.show_events {
341            trace!("getting events");
342            let mut non_empty_digests = vec![];
343            for cache_entry in temp_response.values() {
344                if let Some(effects) = &cache_entry.effects
345                    && effects.events_digest().is_some()
346                {
347                    non_empty_digests.push(cache_entry.digest);
348                }
349            }
350            // fetch events from the DB with retry, retry each 0.5s for 3s
351            let backoff = ExponentialBackoff {
352                max_elapsed_time: Some(Duration::from_secs(3)),
353                multiplier: 1.0,
354                ..ExponentialBackoff::default()
355            };
356            let mut events = retry(backoff, || async {
357                match self
358                    .transaction_kv_store
359                    .multi_get_events_by_tx_digests(&non_empty_digests)
360                    .await
361                {
362                    // Only return Ok when all the queried transaction events are found, otherwise retry
363                    // until timeout, then return Err.
364                    Ok(events) if !events.contains(&None) => Ok(events),
365                    Ok(_) => Err(backoff::Error::transient(Error::UnexpectedError(
366                        "Events not found, transaction execution may be incomplete.".into(),
367                    ))),
368                    Err(e) => Err(backoff::Error::permanent(Error::UnexpectedError(format!(
369                        "Failed to call multi_get_events: {e:?}"
370                    )))),
371                }
372            })
373            .await
374            .map_err(|e| {
375                Error::UnexpectedError(format!(
376                    "Retrieving events with retry failed for transaction digests {digests:?}: {e:?}"
377                ))
378            })?
379            .into_iter();
380
381            // fill cache with the events
382            for (_, cache_entry) in temp_response.iter_mut() {
383                let transaction_digest = cache_entry.digest;
384                if let Some(events_digest) =
385                    cache_entry.effects.as_ref().and_then(|e| e.events_digest())
386                {
387                    match events.next() {
388                        Some(Some(ev)) => {
389                            cache_entry.events =
390                                Some(to_sui_transaction_events(self, cache_entry.digest, ev)?)
391                        }
392                        None | Some(None) => {
393                            error!(
394                                "Failed to fetch events with event digest {events_digest:?} for txn {transaction_digest}"
395                            );
396                            cache_entry.errors.push(format!(
397                                "Failed to fetch events with event digest {events_digest:?}",
398                            ))
399                        }
400                    }
401                } else {
402                    // events field will be Some if and only if `show_events` is true and
403                    // there is no error in converting fetching events
404                    cache_entry.events = Some(SuiTransactionBlockEvents::default());
405                }
406            }
407        }
408
409        let mut object_cache =
410            ObjectProviderCache::new((self.state.clone(), self.transaction_kv_store.clone()));
411
412        // Prefetch the objects if we need to show balance or object changes
413        if opts.show_balance_changes || opts.show_object_changes {
414            let mut keys = vec![];
415            for resp in temp_response.values() {
416                let effects = resp.effects.as_ref().ok_or_else(|| {
417                    SuiRpcInputError::GenericNotFound(
418                        "unable to derive balance/object changes because effect is empty"
419                            .to_string(),
420                    )
421                })?;
422
423                for change in effects.object_changes() {
424                    if let Some(input_version) = change.input_version {
425                        keys.push(ObjectKey(change.id, input_version));
426                    }
427                    if let Some(output_version) = change.output_version {
428                        keys.push(ObjectKey(change.id, output_version));
429                    }
430                }
431            }
432
433            let objects = self
434                .transaction_kv_store
435                .multi_get_objects(&keys)
436                .await?
437                .into_iter()
438                .flatten()
439                .collect::<Vec<_>>();
440
441            object_cache.insert_objects_into_cache(objects);
442        }
443
444        if opts.show_balance_changes {
445            trace!("getting balance changes");
446
447            let mut results = vec![];
448            for resp in temp_response.values() {
449                let input_objects = if let Some(tx) = resp.transaction() {
450                    tx.data()
451                        .inner()
452                        .intent_message
453                        .value
454                        .input_objects()
455                        .unwrap_or_default()
456                } else {
457                    // don't have the input tx, so not much we can do. perhaps this is an Err?
458                    Vec::new()
459                };
460                results.push(get_balance_changes_from_effect(
461                    &object_cache,
462                    resp.effects.as_ref().ok_or_else(|| {
463                        SuiRpcInputError::GenericNotFound(
464                            "unable to derive balance changes because effect is empty".to_string(),
465                        )
466                    })?,
467                    input_objects,
468                    None,
469                ));
470            }
471            let results = join_all(results).await;
472            for (result, entry) in results.into_iter().zip(temp_response.iter_mut()) {
473                match result {
474                    Ok(balance_changes) => entry.1.balance_changes = Some(balance_changes),
475                    Err(e) => entry
476                        .1
477                        .errors
478                        .push(format!("Failed to fetch balance changes {e:?}")),
479                }
480            }
481        }
482
483        if opts.show_object_changes {
484            trace!("getting object changes");
485
486            let mut results = vec![];
487            for resp in temp_response.values() {
488                let effects = resp.effects.as_ref().ok_or_else(|| {
489                    SuiRpcInputError::GenericNotFound(
490                        "unable to derive object changes because effect is empty".to_string(),
491                    )
492                })?;
493
494                results.push(get_object_changes(
495                    &object_cache,
496                    effects,
497                    resp.transaction
498                        .as_ref()
499                        .ok_or_else(|| {
500                            SuiRpcInputError::GenericNotFound(
501                                "unable to derive object changes because transaction is empty"
502                                    .to_string(),
503                            )
504                        })?
505                        .data()
506                        .intent_message()
507                        .value
508                        .sender(),
509                    effects.modified_at_versions(),
510                    effects.all_changed_objects(),
511                    effects.all_removed_objects(),
512                ));
513            }
514            let results = join_all(results).await;
515            for (result, entry) in results.into_iter().zip(temp_response.iter_mut()) {
516                match result {
517                    Ok(object_changes) => entry.1.object_changes = Some(object_changes),
518                    Err(e) => entry
519                        .1
520                        .errors
521                        .push(format!("Failed to fetch object changes {e:?}")),
522                }
523            }
524        }
525
526        let epoch_store = self.state.load_epoch_store_one_call_per_task();
527        let converted_tx_block_resps = temp_response
528            .into_iter()
529            .map(|c| convert_to_response(c.1, &opts, epoch_store.module_cache()))
530            .collect::<Result<Vec<_>, _>>()?;
531
532        self.metrics
533            .get_tx_blocks_result_size
534            .observe(converted_tx_block_resps.len() as f64);
535        self.metrics
536            .get_tx_blocks_result_size_total
537            .inc_by(converted_tx_block_resps.len() as u64);
538
539        trace!("done");
540
541        Ok(converted_tx_block_resps)
542    }
543}
544
545#[async_trait]
546impl ReadApiServer for ReadApi {
547    #[instrument(skip(self))]
548    async fn get_object(
549        &self,
550        object_id: ObjectID,
551        options: Option<SuiObjectDataOptions>,
552    ) -> RpcResult<SuiObjectResponse> {
553        with_tracing!(async move {
554            let state = self.state.clone();
555            let object_read = spawn_monitored_task!(async move {
556                state.get_object_read(&object_id).map_err(|e| {
557                    warn!(?object_id, "Failed to get object: {:?}", e);
558                    Error::from(e)
559                })
560            })
561            .await
562            .map_err(Error::from)??;
563            let options = options.unwrap_or_default();
564
565            match object_read {
566                ObjectRead::NotExists(id) => Ok(SuiObjectResponse::new_with_error(
567                    SuiObjectResponseError::NotExists { object_id: id },
568                )),
569                ObjectRead::Exists(object_ref, o, layout) => {
570                    let mut display_fields = None;
571                    if options.show_display {
572                        match get_display_fields(self, &self.transaction_kv_store, &o, &layout)
573                            .await
574                        {
575                            Ok(rendered_fields) => display_fields = Some(rendered_fields),
576                            Err(e) => {
577                                return Ok(SuiObjectResponse::new(
578                                    Some((object_ref, o, layout, options, None).try_into()?),
579                                    Some(SuiObjectResponseError::DisplayError {
580                                        error: e.to_string(),
581                                    }),
582                                ));
583                            }
584                        }
585                    }
586                    Ok(SuiObjectResponse::new_with_data(
587                        (object_ref, o, layout, options, display_fields).try_into()?,
588                    ))
589                }
590                ObjectRead::Deleted((object_id, version, digest)) => Ok(
591                    SuiObjectResponse::new_with_error(SuiObjectResponseError::Deleted {
592                        object_id,
593                        version,
594                        digest,
595                    }),
596                ),
597            }
598        })
599    }
600
601    #[instrument(skip(self))]
602    async fn multi_get_objects(
603        &self,
604        object_ids: Vec<ObjectID>,
605        options: Option<SuiObjectDataOptions>,
606    ) -> RpcResult<Vec<SuiObjectResponse>> {
607        with_tracing!(async move {
608            if object_ids.len() <= *QUERY_MAX_RESULT_LIMIT {
609                self.metrics
610                    .get_objects_limit
611                    .observe(object_ids.len() as f64);
612                let mut futures = vec![];
613                for object_id in object_ids {
614                    futures.push(self.get_object(object_id, options.clone()));
615                }
616                let results = join_all(futures).await;
617
618                let objects_result: Result<Vec<SuiObjectResponse>, String> = results
619                    .into_iter()
620                    .map(|result| match result {
621                        Ok(response) => Ok(response),
622                        Err(error) => {
623                            error!("Failed to fetch object with error: {error:?}");
624                            Err(format!("Error: {}", error))
625                        }
626                    })
627                    .collect();
628
629                let objects = objects_result.map_err(|err| {
630                    Error::UnexpectedError(format!("Failed to fetch objects with error: {}", err))
631                })?;
632
633                self.metrics
634                    .get_objects_result_size
635                    .observe(objects.len() as f64);
636                self.metrics
637                    .get_objects_result_size_total
638                    .inc_by(objects.len() as u64);
639                Ok(objects)
640            } else {
641                Err(SuiRpcInputError::SizeLimitExceeded(
642                    QUERY_MAX_RESULT_LIMIT.to_string(),
643                ))?
644            }
645        })
646    }
647
648    #[instrument(skip(self))]
649    async fn try_get_past_object(
650        &self,
651        object_id: ObjectID,
652        version: SequenceNumber,
653        options: Option<SuiObjectDataOptions>,
654    ) -> RpcResult<SuiPastObjectResponse> {
655        with_tracing!(async move {
656            let state = self.state.clone();
657            let past_read = spawn_monitored_task!(async move {
658            state.get_past_object_read(&object_id, version)
659            .map_err(|e| {
660                error!("Failed to call try_get_past_object for object: {object_id:?} version: {version:?} with error: {e:?}");
661                Error::from(e)
662            })}).await.map_err(Error::from)??;
663            let options = options.unwrap_or_default();
664            match past_read {
665                PastObjectRead::ObjectNotExists(id) => {
666                    Ok(SuiPastObjectResponse::ObjectNotExists(id))
667                }
668                PastObjectRead::VersionFound(object_ref, o, layout) => {
669                    let display_fields = if options.show_display {
670                        // TODO (jian): api breaking change to also modify past objects.
671                        Some(
672                            get_display_fields(self, &self.transaction_kv_store, &o, &layout)
673                                .await
674                                .map_err(|e| {
675                                    Error::UnexpectedError(format!(
676                                        "Unable to render object at version {version}: {e}"
677                                    ))
678                                })?,
679                        )
680                    } else {
681                        None
682                    };
683                    Ok(SuiPastObjectResponse::VersionFound(
684                        (object_ref, o, layout, options, display_fields).try_into()?,
685                    ))
686                }
687                PastObjectRead::ObjectDeleted(oref) => {
688                    Ok(SuiPastObjectResponse::ObjectDeleted(oref.into()))
689                }
690                PastObjectRead::VersionNotFound(id, seq_num) => {
691                    Ok(SuiPastObjectResponse::VersionNotFound(id, seq_num))
692                }
693                PastObjectRead::VersionTooHigh {
694                    object_id,
695                    asked_version,
696                    latest_version,
697                } => Ok(SuiPastObjectResponse::VersionTooHigh {
698                    object_id,
699                    asked_version,
700                    latest_version,
701                }),
702            }
703        })
704    }
705
706    #[instrument(skip(self))]
707    async fn try_get_object_before_version(
708        &self,
709        object_id: ObjectID,
710        version: SequenceNumber,
711    ) -> RpcResult<SuiPastObjectResponse> {
712        let version = self
713            .state
714            .find_object_lt_or_eq_version(&object_id, &version)
715            .await
716            .map_err(Error::from)?
717            .map(|obj| obj.version())
718            .unwrap_or_default();
719        self.try_get_past_object(
720            object_id,
721            version,
722            Some(SuiObjectDataOptions::bcs_lossless()),
723        )
724        .await
725    }
726
727    #[instrument(skip(self))]
728    async fn try_multi_get_past_objects(
729        &self,
730        past_objects: Vec<SuiGetPastObjectRequest>,
731        options: Option<SuiObjectDataOptions>,
732    ) -> RpcResult<Vec<SuiPastObjectResponse>> {
733        with_tracing!(async move {
734            if past_objects.len() <= *QUERY_MAX_RESULT_LIMIT {
735                let mut futures = vec![];
736                for past_object in past_objects {
737                    futures.push(self.try_get_past_object(
738                        past_object.object_id,
739                        past_object.version,
740                        options.clone(),
741                    ));
742                }
743                let results = join_all(futures).await;
744
745                let (oks, errs): (Vec<_>, Vec<_>) = results.into_iter().partition(Result::is_ok);
746                let success = oks.into_iter().filter_map(Result::ok).collect();
747                let errors: Vec<_> = errs.into_iter().filter_map(Result::err).collect();
748                if !errors.is_empty() {
749                    let error_string = errors
750                        .iter()
751                        .map(|e| e.to_string())
752                        .collect::<Vec<String>>()
753                        .join("; ");
754                    Err(anyhow!("{error_string}").into()) // Collects errors not related to SuiPastObjectResponse variants
755                } else {
756                    Ok(success)
757                }
758            } else {
759                Err(SuiRpcInputError::SizeLimitExceeded(
760                    QUERY_MAX_RESULT_LIMIT.to_string(),
761                ))?
762            }
763        })
764    }
765
766    #[instrument(skip(self))]
767    async fn get_total_transaction_blocks(&self) -> RpcResult<BigInt<u64>> {
768        with_tracing!(async move {
769            Ok(self
770                .state
771                .get_total_transaction_blocks()
772                .map_err(Error::from)?
773                .into()) // converts into BigInt<u64>
774        })
775    }
776
777    #[instrument(skip(self))]
778    async fn get_transaction_block(
779        &self,
780        digest: TransactionDigest,
781        opts: Option<SuiTransactionBlockResponseOptions>,
782    ) -> RpcResult<SuiTransactionBlockResponse> {
783        with_tracing!(async move {
784            let opts = opts.unwrap_or_default();
785            let mut temp_response = IntermediateTransactionResponse::new(digest);
786
787            // Fetch transaction to determine existence
788            let transaction_kv_store = self.transaction_kv_store.clone();
789            let transaction = spawn_monitored_task!(async move {
790                let ret = transaction_kv_store.get_tx(digest).await.map_err(|err| {
791                    debug!(tx_digest=?digest, "Failed to get transaction: {}", err);
792                    Error::from(err)
793                });
794                add_server_timing("tx_kv_lookup");
795                ret
796            })
797            .await
798            .map_err(Error::from)??;
799            let input_objects = transaction
800                .data()
801                .inner()
802                .intent_message
803                .value
804                .input_objects()
805                .unwrap_or_default();
806
807            // the input is needed for object_changes to retrieve the sender address.
808            if opts.require_input() {
809                temp_response.transaction = Some(transaction);
810            }
811
812            // Fetch effects when `show_events` is true because events relies on effects
813            if opts.require_effects() {
814                let transaction_kv_store = self.transaction_kv_store.clone();
815                temp_response.effects = Some(
816                    spawn_monitored_task!(async move {
817                        transaction_kv_store
818                            .get_fx_by_tx_digest(digest)
819                            .await
820                            .map_err(|err| {
821                                debug!(tx_digest=?digest, "Failed to get effects: {:?}", err);
822                                Error::from(err)
823                            })
824                    })
825                    .await
826                    .map_err(Error::from)??,
827                );
828            }
829
830            temp_response.checkpoint_seq = self
831                .transaction_kv_store
832                .deprecated_get_transaction_checkpoint(digest)
833                .await
834                .map_err(|e| {
835                    error!("Failed to retrieve checkpoint sequence for transaction {digest:?} with error: {e:?}");
836                    Error::from(e)
837                })?;
838
839            if let Some(checkpoint_seq) = &temp_response.checkpoint_seq {
840                let kv_store = self.transaction_kv_store.clone();
841                let checkpoint_seq = *checkpoint_seq;
842                let checkpoint = spawn_monitored_task!(async move {
843                    kv_store
844                    // safe to unwrap because we have checked `is_some` above
845                    .get_checkpoint_summary(checkpoint_seq)
846                    .await
847                    .map_err(|e| {
848                        error!("Failed to get checkpoint by sequence number: {checkpoint_seq:?} with error: {e:?}");
849                        Error::from(e)
850                    })
851                }).await.map_err(Error::from)??;
852                // TODO(chris): we don't need to fetch the whole checkpoint summary
853                temp_response.timestamp = Some(checkpoint.timestamp_ms);
854            }
855
856            if opts.show_events && temp_response.effects.is_some() {
857                let transaction_kv_store = self.transaction_kv_store.clone();
858                let events = spawn_monitored_task!(async move {
859                    transaction_kv_store
860                        .multi_get_events_by_tx_digests(&[digest])
861                        .await
862                        .map_err(|e| {
863                            error!("Failed to call get transaction events for transaction: {digest:?} with error {e:?}");
864                            Error::from(e)
865                        })
866                    })
867                    .await
868                    .map_err(Error::from)??
869                    .pop()
870                    .flatten();
871                match events {
872                    None => temp_response.events = Some(SuiTransactionBlockEvents::default()),
873                    Some(events) => match to_sui_transaction_events(self, digest, events) {
874                        Ok(e) => temp_response.events = Some(e),
875                        Err(e) => temp_response.errors.push(e.to_string()),
876                    },
877                }
878            }
879
880            let object_cache =
881                ObjectProviderCache::new((self.state.clone(), self.transaction_kv_store.clone()));
882            if opts.show_balance_changes
883                && let Some(effects) = &temp_response.effects
884            {
885                let balance_changes =
886                    get_balance_changes_from_effect(&object_cache, effects, input_objects, None)
887                        .await;
888
889                if let Ok(balance_changes) = balance_changes {
890                    temp_response.balance_changes = Some(balance_changes);
891                } else {
892                    temp_response.errors.push(format!(
893                        "Cannot retrieve balance changes: {}",
894                        balance_changes.unwrap_err()
895                    ));
896                }
897            }
898
899            if opts.show_object_changes
900                && let (Some(effects), Some(input)) =
901                    (&temp_response.effects, &temp_response.transaction)
902            {
903                let sender = input.data().intent_message().value.sender();
904                let object_changes = get_object_changes(
905                    &object_cache,
906                    effects,
907                    sender,
908                    effects.modified_at_versions(),
909                    effects.all_changed_objects(),
910                    effects.all_removed_objects(),
911                )
912                .await;
913
914                if let Ok(object_changes) = object_changes {
915                    temp_response.object_changes = Some(object_changes);
916                } else {
917                    temp_response.errors.push(format!(
918                        "Cannot retrieve object changes: {}",
919                        object_changes.unwrap_err()
920                    ));
921                }
922            }
923            let epoch_store = self.state.load_epoch_store_one_call_per_task();
924            convert_to_response(temp_response, &opts, epoch_store.module_cache())
925        })
926    }
927
928    #[instrument(skip(self))]
929    async fn multi_get_transaction_blocks(
930        &self,
931        digests: Vec<TransactionDigest>,
932        opts: Option<SuiTransactionBlockResponseOptions>,
933    ) -> RpcResult<Vec<SuiTransactionBlockResponse>> {
934        with_tracing!(async move {
935            let cloned_self = self.clone();
936            spawn_monitored_task!(async move {
937                cloned_self
938                    .multi_get_transaction_blocks_internal(digests, opts)
939                    .await
940            })
941            .await
942            .map_err(Error::from)?
943        })
944    }
945
946    #[instrument(skip(self))]
947    async fn get_events(&self, transaction_digest: TransactionDigest) -> RpcResult<Vec<SuiEvent>> {
948        with_tracing!(async move {
949            let state = self.state.clone();
950            let transaction_kv_store = self.transaction_kv_store.clone();
951            spawn_monitored_task!(async move{
952            let store = state.load_epoch_store_one_call_per_task();
953            let events = transaction_kv_store
954                .multi_get_events_by_tx_digests(&[transaction_digest])
955                .await
956                .map_err(
957                    |e| {
958                        error!("Failed to get transaction events for transaction {transaction_digest:?} with error: {e:?}");
959                        Error::StateReadError(e.into())
960                    })?
961                .pop()
962                .flatten();
963            Ok(match events {
964                Some(events) => events
965                    .data
966                    .into_iter()
967                    .enumerate()
968                    .map(|(seq, e)| {
969                        let layout = store.executor().type_layout_resolver(Box::new(&state.get_backing_package_store().as_ref())).get_annotated_layout(&e.type_)?;
970                        SuiEvent::try_from(e, transaction_digest, seq as u64, None, layout)
971                    })
972                    .collect::<Result<Vec<_>, _>>()
973                    .map_err(Error::SuiError)?,
974                None => vec![],
975            })
976        }).await.map_err(Error::from)?
977        })
978    }
979
980    #[instrument(skip(self))]
981    async fn get_latest_checkpoint_sequence_number(&self) -> RpcResult<BigInt<u64>> {
982        with_tracing!(async move {
983            Ok(self
984                .state
985                .get_latest_checkpoint_sequence_number()
986                .map_err(|e| {
987                    SuiRpcInputError::GenericNotFound(format!(
988                        "Latest checkpoint sequence number was not found with error :{e}"
989                    ))
990                })?
991                .into())
992        })
993    }
994
995    #[instrument(skip(self))]
996    async fn get_checkpoint(&self, id: CheckpointId) -> RpcResult<Checkpoint> {
997        with_tracing!(self.get_checkpoint_internal(id))
998    }
999
1000    #[instrument(skip(self))]
1001    async fn get_checkpoints(
1002        &self,
1003        // If `Some`, the query will start from the next item after the specified cursor
1004        cursor: Option<BigInt<u64>>,
1005        limit: Option<usize>,
1006        descending_order: bool,
1007    ) -> RpcResult<CheckpointPage> {
1008        with_tracing!(async move {
1009            let limit = validate_limit(limit, QUERY_MAX_RESULT_LIMIT_CHECKPOINTS)
1010                .map_err(SuiRpcInputError::from)?;
1011
1012            let state = self.state.clone();
1013            let kv_store = self.transaction_kv_store.clone();
1014
1015            self.metrics.get_checkpoints_limit.observe(limit as f64);
1016
1017            let mut data = spawn_monitored_task!(Self::get_checkpoints_internal(
1018                state,
1019                kv_store,
1020                cursor.map(|s| *s),
1021                limit as u64 + 1,
1022                descending_order,
1023            ))
1024            .await
1025            .map_err(Error::from)?
1026            .map_err(Error::from)?;
1027
1028            let has_next_page = data.len() > limit;
1029            data.truncate(limit);
1030
1031            let next_cursor = if has_next_page {
1032                data.last().cloned().map(|d| d.sequence_number.into())
1033            } else {
1034                None
1035            };
1036
1037            self.metrics
1038                .get_checkpoints_result_size
1039                .observe(data.len() as f64);
1040            self.metrics
1041                .get_checkpoints_result_size_total
1042                .inc_by(data.len() as u64);
1043
1044            Ok(CheckpointPage {
1045                data,
1046                next_cursor,
1047                has_next_page,
1048            })
1049        })
1050    }
1051
1052    #[instrument(skip(self))]
1053    async fn get_protocol_config(
1054        &self,
1055        version: Option<BigInt<u64>>,
1056    ) -> RpcResult<ProtocolConfigResponse> {
1057        with_tracing!(async move {
1058            version
1059                .map(|v| {
1060                    ProtocolConfig::get_for_version_if_supported(
1061                        (*v).into(),
1062                        self.state.get_chain_identifier()?.chain(),
1063                    )
1064                    .ok_or(SuiRpcInputError::ProtocolVersionUnsupported(
1065                        ProtocolVersion::MIN.as_u64(),
1066                        ProtocolVersion::MAX.as_u64(),
1067                    ))
1068                    .map_err(Error::from)
1069                })
1070                .unwrap_or(Ok(self
1071                    .state
1072                    .load_epoch_store_one_call_per_task()
1073                    .protocol_config()
1074                    .clone()))
1075                .map(ProtocolConfigResponse::from)
1076        })
1077    }
1078
1079    #[instrument(skip(self))]
1080    async fn get_chain_identifier(&self) -> RpcResult<String> {
1081        with_tracing!(async move {
1082            let ci = self.state.get_chain_identifier()?;
1083            Ok(ci.to_string())
1084        })
1085    }
1086    #[instrument(skip(self))]
1087    async fn verify_zklogin_signature(
1088        &self,
1089        bytes: String,
1090        signature: String,
1091        intent_scope: ZkLoginIntentScope,
1092        author: SuiAddress,
1093    ) -> RpcResult<ZkLoginVerifyResult> {
1094        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1095        let curr_epoch = epoch_store.epoch();
1096        let zklogin_env_native = match self
1097            .state
1098            .get_chain_identifier()
1099            .expect("get chain identifier should not fail")
1100            .chain()
1101        {
1102            sui_protocol_config::Chain::Mainnet | sui_protocol_config::Chain::Testnet => {
1103                ZkLoginEnv::Prod
1104            }
1105            _ => ZkLoginEnv::Test,
1106        };
1107        let GenericSignature::ZkLoginAuthenticator(zklogin_sig) =
1108            GenericSignature::from_bytes(&Base64::decode(&signature).map_err(Error::from)?)
1109                .map_err(Error::from)?
1110        else {
1111            return Err(SuiRpcInputError::GenericNotFound(
1112                "Endpoint only supports zkLogin signature".to_string(),
1113            )
1114            .into());
1115        };
1116
1117        let new_jwks =
1118            match get_authenticator_state(self.state.get_object_store()).map_err(Error::from)? {
1119                Some(authenticator_state) => authenticator_state.active_jwks,
1120                None => {
1121                    return Err(SuiRpcInputError::GenericNotFound(
1122                        "Authenticator state not found".to_string(),
1123                    )
1124                    .into());
1125                }
1126            };
1127
1128        // construct verify params with active jwks and zklogin_env.
1129        let mut oidc_provider_jwks = ImHashMap::new();
1130        for active_jwk in new_jwks.iter() {
1131            let ActiveJwk { jwk_id, jwk, .. } = active_jwk;
1132            match oidc_provider_jwks.entry(jwk_id.clone()) {
1133                im::hashmap::Entry::Occupied(_) => {
1134                    warn!("JWK with kid {:?} already exists", jwk_id);
1135                }
1136                im::hashmap::Entry::Vacant(entry) => {
1137                    entry.insert(jwk.clone());
1138                }
1139            }
1140        }
1141        let verify_params = VerifyParams::new(
1142            oidc_provider_jwks,
1143            vec![],
1144            zklogin_env_native,
1145            true,
1146            true,
1147            true,
1148            Some(30),
1149            true,
1150        );
1151        match intent_scope {
1152            ZkLoginIntentScope::TransactionData => {
1153                let tx_data: TransactionData =
1154                    bcs::from_bytes(&Base64::decode(&bytes).map_err(Error::from)?)
1155                        .map_err(Error::from)?;
1156                let intent_msg = IntentMessage::new(Intent::sui_transaction(), tx_data.clone());
1157                let sig = GenericSignature::ZkLoginAuthenticator(zklogin_sig);
1158                match sig.verify_authenticator(
1159                    &intent_msg,
1160                    author,
1161                    curr_epoch,
1162                    &verify_params,
1163                    Arc::new(VerifiedDigestCache::new_empty()),
1164                ) {
1165                    Ok(_) => Ok(ZkLoginVerifyResult {
1166                        success: true,
1167                        errors: vec![],
1168                    }),
1169                    Err(e) => Ok(ZkLoginVerifyResult {
1170                        success: false,
1171                        errors: vec![e.to_string()],
1172                    }),
1173                }
1174            }
1175            ZkLoginIntentScope::PersonalMessage => {
1176                let data = PersonalMessage {
1177                    message: Base64::decode(&bytes).map_err(Error::from)?,
1178                };
1179                let intent_msg = IntentMessage::new(Intent::personal_message(), data);
1180
1181                let sig = GenericSignature::ZkLoginAuthenticator(zklogin_sig);
1182                match sig.verify_authenticator(
1183                    &intent_msg,
1184                    author,
1185                    curr_epoch,
1186                    &verify_params,
1187                    Arc::new(VerifiedDigestCache::new_empty()),
1188                ) {
1189                    Ok(_) => Ok(ZkLoginVerifyResult {
1190                        success: true,
1191                        errors: vec![],
1192                    }),
1193                    Err(e) => Ok(ZkLoginVerifyResult {
1194                        success: false,
1195                        errors: vec![e.to_string()],
1196                    }),
1197                }
1198            }
1199        }
1200    }
1201}
1202
1203impl SuiRpcModule for ReadApi {
1204    fn rpc(self) -> RpcModule<Self> {
1205        self.into_rpc()
1206    }
1207
1208    fn rpc_doc_module() -> Module {
1209        ReadApiOpenRpc::module_doc()
1210    }
1211}
1212
1213#[instrument(skip_all)]
1214fn to_sui_transaction_events(
1215    fullnode_api: &ReadApi,
1216    tx_digest: TransactionDigest,
1217    events: TransactionEvents,
1218) -> Result<SuiTransactionBlockEvents, Error> {
1219    let epoch_store = fullnode_api.state.load_epoch_store_one_call_per_task();
1220    let backing_package_store = fullnode_api.state.get_backing_package_store();
1221    let mut layout_resolver = epoch_store
1222        .executor()
1223        .type_layout_resolver(Box::new(backing_package_store.as_ref()));
1224    Ok(SuiTransactionBlockEvents::try_from(
1225        events,
1226        tx_digest,
1227        None,
1228        layout_resolver.as_mut(),
1229    )?)
1230}
1231
1232#[derive(Debug, thiserror::Error)]
1233pub enum ObjectDisplayError {
1234    #[error("Not a move struct")]
1235    NotMoveStruct,
1236
1237    #[error("Failed to extract layout")]
1238    Layout,
1239
1240    #[error("Failed to extract Move object")]
1241    MoveObject,
1242
1243    #[error(transparent)]
1244    Deserialization(#[from] SuiError),
1245
1246    #[error("Failed to deserialize 'VersionUpdatedEvent': {0}")]
1247    Bcs(#[from] bcs::Error),
1248
1249    #[error(transparent)]
1250    StateReadError(#[from] StateReadError),
1251}
1252
1253#[instrument(skip(fullnode_api, kv_store))]
1254async fn get_display_fields(
1255    fullnode_api: &ReadApi,
1256    kv_store: &Arc<TransactionKeyValueStore>,
1257    original_object: &Object,
1258    original_layout: &Option<MoveStructLayout>,
1259) -> Result<DisplayFieldsResponse, ObjectDisplayError> {
1260    let Some(layout) = original_layout else {
1261        return Ok(DisplayFieldsResponse {
1262            data: None,
1263            error: None,
1264        });
1265    };
1266
1267    let Some(move_object) = original_object.data.try_as_move() else {
1268        return Err(ObjectDisplayError::MoveObject);
1269    };
1270
1271    let Some(display_object) =
1272        get_display_object_by_type(kv_store, fullnode_api, &layout.type_).await?
1273    else {
1274        return Ok(DisplayFieldsResponse {
1275            data: None,
1276            error: None,
1277        });
1278    };
1279
1280    let format = match Format::parse(MAX_DISPLAY_NESTED_LEVEL, &display_object.fields) {
1281        Ok(format) => format,
1282        Err(e) => {
1283            return Ok(DisplayFieldsResponse {
1284                data: None,
1285                error: Some(SuiObjectResponseError::DisplayError {
1286                    error: e.to_string(),
1287                }),
1288            });
1289        }
1290    };
1291
1292    let layout = MoveTypeLayout::Struct(Box::new(layout.clone()));
1293    let display = match format.display(*MAX_DISPLAY_OUTPUT_SIZE, move_object.contents(), &layout) {
1294        Ok(fields) => fields,
1295        Err(e) => {
1296            return Ok(DisplayFieldsResponse {
1297                data: None,
1298                error: Some(SuiObjectResponseError::DisplayError {
1299                    error: e.to_string(),
1300                }),
1301            });
1302        }
1303    };
1304
1305    let mut fields = BTreeMap::new();
1306    let mut errors = vec![];
1307
1308    for (key, value) in display {
1309        match value {
1310            Ok(v) => {
1311                fields.insert(key, v);
1312            }
1313            Err(e) => {
1314                errors.push(e.to_string());
1315            }
1316        }
1317    }
1318
1319    Ok(DisplayFieldsResponse {
1320        data: (!fields.is_empty()).then_some(fields),
1321        error: (!errors.is_empty()).then(|| SuiObjectResponseError::DisplayError {
1322            error: errors.join("; "),
1323        }),
1324    })
1325}
1326
1327#[instrument(skip(kv_store, fullnode_api))]
1328async fn get_display_object_by_type(
1329    kv_store: &Arc<TransactionKeyValueStore>,
1330    fullnode_api: &ReadApi,
1331    object_type: &StructTag,
1332    // TODO: add query version support
1333) -> Result<Option<DisplayVersionUpdatedEvent>, ObjectDisplayError> {
1334    let mut events = fullnode_api
1335        .state
1336        .query_events(
1337            kv_store,
1338            EventFilter::MoveEventType(DisplayVersionUpdatedEvent::type_(object_type)),
1339            None,
1340            1,
1341            true,
1342        )
1343        .await?;
1344
1345    // If there's any recent version of Display, give it to the client.
1346    // TODO: add support for version query.
1347    if let Some(event) = events.pop() {
1348        let display: DisplayVersionUpdatedEvent = bcs::from_bytes(&event.bcs.into_bytes())?;
1349        Ok(Some(display))
1350    } else {
1351        Ok(None)
1352    }
1353}
1354
1355#[instrument(skip_all)]
1356fn convert_to_response(
1357    cache: IntermediateTransactionResponse,
1358    opts: &SuiTransactionBlockResponseOptions,
1359    module_cache: &impl GetModule,
1360) -> RpcInterimResult<SuiTransactionBlockResponse> {
1361    let mut response = SuiTransactionBlockResponse::new(cache.digest);
1362    response.errors = cache.errors;
1363
1364    if let Some(transaction) = cache.transaction {
1365        if opts.show_raw_input {
1366            response.raw_transaction = bcs::to_bytes(transaction.data()).map_err(|e| {
1367                // TODO: is this a client or server error?
1368                anyhow!("Failed to serialize raw transaction with error: {e}")
1369            })?;
1370        }
1371
1372        if opts.show_input {
1373            response.transaction = Some(SuiTransactionBlock::try_from(
1374                transaction.into_data(),
1375                module_cache,
1376            )?);
1377        }
1378    }
1379
1380    if let Some(effects) = cache.effects {
1381        if opts.show_raw_effects {
1382            response.raw_effects = bcs::to_bytes(&effects).map_err(|e| {
1383                // TODO: is this a client or server error?
1384                anyhow!("Failed to serialize transaction block effects with error: {e}")
1385            })?;
1386        }
1387
1388        if opts.show_effects {
1389            response.effects = Some(effects.try_into().map_err(|e| {
1390                // TODO: is this a client or server error?
1391                anyhow!("Failed to convert transaction block effects with error: {e}")
1392            })?);
1393        }
1394    }
1395
1396    response.checkpoint = cache.checkpoint_seq;
1397    response.timestamp_ms = cache.timestamp;
1398
1399    if opts.show_events {
1400        response.events = cache.events;
1401    }
1402
1403    if opts.show_balance_changes {
1404        response.balance_changes = cache.balance_changes;
1405    }
1406
1407    if opts.show_object_changes {
1408        response.object_changes = cache.object_changes;
1409    }
1410
1411    Ok(response)
1412}
1413
1414fn calculate_checkpoint_numbers(
1415    // If `Some`, the query will start from the next item after the specified cursor
1416    cursor: Option<CheckpointSequenceNumber>,
1417    limit: u64,
1418    descending_order: bool,
1419    max_checkpoint: CheckpointSequenceNumber,
1420) -> Vec<CheckpointSequenceNumber> {
1421    let (start_index, end_index) = match cursor {
1422        Some(t) => {
1423            if descending_order {
1424                let start = std::cmp::min(t.saturating_sub(1), max_checkpoint);
1425                let end = start.saturating_sub(limit - 1);
1426                (end, start)
1427            } else {
1428                let start =
1429                    std::cmp::min(t.checked_add(1).unwrap_or(max_checkpoint), max_checkpoint);
1430                let end = std::cmp::min(
1431                    start.checked_add(limit - 1).unwrap_or(max_checkpoint),
1432                    max_checkpoint,
1433                );
1434                (start, end)
1435            }
1436        }
1437        None => {
1438            if descending_order {
1439                (max_checkpoint.saturating_sub(limit - 1), max_checkpoint)
1440            } else {
1441                (0, std::cmp::min(limit - 1, max_checkpoint))
1442            }
1443        }
1444    };
1445
1446    if descending_order {
1447        (start_index..=end_index).rev().collect()
1448    } else {
1449        (start_index..=end_index).collect()
1450    }
1451}
1452
1453#[cfg(test)]
1454mod tests {
1455    use super::*;
1456
1457    #[test]
1458    fn test_calculate_checkpoint_numbers() {
1459        let cursor = Some(10);
1460        let limit = 5;
1461        let descending_order = true;
1462        let max_checkpoint = 15;
1463
1464        let checkpoint_numbers =
1465            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1466
1467        assert_eq!(checkpoint_numbers, vec![9, 8, 7, 6, 5]);
1468    }
1469
1470    #[test]
1471    fn test_calculate_checkpoint_numbers_descending_no_cursor() {
1472        let cursor = None;
1473        let limit = 5;
1474        let descending_order = true;
1475        let max_checkpoint = 15;
1476
1477        let checkpoint_numbers =
1478            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1479
1480        assert_eq!(checkpoint_numbers, vec![15, 14, 13, 12, 11]);
1481    }
1482
1483    #[test]
1484    fn test_calculate_checkpoint_numbers_ascending_no_cursor() {
1485        let cursor = None;
1486        let limit = 5;
1487        let descending_order = false;
1488        let max_checkpoint = 15;
1489
1490        let checkpoint_numbers =
1491            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1492
1493        assert_eq!(checkpoint_numbers, vec![0, 1, 2, 3, 4]);
1494    }
1495
1496    #[test]
1497    fn test_calculate_checkpoint_numbers_ascending_with_cursor() {
1498        let cursor = Some(10);
1499        let limit = 5;
1500        let descending_order = false;
1501        let max_checkpoint = 15;
1502
1503        let checkpoint_numbers =
1504            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1505
1506        assert_eq!(checkpoint_numbers, vec![11, 12, 13, 14, 15]);
1507    }
1508
1509    #[test]
1510    fn test_calculate_checkpoint_numbers_ascending_limit_exceeds_max() {
1511        let cursor = None;
1512        let limit = 20;
1513        let descending_order = false;
1514        let max_checkpoint = 15;
1515
1516        let checkpoint_numbers =
1517            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1518
1519        assert_eq!(checkpoint_numbers, (0..=15).collect::<Vec<_>>());
1520    }
1521
1522    #[test]
1523    fn test_calculate_checkpoint_numbers_descending_limit_exceeds_max() {
1524        let cursor = None;
1525        let limit = 20;
1526        let descending_order = true;
1527        let max_checkpoint = 15;
1528
1529        let checkpoint_numbers =
1530            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1531
1532        assert_eq!(checkpoint_numbers, (0..=15).rev().collect::<Vec<_>>());
1533    }
1534}