sui_core/
storage.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::AuthorityState;
5use crate::checkpoints::CheckpointStore;
6use crate::epoch::committee_store::CommitteeStore;
7use crate::execution_cache::ExecutionCacheTraitPointers;
8use crate::rpc_index::CoinIndexInfo;
9use crate::rpc_index::OwnerIndexInfo;
10use crate::rpc_index::OwnerIndexKey;
11use crate::rpc_index::RpcIndexStore;
12use move_core_types::language_storage::StructTag;
13use parking_lot::Mutex;
14use std::sync::Arc;
15use sui_types::base_types::ObjectID;
16use sui_types::base_types::SequenceNumber;
17use sui_types::base_types::SuiAddress;
18use sui_types::base_types::TransactionDigest;
19use sui_types::committee::Committee;
20use sui_types::committee::EpochId;
21use sui_types::effects::{TransactionEffects, TransactionEvents};
22use sui_types::error::{SuiErrorKind, SuiResult};
23use sui_types::full_checkpoint_content::ObjectSet;
24use sui_types::messages_checkpoint::CheckpointContentsDigest;
25use sui_types::messages_checkpoint::CheckpointDigest;
26use sui_types::messages_checkpoint::CheckpointSequenceNumber;
27use sui_types::messages_checkpoint::EndOfEpochData;
28use sui_types::messages_checkpoint::VerifiedCheckpoint;
29use sui_types::messages_checkpoint::VerifiedCheckpointContents;
30use sui_types::messages_checkpoint::VersionedFullCheckpointContents;
31use sui_types::object::Object;
32use sui_types::object::Owner;
33use sui_types::storage::BalanceInfo;
34use sui_types::storage::BalanceIterator;
35use sui_types::storage::ChildObjectResolver;
36use sui_types::storage::CoinInfo;
37use sui_types::storage::DynamicFieldKey;
38use sui_types::storage::ObjectStore;
39use sui_types::storage::OwnedObjectInfo;
40use sui_types::storage::RpcIndexes;
41use sui_types::storage::RpcStateReader;
42use sui_types::storage::WriteStore;
43use sui_types::storage::error::Error as StorageError;
44use sui_types::storage::error::Result;
45use sui_types::storage::{ObjectKey, OverlayBackingPackageStore, ReadStore};
46use sui_types::transaction::VerifiedTransaction;
47use tap::Pipe;
48use tap::TapFallible;
49use tracing::error;
50use typed_store::TypedStoreError;
51
52#[derive(Clone)]
53pub struct RocksDbStore {
54    cache_traits: ExecutionCacheTraitPointers,
55
56    committee_store: Arc<CommitteeStore>,
57    checkpoint_store: Arc<CheckpointStore>,
58    // in memory checkpoint watermark sequence numbers
59    highest_verified_checkpoint: Arc<Mutex<Option<u64>>>,
60    highest_synced_checkpoint: Arc<Mutex<Option<u64>>>,
61}
62
63impl RocksDbStore {
64    pub fn new(
65        cache_traits: ExecutionCacheTraitPointers,
66        committee_store: Arc<CommitteeStore>,
67        checkpoint_store: Arc<CheckpointStore>,
68    ) -> Self {
69        Self {
70            cache_traits,
71            committee_store,
72            checkpoint_store,
73            highest_verified_checkpoint: Arc::new(Mutex::new(None)),
74            highest_synced_checkpoint: Arc::new(Mutex::new(None)),
75        }
76    }
77
78    pub fn get_objects(&self, object_keys: &[ObjectKey]) -> Vec<Option<Object>> {
79        self.cache_traits
80            .object_cache_reader
81            .multi_get_objects_by_key(object_keys)
82    }
83
84    pub fn get_last_executed_checkpoint(&self) -> Option<VerifiedCheckpoint> {
85        self.checkpoint_store
86            .get_highest_executed_checkpoint()
87            .expect("db error")
88    }
89}
90
91impl ReadStore for RocksDbStore {
92    fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<VerifiedCheckpoint> {
93        self.checkpoint_store
94            .get_checkpoint_by_digest(digest)
95            .expect("db error")
96    }
97
98    fn get_checkpoint_by_sequence_number(
99        &self,
100        sequence_number: CheckpointSequenceNumber,
101    ) -> Option<VerifiedCheckpoint> {
102        self.checkpoint_store
103            .get_checkpoint_by_sequence_number(sequence_number)
104            .expect("db error")
105    }
106
107    fn get_highest_verified_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
108        self.checkpoint_store
109            .get_highest_verified_checkpoint()
110            .map(|maybe_checkpoint| {
111                maybe_checkpoint
112                    .expect("storage should have been initialized with genesis checkpoint")
113            })
114            .map_err(Into::into)
115    }
116
117    fn get_highest_synced_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
118        self.checkpoint_store
119            .get_highest_synced_checkpoint()
120            .map(|maybe_checkpoint| {
121                maybe_checkpoint
122                    .expect("storage should have been initialized with genesis checkpoint")
123            })
124            .map_err(Into::into)
125    }
126
127    fn get_lowest_available_checkpoint(&self) -> Result<CheckpointSequenceNumber, StorageError> {
128        if let Some(highest_pruned_cp) = self
129            .checkpoint_store
130            .get_highest_pruned_checkpoint_seq_number()
131            .map_err(Into::<StorageError>::into)?
132        {
133            Ok(highest_pruned_cp + 1)
134        } else {
135            Ok(0)
136        }
137    }
138
139    fn get_full_checkpoint_contents(
140        &self,
141        sequence_number: Option<CheckpointSequenceNumber>,
142        digest: &CheckpointContentsDigest,
143    ) -> Option<VersionedFullCheckpointContents> {
144        #[cfg(debug_assertions)]
145        if let Some(sequence_number) = sequence_number {
146            // When sequence_number is provided as an optimization, we want to ensure that
147            // the sequence number we get from the db matches the one we provided.
148            // Only check this in debug mode though.
149            if let Some(loaded_sequence_number) = self
150                .checkpoint_store
151                .get_sequence_number_by_contents_digest(digest)
152                .expect("db error")
153            {
154                assert_eq!(loaded_sequence_number, sequence_number);
155            }
156        }
157
158        let sequence_number = sequence_number.or_else(|| {
159            self.checkpoint_store
160                .get_sequence_number_by_contents_digest(digest)
161                .expect("db error")
162        });
163        if let Some(sequence_number) = sequence_number {
164            // Note: We don't use `?` here because we want to tolerate
165            // potential db errors due to data corruption.
166            // In that case, we will fallback and construct the contents
167            // from the individual components as if we could not find the
168            // cached full contents.
169            if let Ok(Some(contents)) = self
170                .checkpoint_store
171                .get_full_checkpoint_contents_by_sequence_number(sequence_number)
172                .tap_err(|e| {
173                    error!(
174                        "error getting full checkpoint contents for checkpoint {:?}: {:?}",
175                        sequence_number, e
176                    )
177                })
178            {
179                return Some(contents);
180            }
181        }
182
183        // Otherwise gather it from the individual components.
184        // Note we can't insert the constructed contents into `full_checkpoint_content`,
185        // because it needs to be inserted along with `checkpoint_sequence_by_contents_digest`
186        // and `checkpoint_content`. However at this point it's likely we don't know the
187        // corresponding sequence number yet.
188        self.checkpoint_store
189            .get_checkpoint_contents(digest)
190            .expect("db error")
191            .and_then(|contents| {
192                let mut transactions = Vec::with_capacity(contents.size());
193                for tx in contents.iter() {
194                    if let (Some(t), Some(e)) = (
195                        self.get_transaction(&tx.transaction),
196                        self.cache_traits
197                            .transaction_cache_reader
198                            .get_effects(&tx.effects),
199                    ) {
200                        transactions.push(sui_types::base_types::ExecutionData::new(
201                            (*t).clone().into_inner(),
202                            e,
203                        ))
204                    } else {
205                        return None;
206                    }
207                }
208                Some(
209                    VersionedFullCheckpointContents::from_contents_and_execution_data(
210                        contents,
211                        transactions.into_iter(),
212                    ),
213                )
214            })
215    }
216
217    fn get_committee(&self, epoch: EpochId) -> Option<Arc<Committee>> {
218        self.committee_store.get_committee(&epoch).unwrap()
219    }
220
221    fn get_transaction(&self, digest: &TransactionDigest) -> Option<Arc<VerifiedTransaction>> {
222        self.cache_traits
223            .transaction_cache_reader
224            .get_transaction_block(digest)
225    }
226
227    fn get_transaction_effects(&self, digest: &TransactionDigest) -> Option<TransactionEffects> {
228        self.cache_traits
229            .transaction_cache_reader
230            .get_executed_effects(digest)
231    }
232
233    fn get_events(&self, digest: &TransactionDigest) -> Option<TransactionEvents> {
234        self.cache_traits
235            .transaction_cache_reader
236            .get_events(digest)
237    }
238
239    fn get_unchanged_loaded_runtime_objects(
240        &self,
241        digest: &TransactionDigest,
242    ) -> Option<Vec<ObjectKey>> {
243        self.cache_traits
244            .transaction_cache_reader
245            .get_unchanged_loaded_runtime_objects(digest)
246    }
247
248    fn get_transaction_checkpoint(
249        &self,
250        digest: &TransactionDigest,
251    ) -> Option<CheckpointSequenceNumber> {
252        self.cache_traits
253            .checkpoint_cache
254            .deprecated_get_transaction_checkpoint(digest)
255            .map(|(_epoch, checkpoint)| checkpoint)
256    }
257
258    fn get_latest_checkpoint(&self) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
259        self.checkpoint_store
260            .get_highest_executed_checkpoint()
261            .expect("db error")
262            .ok_or_else(|| {
263                sui_types::storage::error::Error::missing("unable to get latest checkpoint")
264            })
265    }
266
267    fn get_checkpoint_contents_by_digest(
268        &self,
269        digest: &CheckpointContentsDigest,
270    ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
271        self.checkpoint_store
272            .get_checkpoint_contents(digest)
273            .expect("db error")
274    }
275
276    fn get_checkpoint_contents_by_sequence_number(
277        &self,
278        sequence_number: CheckpointSequenceNumber,
279    ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
280        match self.get_checkpoint_by_sequence_number(sequence_number) {
281            Some(checkpoint) => self.get_checkpoint_contents_by_digest(&checkpoint.content_digest),
282            None => None,
283        }
284    }
285}
286
287impl ObjectStore for RocksDbStore {
288    fn get_object(&self, object_id: &sui_types::base_types::ObjectID) -> Option<Object> {
289        self.cache_traits.object_store.get_object(object_id)
290    }
291
292    fn get_object_by_key(
293        &self,
294        object_id: &sui_types::base_types::ObjectID,
295        version: sui_types::base_types::VersionNumber,
296    ) -> Option<Object> {
297        self.cache_traits
298            .object_store
299            .get_object_by_key(object_id, version)
300    }
301}
302
303impl WriteStore for RocksDbStore {
304    fn insert_checkpoint(
305        &self,
306        checkpoint: &VerifiedCheckpoint,
307    ) -> Result<(), sui_types::storage::error::Error> {
308        if let Some(EndOfEpochData {
309            next_epoch_committee,
310            ..
311        }) = checkpoint.end_of_epoch_data.as_ref()
312        {
313            let next_committee = next_epoch_committee.iter().cloned().collect();
314            let committee =
315                Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
316            self.insert_committee(committee)?;
317        }
318
319        self.checkpoint_store
320            .insert_verified_checkpoint(checkpoint)
321            .map_err(Into::into)
322    }
323
324    fn update_highest_synced_checkpoint(
325        &self,
326        checkpoint: &VerifiedCheckpoint,
327    ) -> Result<(), sui_types::storage::error::Error> {
328        let mut locked = self.highest_synced_checkpoint.lock();
329        if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
330            return Ok(());
331        }
332        self.checkpoint_store
333            .update_highest_synced_checkpoint(checkpoint)
334            .map_err(sui_types::storage::error::Error::custom)?;
335        *locked = Some(checkpoint.sequence_number);
336        Ok(())
337    }
338
339    fn update_highest_verified_checkpoint(
340        &self,
341        checkpoint: &VerifiedCheckpoint,
342    ) -> Result<(), sui_types::storage::error::Error> {
343        let mut locked = self.highest_verified_checkpoint.lock();
344        if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
345            return Ok(());
346        }
347        self.checkpoint_store
348            .update_highest_verified_checkpoint(checkpoint)
349            .map_err(sui_types::storage::error::Error::custom)?;
350        *locked = Some(checkpoint.sequence_number);
351        Ok(())
352    }
353
354    fn insert_checkpoint_contents(
355        &self,
356        checkpoint: &VerifiedCheckpoint,
357        contents: VerifiedCheckpointContents,
358    ) -> Result<(), sui_types::storage::error::Error> {
359        self.cache_traits
360            .state_sync_store
361            .multi_insert_transaction_and_effects(contents.transactions());
362        self.checkpoint_store
363            .insert_verified_checkpoint_contents(checkpoint, contents)
364            .map_err(Into::into)
365    }
366
367    fn insert_committee(
368        &self,
369        new_committee: Committee,
370    ) -> Result<(), sui_types::storage::error::Error> {
371        self.committee_store
372            .insert_new_committee(&new_committee)
373            .unwrap();
374        Ok(())
375    }
376}
377
378pub struct RestReadStore {
379    state: Arc<AuthorityState>,
380    rocks: RocksDbStore,
381}
382
383impl RestReadStore {
384    pub fn new(state: Arc<AuthorityState>, rocks: RocksDbStore) -> Self {
385        Self { state, rocks }
386    }
387
388    fn index(&self) -> sui_types::storage::error::Result<&RpcIndexStore> {
389        self.state
390            .rpc_index
391            .as_deref()
392            .ok_or_else(|| sui_types::storage::error::Error::custom("rest index store is disabled"))
393    }
394}
395
396impl ObjectStore for RestReadStore {
397    fn get_object(&self, object_id: &sui_types::base_types::ObjectID) -> Option<Object> {
398        self.rocks.get_object(object_id)
399    }
400
401    fn get_object_by_key(
402        &self,
403        object_id: &sui_types::base_types::ObjectID,
404        version: sui_types::base_types::VersionNumber,
405    ) -> Option<Object> {
406        self.rocks.get_object_by_key(object_id, version)
407    }
408}
409
410impl ReadStore for RestReadStore {
411    fn get_committee(&self, epoch: EpochId) -> Option<Arc<Committee>> {
412        self.rocks.get_committee(epoch)
413    }
414
415    fn get_latest_checkpoint(&self) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
416        self.rocks.get_latest_checkpoint()
417    }
418
419    fn get_highest_verified_checkpoint(
420        &self,
421    ) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
422        self.rocks.get_highest_verified_checkpoint()
423    }
424
425    fn get_highest_synced_checkpoint(
426        &self,
427    ) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
428        self.rocks.get_highest_synced_checkpoint()
429    }
430
431    fn get_lowest_available_checkpoint(
432        &self,
433    ) -> sui_types::storage::error::Result<CheckpointSequenceNumber> {
434        self.rocks.get_lowest_available_checkpoint()
435    }
436
437    fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<VerifiedCheckpoint> {
438        self.rocks.get_checkpoint_by_digest(digest)
439    }
440
441    fn get_checkpoint_by_sequence_number(
442        &self,
443        sequence_number: CheckpointSequenceNumber,
444    ) -> Option<VerifiedCheckpoint> {
445        self.rocks
446            .get_checkpoint_by_sequence_number(sequence_number)
447    }
448
449    fn get_checkpoint_contents_by_digest(
450        &self,
451        digest: &CheckpointContentsDigest,
452    ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
453        self.rocks.get_checkpoint_contents_by_digest(digest)
454    }
455
456    fn get_checkpoint_contents_by_sequence_number(
457        &self,
458        sequence_number: CheckpointSequenceNumber,
459    ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
460        self.rocks
461            .get_checkpoint_contents_by_sequence_number(sequence_number)
462    }
463
464    fn get_transaction(&self, digest: &TransactionDigest) -> Option<Arc<VerifiedTransaction>> {
465        self.rocks.get_transaction(digest)
466    }
467
468    fn get_transaction_effects(&self, digest: &TransactionDigest) -> Option<TransactionEffects> {
469        self.rocks.get_transaction_effects(digest)
470    }
471
472    fn get_events(&self, digest: &TransactionDigest) -> Option<TransactionEvents> {
473        self.rocks.get_events(digest)
474    }
475
476    fn get_full_checkpoint_contents(
477        &self,
478        sequence_number: Option<CheckpointSequenceNumber>,
479        digest: &CheckpointContentsDigest,
480    ) -> Option<VersionedFullCheckpointContents> {
481        self.rocks
482            .get_full_checkpoint_contents(sequence_number, digest)
483    }
484
485    fn get_unchanged_loaded_runtime_objects(
486        &self,
487        digest: &TransactionDigest,
488    ) -> Option<Vec<ObjectKey>> {
489        self.rocks.get_unchanged_loaded_runtime_objects(digest)
490    }
491
492    fn get_transaction_checkpoint(
493        &self,
494        digest: &TransactionDigest,
495    ) -> Option<CheckpointSequenceNumber> {
496        self.rocks.get_transaction_checkpoint(digest)
497    }
498}
499
500impl ChildObjectResolver for RestReadStore {
501    fn read_child_object(
502        &self,
503        parent: &ObjectID,
504        child: &ObjectID,
505        child_version_upper_bound: SequenceNumber,
506    ) -> SuiResult<Option<Object>> {
507        Ok(self.get_object(child).and_then(|o| {
508            if o.version() <= child_version_upper_bound
509                && o.owner == Owner::ObjectOwner((*parent).into())
510            {
511                Some(o)
512            } else {
513                None
514            }
515        }))
516    }
517
518    fn get_object_received_at_version(
519        &self,
520        _owner: &ObjectID,
521        _receiving_object_id: &ObjectID,
522        _receive_object_at_version: SequenceNumber,
523        _epoch_id: EpochId,
524    ) -> SuiResult<Option<Object>> {
525        Err(SuiErrorKind::UnsupportedFeatureError {
526            error: "RestReadStore does not support receiving objects".to_string(),
527        }
528        .into())
529    }
530}
531
532impl RpcStateReader for RestReadStore {
533    fn get_lowest_available_checkpoint_objects(
534        &self,
535    ) -> sui_types::storage::error::Result<CheckpointSequenceNumber> {
536        Ok(self
537            .state
538            .get_object_cache_reader()
539            .get_highest_pruned_checkpoint()
540            .map(|cp| cp + 1)
541            .unwrap_or(0))
542    }
543
544    fn get_chain_identifier(&self) -> Result<sui_types::digests::ChainIdentifier> {
545        Ok(self.state.get_chain_identifier())
546    }
547
548    fn indexes(&self) -> Option<&dyn RpcIndexes> {
549        Some(self)
550    }
551
552    fn get_struct_layout_with_overlay(
553        &self,
554        struct_tag: &move_core_types::language_storage::StructTag,
555        overlay: &ObjectSet,
556    ) -> Result<Option<move_core_types::annotated_value::MoveTypeLayout>> {
557        let backing_store = self.state.get_backing_package_store();
558        let overlay_store = OverlayBackingPackageStore::new(overlay, backing_store.as_ref());
559        self.state
560            .load_epoch_store_one_call_per_task()
561            .executor()
562            // TODO(cache) - must read through cache
563            .type_layout_resolver(Box::new(overlay_store))
564            .get_annotated_layout(struct_tag)
565            .map(|layout| layout.into_layout())
566            .map(Some)
567            .map_err(StorageError::custom)
568    }
569}
570
571struct BatchedEventIterator<'a, I>
572where
573    I: Iterator<Item = Result<crate::rpc_index::EventIndexKey, TypedStoreError>>,
574{
575    key_iter: I,
576    rocks: &'a RocksDbStore,
577    current_checkpoint: Option<u64>,
578    current_checkpoint_contents: Option<sui_types::messages_checkpoint::CheckpointContents>,
579    cached_tx_events: Option<TransactionEvents>,
580    cached_tx_digest: Option<TransactionDigest>,
581}
582
583impl<I> Iterator for BatchedEventIterator<'_, I>
584where
585    I: Iterator<Item = Result<crate::rpc_index::EventIndexKey, TypedStoreError>>,
586{
587    type Item = Result<(u64, u64, u32, u32, sui_types::event::Event), TypedStoreError>;
588
589    fn next(&mut self) -> Option<Self::Item> {
590        let key = match self.key_iter.next()? {
591            Ok(k) => k,
592            Err(e) => return Some(Err(e)),
593        };
594
595        if self.current_checkpoint != Some(key.checkpoint_seq) {
596            self.current_checkpoint = Some(key.checkpoint_seq);
597            self.current_checkpoint_contents = self
598                .rocks
599                .get_checkpoint_contents_by_sequence_number(key.checkpoint_seq);
600            self.cached_tx_events = None;
601            self.cached_tx_digest = None;
602        }
603
604        let checkpoint_contents = self.current_checkpoint_contents.as_ref()?;
605
606        let exec_digest = checkpoint_contents
607            .iter()
608            .nth(key.transaction_idx as usize)?;
609        let tx_digest = exec_digest.transaction;
610
611        if self.cached_tx_digest != Some(tx_digest) {
612            self.cached_tx_digest = Some(tx_digest);
613            self.cached_tx_events = self.rocks.get_events(&tx_digest);
614        }
615
616        let tx_events = self.cached_tx_events.as_ref()?;
617        let event = tx_events.data.get(key.event_index as usize)?.clone();
618
619        Some(Ok((
620            key.checkpoint_seq,
621            key.accumulator_version,
622            key.transaction_idx,
623            key.event_index,
624            event,
625        )))
626    }
627}
628
629impl RpcIndexes for RestReadStore {
630    fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<sui_types::storage::EpochInfo>> {
631        self.index()?
632            .get_epoch_info(epoch)
633            .map_err(StorageError::custom)
634    }
635
636    fn owned_objects_iter(
637        &self,
638        owner: SuiAddress,
639        object_type: Option<StructTag>,
640        cursor: Option<OwnedObjectInfo>,
641    ) -> Result<Box<dyn Iterator<Item = Result<OwnedObjectInfo, TypedStoreError>> + '_>> {
642        let cursor = cursor.map(|cursor| OwnerIndexKey {
643            owner: cursor.owner,
644            object_type: cursor.object_type,
645            inverted_balance: cursor.balance.map(std::ops::Not::not),
646            object_id: cursor.object_id,
647        });
648
649        let iter = self
650            .index()?
651            .owner_iter(owner, object_type, cursor)?
652            .map(|result| {
653                result.map(
654                    |(
655                        OwnerIndexKey {
656                            owner,
657                            object_id,
658                            object_type,
659                            inverted_balance,
660                        },
661                        OwnerIndexInfo { version },
662                    )| {
663                        OwnedObjectInfo {
664                            owner,
665                            object_type,
666                            balance: inverted_balance.map(std::ops::Not::not),
667                            object_id,
668                            version,
669                        }
670                    },
671                )
672            });
673
674        Ok(Box::new(iter) as _)
675    }
676
677    fn dynamic_field_iter(
678        &self,
679        parent: ObjectID,
680        cursor: Option<ObjectID>,
681    ) -> sui_types::storage::error::Result<
682        Box<dyn Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_>,
683    > {
684        let iter = self.index()?.dynamic_field_iter(parent, cursor)?;
685        Ok(Box::new(iter) as _)
686    }
687
688    fn get_coin_info(
689        &self,
690        coin_type: &StructTag,
691    ) -> sui_types::storage::error::Result<Option<CoinInfo>> {
692        self.index()?
693            .get_coin_info(coin_type)?
694            .map(
695                |CoinIndexInfo {
696                     coin_metadata_object_id,
697                     treasury_object_id,
698                     regulated_coin_metadata_object_id,
699                 }| CoinInfo {
700                    coin_metadata_object_id,
701                    treasury_object_id,
702                    regulated_coin_metadata_object_id,
703                },
704            )
705            .pipe(Ok)
706    }
707
708    fn get_balance(
709        &self,
710        owner: &SuiAddress,
711        coin_type: &StructTag,
712    ) -> sui_types::storage::error::Result<Option<BalanceInfo>> {
713        self.index()?
714            .get_balance(owner, coin_type)?
715            .map(|info| info.into())
716            .pipe(Ok)
717    }
718
719    fn balance_iter(
720        &self,
721        owner: &SuiAddress,
722        cursor: Option<(SuiAddress, StructTag)>,
723    ) -> sui_types::storage::error::Result<BalanceIterator<'_>> {
724        let cursor_key =
725            cursor.map(|(owner, coin_type)| crate::rpc_index::BalanceKey { owner, coin_type });
726
727        Ok(Box::new(
728            self.index()?
729                .balance_iter(*owner, cursor_key)?
730                .map(|result| {
731                    result
732                        .map(|(key, info)| (key.coin_type, info.into()))
733                        .map_err(Into::into)
734                }),
735        ))
736    }
737
738    fn package_versions_iter(
739        &self,
740        original_id: ObjectID,
741        cursor: Option<u64>,
742    ) -> sui_types::storage::error::Result<
743        Box<dyn Iterator<Item = Result<(u64, ObjectID), TypedStoreError>> + '_>,
744    > {
745        let iter = self.index()?.package_versions_iter(original_id, cursor)?;
746        Ok(
747            Box::new(iter.map(|result| result.map(|(key, info)| (key.version, info.storage_id))))
748                as _,
749        )
750    }
751
752    fn get_highest_indexed_checkpoint_seq_number(
753        &self,
754    ) -> sui_types::storage::error::Result<Option<CheckpointSequenceNumber>> {
755        self.index()?
756            .get_highest_indexed_checkpoint_seq_number()
757            .map_err(Into::into)
758    }
759
760    fn authenticated_event_iter(
761        &self,
762        stream_id: SuiAddress,
763        start_checkpoint: u64,
764        start_accumulator_version: Option<u64>,
765        start_transaction_idx: Option<u32>,
766        start_event_idx: Option<u32>,
767        end_checkpoint: u64,
768        limit: u32,
769    ) -> sui_types::storage::error::Result<
770        Box<
771            dyn Iterator<
772                    Item = Result<(u64, u64, u32, u32, sui_types::event::Event), TypedStoreError>,
773                > + '_,
774        >,
775    > {
776        let index = self.index()?;
777        let key_iter = index.event_iter(
778            stream_id,
779            start_checkpoint,
780            start_accumulator_version.unwrap_or(0),
781            start_transaction_idx.unwrap_or(0),
782            start_event_idx.unwrap_or(0),
783            end_checkpoint,
784            limit,
785        )?;
786
787        let rocks = &self.rocks;
788        let iter = BatchedEventIterator {
789            key_iter,
790            rocks,
791            current_checkpoint: None,
792            current_checkpoint_contents: None,
793            cached_tx_events: None,
794            cached_tx_digest: None,
795        };
796
797        Ok(Box::new(iter))
798    }
799}