sui_json_rpc/
read_api.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::{BTreeMap, HashMap};
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::anyhow;
9use async_trait::async_trait;
10use backoff::ExponentialBackoff;
11use backoff::future::retry;
12use fastcrypto::encoding::Base64;
13use fastcrypto_zkp::bn254::zk_login_api::ZkLoginEnv;
14use futures::future::join_all;
15use im::hashmap::HashMap as ImHashMap;
16use indexmap::map::IndexMap;
17use itertools::Itertools;
18use jsonrpsee::RpcModule;
19use jsonrpsee::core::RpcResult;
20use move_bytecode_utils::module_cache::GetModule;
21use move_core_types::annotated_value::{MoveStructLayout, MoveTypeLayout};
22use move_core_types::language_storage::StructTag;
23use once_cell::sync::Lazy;
24use shared_crypto::intent::{IntentMessage, PersonalMessage};
25use sui_display::v1::Format;
26use sui_json_rpc_types::ZkLoginIntentScope;
27use sui_types::base_types::SuiAddress;
28use sui_types::signature::{GenericSignature, VerifyParams};
29use sui_types::signature_verification::VerifiedDigestCache;
30use sui_types::storage::ObjectKey;
31use tap::TapFallible;
32use tracing::{debug, error, info, instrument, trace, warn};
33
34use mysten_metrics::add_server_timing;
35use sui_core::authority::AuthorityState;
36use sui_json_rpc_api::{
37    JsonRpcMetrics, QUERY_MAX_RESULT_LIMIT, QUERY_MAX_RESULT_LIMIT_CHECKPOINTS, ReadApiOpenRpc,
38    ReadApiServer, validate_limit,
39};
40use sui_json_rpc_types::{
41    BalanceChange, Checkpoint, CheckpointId, CheckpointPage, DisplayFieldsResponse, EventFilter,
42    ObjectChange, ProtocolConfigResponse, SuiEvent, SuiGetPastObjectRequest, SuiObjectDataOptions,
43    SuiObjectResponse, SuiPastObjectResponse, SuiTransactionBlock, SuiTransactionBlockEvents,
44    SuiTransactionBlockResponse, SuiTransactionBlockResponseOptions,
45};
46use sui_open_rpc::Module;
47use sui_protocol_config::{ProtocolConfig, ProtocolVersion};
48use sui_storage::key_value_store::TransactionKeyValueStore;
49use sui_types::base_types::{ObjectID, SequenceNumber, TransactionDigest};
50use sui_types::crypto::AggregateAuthoritySignature;
51use sui_types::display::DisplayVersionUpdatedEvent;
52use sui_types::effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents};
53use sui_types::error::{SuiError, SuiObjectResponseError};
54use sui_types::messages_checkpoint::{
55    CheckpointContents, CheckpointSequenceNumber, CheckpointSummary, CheckpointTimestamp,
56};
57use sui_types::object::{Object, ObjectRead, PastObjectRead};
58use sui_types::sui_serde::BigInt;
59use sui_types::transaction::TransactionDataAPI;
60use sui_types::transaction::{Transaction, TransactionData};
61
62use crate::authority_state::{StateRead, StateReadError, StateReadResult};
63use crate::error::{Error, RpcInterimResult, SuiRpcInputError};
64use crate::{ObjectProvider, with_tracing};
65use crate::{
66    ObjectProviderCache, SuiRpcModule, get_balance_changes_from_effect, get_object_changes,
67};
68use fastcrypto::encoding::Encoding;
69use fastcrypto::traits::ToFromBytes;
70use shared_crypto::intent::Intent;
71use sui_json_rpc_types::ZkLoginVerifyResult;
72use sui_types::authenticator_state::{ActiveJwk, get_authenticator_state};
73
74/// A field access in a  Display string cannot exceed this level of nesting.
75const MAX_DISPLAY_NESTED_LEVEL: usize = 10;
76
77/// Default budget for Display output size.
78const DEFAULT_MAX_DISPLAY_OUTPUT_SIZE: usize = 1024 * 1024;
79
80/// Overall display output cannot exceed this size.
81static MAX_DISPLAY_OUTPUT_SIZE: Lazy<usize> = Lazy::new(|| {
82    let max_opt = std::env::var("MAX_DISPLAY_OUTPUT_SIZE")
83        .ok()
84        .and_then(|s| s.parse().ok());
85
86    if let Some(max) = max_opt {
87        info!("Using custom value for 'MAX_DISPLAY_OUTPUT_SIZE': {max}");
88        max
89    } else {
90        DEFAULT_MAX_DISPLAY_OUTPUT_SIZE
91    }
92});
93
94// An implementation of the read portion of the JSON-RPC interface intended for use in
95// Fullnodes.
96#[derive(Clone)]
97pub struct ReadApi {
98    pub state: Arc<dyn StateRead>,
99    pub transaction_kv_store: Arc<TransactionKeyValueStore>,
100    pub metrics: Arc<JsonRpcMetrics>,
101}
102
103// Internal data structure to make it easy to work with data returned from
104// authority store and also enable code sharing between get_transaction_with_options,
105// multi_get_transaction_with_options, etc.
106#[derive(Default)]
107struct IntermediateTransactionResponse {
108    digest: TransactionDigest,
109    transaction: Option<Transaction>,
110    effects: Option<TransactionEffects>,
111    events: Option<SuiTransactionBlockEvents>,
112    checkpoint_seq: Option<CheckpointSequenceNumber>,
113    balance_changes: Option<Vec<BalanceChange>>,
114    object_changes: Option<Vec<ObjectChange>>,
115    timestamp: Option<CheckpointTimestamp>,
116    errors: Vec<String>,
117}
118
119impl IntermediateTransactionResponse {
120    pub fn new(digest: TransactionDigest) -> Self {
121        Self {
122            digest,
123            ..Default::default()
124        }
125    }
126
127    pub fn transaction(&self) -> &Option<Transaction> {
128        &self.transaction
129    }
130}
131
132impl ReadApi {
133    pub fn new(
134        state: Arc<AuthorityState>,
135        transaction_kv_store: Arc<TransactionKeyValueStore>,
136        metrics: Arc<JsonRpcMetrics>,
137    ) -> Self {
138        Self {
139            state,
140            transaction_kv_store,
141            metrics,
142        }
143    }
144
145    async fn get_checkpoint_internal(&self, id: CheckpointId) -> Result<Checkpoint, Error> {
146        Ok(match id {
147            CheckpointId::SequenceNumber(seq) => {
148                let verified_summary = self
149                    .transaction_kv_store
150                    .get_checkpoint_summary(seq)
151                    .await?;
152                let content = self
153                    .transaction_kv_store
154                    .get_checkpoint_contents(verified_summary.sequence_number)
155                    .await?;
156                let signature = verified_summary.auth_sig().signature.clone();
157                (verified_summary.into_data(), content, signature).into()
158            }
159            CheckpointId::Digest(digest) => {
160                let verified_summary = self
161                    .transaction_kv_store
162                    .get_checkpoint_summary_by_digest(digest)
163                    .await?;
164                let content = self
165                    .transaction_kv_store
166                    .get_checkpoint_contents(verified_summary.sequence_number)
167                    .await?;
168                let signature = verified_summary.auth_sig().signature.clone();
169                (verified_summary.into_data(), content, signature).into()
170            }
171        })
172    }
173
174    pub async fn get_checkpoints_internal(
175        state: Arc<dyn StateRead>,
176        transaction_kv_store: Arc<TransactionKeyValueStore>,
177        // If `Some`, the query will start from the next item after the specified cursor
178        cursor: Option<CheckpointSequenceNumber>,
179        limit: u64,
180        descending_order: bool,
181    ) -> StateReadResult<Vec<Checkpoint>> {
182        let max_checkpoint = state.get_latest_checkpoint_sequence_number()?;
183        let checkpoint_numbers =
184            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
185
186        let verified_checkpoints = transaction_kv_store
187            .multi_get_checkpoints_summaries(&checkpoint_numbers)
188            .await?;
189
190        let checkpoint_summaries_and_signatures: Vec<(
191            CheckpointSummary,
192            AggregateAuthoritySignature,
193        )> = verified_checkpoints
194            .into_iter()
195            .flatten()
196            .map(|check| {
197                (
198                    check.clone().into_summary_and_sequence().1,
199                    check.get_validator_signature(),
200                )
201            })
202            .collect();
203
204        let checkpoint_contents = transaction_kv_store
205            .multi_get_checkpoints_contents(&checkpoint_numbers)
206            .await?;
207        let contents: Vec<CheckpointContents> = checkpoint_contents.into_iter().flatten().collect();
208
209        let mut checkpoints: Vec<Checkpoint> = vec![];
210
211        for (summary_and_sig, content) in checkpoint_summaries_and_signatures
212            .into_iter()
213            .zip(contents.into_iter())
214        {
215            checkpoints.push(Checkpoint::from((
216                summary_and_sig.0,
217                content,
218                summary_and_sig.1,
219            )));
220        }
221
222        Ok(checkpoints)
223    }
224
225    #[instrument(skip_all)]
226    async fn multi_get_transaction_blocks_internal(
227        &self,
228        digests: Vec<TransactionDigest>,
229        opts: Option<SuiTransactionBlockResponseOptions>,
230    ) -> Result<Vec<SuiTransactionBlockResponse>, Error> {
231        trace!("start");
232
233        let num_digests = digests.len();
234        if num_digests > *QUERY_MAX_RESULT_LIMIT {
235            Err(SuiRpcInputError::SizeLimitExceeded(
236                QUERY_MAX_RESULT_LIMIT.to_string(),
237            ))?
238        }
239        self.metrics
240            .get_tx_blocks_limit
241            .observe(digests.len() as f64);
242
243        let opts = opts.unwrap_or_default();
244
245        // use LinkedHashMap to dedup and can iterate in insertion order.
246        let mut temp_response: IndexMap<&TransactionDigest, IntermediateTransactionResponse> =
247            IndexMap::from_iter(
248                digests
249                    .iter()
250                    .map(|k| (k, IntermediateTransactionResponse::new(*k))),
251            );
252        if temp_response.len() < num_digests {
253            Err(SuiRpcInputError::ContainsDuplicates)?
254        }
255
256        if opts.require_input() {
257            trace!("getting input");
258            let digests_clone = digests.clone();
259            let transactions =
260                self.transaction_kv_store.multi_get_tx(&digests_clone).await.tap_err(
261                    |err| debug!(digests=?digests_clone, "Failed to multi get transactions: {:?}", err),
262                )?;
263
264            for ((_digest, cache_entry), txn) in
265                temp_response.iter_mut().zip(transactions.into_iter())
266            {
267                cache_entry.transaction = txn;
268            }
269        }
270
271        // Fetch effects when `show_events` is true because events relies on effects
272        if opts.require_effects() {
273            trace!("getting effects");
274            let digests_clone = digests.clone();
275            let effects_list = self.transaction_kv_store
276                .multi_get_fx_by_tx_digest(&digests_clone)
277                .await
278                .tap_err(
279                    |err| debug!(digests=?digests_clone, "Failed to multi get effects for transactions: {:?}", err),
280                )?;
281            for ((_digest, cache_entry), e) in
282                temp_response.iter_mut().zip(effects_list.into_iter())
283            {
284                cache_entry.effects = e;
285            }
286        }
287
288        trace!("getting checkpoint sequence numbers");
289        let checkpoint_seq_list = self
290            .transaction_kv_store
291            .multi_get_transaction_checkpoint(&digests)
292            .await
293            .tap_err(
294                |err| debug!(digests=?digests, "Failed to multi get checkpoint sequence number: {:?}", err))?;
295        for ((_digest, cache_entry), seq) in temp_response
296            .iter_mut()
297            .zip(checkpoint_seq_list.into_iter())
298        {
299            cache_entry.checkpoint_seq = seq;
300        }
301
302        let unique_checkpoint_numbers = temp_response
303            .values()
304            .filter_map(|cache_entry| cache_entry.checkpoint_seq)
305            // It's likely that many transactions have the same checkpoint, so we don't
306            // need to over-fetch
307            .unique()
308            .collect::<Vec<CheckpointSequenceNumber>>();
309
310        // fetch timestamp from the DB
311        trace!("getting checkpoint summaries");
312        let timestamps = self
313            .transaction_kv_store
314            .multi_get_checkpoints_summaries(&unique_checkpoint_numbers)
315            .await
316            .map_err(|e| {
317                Error::UnexpectedError(format!("Failed to fetch checkpoint summaries by these checkpoint ids: {unique_checkpoint_numbers:?} with error: {e:?}"))
318            })?
319            .into_iter()
320            .map(|c| c.map(|checkpoint| checkpoint.timestamp_ms));
321
322        // construct a hashmap of checkpoint -> timestamp for fast lookup
323        let checkpoint_to_timestamp = unique_checkpoint_numbers
324            .into_iter()
325            .zip(timestamps)
326            .collect::<HashMap<_, _>>();
327
328        // fill cache with the timestamp
329        for (_, cache_entry) in temp_response.iter_mut() {
330            if cache_entry.checkpoint_seq.is_some() {
331                // safe to unwrap because is_some is checked
332                cache_entry.timestamp = *checkpoint_to_timestamp
333                    .get(cache_entry.checkpoint_seq.as_ref().unwrap())
334                    // Safe to unwrap because checkpoint_seq is guaranteed to exist in checkpoint_to_timestamp
335                    .unwrap();
336            }
337        }
338
339        if opts.show_events {
340            trace!("getting events");
341            let mut non_empty_digests = vec![];
342            for cache_entry in temp_response.values() {
343                if let Some(effects) = &cache_entry.effects
344                    && effects.events_digest().is_some()
345                {
346                    non_empty_digests.push(cache_entry.digest);
347                }
348            }
349            // fetch events from the DB with retry, retry each 0.5s for 3s
350            let backoff = ExponentialBackoff {
351                max_elapsed_time: Some(Duration::from_secs(3)),
352                multiplier: 1.0,
353                ..ExponentialBackoff::default()
354            };
355            let mut events = retry(backoff, || async {
356                match self
357                    .transaction_kv_store
358                    .multi_get_events_by_tx_digests(&non_empty_digests)
359                    .await
360                {
361                    // Only return Ok when all the queried transaction events are found, otherwise retry
362                    // until timeout, then return Err.
363                    Ok(events) if !events.contains(&None) => Ok(events),
364                    Ok(_) => Err(backoff::Error::transient(Error::UnexpectedError(
365                        "Events not found, transaction execution may be incomplete.".into(),
366                    ))),
367                    Err(e) => Err(backoff::Error::permanent(Error::UnexpectedError(format!(
368                        "Failed to call multi_get_events: {e:?}"
369                    )))),
370                }
371            })
372            .await
373            .map_err(|e| {
374                Error::UnexpectedError(format!(
375                    "Retrieving events with retry failed for transaction digests {digests:?}: {e:?}"
376                ))
377            })?
378            .into_iter();
379
380            // fill cache with the events
381            for (_, cache_entry) in temp_response.iter_mut() {
382                let transaction_digest = cache_entry.digest;
383                if let Some(events_digest) =
384                    cache_entry.effects.as_ref().and_then(|e| e.events_digest())
385                {
386                    match events.next() {
387                        Some(Some(ev)) => {
388                            cache_entry.events =
389                                Some(to_sui_transaction_events(self, cache_entry.digest, ev)?)
390                        }
391                        None | Some(None) => {
392                            error!(
393                                "Failed to fetch events with event digest {events_digest:?} for txn {transaction_digest}"
394                            );
395                            cache_entry.errors.push(format!(
396                                "Failed to fetch events with event digest {events_digest:?}",
397                            ))
398                        }
399                    }
400                } else {
401                    // events field will be Some if and only if `show_events` is true and
402                    // there is no error in converting fetching events
403                    cache_entry.events = Some(SuiTransactionBlockEvents::default());
404                }
405            }
406        }
407
408        let mut object_cache =
409            ObjectProviderCache::new((self.state.clone(), self.transaction_kv_store.clone()));
410
411        // Prefetch the objects if we need to show balance or object changes
412        if opts.show_balance_changes || opts.show_object_changes {
413            let mut keys = vec![];
414            for resp in temp_response.values() {
415                let effects = resp.effects.as_ref().ok_or_else(|| {
416                    SuiRpcInputError::GenericNotFound(
417                        "unable to derive balance/object changes because effect is empty"
418                            .to_string(),
419                    )
420                })?;
421
422                for change in effects.object_changes() {
423                    if let Some(input_version) = change.input_version {
424                        keys.push(ObjectKey(change.id, input_version));
425                    }
426                    if let Some(output_version) = change.output_version {
427                        keys.push(ObjectKey(change.id, output_version));
428                    }
429                }
430            }
431
432            let objects = self
433                .transaction_kv_store
434                .multi_get_objects(&keys)
435                .await?
436                .into_iter()
437                .flatten()
438                .collect::<Vec<_>>();
439
440            object_cache.insert_objects_into_cache(objects);
441        }
442
443        if opts.show_balance_changes {
444            trace!("getting balance changes");
445
446            let mut results = vec![];
447            for resp in temp_response.values() {
448                let input_objects = if let Some(tx) = resp.transaction() {
449                    tx.data()
450                        .inner()
451                        .intent_message
452                        .value
453                        .input_objects()
454                        .unwrap_or_default()
455                } else {
456                    // don't have the input tx, so not much we can do. perhaps this is an Err?
457                    Vec::new()
458                };
459                results.push(get_balance_changes_from_effect(
460                    &object_cache,
461                    resp.effects.as_ref().ok_or_else(|| {
462                        SuiRpcInputError::GenericNotFound(
463                            "unable to derive balance changes because effect is empty".to_string(),
464                        )
465                    })?,
466                    input_objects,
467                    None,
468                ));
469            }
470            let results = join_all(results).await;
471            for (result, entry) in results.into_iter().zip(temp_response.iter_mut()) {
472                match result {
473                    Ok(balance_changes) => entry.1.balance_changes = Some(balance_changes),
474                    Err(e) => entry
475                        .1
476                        .errors
477                        .push(format!("Failed to fetch balance changes {e:?}")),
478                }
479            }
480        }
481
482        if opts.show_object_changes {
483            trace!("getting object changes");
484
485            let mut results = vec![];
486            for resp in temp_response.values() {
487                let effects = resp.effects.as_ref().ok_or_else(|| {
488                    SuiRpcInputError::GenericNotFound(
489                        "unable to derive object changes because effect is empty".to_string(),
490                    )
491                })?;
492
493                results.push(get_object_changes(
494                    &object_cache,
495                    effects,
496                    resp.transaction
497                        .as_ref()
498                        .ok_or_else(|| {
499                            SuiRpcInputError::GenericNotFound(
500                                "unable to derive object changes because transaction is empty"
501                                    .to_string(),
502                            )
503                        })?
504                        .data()
505                        .intent_message()
506                        .value
507                        .sender(),
508                    effects.modified_at_versions(),
509                    effects.all_changed_objects(),
510                    effects.all_removed_objects(),
511                ));
512            }
513            let results = join_all(results).await;
514            for (result, entry) in results.into_iter().zip(temp_response.iter_mut()) {
515                match result {
516                    Ok(object_changes) => entry.1.object_changes = Some(object_changes),
517                    Err(e) => entry
518                        .1
519                        .errors
520                        .push(format!("Failed to fetch object changes {e:?}")),
521                }
522            }
523        }
524
525        let epoch_store = self.state.load_epoch_store_one_call_per_task();
526        let converted_tx_block_resps = temp_response
527            .into_iter()
528            .map(|c| convert_to_response(c.1, &opts, epoch_store.module_cache()))
529            .collect::<Result<Vec<_>, _>>()?;
530
531        self.metrics
532            .get_tx_blocks_result_size
533            .observe(converted_tx_block_resps.len() as f64);
534        self.metrics
535            .get_tx_blocks_result_size_total
536            .inc_by(converted_tx_block_resps.len() as u64);
537
538        trace!("done");
539
540        Ok(converted_tx_block_resps)
541    }
542}
543
544#[async_trait]
545impl ReadApiServer for ReadApi {
546    #[instrument(skip(self))]
547    async fn get_object(
548        &self,
549        object_id: ObjectID,
550        options: Option<SuiObjectDataOptions>,
551    ) -> RpcResult<SuiObjectResponse> {
552        with_tracing!(async move {
553            let state = self.state.clone();
554            let object_read = state.get_object_read(&object_id).map_err(|e| {
555                warn!(?object_id, "Failed to get object: {:?}", e);
556                Error::from(e)
557            })?;
558            let options = options.unwrap_or_default();
559
560            match object_read {
561                ObjectRead::NotExists(id) => Ok(SuiObjectResponse::new_with_error(
562                    SuiObjectResponseError::NotExists { object_id: id },
563                )),
564                ObjectRead::Exists(object_ref, o, layout) => {
565                    let mut display_fields = None;
566                    if options.show_display {
567                        match get_display_fields(self, &self.transaction_kv_store, &o, &layout)
568                            .await
569                        {
570                            Ok(rendered_fields) => display_fields = Some(rendered_fields),
571                            Err(e) => {
572                                return Ok(SuiObjectResponse::new(
573                                    Some((object_ref, o, layout, options, None).try_into()?),
574                                    Some(SuiObjectResponseError::DisplayError {
575                                        error: e.to_string(),
576                                    }),
577                                ));
578                            }
579                        }
580                    }
581                    Ok(SuiObjectResponse::new_with_data(
582                        (object_ref, o, layout, options, display_fields).try_into()?,
583                    ))
584                }
585                ObjectRead::Deleted((object_id, version, digest)) => Ok(
586                    SuiObjectResponse::new_with_error(SuiObjectResponseError::Deleted {
587                        object_id,
588                        version,
589                        digest,
590                    }),
591                ),
592            }
593        })
594    }
595
596    #[instrument(skip(self))]
597    async fn multi_get_objects(
598        &self,
599        object_ids: Vec<ObjectID>,
600        options: Option<SuiObjectDataOptions>,
601    ) -> RpcResult<Vec<SuiObjectResponse>> {
602        with_tracing!(async move {
603            if object_ids.len() <= *QUERY_MAX_RESULT_LIMIT {
604                self.metrics
605                    .get_objects_limit
606                    .observe(object_ids.len() as f64);
607                let mut futures = vec![];
608                for object_id in object_ids {
609                    futures.push(self.get_object(object_id, options.clone()));
610                }
611                let results = join_all(futures).await;
612
613                let objects_result: Result<Vec<SuiObjectResponse>, String> = results
614                    .into_iter()
615                    .map(|result| match result {
616                        Ok(response) => Ok(response),
617                        Err(error) => {
618                            error!("Failed to fetch object with error: {error:?}");
619                            Err(format!("Error: {}", error))
620                        }
621                    })
622                    .collect();
623
624                let objects = objects_result.map_err(|err| {
625                    Error::UnexpectedError(format!("Failed to fetch objects with error: {}", err))
626                })?;
627
628                self.metrics
629                    .get_objects_result_size
630                    .observe(objects.len() as f64);
631                self.metrics
632                    .get_objects_result_size_total
633                    .inc_by(objects.len() as u64);
634                Ok(objects)
635            } else {
636                Err(SuiRpcInputError::SizeLimitExceeded(
637                    QUERY_MAX_RESULT_LIMIT.to_string(),
638                ))?
639            }
640        })
641    }
642
643    #[instrument(skip(self))]
644    async fn try_get_past_object(
645        &self,
646        object_id: ObjectID,
647        version: SequenceNumber,
648        options: Option<SuiObjectDataOptions>,
649    ) -> RpcResult<SuiPastObjectResponse> {
650        with_tracing!(async move {
651            let state = self.state.clone();
652            let past_read = state
653                .get_past_object_read(&object_id, version)
654                .map_err(|e| {
655                    error!("Failed to call try_get_past_object for object: {object_id:?} version: {version:?} with error: {e:?}");
656                    Error::from(e)
657                })?;
658            let options = options.unwrap_or_default();
659            match past_read {
660                PastObjectRead::ObjectNotExists(id) => {
661                    Ok(SuiPastObjectResponse::ObjectNotExists(id))
662                }
663                PastObjectRead::VersionFound(object_ref, o, layout) => {
664                    let display_fields = if options.show_display {
665                        // TODO (jian): api breaking change to also modify past objects.
666                        Some(
667                            get_display_fields(self, &self.transaction_kv_store, &o, &layout)
668                                .await
669                                .map_err(|e| {
670                                    Error::UnexpectedError(format!(
671                                        "Unable to render object at version {version}: {e}"
672                                    ))
673                                })?,
674                        )
675                    } else {
676                        None
677                    };
678                    Ok(SuiPastObjectResponse::VersionFound(
679                        (object_ref, o, layout, options, display_fields).try_into()?,
680                    ))
681                }
682                PastObjectRead::ObjectDeleted(oref) => {
683                    Ok(SuiPastObjectResponse::ObjectDeleted(oref.into()))
684                }
685                PastObjectRead::VersionNotFound(id, seq_num) => {
686                    Ok(SuiPastObjectResponse::VersionNotFound(id, seq_num))
687                }
688                PastObjectRead::VersionTooHigh {
689                    object_id,
690                    asked_version,
691                    latest_version,
692                } => Ok(SuiPastObjectResponse::VersionTooHigh {
693                    object_id,
694                    asked_version,
695                    latest_version,
696                }),
697            }
698        })
699    }
700
701    #[instrument(skip(self))]
702    async fn try_get_object_before_version(
703        &self,
704        object_id: ObjectID,
705        version: SequenceNumber,
706    ) -> RpcResult<SuiPastObjectResponse> {
707        let version = self
708            .state
709            .find_object_lt_or_eq_version(&object_id, &version)
710            .await
711            .map_err(Error::from)?
712            .map(|obj| obj.version())
713            .unwrap_or_default();
714        self.try_get_past_object(
715            object_id,
716            version,
717            Some(SuiObjectDataOptions::bcs_lossless()),
718        )
719        .await
720    }
721
722    #[instrument(skip(self))]
723    async fn try_multi_get_past_objects(
724        &self,
725        past_objects: Vec<SuiGetPastObjectRequest>,
726        options: Option<SuiObjectDataOptions>,
727    ) -> RpcResult<Vec<SuiPastObjectResponse>> {
728        with_tracing!(async move {
729            if past_objects.len() <= *QUERY_MAX_RESULT_LIMIT {
730                let mut futures = vec![];
731                for past_object in past_objects {
732                    futures.push(self.try_get_past_object(
733                        past_object.object_id,
734                        past_object.version,
735                        options.clone(),
736                    ));
737                }
738                let results = join_all(futures).await;
739
740                let (oks, errs): (Vec<_>, Vec<_>) = results.into_iter().partition(Result::is_ok);
741                let success = oks.into_iter().filter_map(Result::ok).collect();
742                let errors: Vec<_> = errs.into_iter().filter_map(Result::err).collect();
743                if !errors.is_empty() {
744                    let error_string = errors
745                        .iter()
746                        .map(|e| e.to_string())
747                        .collect::<Vec<String>>()
748                        .join("; ");
749                    Err(anyhow!("{error_string}").into()) // Collects errors not related to SuiPastObjectResponse variants
750                } else {
751                    Ok(success)
752                }
753            } else {
754                Err(SuiRpcInputError::SizeLimitExceeded(
755                    QUERY_MAX_RESULT_LIMIT.to_string(),
756                ))?
757            }
758        })
759    }
760
761    #[instrument(skip(self))]
762    async fn get_total_transaction_blocks(&self) -> RpcResult<BigInt<u64>> {
763        with_tracing!(async move {
764            Ok(self
765                .state
766                .get_total_transaction_blocks()
767                .map_err(Error::from)?
768                .into()) // converts into BigInt<u64>
769        })
770    }
771
772    #[instrument(skip(self))]
773    async fn get_transaction_block(
774        &self,
775        digest: TransactionDigest,
776        opts: Option<SuiTransactionBlockResponseOptions>,
777    ) -> RpcResult<SuiTransactionBlockResponse> {
778        with_tracing!(async move {
779            let opts = opts.unwrap_or_default();
780            let mut temp_response = IntermediateTransactionResponse::new(digest);
781
782            // Fetch transaction to determine existence
783            let transaction_kv_store = self.transaction_kv_store.clone();
784            let transaction = async move {
785                let ret = transaction_kv_store.get_tx(digest).await.map_err(|err| {
786                    debug!(tx_digest=?digest, "Failed to get transaction: {}", err);
787                    Error::from(err)
788                });
789                add_server_timing("tx_kv_lookup");
790                ret
791            }
792            .await?;
793            let input_objects = transaction
794                .data()
795                .inner()
796                .intent_message
797                .value
798                .input_objects()
799                .unwrap_or_default();
800
801            // the input is needed for object_changes to retrieve the sender address.
802            if opts.require_input() {
803                temp_response.transaction = Some(transaction);
804            }
805
806            // Fetch effects when `show_events` is true because events relies on effects
807            if opts.require_effects() {
808                let transaction_kv_store = self.transaction_kv_store.clone();
809                temp_response.effects = Some(
810                    transaction_kv_store
811                        .get_fx_by_tx_digest(digest)
812                        .await
813                        .map_err(|err| {
814                            debug!(tx_digest=?digest, "Failed to get effects: {:?}", err);
815                            Error::from(err)
816                        })?,
817                );
818            }
819
820            temp_response.checkpoint_seq = self
821                .transaction_kv_store
822                .deprecated_get_transaction_checkpoint(digest)
823                .await
824                .map_err(|e| {
825                    error!("Failed to retrieve checkpoint sequence for transaction {digest:?} with error: {e:?}");
826                    Error::from(e)
827                })?;
828
829            if let Some(checkpoint_seq) = &temp_response.checkpoint_seq {
830                let kv_store = self.transaction_kv_store.clone();
831                let checkpoint_seq = *checkpoint_seq;
832                let checkpoint = kv_store
833                    // safe to unwrap because we have checked `is_some` above
834                    .get_checkpoint_summary(checkpoint_seq)
835                    .await
836                    .map_err(|e| {
837                        error!("Failed to get checkpoint by sequence number: {checkpoint_seq:?} with error: {e:?}");
838                        Error::from(e)
839                    })?;
840                // TODO(chris): we don't need to fetch the whole checkpoint summary
841                temp_response.timestamp = Some(checkpoint.timestamp_ms);
842            }
843
844            if opts.show_events && temp_response.effects.is_some() {
845                let transaction_kv_store = self.transaction_kv_store.clone();
846                let events = transaction_kv_store
847                    .multi_get_events_by_tx_digests(&[digest])
848                    .await
849                    .map_err(|e| {
850                        error!("Failed to call get transaction events for transaction: {digest:?} with error {e:?}");
851                        Error::from(e)
852                    })?
853                    .pop()
854                    .flatten();
855                match events {
856                    None => temp_response.events = Some(SuiTransactionBlockEvents::default()),
857                    Some(events) => match to_sui_transaction_events(self, digest, events) {
858                        Ok(e) => temp_response.events = Some(e),
859                        Err(e) => temp_response.errors.push(e.to_string()),
860                    },
861                }
862            }
863
864            let object_cache =
865                ObjectProviderCache::new((self.state.clone(), self.transaction_kv_store.clone()));
866            if opts.show_balance_changes
867                && let Some(effects) = &temp_response.effects
868            {
869                let balance_changes =
870                    get_balance_changes_from_effect(&object_cache, effects, input_objects, None)
871                        .await;
872
873                if let Ok(balance_changes) = balance_changes {
874                    temp_response.balance_changes = Some(balance_changes);
875                } else {
876                    temp_response.errors.push(format!(
877                        "Cannot retrieve balance changes: {}",
878                        balance_changes.unwrap_err()
879                    ));
880                }
881            }
882
883            if opts.show_object_changes
884                && let (Some(effects), Some(input)) =
885                    (&temp_response.effects, &temp_response.transaction)
886            {
887                let sender = input.data().intent_message().value.sender();
888                let object_changes = get_object_changes(
889                    &object_cache,
890                    effects,
891                    sender,
892                    effects.modified_at_versions(),
893                    effects.all_changed_objects(),
894                    effects.all_removed_objects(),
895                )
896                .await;
897
898                if let Ok(object_changes) = object_changes {
899                    temp_response.object_changes = Some(object_changes);
900                } else {
901                    temp_response.errors.push(format!(
902                        "Cannot retrieve object changes: {}",
903                        object_changes.unwrap_err()
904                    ));
905                }
906            }
907            let epoch_store = self.state.load_epoch_store_one_call_per_task();
908            convert_to_response(temp_response, &opts, epoch_store.module_cache())
909        })
910    }
911
912    #[instrument(skip(self))]
913    async fn multi_get_transaction_blocks(
914        &self,
915        digests: Vec<TransactionDigest>,
916        opts: Option<SuiTransactionBlockResponseOptions>,
917    ) -> RpcResult<Vec<SuiTransactionBlockResponse>> {
918        with_tracing!(async move {
919            self.multi_get_transaction_blocks_internal(digests, opts)
920                .await
921        })
922    }
923
924    #[instrument(skip(self))]
925    async fn get_events(&self, transaction_digest: TransactionDigest) -> RpcResult<Vec<SuiEvent>> {
926        with_tracing!(async move {
927            let state = self.state.clone();
928            let transaction_kv_store = self.transaction_kv_store.clone();
929            async move {
930                let store = state.load_epoch_store_one_call_per_task();
931                let events = transaction_kv_store
932                    .multi_get_events_by_tx_digests(&[transaction_digest])
933                    .await
934                    .map_err(|e| {
935                        error!("Failed to get transaction events for transaction {transaction_digest:?} with error: {e:?}");
936                        Error::StateReadError(e.into())
937                    })?
938                    .pop()
939                    .flatten();
940                Ok(match events {
941                    Some(events) => events
942                        .data
943                        .into_iter()
944                        .enumerate()
945                        .map(|(seq, e)| {
946                            let layout = store
947                                .executor()
948                                .type_layout_resolver(Box::new(
949                                    &state.get_backing_package_store().as_ref(),
950                                ))
951                                .get_annotated_layout(&e.type_)?;
952                            SuiEvent::try_from(e, transaction_digest, seq as u64, None, layout)
953                        })
954                        .collect::<Result<Vec<_>, _>>()
955                        .map_err(Error::SuiError)?,
956                    None => vec![],
957                })
958            }
959            .await
960        })
961    }
962
963    #[instrument(skip(self))]
964    async fn get_latest_checkpoint_sequence_number(&self) -> RpcResult<BigInt<u64>> {
965        with_tracing!(async move {
966            Ok(self
967                .state
968                .get_latest_checkpoint_sequence_number()
969                .map_err(|e| {
970                    SuiRpcInputError::GenericNotFound(format!(
971                        "Latest checkpoint sequence number was not found with error :{e}"
972                    ))
973                })?
974                .into())
975        })
976    }
977
978    #[instrument(skip(self))]
979    async fn get_checkpoint(&self, id: CheckpointId) -> RpcResult<Checkpoint> {
980        with_tracing!(self.get_checkpoint_internal(id))
981    }
982
983    #[instrument(skip(self))]
984    async fn get_checkpoints(
985        &self,
986        // If `Some`, the query will start from the next item after the specified cursor
987        cursor: Option<BigInt<u64>>,
988        limit: Option<usize>,
989        descending_order: bool,
990    ) -> RpcResult<CheckpointPage> {
991        with_tracing!(async move {
992            let limit = validate_limit(limit, QUERY_MAX_RESULT_LIMIT_CHECKPOINTS)
993                .map_err(SuiRpcInputError::from)?;
994
995            let state = self.state.clone();
996            let kv_store = self.transaction_kv_store.clone();
997
998            self.metrics.get_checkpoints_limit.observe(limit as f64);
999
1000            let mut data = Self::get_checkpoints_internal(
1001                state,
1002                kv_store,
1003                cursor.map(|s| *s),
1004                limit as u64 + 1,
1005                descending_order,
1006            )
1007            .await
1008            .map_err(Error::from)?;
1009
1010            let has_next_page = data.len() > limit;
1011            data.truncate(limit);
1012
1013            let next_cursor = if has_next_page {
1014                data.last().cloned().map(|d| d.sequence_number.into())
1015            } else {
1016                None
1017            };
1018
1019            self.metrics
1020                .get_checkpoints_result_size
1021                .observe(data.len() as f64);
1022            self.metrics
1023                .get_checkpoints_result_size_total
1024                .inc_by(data.len() as u64);
1025
1026            Ok(CheckpointPage {
1027                data,
1028                next_cursor,
1029                has_next_page,
1030            })
1031        })
1032    }
1033
1034    #[instrument(skip(self))]
1035    async fn get_protocol_config(
1036        &self,
1037        version: Option<BigInt<u64>>,
1038    ) -> RpcResult<ProtocolConfigResponse> {
1039        with_tracing!(async move {
1040            version
1041                .map(|v| {
1042                    ProtocolConfig::get_for_version_if_supported(
1043                        (*v).into(),
1044                        self.state.get_chain_identifier()?.chain(),
1045                    )
1046                    .ok_or(SuiRpcInputError::ProtocolVersionUnsupported(
1047                        ProtocolVersion::MIN.as_u64(),
1048                        ProtocolVersion::MAX.as_u64(),
1049                    ))
1050                    .map_err(Error::from)
1051                })
1052                .unwrap_or(Ok(self
1053                    .state
1054                    .load_epoch_store_one_call_per_task()
1055                    .protocol_config()
1056                    .clone()))
1057                .map(ProtocolConfigResponse::from)
1058        })
1059    }
1060
1061    #[instrument(skip(self))]
1062    async fn get_chain_identifier(&self) -> RpcResult<String> {
1063        with_tracing!(async move {
1064            let ci = self.state.get_chain_identifier()?;
1065            Ok(ci.to_string())
1066        })
1067    }
1068    #[instrument(skip(self))]
1069    async fn verify_zklogin_signature(
1070        &self,
1071        bytes: String,
1072        signature: String,
1073        intent_scope: ZkLoginIntentScope,
1074        author: SuiAddress,
1075    ) -> RpcResult<ZkLoginVerifyResult> {
1076        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1077        let curr_epoch = epoch_store.epoch();
1078        let zklogin_env_native = match self
1079            .state
1080            .get_chain_identifier()
1081            .expect("get chain identifier should not fail")
1082            .chain()
1083        {
1084            sui_protocol_config::Chain::Mainnet | sui_protocol_config::Chain::Testnet => {
1085                ZkLoginEnv::Prod
1086            }
1087            _ => ZkLoginEnv::Test,
1088        };
1089        let GenericSignature::ZkLoginAuthenticator(zklogin_sig) =
1090            GenericSignature::from_bytes(&Base64::decode(&signature).map_err(Error::from)?)
1091                .map_err(Error::from)?
1092        else {
1093            return Err(SuiRpcInputError::GenericNotFound(
1094                "Endpoint only supports zkLogin signature".to_string(),
1095            )
1096            .into());
1097        };
1098
1099        let new_jwks =
1100            match get_authenticator_state(self.state.get_object_store()).map_err(Error::from)? {
1101                Some(authenticator_state) => authenticator_state.active_jwks,
1102                None => {
1103                    return Err(SuiRpcInputError::GenericNotFound(
1104                        "Authenticator state not found".to_string(),
1105                    )
1106                    .into());
1107                }
1108            };
1109
1110        // construct verify params with active jwks and zklogin_env.
1111        let mut oidc_provider_jwks = ImHashMap::new();
1112        for active_jwk in new_jwks.iter() {
1113            let ActiveJwk { jwk_id, jwk, .. } = active_jwk;
1114            match oidc_provider_jwks.entry(jwk_id.clone()) {
1115                im::hashmap::Entry::Occupied(_) => {
1116                    warn!("JWK with kid {:?} already exists", jwk_id);
1117                }
1118                im::hashmap::Entry::Vacant(entry) => {
1119                    entry.insert(jwk.clone());
1120                }
1121            }
1122        }
1123        let verify_params = VerifyParams::new(
1124            oidc_provider_jwks,
1125            vec![],
1126            zklogin_env_native,
1127            true,
1128            true,
1129            true,
1130            Some(30),
1131            true,
1132        );
1133        match intent_scope {
1134            ZkLoginIntentScope::TransactionData => {
1135                let tx_data: TransactionData =
1136                    bcs::from_bytes(&Base64::decode(&bytes).map_err(Error::from)?)
1137                        .map_err(Error::from)?;
1138                let intent_msg = IntentMessage::new(Intent::sui_transaction(), tx_data.clone());
1139                let sig = GenericSignature::ZkLoginAuthenticator(zklogin_sig);
1140                match sig.verify_authenticator(
1141                    &intent_msg,
1142                    author,
1143                    curr_epoch,
1144                    &verify_params,
1145                    Arc::new(VerifiedDigestCache::new_empty()),
1146                ) {
1147                    Ok(_) => Ok(ZkLoginVerifyResult {
1148                        success: true,
1149                        errors: vec![],
1150                    }),
1151                    Err(e) => Ok(ZkLoginVerifyResult {
1152                        success: false,
1153                        errors: vec![e.to_string()],
1154                    }),
1155                }
1156            }
1157            ZkLoginIntentScope::PersonalMessage => {
1158                let data = PersonalMessage {
1159                    message: Base64::decode(&bytes).map_err(Error::from)?,
1160                };
1161                let intent_msg = IntentMessage::new(Intent::personal_message(), data);
1162
1163                let sig = GenericSignature::ZkLoginAuthenticator(zklogin_sig);
1164                match sig.verify_authenticator(
1165                    &intent_msg,
1166                    author,
1167                    curr_epoch,
1168                    &verify_params,
1169                    Arc::new(VerifiedDigestCache::new_empty()),
1170                ) {
1171                    Ok(_) => Ok(ZkLoginVerifyResult {
1172                        success: true,
1173                        errors: vec![],
1174                    }),
1175                    Err(e) => Ok(ZkLoginVerifyResult {
1176                        success: false,
1177                        errors: vec![e.to_string()],
1178                    }),
1179                }
1180            }
1181        }
1182    }
1183}
1184
1185impl SuiRpcModule for ReadApi {
1186    fn rpc(self) -> RpcModule<Self> {
1187        self.into_rpc()
1188    }
1189
1190    fn rpc_doc_module() -> Module {
1191        ReadApiOpenRpc::module_doc()
1192    }
1193}
1194
1195#[instrument(skip_all)]
1196fn to_sui_transaction_events(
1197    fullnode_api: &ReadApi,
1198    tx_digest: TransactionDigest,
1199    events: TransactionEvents,
1200) -> Result<SuiTransactionBlockEvents, Error> {
1201    let epoch_store = fullnode_api.state.load_epoch_store_one_call_per_task();
1202    let backing_package_store = fullnode_api.state.get_backing_package_store();
1203    let mut layout_resolver = epoch_store
1204        .executor()
1205        .type_layout_resolver(Box::new(backing_package_store.as_ref()));
1206    Ok(SuiTransactionBlockEvents::try_from(
1207        events,
1208        tx_digest,
1209        None,
1210        layout_resolver.as_mut(),
1211    )?)
1212}
1213
1214#[derive(Debug, thiserror::Error)]
1215pub enum ObjectDisplayError {
1216    #[error("Not a move struct")]
1217    NotMoveStruct,
1218
1219    #[error("Failed to extract layout")]
1220    Layout,
1221
1222    #[error("Failed to extract Move object")]
1223    MoveObject,
1224
1225    #[error(transparent)]
1226    Deserialization(#[from] SuiError),
1227
1228    #[error("Failed to deserialize 'VersionUpdatedEvent': {0}")]
1229    Bcs(#[from] bcs::Error),
1230
1231    #[error(transparent)]
1232    StateReadError(#[from] StateReadError),
1233}
1234
1235#[instrument(skip(fullnode_api, kv_store))]
1236async fn get_display_fields(
1237    fullnode_api: &ReadApi,
1238    kv_store: &Arc<TransactionKeyValueStore>,
1239    original_object: &Object,
1240    original_layout: &Option<MoveStructLayout>,
1241) -> Result<DisplayFieldsResponse, ObjectDisplayError> {
1242    let Some(layout) = original_layout else {
1243        return Ok(DisplayFieldsResponse {
1244            data: None,
1245            error: None,
1246        });
1247    };
1248
1249    let Some(move_object) = original_object.data.try_as_move() else {
1250        return Err(ObjectDisplayError::MoveObject);
1251    };
1252
1253    let Some(display_object) =
1254        get_display_object_by_type(kv_store, fullnode_api, &layout.type_).await?
1255    else {
1256        return Ok(DisplayFieldsResponse {
1257            data: None,
1258            error: None,
1259        });
1260    };
1261
1262    let format = match Format::parse(MAX_DISPLAY_NESTED_LEVEL, &display_object.fields) {
1263        Ok(format) => format,
1264        Err(e) => {
1265            return Ok(DisplayFieldsResponse {
1266                data: None,
1267                error: Some(SuiObjectResponseError::DisplayError {
1268                    error: e.to_string(),
1269                }),
1270            });
1271        }
1272    };
1273
1274    let layout = MoveTypeLayout::Struct(Box::new(layout.clone()));
1275    let display = match format.display(*MAX_DISPLAY_OUTPUT_SIZE, move_object.contents(), &layout) {
1276        Ok(fields) => fields,
1277        Err(e) => {
1278            return Ok(DisplayFieldsResponse {
1279                data: None,
1280                error: Some(SuiObjectResponseError::DisplayError {
1281                    error: e.to_string(),
1282                }),
1283            });
1284        }
1285    };
1286
1287    let mut fields = BTreeMap::new();
1288    let mut errors = vec![];
1289
1290    for (key, value) in display {
1291        match value {
1292            Ok(v) => {
1293                fields.insert(key, v);
1294            }
1295            Err(e) => {
1296                errors.push(e.to_string());
1297            }
1298        }
1299    }
1300
1301    Ok(DisplayFieldsResponse {
1302        data: (!fields.is_empty()).then_some(fields),
1303        error: (!errors.is_empty()).then(|| SuiObjectResponseError::DisplayError {
1304            error: errors.join("; "),
1305        }),
1306    })
1307}
1308
1309#[instrument(skip(kv_store, fullnode_api))]
1310async fn get_display_object_by_type(
1311    kv_store: &Arc<TransactionKeyValueStore>,
1312    fullnode_api: &ReadApi,
1313    object_type: &StructTag,
1314    // TODO: add query version support
1315) -> Result<Option<DisplayVersionUpdatedEvent>, ObjectDisplayError> {
1316    let mut events = fullnode_api
1317        .state
1318        .query_events(
1319            kv_store,
1320            EventFilter::MoveEventType(DisplayVersionUpdatedEvent::type_(object_type)),
1321            None,
1322            1,
1323            true,
1324        )
1325        .await?;
1326
1327    // If there's any recent version of Display, give it to the client.
1328    // TODO: add support for version query.
1329    if let Some(event) = events.pop() {
1330        let display: DisplayVersionUpdatedEvent = bcs::from_bytes(&event.bcs.into_bytes())?;
1331        Ok(Some(display))
1332    } else {
1333        Ok(None)
1334    }
1335}
1336
1337#[instrument(skip_all)]
1338fn convert_to_response(
1339    cache: IntermediateTransactionResponse,
1340    opts: &SuiTransactionBlockResponseOptions,
1341    module_cache: &impl GetModule,
1342) -> RpcInterimResult<SuiTransactionBlockResponse> {
1343    let mut response = SuiTransactionBlockResponse::new(cache.digest);
1344    response.errors = cache.errors;
1345
1346    if let Some(transaction) = cache.transaction {
1347        if opts.show_raw_input {
1348            response.raw_transaction = bcs::to_bytes(transaction.data()).map_err(|e| {
1349                // TODO: is this a client or server error?
1350                anyhow!("Failed to serialize raw transaction with error: {e}")
1351            })?;
1352        }
1353
1354        if opts.show_input {
1355            response.transaction = Some(SuiTransactionBlock::try_from(
1356                transaction.into_data(),
1357                module_cache,
1358            )?);
1359        }
1360    }
1361
1362    if let Some(effects) = cache.effects {
1363        if opts.show_raw_effects {
1364            response.raw_effects = bcs::to_bytes(&effects).map_err(|e| {
1365                // TODO: is this a client or server error?
1366                anyhow!("Failed to serialize transaction block effects with error: {e}")
1367            })?;
1368        }
1369
1370        if opts.show_effects {
1371            response.effects = Some(effects.try_into().map_err(|e| {
1372                // TODO: is this a client or server error?
1373                anyhow!("Failed to convert transaction block effects with error: {e}")
1374            })?);
1375        }
1376    }
1377
1378    response.checkpoint = cache.checkpoint_seq;
1379    response.timestamp_ms = cache.timestamp;
1380
1381    if opts.show_events {
1382        response.events = cache.events;
1383    }
1384
1385    if opts.show_balance_changes {
1386        response.balance_changes = cache.balance_changes;
1387    }
1388
1389    if opts.show_object_changes {
1390        response.object_changes = cache.object_changes;
1391    }
1392
1393    Ok(response)
1394}
1395
1396fn calculate_checkpoint_numbers(
1397    // If `Some`, the query will start from the next item after the specified cursor
1398    cursor: Option<CheckpointSequenceNumber>,
1399    limit: u64,
1400    descending_order: bool,
1401    max_checkpoint: CheckpointSequenceNumber,
1402) -> Vec<CheckpointSequenceNumber> {
1403    let (start_index, end_index) = match cursor {
1404        Some(t) => {
1405            if descending_order {
1406                let start = std::cmp::min(t.saturating_sub(1), max_checkpoint);
1407                let end = start.saturating_sub(limit - 1);
1408                (end, start)
1409            } else {
1410                let start =
1411                    std::cmp::min(t.checked_add(1).unwrap_or(max_checkpoint), max_checkpoint);
1412                let end = std::cmp::min(
1413                    start.checked_add(limit - 1).unwrap_or(max_checkpoint),
1414                    max_checkpoint,
1415                );
1416                (start, end)
1417            }
1418        }
1419        None => {
1420            if descending_order {
1421                (max_checkpoint.saturating_sub(limit - 1), max_checkpoint)
1422            } else {
1423                (0, std::cmp::min(limit - 1, max_checkpoint))
1424            }
1425        }
1426    };
1427
1428    if descending_order {
1429        (start_index..=end_index).rev().collect()
1430    } else {
1431        (start_index..=end_index).collect()
1432    }
1433}
1434
1435#[cfg(test)]
1436mod tests {
1437    use super::*;
1438
1439    #[test]
1440    fn test_calculate_checkpoint_numbers() {
1441        let cursor = Some(10);
1442        let limit = 5;
1443        let descending_order = true;
1444        let max_checkpoint = 15;
1445
1446        let checkpoint_numbers =
1447            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1448
1449        assert_eq!(checkpoint_numbers, vec![9, 8, 7, 6, 5]);
1450    }
1451
1452    #[test]
1453    fn test_calculate_checkpoint_numbers_descending_no_cursor() {
1454        let cursor = None;
1455        let limit = 5;
1456        let descending_order = true;
1457        let max_checkpoint = 15;
1458
1459        let checkpoint_numbers =
1460            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1461
1462        assert_eq!(checkpoint_numbers, vec![15, 14, 13, 12, 11]);
1463    }
1464
1465    #[test]
1466    fn test_calculate_checkpoint_numbers_ascending_no_cursor() {
1467        let cursor = None;
1468        let limit = 5;
1469        let descending_order = false;
1470        let max_checkpoint = 15;
1471
1472        let checkpoint_numbers =
1473            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1474
1475        assert_eq!(checkpoint_numbers, vec![0, 1, 2, 3, 4]);
1476    }
1477
1478    #[test]
1479    fn test_calculate_checkpoint_numbers_ascending_with_cursor() {
1480        let cursor = Some(10);
1481        let limit = 5;
1482        let descending_order = false;
1483        let max_checkpoint = 15;
1484
1485        let checkpoint_numbers =
1486            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1487
1488        assert_eq!(checkpoint_numbers, vec![11, 12, 13, 14, 15]);
1489    }
1490
1491    #[test]
1492    fn test_calculate_checkpoint_numbers_ascending_limit_exceeds_max() {
1493        let cursor = None;
1494        let limit = 20;
1495        let descending_order = false;
1496        let max_checkpoint = 15;
1497
1498        let checkpoint_numbers =
1499            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1500
1501        assert_eq!(checkpoint_numbers, (0..=15).collect::<Vec<_>>());
1502    }
1503
1504    #[test]
1505    fn test_calculate_checkpoint_numbers_descending_limit_exceeds_max() {
1506        let cursor = None;
1507        let limit = 20;
1508        let descending_order = true;
1509        let max_checkpoint = 15;
1510
1511        let checkpoint_numbers =
1512            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1513
1514        assert_eq!(checkpoint_numbers, (0..=15).rev().collect::<Vec<_>>());
1515    }
1516}