sui_json_rpc/
read_api.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::{BTreeMap, HashMap};
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::anyhow;
9use async_trait::async_trait;
10use backoff::ExponentialBackoff;
11use backoff::future::retry;
12use fastcrypto::encoding::Base64;
13use fastcrypto_zkp::bn254::zk_login_api::ZkLoginEnv;
14use futures::future::join_all;
15use im::hashmap::HashMap as ImHashMap;
16use indexmap::map::IndexMap;
17use itertools::Itertools;
18use jsonrpsee::RpcModule;
19use jsonrpsee::core::RpcResult;
20use move_bytecode_utils::module_cache::GetModule;
21use move_core_types::account_address::AccountAddress;
22use move_core_types::annotated_value::{MoveStructLayout, MoveTypeLayout};
23use move_core_types::language_storage::StructTag;
24use once_cell::sync::Lazy;
25use serde_json::Value as Json;
26use shared_crypto::intent::{IntentMessage, PersonalMessage};
27use sui_display::v1::Format;
28use sui_json_rpc_types::ZkLoginIntentScope;
29use sui_types::base_types::SuiAddress;
30use sui_types::signature::{GenericSignature, VerifyParams};
31use sui_types::signature_verification::VerifiedDigestCache;
32use sui_types::storage::ObjectKey;
33use tap::TapFallible;
34use tracing::{debug, error, info, instrument, trace, warn};
35
36use mysten_metrics::add_server_timing;
37use sui_core::authority::AuthorityState;
38use sui_json_rpc_api::{
39    JsonRpcMetrics, QUERY_MAX_RESULT_LIMIT, QUERY_MAX_RESULT_LIMIT_CHECKPOINTS, ReadApiOpenRpc,
40    ReadApiServer, validate_limit,
41};
42use sui_json_rpc_types::{
43    BalanceChange, Checkpoint, CheckpointId, CheckpointPage, DisplayFieldsResponse, EventFilter,
44    ObjectChange, ProtocolConfigResponse, SuiEvent, SuiGetPastObjectRequest, SuiObjectDataOptions,
45    SuiObjectResponse, SuiPastObjectResponse, SuiTransactionBlock, SuiTransactionBlockEvents,
46    SuiTransactionBlockResponse, SuiTransactionBlockResponseOptions,
47};
48use sui_open_rpc::Module;
49use sui_protocol_config::{ProtocolConfig, ProtocolVersion};
50use sui_storage::key_value_store::TransactionKeyValueStore;
51use sui_types::base_types::{ObjectID, SequenceNumber, TransactionDigest};
52use sui_types::crypto::AggregateAuthoritySignature;
53use sui_types::display::DisplayVersionUpdatedEvent;
54use sui_types::display_registry;
55use sui_types::effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents};
56use sui_types::error::{SuiError, SuiObjectResponseError};
57use sui_types::messages_checkpoint::{
58    CheckpointContents, CheckpointSequenceNumber, CheckpointSummary, CheckpointTimestamp,
59};
60use sui_types::object::{Object, ObjectRead, PastObjectRead};
61use sui_types::sui_serde::BigInt;
62use sui_types::transaction::TransactionDataAPI;
63use sui_types::transaction::{Transaction, TransactionData};
64
65use crate::authority_state::{StateRead, StateReadError, StateReadResult};
66use crate::error::{Error, RpcInterimResult, SuiRpcInputError};
67use crate::{ObjectProvider, with_tracing};
68use crate::{
69    ObjectProviderCache, SuiRpcModule, get_balance_changes_from_effect, get_object_changes,
70};
71use fastcrypto::encoding::Encoding;
72use fastcrypto::traits::ToFromBytes;
73use shared_crypto::intent::Intent;
74use sui_json_rpc_types::ZkLoginVerifyResult;
75use sui_types::authenticator_state::{ActiveJwk, get_authenticator_state};
76
77/// Default max depth used while converting rendered Display values to JSON.
78const DEFAULT_MAX_DISPLAY_MOVE_VALUE_DEPTH: usize = 32;
79
80/// Default budget for Display output size.
81const DEFAULT_MAX_DISPLAY_OUTPUT_SIZE: usize = 1024 * 1024;
82
83/// A field access in a Display string cannot exceed this level of nesting.
84static MAX_DISPLAY_FIELD_DEPTH: Lazy<usize> = Lazy::new(|| {
85    let max_opt = std::env::var("MAX_DISPLAY_FIELD_DEPTH")
86        .ok()
87        .and_then(|s| s.parse().ok());
88
89    if let Some(max) = max_opt {
90        info!("Using custom value for 'MAX_DISPLAY_FIELD_DEPTH': {max}");
91        max
92    } else {
93        sui_display::v2::Limits::default().max_depth
94    }
95});
96
97/// Parser node budget for Display v2.
98static MAX_DISPLAY_FORMAT_NODES: Lazy<usize> = Lazy::new(|| {
99    let max_opt = std::env::var("MAX_DISPLAY_FORMAT_NODES")
100        .ok()
101        .and_then(|s| s.parse().ok());
102
103    if let Some(max) = max_opt {
104        info!("Using custom value for 'MAX_DISPLAY_FORMAT_NODES': {max}");
105        max
106    } else {
107        sui_display::v2::Limits::default().max_nodes
108    }
109});
110
111/// Max object loads budget for Display v2.
112static MAX_DISPLAY_OBJECT_LOADS: Lazy<usize> = Lazy::new(|| {
113    let max_opt = std::env::var("MAX_DISPLAY_OBJECT_LOADS")
114        .ok()
115        .and_then(|s| s.parse().ok());
116
117    if let Some(max) = max_opt {
118        info!("Using custom value for 'MAX_DISPLAY_OBJECT_LOADS': {max}");
119        max
120    } else {
121        sui_display::v2::Limits::default().max_loads
122    }
123});
124
125/// Maximum depth used while converting rendered Display values to JSON.
126static MAX_DISPLAY_MOVE_VALUE_DEPTH: Lazy<usize> = Lazy::new(|| {
127    let max_opt = std::env::var("MAX_MOVE_VALUE_DEPTH")
128        .ok()
129        .and_then(|s| s.parse().ok());
130
131    if let Some(max) = max_opt {
132        info!("Using custom value for 'MAX_MOVE_VALUE_DEPTH': {max}");
133        max
134    } else {
135        DEFAULT_MAX_DISPLAY_MOVE_VALUE_DEPTH
136    }
137});
138
139/// Overall display output cannot exceed this size.
140static MAX_DISPLAY_OUTPUT_SIZE: Lazy<usize> = Lazy::new(|| {
141    let max_opt = std::env::var("MAX_DISPLAY_OUTPUT_SIZE")
142        .ok()
143        .and_then(|s| s.parse().ok());
144
145    if let Some(max) = max_opt {
146        info!("Using custom value for 'MAX_DISPLAY_OUTPUT_SIZE': {max}");
147        max
148    } else {
149        DEFAULT_MAX_DISPLAY_OUTPUT_SIZE
150    }
151});
152
153struct DisplayStore<'s> {
154    state: &'s dyn StateRead,
155}
156
157impl<'s> DisplayStore<'s> {
158    fn new(state: &'s dyn StateRead) -> Self {
159        Self { state }
160    }
161}
162
163#[async_trait]
164impl sui_display::v2::Store for DisplayStore<'_> {
165    async fn object(
166        &self,
167        id: AccountAddress,
168    ) -> anyhow::Result<Option<sui_display::v2::OwnedSlice>> {
169        let read = self.state.get_object_read(&id.into())?;
170        let ObjectRead::Exists(_, object, Some(layout)) = read else {
171            return Ok(None);
172        };
173
174        let Some(move_object) = object.data.try_as_move() else {
175            return Ok(None);
176        };
177
178        Ok(Some(sui_display::v2::OwnedSlice {
179            bytes: move_object.contents().to_vec(),
180            layout: MoveTypeLayout::Struct(Box::new(layout)),
181        }))
182    }
183}
184
185// An implementation of the read portion of the JSON-RPC interface intended for use in
186// Fullnodes.
187#[derive(Clone)]
188pub struct ReadApi {
189    pub state: Arc<dyn StateRead>,
190    pub transaction_kv_store: Arc<TransactionKeyValueStore>,
191    pub metrics: Arc<JsonRpcMetrics>,
192}
193
194// Internal data structure to make it easy to work with data returned from
195// authority store and also enable code sharing between get_transaction_with_options,
196// multi_get_transaction_with_options, etc.
197#[derive(Default)]
198struct IntermediateTransactionResponse {
199    digest: TransactionDigest,
200    transaction: Option<Transaction>,
201    effects: Option<TransactionEffects>,
202    events: Option<SuiTransactionBlockEvents>,
203    checkpoint_seq: Option<CheckpointSequenceNumber>,
204    balance_changes: Option<Vec<BalanceChange>>,
205    object_changes: Option<Vec<ObjectChange>>,
206    timestamp: Option<CheckpointTimestamp>,
207    errors: Vec<String>,
208}
209
210impl IntermediateTransactionResponse {
211    pub fn new(digest: TransactionDigest) -> Self {
212        Self {
213            digest,
214            ..Default::default()
215        }
216    }
217
218    pub fn transaction(&self) -> &Option<Transaction> {
219        &self.transaction
220    }
221}
222
223impl ReadApi {
224    pub fn new(
225        state: Arc<AuthorityState>,
226        transaction_kv_store: Arc<TransactionKeyValueStore>,
227        metrics: Arc<JsonRpcMetrics>,
228    ) -> Self {
229        Self {
230            state,
231            transaction_kv_store,
232            metrics,
233        }
234    }
235
236    async fn get_checkpoint_internal(&self, id: CheckpointId) -> Result<Checkpoint, Error> {
237        Ok(match id {
238            CheckpointId::SequenceNumber(seq) => {
239                let verified_summary = self
240                    .transaction_kv_store
241                    .get_checkpoint_summary(seq)
242                    .await?;
243                let content = self
244                    .transaction_kv_store
245                    .get_checkpoint_contents(verified_summary.sequence_number)
246                    .await?;
247                let signature = verified_summary.auth_sig().signature.clone();
248                (verified_summary.into_data(), content, signature).into()
249            }
250            CheckpointId::Digest(digest) => {
251                let verified_summary = self
252                    .transaction_kv_store
253                    .get_checkpoint_summary_by_digest(digest)
254                    .await?;
255                let content = self
256                    .transaction_kv_store
257                    .get_checkpoint_contents(verified_summary.sequence_number)
258                    .await?;
259                let signature = verified_summary.auth_sig().signature.clone();
260                (verified_summary.into_data(), content, signature).into()
261            }
262        })
263    }
264
265    pub async fn get_checkpoints_internal(
266        state: Arc<dyn StateRead>,
267        transaction_kv_store: Arc<TransactionKeyValueStore>,
268        // If `Some`, the query will start from the next item after the specified cursor
269        cursor: Option<CheckpointSequenceNumber>,
270        limit: u64,
271        descending_order: bool,
272    ) -> StateReadResult<Vec<Checkpoint>> {
273        let max_checkpoint = state.get_latest_checkpoint_sequence_number()?;
274        let checkpoint_numbers =
275            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
276
277        let verified_checkpoints = transaction_kv_store
278            .multi_get_checkpoints_summaries(&checkpoint_numbers)
279            .await?;
280
281        let checkpoint_summaries_and_signatures: Vec<(
282            CheckpointSummary,
283            AggregateAuthoritySignature,
284        )> = verified_checkpoints
285            .into_iter()
286            .flatten()
287            .map(|check| {
288                (
289                    check.clone().into_summary_and_sequence().1,
290                    check.get_validator_signature(),
291                )
292            })
293            .collect();
294
295        let checkpoint_contents = transaction_kv_store
296            .multi_get_checkpoints_contents(&checkpoint_numbers)
297            .await?;
298        let contents: Vec<CheckpointContents> = checkpoint_contents.into_iter().flatten().collect();
299
300        let mut checkpoints: Vec<Checkpoint> = vec![];
301
302        for (summary_and_sig, content) in checkpoint_summaries_and_signatures
303            .into_iter()
304            .zip(contents.into_iter())
305        {
306            checkpoints.push(Checkpoint::from((
307                summary_and_sig.0,
308                content,
309                summary_and_sig.1,
310            )));
311        }
312
313        Ok(checkpoints)
314    }
315
316    #[instrument(skip_all)]
317    async fn multi_get_transaction_blocks_internal(
318        &self,
319        digests: Vec<TransactionDigest>,
320        opts: Option<SuiTransactionBlockResponseOptions>,
321    ) -> Result<Vec<SuiTransactionBlockResponse>, Error> {
322        trace!("start");
323
324        let num_digests = digests.len();
325        if num_digests > *QUERY_MAX_RESULT_LIMIT {
326            Err(SuiRpcInputError::SizeLimitExceeded(
327                QUERY_MAX_RESULT_LIMIT.to_string(),
328            ))?
329        }
330        self.metrics
331            .get_tx_blocks_limit
332            .observe(digests.len() as f64);
333
334        let opts = opts.unwrap_or_default();
335
336        // use LinkedHashMap to dedup and can iterate in insertion order.
337        let mut temp_response: IndexMap<&TransactionDigest, IntermediateTransactionResponse> =
338            IndexMap::from_iter(
339                digests
340                    .iter()
341                    .map(|k| (k, IntermediateTransactionResponse::new(*k))),
342            );
343        if temp_response.len() < num_digests {
344            Err(SuiRpcInputError::ContainsDuplicates)?
345        }
346
347        if opts.require_input() {
348            trace!("getting input");
349            let digests_clone = digests.clone();
350            let transactions =
351                self.transaction_kv_store.multi_get_tx(&digests_clone).await.tap_err(
352                    |err| debug!(digests=?digests_clone, "Failed to multi get transactions: {:?}", err),
353                )?;
354
355            for ((_digest, cache_entry), txn) in
356                temp_response.iter_mut().zip(transactions.into_iter())
357            {
358                cache_entry.transaction = txn;
359            }
360        }
361
362        // Fetch effects when `show_events` is true because events relies on effects
363        if opts.require_effects() {
364            trace!("getting effects");
365            let digests_clone = digests.clone();
366            let effects_list = self.transaction_kv_store
367                .multi_get_fx_by_tx_digest(&digests_clone)
368                .await
369                .tap_err(
370                    |err| debug!(digests=?digests_clone, "Failed to multi get effects for transactions: {:?}", err),
371                )?;
372            for ((_digest, cache_entry), e) in
373                temp_response.iter_mut().zip(effects_list.into_iter())
374            {
375                cache_entry.effects = e;
376            }
377        }
378
379        trace!("getting checkpoint sequence numbers");
380        let checkpoint_seq_list = self
381            .transaction_kv_store
382            .multi_get_transaction_checkpoint(&digests)
383            .await
384            .tap_err(
385                |err| debug!(digests=?digests, "Failed to multi get checkpoint sequence number: {:?}", err))?;
386        for ((_digest, cache_entry), seq) in temp_response
387            .iter_mut()
388            .zip(checkpoint_seq_list.into_iter())
389        {
390            cache_entry.checkpoint_seq = seq;
391        }
392
393        let unique_checkpoint_numbers = temp_response
394            .values()
395            .filter_map(|cache_entry| cache_entry.checkpoint_seq)
396            // It's likely that many transactions have the same checkpoint, so we don't
397            // need to over-fetch
398            .unique()
399            .collect::<Vec<CheckpointSequenceNumber>>();
400
401        // fetch timestamp from the DB
402        trace!("getting checkpoint summaries");
403        let timestamps = self
404            .transaction_kv_store
405            .multi_get_checkpoints_summaries(&unique_checkpoint_numbers)
406            .await
407            .map_err(|e| {
408                Error::UnexpectedError(format!("Failed to fetch checkpoint summaries by these checkpoint ids: {unique_checkpoint_numbers:?} with error: {e:?}"))
409            })?
410            .into_iter()
411            .map(|c| c.map(|checkpoint| checkpoint.timestamp_ms));
412
413        // construct a hashmap of checkpoint -> timestamp for fast lookup
414        let checkpoint_to_timestamp = unique_checkpoint_numbers
415            .into_iter()
416            .zip(timestamps)
417            .collect::<HashMap<_, _>>();
418
419        // fill cache with the timestamp
420        for (_, cache_entry) in temp_response.iter_mut() {
421            if cache_entry.checkpoint_seq.is_some() {
422                // safe to unwrap because is_some is checked
423                cache_entry.timestamp = *checkpoint_to_timestamp
424                    .get(cache_entry.checkpoint_seq.as_ref().unwrap())
425                    // Safe to unwrap because checkpoint_seq is guaranteed to exist in checkpoint_to_timestamp
426                    .unwrap();
427            }
428        }
429
430        if opts.show_events {
431            trace!("getting events");
432            let mut non_empty_digests = vec![];
433            for cache_entry in temp_response.values() {
434                if let Some(effects) = &cache_entry.effects
435                    && effects.events_digest().is_some()
436                {
437                    non_empty_digests.push(cache_entry.digest);
438                }
439            }
440            // fetch events from the DB with retry, retry each 0.5s for 3s
441            let backoff = ExponentialBackoff {
442                max_elapsed_time: Some(Duration::from_secs(3)),
443                multiplier: 1.0,
444                ..ExponentialBackoff::default()
445            };
446            let mut events = retry(backoff, || async {
447                match self
448                    .transaction_kv_store
449                    .multi_get_events_by_tx_digests(&non_empty_digests)
450                    .await
451                {
452                    // Only return Ok when all the queried transaction events are found, otherwise retry
453                    // until timeout, then return Err.
454                    Ok(events) if !events.contains(&None) => Ok(events),
455                    Ok(_) => Err(backoff::Error::transient(Error::UnexpectedError(
456                        "Events not found, transaction execution may be incomplete.".into(),
457                    ))),
458                    Err(e) => Err(backoff::Error::permanent(Error::UnexpectedError(format!(
459                        "Failed to call multi_get_events: {e:?}"
460                    )))),
461                }
462            })
463            .await
464            .map_err(|e| {
465                Error::UnexpectedError(format!(
466                    "Retrieving events with retry failed for transaction digests {digests:?}: {e:?}"
467                ))
468            })?
469            .into_iter();
470
471            // fill cache with the events
472            for (_, cache_entry) in temp_response.iter_mut() {
473                let transaction_digest = cache_entry.digest;
474                if let Some(events_digest) =
475                    cache_entry.effects.as_ref().and_then(|e| e.events_digest())
476                {
477                    match events.next() {
478                        Some(Some(ev)) => {
479                            cache_entry.events =
480                                Some(to_sui_transaction_events(self, cache_entry.digest, ev)?)
481                        }
482                        None | Some(None) => {
483                            error!(
484                                "Failed to fetch events with event digest {events_digest:?} for txn {transaction_digest}"
485                            );
486                            cache_entry.errors.push(format!(
487                                "Failed to fetch events with event digest {events_digest:?}",
488                            ))
489                        }
490                    }
491                } else {
492                    // events field will be Some if and only if `show_events` is true and
493                    // there is no error in converting fetching events
494                    cache_entry.events = Some(SuiTransactionBlockEvents::default());
495                }
496            }
497        }
498
499        let mut object_cache =
500            ObjectProviderCache::new((self.state.clone(), self.transaction_kv_store.clone()));
501
502        // Prefetch the objects if we need to show balance or object changes
503        if opts.show_balance_changes || opts.show_object_changes {
504            let mut keys = vec![];
505            for resp in temp_response.values() {
506                let effects = resp.effects.as_ref().ok_or_else(|| {
507                    SuiRpcInputError::GenericNotFound(
508                        "unable to derive balance/object changes because effect is empty"
509                            .to_string(),
510                    )
511                })?;
512
513                for change in effects.object_changes() {
514                    if let Some(input_version) = change.input_version {
515                        keys.push(ObjectKey(change.id, input_version));
516                    }
517                    if let Some(output_version) = change.output_version {
518                        keys.push(ObjectKey(change.id, output_version));
519                    }
520                }
521            }
522
523            let objects = self
524                .transaction_kv_store
525                .multi_get_objects(&keys)
526                .await?
527                .into_iter()
528                .flatten()
529                .collect::<Vec<_>>();
530
531            object_cache.insert_objects_into_cache(objects);
532        }
533
534        if opts.show_balance_changes {
535            trace!("getting balance changes");
536
537            let mut results = vec![];
538            for resp in temp_response.values() {
539                let input_objects = if let Some(tx) = resp.transaction() {
540                    tx.data()
541                        .inner()
542                        .intent_message
543                        .value
544                        .input_objects()
545                        .unwrap_or_default()
546                } else {
547                    // don't have the input tx, so not much we can do. perhaps this is an Err?
548                    Vec::new()
549                };
550                results.push(get_balance_changes_from_effect(
551                    &object_cache,
552                    resp.effects.as_ref().ok_or_else(|| {
553                        SuiRpcInputError::GenericNotFound(
554                            "unable to derive balance changes because effect is empty".to_string(),
555                        )
556                    })?,
557                    input_objects,
558                    None,
559                ));
560            }
561            let results = join_all(results).await;
562            for (result, entry) in results.into_iter().zip(temp_response.iter_mut()) {
563                match result {
564                    Ok(balance_changes) => entry.1.balance_changes = Some(balance_changes),
565                    Err(e) => entry
566                        .1
567                        .errors
568                        .push(format!("Failed to fetch balance changes {e:?}")),
569                }
570            }
571        }
572
573        if opts.show_object_changes {
574            trace!("getting object changes");
575
576            let mut results = vec![];
577            for resp in temp_response.values() {
578                let effects = resp.effects.as_ref().ok_or_else(|| {
579                    SuiRpcInputError::GenericNotFound(
580                        "unable to derive object changes because effect is empty".to_string(),
581                    )
582                })?;
583
584                results.push(get_object_changes(
585                    &object_cache,
586                    effects,
587                    resp.transaction
588                        .as_ref()
589                        .ok_or_else(|| {
590                            SuiRpcInputError::GenericNotFound(
591                                "unable to derive object changes because transaction is empty"
592                                    .to_string(),
593                            )
594                        })?
595                        .data()
596                        .intent_message()
597                        .value
598                        .sender(),
599                    effects.modified_at_versions(),
600                    effects.all_changed_objects(),
601                    effects.all_removed_objects(),
602                ));
603            }
604            let results = join_all(results).await;
605            for (result, entry) in results.into_iter().zip(temp_response.iter_mut()) {
606                match result {
607                    Ok(object_changes) => entry.1.object_changes = Some(object_changes),
608                    Err(e) => entry
609                        .1
610                        .errors
611                        .push(format!("Failed to fetch object changes {e:?}")),
612                }
613            }
614        }
615
616        let epoch_store = self.state.load_epoch_store_one_call_per_task();
617        let converted_tx_block_resps = temp_response
618            .into_iter()
619            .map(|c| convert_to_response(c.1, &opts, epoch_store.module_cache()))
620            .collect::<Result<Vec<_>, _>>()?;
621
622        self.metrics
623            .get_tx_blocks_result_size
624            .observe(converted_tx_block_resps.len() as f64);
625        self.metrics
626            .get_tx_blocks_result_size_total
627            .inc_by(converted_tx_block_resps.len() as u64);
628
629        trace!("done");
630
631        Ok(converted_tx_block_resps)
632    }
633}
634
635#[async_trait]
636impl ReadApiServer for ReadApi {
637    #[instrument(skip(self))]
638    async fn get_object(
639        &self,
640        object_id: ObjectID,
641        options: Option<SuiObjectDataOptions>,
642    ) -> RpcResult<SuiObjectResponse> {
643        with_tracing!(async move {
644            let state = self.state.clone();
645            let object_read = state.get_object_read(&object_id).map_err(|e| {
646                warn!(?object_id, "Failed to get object: {:?}", e);
647                Error::from(e)
648            })?;
649            let options = options.unwrap_or_default();
650
651            match object_read {
652                ObjectRead::NotExists(id) => Ok(SuiObjectResponse::new_with_error(
653                    SuiObjectResponseError::NotExists { object_id: id },
654                )),
655                ObjectRead::Exists(object_ref, o, layout) => {
656                    let mut display_fields = None;
657                    if options.show_display {
658                        match get_display_fields(self, &self.transaction_kv_store, &o, &layout)
659                            .await
660                        {
661                            Ok(rendered_fields) => display_fields = Some(rendered_fields),
662                            Err(e) => {
663                                return Ok(SuiObjectResponse::new(
664                                    Some((object_ref, o, layout, options, None).try_into()?),
665                                    Some(SuiObjectResponseError::DisplayError {
666                                        error: e.to_string(),
667                                    }),
668                                ));
669                            }
670                        }
671                    }
672                    Ok(SuiObjectResponse::new_with_data(
673                        (object_ref, o, layout, options, display_fields).try_into()?,
674                    ))
675                }
676                ObjectRead::Deleted((object_id, version, digest)) => Ok(
677                    SuiObjectResponse::new_with_error(SuiObjectResponseError::Deleted {
678                        object_id,
679                        version,
680                        digest,
681                    }),
682                ),
683            }
684        })
685    }
686
687    #[instrument(skip(self))]
688    async fn multi_get_objects(
689        &self,
690        object_ids: Vec<ObjectID>,
691        options: Option<SuiObjectDataOptions>,
692    ) -> RpcResult<Vec<SuiObjectResponse>> {
693        with_tracing!(async move {
694            if object_ids.len() <= *QUERY_MAX_RESULT_LIMIT {
695                self.metrics
696                    .get_objects_limit
697                    .observe(object_ids.len() as f64);
698                let mut futures = vec![];
699                for object_id in object_ids {
700                    futures.push(self.get_object(object_id, options.clone()));
701                }
702                let results = join_all(futures).await;
703
704                let objects_result: Result<Vec<SuiObjectResponse>, String> = results
705                    .into_iter()
706                    .map(|result| match result {
707                        Ok(response) => Ok(response),
708                        Err(error) => {
709                            error!("Failed to fetch object with error: {error:?}");
710                            Err(format!("Error: {}", error))
711                        }
712                    })
713                    .collect();
714
715                let objects = objects_result.map_err(|err| {
716                    Error::UnexpectedError(format!("Failed to fetch objects with error: {}", err))
717                })?;
718
719                self.metrics
720                    .get_objects_result_size
721                    .observe(objects.len() as f64);
722                self.metrics
723                    .get_objects_result_size_total
724                    .inc_by(objects.len() as u64);
725                Ok(objects)
726            } else {
727                Err(SuiRpcInputError::SizeLimitExceeded(
728                    QUERY_MAX_RESULT_LIMIT.to_string(),
729                ))?
730            }
731        })
732    }
733
734    #[instrument(skip(self))]
735    async fn try_get_past_object(
736        &self,
737        object_id: ObjectID,
738        version: SequenceNumber,
739        options: Option<SuiObjectDataOptions>,
740    ) -> RpcResult<SuiPastObjectResponse> {
741        with_tracing!(async move {
742            let state = self.state.clone();
743            let past_read = state
744                .get_past_object_read(&object_id, version)
745                .map_err(|e| {
746                    error!("Failed to call try_get_past_object for object: {object_id:?} version: {version:?} with error: {e:?}");
747                    Error::from(e)
748                })?;
749            let options = options.unwrap_or_default();
750            match past_read {
751                PastObjectRead::ObjectNotExists(id) => {
752                    Ok(SuiPastObjectResponse::ObjectNotExists(id))
753                }
754                PastObjectRead::VersionFound(object_ref, o, layout) => {
755                    let display_fields = if options.show_display {
756                        // TODO (jian): api breaking change to also modify past objects.
757                        Some(
758                            get_display_fields(self, &self.transaction_kv_store, &o, &layout)
759                                .await
760                                .map_err(|e| {
761                                    Error::UnexpectedError(format!(
762                                        "Unable to render object at version {version}: {e}"
763                                    ))
764                                })?,
765                        )
766                    } else {
767                        None
768                    };
769                    Ok(SuiPastObjectResponse::VersionFound(
770                        (object_ref, o, layout, options, display_fields).try_into()?,
771                    ))
772                }
773                PastObjectRead::ObjectDeleted(oref) => {
774                    Ok(SuiPastObjectResponse::ObjectDeleted(oref.into()))
775                }
776                PastObjectRead::VersionNotFound(id, seq_num) => {
777                    Ok(SuiPastObjectResponse::VersionNotFound(id, seq_num))
778                }
779                PastObjectRead::VersionTooHigh {
780                    object_id,
781                    asked_version,
782                    latest_version,
783                } => Ok(SuiPastObjectResponse::VersionTooHigh {
784                    object_id,
785                    asked_version,
786                    latest_version,
787                }),
788            }
789        })
790    }
791
792    #[instrument(skip(self))]
793    async fn try_get_object_before_version(
794        &self,
795        object_id: ObjectID,
796        version: SequenceNumber,
797    ) -> RpcResult<SuiPastObjectResponse> {
798        let version = self
799            .state
800            .find_object_lt_or_eq_version(&object_id, &version)
801            .await
802            .map_err(Error::from)?
803            .map(|obj| obj.version())
804            .unwrap_or_default();
805        self.try_get_past_object(
806            object_id,
807            version,
808            Some(SuiObjectDataOptions::bcs_lossless()),
809        )
810        .await
811    }
812
813    #[instrument(skip(self))]
814    async fn try_multi_get_past_objects(
815        &self,
816        past_objects: Vec<SuiGetPastObjectRequest>,
817        options: Option<SuiObjectDataOptions>,
818    ) -> RpcResult<Vec<SuiPastObjectResponse>> {
819        with_tracing!(async move {
820            if past_objects.len() <= *QUERY_MAX_RESULT_LIMIT {
821                let mut futures = vec![];
822                for past_object in past_objects {
823                    futures.push(self.try_get_past_object(
824                        past_object.object_id,
825                        past_object.version,
826                        options.clone(),
827                    ));
828                }
829                let results = join_all(futures).await;
830
831                let (oks, errs): (Vec<_>, Vec<_>) = results.into_iter().partition(Result::is_ok);
832                let success = oks.into_iter().filter_map(Result::ok).collect();
833                let errors: Vec<_> = errs.into_iter().filter_map(Result::err).collect();
834                if !errors.is_empty() {
835                    let error_string = errors
836                        .iter()
837                        .map(|e| e.to_string())
838                        .collect::<Vec<String>>()
839                        .join("; ");
840                    Err(anyhow!("{error_string}").into()) // Collects errors not related to SuiPastObjectResponse variants
841                } else {
842                    Ok(success)
843                }
844            } else {
845                Err(SuiRpcInputError::SizeLimitExceeded(
846                    QUERY_MAX_RESULT_LIMIT.to_string(),
847                ))?
848            }
849        })
850    }
851
852    #[instrument(skip(self))]
853    async fn get_total_transaction_blocks(&self) -> RpcResult<BigInt<u64>> {
854        with_tracing!(async move {
855            Ok(self
856                .state
857                .get_total_transaction_blocks()
858                .map_err(Error::from)?
859                .into()) // converts into BigInt<u64>
860        })
861    }
862
863    #[instrument(skip(self))]
864    async fn get_transaction_block(
865        &self,
866        digest: TransactionDigest,
867        opts: Option<SuiTransactionBlockResponseOptions>,
868    ) -> RpcResult<SuiTransactionBlockResponse> {
869        with_tracing!(async move {
870            let opts = opts.unwrap_or_default();
871            let mut temp_response = IntermediateTransactionResponse::new(digest);
872
873            // Fetch transaction to determine existence
874            let transaction_kv_store = self.transaction_kv_store.clone();
875            let transaction = async move {
876                let ret = transaction_kv_store.get_tx(digest).await.map_err(|err| {
877                    debug!(tx_digest=?digest, "Failed to get transaction: {}", err);
878                    Error::from(err)
879                });
880                add_server_timing("tx_kv_lookup");
881                ret
882            }
883            .await?;
884            let input_objects = transaction
885                .data()
886                .inner()
887                .intent_message
888                .value
889                .input_objects()
890                .unwrap_or_default();
891
892            // the input is needed for object_changes to retrieve the sender address.
893            if opts.require_input() {
894                temp_response.transaction = Some(transaction);
895            }
896
897            // Fetch effects when `show_events` is true because events relies on effects
898            if opts.require_effects() {
899                let transaction_kv_store = self.transaction_kv_store.clone();
900                temp_response.effects = Some(
901                    transaction_kv_store
902                        .get_fx_by_tx_digest(digest)
903                        .await
904                        .map_err(|err| {
905                            debug!(tx_digest=?digest, "Failed to get effects: {:?}", err);
906                            Error::from(err)
907                        })?,
908                );
909            }
910
911            temp_response.checkpoint_seq = self
912                .transaction_kv_store
913                .deprecated_get_transaction_checkpoint(digest)
914                .await
915                .map_err(|e| {
916                    error!("Failed to retrieve checkpoint sequence for transaction {digest:?} with error: {e:?}");
917                    Error::from(e)
918                })?;
919
920            if let Some(checkpoint_seq) = &temp_response.checkpoint_seq {
921                let kv_store = self.transaction_kv_store.clone();
922                let checkpoint_seq = *checkpoint_seq;
923                let checkpoint = kv_store
924                    // safe to unwrap because we have checked `is_some` above
925                    .get_checkpoint_summary(checkpoint_seq)
926                    .await
927                    .map_err(|e| {
928                        error!("Failed to get checkpoint by sequence number: {checkpoint_seq:?} with error: {e:?}");
929                        Error::from(e)
930                    })?;
931                // TODO(chris): we don't need to fetch the whole checkpoint summary
932                temp_response.timestamp = Some(checkpoint.timestamp_ms);
933            }
934
935            if opts.show_events && temp_response.effects.is_some() {
936                let transaction_kv_store = self.transaction_kv_store.clone();
937                let events = transaction_kv_store
938                    .multi_get_events_by_tx_digests(&[digest])
939                    .await
940                    .map_err(|e| {
941                        error!("Failed to call get transaction events for transaction: {digest:?} with error {e:?}");
942                        Error::from(e)
943                    })?
944                    .pop()
945                    .flatten();
946                match events {
947                    None => temp_response.events = Some(SuiTransactionBlockEvents::default()),
948                    Some(events) => match to_sui_transaction_events(self, digest, events) {
949                        Ok(e) => temp_response.events = Some(e),
950                        Err(e) => temp_response.errors.push(e.to_string()),
951                    },
952                }
953            }
954
955            let object_cache =
956                ObjectProviderCache::new((self.state.clone(), self.transaction_kv_store.clone()));
957            if opts.show_balance_changes
958                && let Some(effects) = &temp_response.effects
959            {
960                let balance_changes =
961                    get_balance_changes_from_effect(&object_cache, effects, input_objects, None)
962                        .await;
963
964                if let Ok(balance_changes) = balance_changes {
965                    temp_response.balance_changes = Some(balance_changes);
966                } else {
967                    temp_response.errors.push(format!(
968                        "Cannot retrieve balance changes: {}",
969                        balance_changes.unwrap_err()
970                    ));
971                }
972            }
973
974            if opts.show_object_changes
975                && let (Some(effects), Some(input)) =
976                    (&temp_response.effects, &temp_response.transaction)
977            {
978                let sender = input.data().intent_message().value.sender();
979                let object_changes = get_object_changes(
980                    &object_cache,
981                    effects,
982                    sender,
983                    effects.modified_at_versions(),
984                    effects.all_changed_objects(),
985                    effects.all_removed_objects(),
986                )
987                .await;
988
989                if let Ok(object_changes) = object_changes {
990                    temp_response.object_changes = Some(object_changes);
991                } else {
992                    temp_response.errors.push(format!(
993                        "Cannot retrieve object changes: {}",
994                        object_changes.unwrap_err()
995                    ));
996                }
997            }
998            let epoch_store = self.state.load_epoch_store_one_call_per_task();
999            convert_to_response(temp_response, &opts, epoch_store.module_cache())
1000        })
1001    }
1002
1003    #[instrument(skip(self))]
1004    async fn multi_get_transaction_blocks(
1005        &self,
1006        digests: Vec<TransactionDigest>,
1007        opts: Option<SuiTransactionBlockResponseOptions>,
1008    ) -> RpcResult<Vec<SuiTransactionBlockResponse>> {
1009        with_tracing!(async move {
1010            self.multi_get_transaction_blocks_internal(digests, opts)
1011                .await
1012        })
1013    }
1014
1015    #[instrument(skip(self))]
1016    async fn get_events(&self, transaction_digest: TransactionDigest) -> RpcResult<Vec<SuiEvent>> {
1017        with_tracing!(async move {
1018            let state = self.state.clone();
1019            let transaction_kv_store = self.transaction_kv_store.clone();
1020            async move {
1021                let store = state.load_epoch_store_one_call_per_task();
1022                let events = transaction_kv_store
1023                    .multi_get_events_by_tx_digests(&[transaction_digest])
1024                    .await
1025                    .map_err(|e| {
1026                        error!("Failed to get transaction events for transaction {transaction_digest:?} with error: {e:?}");
1027                        Error::StateReadError(e.into())
1028                    })?
1029                    .pop()
1030                    .flatten();
1031                Ok(match events {
1032                    Some(events) => events
1033                        .data
1034                        .into_iter()
1035                        .enumerate()
1036                        .map(|(seq, e)| {
1037                            let layout = store
1038                                .executor()
1039                                .type_layout_resolver(Box::new(
1040                                    &state.get_backing_package_store().as_ref(),
1041                                ))
1042                                .get_annotated_layout(&e.type_)?;
1043                            SuiEvent::try_from(e, transaction_digest, seq as u64, None, layout)
1044                        })
1045                        .collect::<Result<Vec<_>, _>>()
1046                        .map_err(Error::SuiError)?,
1047                    None => vec![],
1048                })
1049            }
1050            .await
1051        })
1052    }
1053
1054    #[instrument(skip(self))]
1055    async fn get_latest_checkpoint_sequence_number(&self) -> RpcResult<BigInt<u64>> {
1056        with_tracing!(async move {
1057            Ok(self
1058                .state
1059                .get_latest_checkpoint_sequence_number()
1060                .map_err(|e| {
1061                    SuiRpcInputError::GenericNotFound(format!(
1062                        "Latest checkpoint sequence number was not found with error :{e}"
1063                    ))
1064                })?
1065                .into())
1066        })
1067    }
1068
1069    #[instrument(skip(self))]
1070    async fn get_checkpoint(&self, id: CheckpointId) -> RpcResult<Checkpoint> {
1071        with_tracing!(self.get_checkpoint_internal(id))
1072    }
1073
1074    #[instrument(skip(self))]
1075    async fn get_checkpoints(
1076        &self,
1077        // If `Some`, the query will start from the next item after the specified cursor
1078        cursor: Option<BigInt<u64>>,
1079        limit: Option<usize>,
1080        descending_order: bool,
1081    ) -> RpcResult<CheckpointPage> {
1082        with_tracing!(async move {
1083            let limit = validate_limit(limit, QUERY_MAX_RESULT_LIMIT_CHECKPOINTS)
1084                .map_err(SuiRpcInputError::from)?;
1085
1086            let state = self.state.clone();
1087            let kv_store = self.transaction_kv_store.clone();
1088
1089            self.metrics.get_checkpoints_limit.observe(limit as f64);
1090
1091            let mut data = Self::get_checkpoints_internal(
1092                state,
1093                kv_store,
1094                cursor.map(|s| *s),
1095                limit as u64 + 1,
1096                descending_order,
1097            )
1098            .await
1099            .map_err(Error::from)?;
1100
1101            let has_next_page = data.len() > limit;
1102            data.truncate(limit);
1103
1104            let next_cursor = if has_next_page {
1105                data.last().cloned().map(|d| d.sequence_number.into())
1106            } else {
1107                None
1108            };
1109
1110            self.metrics
1111                .get_checkpoints_result_size
1112                .observe(data.len() as f64);
1113            self.metrics
1114                .get_checkpoints_result_size_total
1115                .inc_by(data.len() as u64);
1116
1117            Ok(CheckpointPage {
1118                data,
1119                next_cursor,
1120                has_next_page,
1121            })
1122        })
1123    }
1124
1125    #[instrument(skip(self))]
1126    async fn get_protocol_config(
1127        &self,
1128        version: Option<BigInt<u64>>,
1129    ) -> RpcResult<ProtocolConfigResponse> {
1130        with_tracing!(async move {
1131            version
1132                .map(|v| {
1133                    ProtocolConfig::get_for_version_if_supported(
1134                        (*v).into(),
1135                        self.state.get_chain_identifier()?.chain(),
1136                    )
1137                    .ok_or(SuiRpcInputError::ProtocolVersionUnsupported(
1138                        ProtocolVersion::MIN.as_u64(),
1139                        ProtocolVersion::MAX.as_u64(),
1140                    ))
1141                    .map_err(Error::from)
1142                })
1143                .unwrap_or(Ok(self
1144                    .state
1145                    .load_epoch_store_one_call_per_task()
1146                    .protocol_config()
1147                    .clone()))
1148                .map(ProtocolConfigResponse::from)
1149        })
1150    }
1151
1152    #[instrument(skip(self))]
1153    async fn get_chain_identifier(&self) -> RpcResult<String> {
1154        with_tracing!(async move {
1155            let ci = self.state.get_chain_identifier()?;
1156            Ok(ci.to_string())
1157        })
1158    }
1159    #[instrument(skip(self))]
1160    async fn verify_zklogin_signature(
1161        &self,
1162        bytes: String,
1163        signature: String,
1164        intent_scope: ZkLoginIntentScope,
1165        author: SuiAddress,
1166    ) -> RpcResult<ZkLoginVerifyResult> {
1167        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1168        let curr_epoch = epoch_store.epoch();
1169        let zklogin_env_native = match self
1170            .state
1171            .get_chain_identifier()
1172            .expect("get chain identifier should not fail")
1173            .chain()
1174        {
1175            sui_protocol_config::Chain::Mainnet | sui_protocol_config::Chain::Testnet => {
1176                ZkLoginEnv::Prod
1177            }
1178            _ => ZkLoginEnv::Test,
1179        };
1180        let GenericSignature::ZkLoginAuthenticator(zklogin_sig) =
1181            GenericSignature::from_bytes(&Base64::decode(&signature).map_err(Error::from)?)
1182                .map_err(Error::from)?
1183        else {
1184            return Err(SuiRpcInputError::GenericNotFound(
1185                "Endpoint only supports zkLogin signature".to_string(),
1186            )
1187            .into());
1188        };
1189
1190        let new_jwks =
1191            match get_authenticator_state(self.state.get_object_store()).map_err(Error::from)? {
1192                Some(authenticator_state) => authenticator_state.active_jwks,
1193                None => {
1194                    return Err(SuiRpcInputError::GenericNotFound(
1195                        "Authenticator state not found".to_string(),
1196                    )
1197                    .into());
1198                }
1199            };
1200
1201        // construct verify params with active jwks and zklogin_env.
1202        let mut oidc_provider_jwks = ImHashMap::new();
1203        for active_jwk in new_jwks.iter() {
1204            let ActiveJwk { jwk_id, jwk, .. } = active_jwk;
1205            match oidc_provider_jwks.entry(jwk_id.clone()) {
1206                im::hashmap::Entry::Occupied(_) => {
1207                    warn!("JWK with kid {:?} already exists", jwk_id);
1208                }
1209                im::hashmap::Entry::Vacant(entry) => {
1210                    entry.insert(jwk.clone());
1211                }
1212            }
1213        }
1214        let verify_params = VerifyParams::new(
1215            oidc_provider_jwks,
1216            vec![],
1217            zklogin_env_native,
1218            true,
1219            true,
1220            true,
1221            Some(30),
1222            true,
1223            true,
1224        );
1225        match intent_scope {
1226            ZkLoginIntentScope::TransactionData => {
1227                let tx_data: TransactionData =
1228                    bcs::from_bytes(&Base64::decode(&bytes).map_err(Error::from)?)
1229                        .map_err(Error::from)?;
1230                let intent_msg = IntentMessage::new(Intent::sui_transaction(), tx_data.clone());
1231                let sig = GenericSignature::ZkLoginAuthenticator(zklogin_sig);
1232                match sig.verify_authenticator(
1233                    &intent_msg,
1234                    author,
1235                    curr_epoch,
1236                    &verify_params,
1237                    Arc::new(VerifiedDigestCache::new_empty()),
1238                ) {
1239                    Ok(_) => Ok(ZkLoginVerifyResult {
1240                        success: true,
1241                        errors: vec![],
1242                    }),
1243                    Err(e) => Ok(ZkLoginVerifyResult {
1244                        success: false,
1245                        errors: vec![e.to_string()],
1246                    }),
1247                }
1248            }
1249            ZkLoginIntentScope::PersonalMessage => {
1250                let data = PersonalMessage {
1251                    message: Base64::decode(&bytes).map_err(Error::from)?,
1252                };
1253                let intent_msg = IntentMessage::new(Intent::personal_message(), data);
1254
1255                let sig = GenericSignature::ZkLoginAuthenticator(zklogin_sig);
1256                match sig.verify_authenticator(
1257                    &intent_msg,
1258                    author,
1259                    curr_epoch,
1260                    &verify_params,
1261                    Arc::new(VerifiedDigestCache::new_empty()),
1262                ) {
1263                    Ok(_) => Ok(ZkLoginVerifyResult {
1264                        success: true,
1265                        errors: vec![],
1266                    }),
1267                    Err(e) => Ok(ZkLoginVerifyResult {
1268                        success: false,
1269                        errors: vec![e.to_string()],
1270                    }),
1271                }
1272            }
1273        }
1274    }
1275}
1276
1277impl SuiRpcModule for ReadApi {
1278    fn rpc(self) -> RpcModule<Self> {
1279        self.into_rpc()
1280    }
1281
1282    fn rpc_doc_module() -> Module {
1283        ReadApiOpenRpc::module_doc()
1284    }
1285}
1286
1287#[instrument(skip_all)]
1288fn to_sui_transaction_events(
1289    fullnode_api: &ReadApi,
1290    tx_digest: TransactionDigest,
1291    events: TransactionEvents,
1292) -> Result<SuiTransactionBlockEvents, Error> {
1293    let epoch_store = fullnode_api.state.load_epoch_store_one_call_per_task();
1294    let backing_package_store = fullnode_api.state.get_backing_package_store();
1295    let mut layout_resolver = epoch_store
1296        .executor()
1297        .type_layout_resolver(Box::new(backing_package_store.as_ref()));
1298    Ok(SuiTransactionBlockEvents::try_from(
1299        events,
1300        tx_digest,
1301        None,
1302        layout_resolver.as_mut(),
1303    )?)
1304}
1305
1306#[derive(Debug, thiserror::Error)]
1307pub enum ObjectDisplayError {
1308    #[error("Not a move struct")]
1309    NotMoveStruct,
1310
1311    #[error("Failed to extract layout")]
1312    Layout,
1313
1314    #[error("Failed to extract Move object")]
1315    MoveObject,
1316
1317    #[error(transparent)]
1318    Deserialization(#[from] SuiError),
1319
1320    #[error("Failed to deserialize 'VersionUpdatedEvent': {0}")]
1321    Bcs(#[from] bcs::Error),
1322
1323    #[error(transparent)]
1324    StateReadError(#[from] StateReadError),
1325
1326    #[error(transparent)]
1327    Internal(#[from] anyhow::Error),
1328}
1329
1330#[instrument(skip(fullnode_api, kv_store))]
1331async fn get_display_fields(
1332    fullnode_api: &ReadApi,
1333    kv_store: &Arc<TransactionKeyValueStore>,
1334    original_object: &Object,
1335    original_layout: &Option<MoveStructLayout>,
1336) -> Result<DisplayFieldsResponse, ObjectDisplayError> {
1337    let (layout, type_) = if let Some(layout) = original_layout {
1338        let type_ = &layout.type_;
1339        let layout = MoveTypeLayout::Struct(Box::new(layout.clone()));
1340        (layout, type_)
1341    } else {
1342        return Ok(DisplayFieldsResponse {
1343            data: None,
1344            error: None,
1345        });
1346    };
1347
1348    let Some(move_object) = original_object.data.try_as_move() else {
1349        return Err(ObjectDisplayError::MoveObject);
1350    };
1351
1352    let display: Vec<(String, Result<Json, anyhow::Error>)> =
1353        if let Some(display_object) = get_display_object_v2_by_type(fullnode_api, type_)? {
1354            let root = sui_display::v2::OwnedSlice {
1355                bytes: move_object.contents().to_owned(),
1356                layout,
1357            };
1358
1359            let store = DisplayStore::new(fullnode_api.state.as_ref());
1360            let interpreter = sui_display::v2::Interpreter::new(root, store);
1361            let limits = sui_display::v2::Limits {
1362                max_depth: *MAX_DISPLAY_FIELD_DEPTH,
1363                max_nodes: *MAX_DISPLAY_FORMAT_NODES,
1364                max_loads: *MAX_DISPLAY_OBJECT_LOADS,
1365            };
1366
1367            match sui_display::v2::Display::parse(limits, display_object.fields()) {
1368                Ok(display) => match display
1369                    .display::<Json>(
1370                        *MAX_DISPLAY_MOVE_VALUE_DEPTH,
1371                        *MAX_DISPLAY_OUTPUT_SIZE,
1372                        &interpreter,
1373                    )
1374                    .await
1375                {
1376                    Ok(fields) => fields
1377                        .into_iter()
1378                        .map(|(field, value)| (field, value.map_err(anyhow::Error::from)))
1379                        .collect(),
1380                    Err(e) => {
1381                        return Ok(DisplayFieldsResponse {
1382                            data: None,
1383                            error: Some(SuiObjectResponseError::DisplayError {
1384                                error: e.to_string(),
1385                            }),
1386                        });
1387                    }
1388                },
1389
1390                Err(e) => {
1391                    return Ok(DisplayFieldsResponse {
1392                        data: None,
1393                        error: Some(SuiObjectResponseError::DisplayError {
1394                            error: e.to_string(),
1395                        }),
1396                    });
1397                }
1398            }
1399        } else if let Some(display_object) =
1400            get_display_object_v1_by_type(kv_store, fullnode_api, type_).await?
1401        {
1402            let format = match Format::parse(*MAX_DISPLAY_FIELD_DEPTH, &display_object.fields) {
1403                Ok(format) => format,
1404                Err(e) => {
1405                    return Ok(DisplayFieldsResponse {
1406                        data: None,
1407                        error: Some(SuiObjectResponseError::DisplayError {
1408                            error: e.to_string(),
1409                        }),
1410                    });
1411                }
1412            };
1413
1414            match format.display(*MAX_DISPLAY_OUTPUT_SIZE, move_object.contents(), &layout) {
1415                Ok(fields) => fields
1416                    .into_iter()
1417                    .map(|(field, value)| (field, value.map(Json::String)))
1418                    .collect(),
1419                Err(e) => {
1420                    return Ok(DisplayFieldsResponse {
1421                        data: None,
1422                        error: Some(SuiObjectResponseError::DisplayError {
1423                            error: e.to_string(),
1424                        }),
1425                    });
1426                }
1427            }
1428        } else {
1429            return Ok(DisplayFieldsResponse {
1430                data: None,
1431                error: None,
1432            });
1433        };
1434
1435    let mut fields = BTreeMap::new();
1436    let mut errors = vec![];
1437
1438    for (key, value) in display {
1439        match value {
1440            Ok(v) => {
1441                fields.insert(key, v);
1442            }
1443            Err(e) => {
1444                errors.push(e.to_string());
1445            }
1446        }
1447    }
1448
1449    Ok(DisplayFieldsResponse {
1450        data: (!fields.is_empty()).then_some(fields),
1451        error: (!errors.is_empty()).then(|| SuiObjectResponseError::DisplayError {
1452            error: errors.join("; "),
1453        }),
1454    })
1455}
1456
1457#[instrument(skip(kv_store, fullnode_api))]
1458async fn get_display_object_v1_by_type(
1459    kv_store: &Arc<TransactionKeyValueStore>,
1460    fullnode_api: &ReadApi,
1461    object_type: &StructTag,
1462) -> Result<Option<DisplayVersionUpdatedEvent>, ObjectDisplayError> {
1463    let mut events = fullnode_api
1464        .state
1465        .query_events(
1466            kv_store,
1467            EventFilter::MoveEventType(DisplayVersionUpdatedEvent::type_(object_type)),
1468            None,
1469            1,
1470            true,
1471        )
1472        .await?;
1473
1474    // If there's any recent version of Display, give it to the client.
1475    if let Some(event) = events.pop() {
1476        let display: DisplayVersionUpdatedEvent = bcs::from_bytes(&event.bcs.into_bytes())?;
1477        Ok(Some(display))
1478    } else {
1479        Ok(None)
1480    }
1481}
1482
1483#[instrument(skip(fullnode_api))]
1484fn get_display_object_v2_by_type(
1485    fullnode_api: &ReadApi,
1486    object_type: &StructTag,
1487) -> Result<Option<display_registry::Display>, ObjectDisplayError> {
1488    let object_id = display_registry::display_object_id(object_type.clone().into())?;
1489    let ObjectRead::Exists(_, object, _) = fullnode_api.state.get_object_read(&object_id)? else {
1490        return Ok(None);
1491    };
1492
1493    let Some(move_object) = object.data.try_as_move() else {
1494        return Ok(None);
1495    };
1496
1497    Ok(Some(bcs::from_bytes(move_object.contents())?))
1498}
1499
1500#[instrument(skip_all)]
1501fn convert_to_response(
1502    cache: IntermediateTransactionResponse,
1503    opts: &SuiTransactionBlockResponseOptions,
1504    module_cache: &impl GetModule,
1505) -> RpcInterimResult<SuiTransactionBlockResponse> {
1506    let mut response = SuiTransactionBlockResponse::new(cache.digest);
1507    response.errors = cache.errors;
1508
1509    if let Some(transaction) = cache.transaction {
1510        if opts.show_raw_input {
1511            response.raw_transaction = bcs::to_bytes(transaction.data()).map_err(|e| {
1512                // TODO: is this a client or server error?
1513                anyhow!("Failed to serialize raw transaction with error: {e}")
1514            })?;
1515        }
1516
1517        if opts.show_input {
1518            response.transaction = Some(SuiTransactionBlock::try_from(
1519                transaction.into_data(),
1520                module_cache,
1521            )?);
1522        }
1523    }
1524
1525    if let Some(effects) = cache.effects {
1526        if opts.show_raw_effects {
1527            response.raw_effects = bcs::to_bytes(&effects).map_err(|e| {
1528                // TODO: is this a client or server error?
1529                anyhow!("Failed to serialize transaction block effects with error: {e}")
1530            })?;
1531        }
1532
1533        if opts.show_effects {
1534            response.effects = Some(effects.try_into().map_err(|e| {
1535                // TODO: is this a client or server error?
1536                anyhow!("Failed to convert transaction block effects with error: {e}")
1537            })?);
1538        }
1539    }
1540
1541    response.checkpoint = cache.checkpoint_seq;
1542    response.timestamp_ms = cache.timestamp;
1543
1544    if opts.show_events {
1545        response.events = cache.events;
1546    }
1547
1548    if opts.show_balance_changes {
1549        response.balance_changes = cache.balance_changes;
1550    }
1551
1552    if opts.show_object_changes {
1553        response.object_changes = cache.object_changes;
1554    }
1555
1556    Ok(response)
1557}
1558
1559fn calculate_checkpoint_numbers(
1560    // If `Some`, the query will start from the next item after the specified cursor
1561    cursor: Option<CheckpointSequenceNumber>,
1562    limit: u64,
1563    descending_order: bool,
1564    max_checkpoint: CheckpointSequenceNumber,
1565) -> Vec<CheckpointSequenceNumber> {
1566    let (start_index, end_index) = match cursor {
1567        Some(t) => {
1568            if descending_order {
1569                let start = std::cmp::min(t.saturating_sub(1), max_checkpoint);
1570                let end = start.saturating_sub(limit - 1);
1571                (end, start)
1572            } else {
1573                let start =
1574                    std::cmp::min(t.checked_add(1).unwrap_or(max_checkpoint), max_checkpoint);
1575                let end = std::cmp::min(
1576                    start.checked_add(limit - 1).unwrap_or(max_checkpoint),
1577                    max_checkpoint,
1578                );
1579                (start, end)
1580            }
1581        }
1582        None => {
1583            if descending_order {
1584                (max_checkpoint.saturating_sub(limit - 1), max_checkpoint)
1585            } else {
1586                (0, std::cmp::min(limit - 1, max_checkpoint))
1587            }
1588        }
1589    };
1590
1591    if descending_order {
1592        (start_index..=end_index).rev().collect()
1593    } else {
1594        (start_index..=end_index).collect()
1595    }
1596}
1597
1598#[cfg(test)]
1599mod tests {
1600    use super::*;
1601
1602    #[test]
1603    fn test_calculate_checkpoint_numbers() {
1604        let cursor = Some(10);
1605        let limit = 5;
1606        let descending_order = true;
1607        let max_checkpoint = 15;
1608
1609        let checkpoint_numbers =
1610            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1611
1612        assert_eq!(checkpoint_numbers, vec![9, 8, 7, 6, 5]);
1613    }
1614
1615    #[test]
1616    fn test_calculate_checkpoint_numbers_descending_no_cursor() {
1617        let cursor = None;
1618        let limit = 5;
1619        let descending_order = true;
1620        let max_checkpoint = 15;
1621
1622        let checkpoint_numbers =
1623            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1624
1625        assert_eq!(checkpoint_numbers, vec![15, 14, 13, 12, 11]);
1626    }
1627
1628    #[test]
1629    fn test_calculate_checkpoint_numbers_ascending_no_cursor() {
1630        let cursor = None;
1631        let limit = 5;
1632        let descending_order = false;
1633        let max_checkpoint = 15;
1634
1635        let checkpoint_numbers =
1636            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1637
1638        assert_eq!(checkpoint_numbers, vec![0, 1, 2, 3, 4]);
1639    }
1640
1641    #[test]
1642    fn test_calculate_checkpoint_numbers_ascending_with_cursor() {
1643        let cursor = Some(10);
1644        let limit = 5;
1645        let descending_order = false;
1646        let max_checkpoint = 15;
1647
1648        let checkpoint_numbers =
1649            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1650
1651        assert_eq!(checkpoint_numbers, vec![11, 12, 13, 14, 15]);
1652    }
1653
1654    #[test]
1655    fn test_calculate_checkpoint_numbers_ascending_limit_exceeds_max() {
1656        let cursor = None;
1657        let limit = 20;
1658        let descending_order = false;
1659        let max_checkpoint = 15;
1660
1661        let checkpoint_numbers =
1662            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1663
1664        assert_eq!(checkpoint_numbers, (0..=15).collect::<Vec<_>>());
1665    }
1666
1667    #[test]
1668    fn test_calculate_checkpoint_numbers_descending_limit_exceeds_max() {
1669        let cursor = None;
1670        let limit = 20;
1671        let descending_order = true;
1672        let max_checkpoint = 15;
1673
1674        let checkpoint_numbers =
1675            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1676
1677        assert_eq!(checkpoint_numbers, (0..=15).rev().collect::<Vec<_>>());
1678    }
1679}