sui_core/
storage.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::AuthorityState;
5use crate::checkpoints::CheckpointStore;
6use crate::epoch::committee_store::CommitteeStore;
7use crate::execution_cache::ExecutionCacheTraitPointers;
8use crate::rpc_index::CoinIndexInfo;
9use crate::rpc_index::OwnerIndexInfo;
10use crate::rpc_index::OwnerIndexKey;
11use crate::rpc_index::RpcIndexStore;
12use move_core_types::language_storage::StructTag;
13use parking_lot::Mutex;
14use std::sync::Arc;
15use sui_types::base_types::ObjectID;
16use sui_types::base_types::SuiAddress;
17use sui_types::base_types::TransactionDigest;
18use sui_types::committee::Committee;
19use sui_types::committee::EpochId;
20use sui_types::effects::{TransactionEffects, TransactionEvents};
21use sui_types::messages_checkpoint::CheckpointContentsDigest;
22use sui_types::messages_checkpoint::CheckpointDigest;
23use sui_types::messages_checkpoint::CheckpointSequenceNumber;
24use sui_types::messages_checkpoint::EndOfEpochData;
25use sui_types::messages_checkpoint::VerifiedCheckpoint;
26use sui_types::messages_checkpoint::VerifiedCheckpointContents;
27use sui_types::messages_checkpoint::VersionedFullCheckpointContents;
28use sui_types::object::Object;
29use sui_types::storage::BalanceInfo;
30use sui_types::storage::BalanceIterator;
31use sui_types::storage::CoinInfo;
32use sui_types::storage::DynamicFieldKey;
33use sui_types::storage::ObjectStore;
34use sui_types::storage::OwnedObjectInfo;
35use sui_types::storage::RpcIndexes;
36use sui_types::storage::RpcStateReader;
37use sui_types::storage::TransactionInfo;
38use sui_types::storage::WriteStore;
39use sui_types::storage::error::Error as StorageError;
40use sui_types::storage::error::Result;
41use sui_types::storage::{ObjectKey, ReadStore};
42use sui_types::transaction::VerifiedTransaction;
43use tap::Pipe;
44use tap::TapFallible;
45use tracing::error;
46use typed_store::TypedStoreError;
47
48#[derive(Clone)]
49pub struct RocksDbStore {
50    cache_traits: ExecutionCacheTraitPointers,
51
52    committee_store: Arc<CommitteeStore>,
53    checkpoint_store: Arc<CheckpointStore>,
54    // in memory checkpoint watermark sequence numbers
55    highest_verified_checkpoint: Arc<Mutex<Option<u64>>>,
56    highest_synced_checkpoint: Arc<Mutex<Option<u64>>>,
57}
58
59impl RocksDbStore {
60    pub fn new(
61        cache_traits: ExecutionCacheTraitPointers,
62        committee_store: Arc<CommitteeStore>,
63        checkpoint_store: Arc<CheckpointStore>,
64    ) -> Self {
65        Self {
66            cache_traits,
67            committee_store,
68            checkpoint_store,
69            highest_verified_checkpoint: Arc::new(Mutex::new(None)),
70            highest_synced_checkpoint: Arc::new(Mutex::new(None)),
71        }
72    }
73
74    pub fn get_objects(&self, object_keys: &[ObjectKey]) -> Vec<Option<Object>> {
75        self.cache_traits
76            .object_cache_reader
77            .multi_get_objects_by_key(object_keys)
78    }
79
80    pub fn get_last_executed_checkpoint(&self) -> Option<VerifiedCheckpoint> {
81        self.checkpoint_store
82            .get_highest_executed_checkpoint()
83            .expect("db error")
84    }
85}
86
87impl ReadStore for RocksDbStore {
88    fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<VerifiedCheckpoint> {
89        self.checkpoint_store
90            .get_checkpoint_by_digest(digest)
91            .expect("db error")
92    }
93
94    fn get_checkpoint_by_sequence_number(
95        &self,
96        sequence_number: CheckpointSequenceNumber,
97    ) -> Option<VerifiedCheckpoint> {
98        self.checkpoint_store
99            .get_checkpoint_by_sequence_number(sequence_number)
100            .expect("db error")
101    }
102
103    fn get_highest_verified_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
104        self.checkpoint_store
105            .get_highest_verified_checkpoint()
106            .map(|maybe_checkpoint| {
107                maybe_checkpoint
108                    .expect("storage should have been initialized with genesis checkpoint")
109            })
110            .map_err(Into::into)
111    }
112
113    fn get_highest_synced_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
114        self.checkpoint_store
115            .get_highest_synced_checkpoint()
116            .map(|maybe_checkpoint| {
117                maybe_checkpoint
118                    .expect("storage should have been initialized with genesis checkpoint")
119            })
120            .map_err(Into::into)
121    }
122
123    fn get_lowest_available_checkpoint(&self) -> Result<CheckpointSequenceNumber, StorageError> {
124        if let Some(highest_pruned_cp) = self
125            .checkpoint_store
126            .get_highest_pruned_checkpoint_seq_number()
127            .map_err(Into::<StorageError>::into)?
128        {
129            Ok(highest_pruned_cp + 1)
130        } else {
131            Ok(0)
132        }
133    }
134
135    fn get_full_checkpoint_contents(
136        &self,
137        sequence_number: Option<CheckpointSequenceNumber>,
138        digest: &CheckpointContentsDigest,
139    ) -> Option<VersionedFullCheckpointContents> {
140        #[cfg(debug_assertions)]
141        if let Some(sequence_number) = sequence_number {
142            // When sequence_number is provided as an optimization, we want to ensure that
143            // the sequence number we get from the db matches the one we provided.
144            // Only check this in debug mode though.
145            if let Some(loaded_sequence_number) = self
146                .checkpoint_store
147                .get_sequence_number_by_contents_digest(digest)
148                .expect("db error")
149            {
150                assert_eq!(loaded_sequence_number, sequence_number);
151            }
152        }
153
154        let sequence_number = sequence_number.or_else(|| {
155            self.checkpoint_store
156                .get_sequence_number_by_contents_digest(digest)
157                .expect("db error")
158        });
159        if let Some(sequence_number) = sequence_number {
160            // Note: We don't use `?` here because we want to tolerate
161            // potential db errors due to data corruption.
162            // In that case, we will fallback and construct the contents
163            // from the individual components as if we could not find the
164            // cached full contents.
165            if let Ok(Some(contents)) = self
166                .checkpoint_store
167                .get_full_checkpoint_contents_by_sequence_number(sequence_number)
168                .tap_err(|e| {
169                    error!(
170                        "error getting full checkpoint contents for checkpoint {:?}: {:?}",
171                        sequence_number, e
172                    )
173                })
174            {
175                return Some(contents);
176            }
177        }
178
179        // Otherwise gather it from the individual components.
180        // Note we can't insert the constructed contents into `full_checkpoint_content`,
181        // because it needs to be inserted along with `checkpoint_sequence_by_contents_digest`
182        // and `checkpoint_content`. However at this point it's likely we don't know the
183        // corresponding sequence number yet.
184        self.checkpoint_store
185            .get_checkpoint_contents(digest)
186            .expect("db error")
187            .and_then(|contents| {
188                let mut transactions = Vec::with_capacity(contents.size());
189                for tx in contents.iter() {
190                    if let (Some(t), Some(e)) = (
191                        self.get_transaction(&tx.transaction),
192                        self.cache_traits
193                            .transaction_cache_reader
194                            .get_effects(&tx.effects),
195                    ) {
196                        transactions.push(sui_types::base_types::ExecutionData::new(
197                            (*t).clone().into_inner(),
198                            e,
199                        ))
200                    } else {
201                        return None;
202                    }
203                }
204                Some(
205                    VersionedFullCheckpointContents::from_contents_and_execution_data(
206                        contents,
207                        transactions.into_iter(),
208                    ),
209                )
210            })
211    }
212
213    fn get_committee(&self, epoch: EpochId) -> Option<Arc<Committee>> {
214        self.committee_store.get_committee(&epoch).unwrap()
215    }
216
217    fn get_transaction(&self, digest: &TransactionDigest) -> Option<Arc<VerifiedTransaction>> {
218        self.cache_traits
219            .transaction_cache_reader
220            .get_transaction_block(digest)
221    }
222
223    fn get_transaction_effects(&self, digest: &TransactionDigest) -> Option<TransactionEffects> {
224        self.cache_traits
225            .transaction_cache_reader
226            .get_executed_effects(digest)
227    }
228
229    fn get_events(&self, digest: &TransactionDigest) -> Option<TransactionEvents> {
230        self.cache_traits
231            .transaction_cache_reader
232            .get_events(digest)
233    }
234
235    fn get_unchanged_loaded_runtime_objects(
236        &self,
237        digest: &TransactionDigest,
238    ) -> Option<Vec<ObjectKey>> {
239        self.cache_traits
240            .transaction_cache_reader
241            .get_unchanged_loaded_runtime_objects(digest)
242    }
243
244    fn get_transaction_checkpoint(
245        &self,
246        digest: &TransactionDigest,
247    ) -> Option<CheckpointSequenceNumber> {
248        self.cache_traits
249            .checkpoint_cache
250            .deprecated_get_transaction_checkpoint(digest)
251            .map(|(_epoch, checkpoint)| checkpoint)
252    }
253
254    fn get_latest_checkpoint(&self) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
255        self.checkpoint_store
256            .get_highest_executed_checkpoint()
257            .expect("db error")
258            .ok_or_else(|| {
259                sui_types::storage::error::Error::missing("unable to get latest checkpoint")
260            })
261    }
262
263    fn get_checkpoint_contents_by_digest(
264        &self,
265        digest: &CheckpointContentsDigest,
266    ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
267        self.checkpoint_store
268            .get_checkpoint_contents(digest)
269            .expect("db error")
270    }
271
272    fn get_checkpoint_contents_by_sequence_number(
273        &self,
274        sequence_number: CheckpointSequenceNumber,
275    ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
276        match self.get_checkpoint_by_sequence_number(sequence_number) {
277            Some(checkpoint) => self.get_checkpoint_contents_by_digest(&checkpoint.content_digest),
278            None => None,
279        }
280    }
281}
282
283impl ObjectStore for RocksDbStore {
284    fn get_object(&self, object_id: &sui_types::base_types::ObjectID) -> Option<Object> {
285        self.cache_traits.object_store.get_object(object_id)
286    }
287
288    fn get_object_by_key(
289        &self,
290        object_id: &sui_types::base_types::ObjectID,
291        version: sui_types::base_types::VersionNumber,
292    ) -> Option<Object> {
293        self.cache_traits
294            .object_store
295            .get_object_by_key(object_id, version)
296    }
297}
298
299impl WriteStore for RocksDbStore {
300    fn insert_checkpoint(
301        &self,
302        checkpoint: &VerifiedCheckpoint,
303    ) -> Result<(), sui_types::storage::error::Error> {
304        if let Some(EndOfEpochData {
305            next_epoch_committee,
306            ..
307        }) = checkpoint.end_of_epoch_data.as_ref()
308        {
309            let next_committee = next_epoch_committee.iter().cloned().collect();
310            let committee =
311                Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
312            self.insert_committee(committee)?;
313        }
314
315        self.checkpoint_store
316            .insert_verified_checkpoint(checkpoint)
317            .map_err(Into::into)
318    }
319
320    fn update_highest_synced_checkpoint(
321        &self,
322        checkpoint: &VerifiedCheckpoint,
323    ) -> Result<(), sui_types::storage::error::Error> {
324        let mut locked = self.highest_synced_checkpoint.lock();
325        if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
326            return Ok(());
327        }
328        self.checkpoint_store
329            .update_highest_synced_checkpoint(checkpoint)
330            .map_err(sui_types::storage::error::Error::custom)?;
331        *locked = Some(checkpoint.sequence_number);
332        Ok(())
333    }
334
335    fn update_highest_verified_checkpoint(
336        &self,
337        checkpoint: &VerifiedCheckpoint,
338    ) -> Result<(), sui_types::storage::error::Error> {
339        let mut locked = self.highest_verified_checkpoint.lock();
340        if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
341            return Ok(());
342        }
343        self.checkpoint_store
344            .update_highest_verified_checkpoint(checkpoint)
345            .map_err(sui_types::storage::error::Error::custom)?;
346        *locked = Some(checkpoint.sequence_number);
347        Ok(())
348    }
349
350    fn insert_checkpoint_contents(
351        &self,
352        checkpoint: &VerifiedCheckpoint,
353        contents: VerifiedCheckpointContents,
354    ) -> Result<(), sui_types::storage::error::Error> {
355        self.cache_traits
356            .state_sync_store
357            .multi_insert_transaction_and_effects(contents.transactions());
358        self.checkpoint_store
359            .insert_verified_checkpoint_contents(checkpoint, contents)
360            .map_err(Into::into)
361    }
362
363    fn insert_committee(
364        &self,
365        new_committee: Committee,
366    ) -> Result<(), sui_types::storage::error::Error> {
367        self.committee_store
368            .insert_new_committee(&new_committee)
369            .unwrap();
370        Ok(())
371    }
372}
373
374pub struct RestReadStore {
375    state: Arc<AuthorityState>,
376    rocks: RocksDbStore,
377}
378
379impl RestReadStore {
380    pub fn new(state: Arc<AuthorityState>, rocks: RocksDbStore) -> Self {
381        Self { state, rocks }
382    }
383
384    fn index(&self) -> sui_types::storage::error::Result<&RpcIndexStore> {
385        self.state
386            .rpc_index
387            .as_deref()
388            .ok_or_else(|| sui_types::storage::error::Error::custom("rest index store is disabled"))
389    }
390}
391
392impl ObjectStore for RestReadStore {
393    fn get_object(&self, object_id: &sui_types::base_types::ObjectID) -> Option<Object> {
394        self.rocks.get_object(object_id)
395    }
396
397    fn get_object_by_key(
398        &self,
399        object_id: &sui_types::base_types::ObjectID,
400        version: sui_types::base_types::VersionNumber,
401    ) -> Option<Object> {
402        self.rocks.get_object_by_key(object_id, version)
403    }
404}
405
406impl ReadStore for RestReadStore {
407    fn get_committee(&self, epoch: EpochId) -> Option<Arc<Committee>> {
408        self.rocks.get_committee(epoch)
409    }
410
411    fn get_latest_checkpoint(&self) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
412        self.rocks.get_latest_checkpoint()
413    }
414
415    fn get_highest_verified_checkpoint(
416        &self,
417    ) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
418        self.rocks.get_highest_verified_checkpoint()
419    }
420
421    fn get_highest_synced_checkpoint(
422        &self,
423    ) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
424        self.rocks.get_highest_synced_checkpoint()
425    }
426
427    fn get_lowest_available_checkpoint(
428        &self,
429    ) -> sui_types::storage::error::Result<CheckpointSequenceNumber> {
430        self.rocks.get_lowest_available_checkpoint()
431    }
432
433    fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<VerifiedCheckpoint> {
434        self.rocks.get_checkpoint_by_digest(digest)
435    }
436
437    fn get_checkpoint_by_sequence_number(
438        &self,
439        sequence_number: CheckpointSequenceNumber,
440    ) -> Option<VerifiedCheckpoint> {
441        self.rocks
442            .get_checkpoint_by_sequence_number(sequence_number)
443    }
444
445    fn get_checkpoint_contents_by_digest(
446        &self,
447        digest: &CheckpointContentsDigest,
448    ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
449        self.rocks.get_checkpoint_contents_by_digest(digest)
450    }
451
452    fn get_checkpoint_contents_by_sequence_number(
453        &self,
454        sequence_number: CheckpointSequenceNumber,
455    ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
456        self.rocks
457            .get_checkpoint_contents_by_sequence_number(sequence_number)
458    }
459
460    fn get_transaction(&self, digest: &TransactionDigest) -> Option<Arc<VerifiedTransaction>> {
461        self.rocks.get_transaction(digest)
462    }
463
464    fn get_transaction_effects(&self, digest: &TransactionDigest) -> Option<TransactionEffects> {
465        self.rocks.get_transaction_effects(digest)
466    }
467
468    fn get_events(&self, digest: &TransactionDigest) -> Option<TransactionEvents> {
469        self.rocks.get_events(digest)
470    }
471
472    fn get_full_checkpoint_contents(
473        &self,
474        sequence_number: Option<CheckpointSequenceNumber>,
475        digest: &CheckpointContentsDigest,
476    ) -> Option<VersionedFullCheckpointContents> {
477        self.rocks
478            .get_full_checkpoint_contents(sequence_number, digest)
479    }
480
481    fn get_unchanged_loaded_runtime_objects(
482        &self,
483        digest: &TransactionDigest,
484    ) -> Option<Vec<ObjectKey>> {
485        self.rocks.get_unchanged_loaded_runtime_objects(digest)
486    }
487
488    fn get_transaction_checkpoint(
489        &self,
490        digest: &TransactionDigest,
491    ) -> Option<CheckpointSequenceNumber> {
492        self.rocks.get_transaction_checkpoint(digest)
493    }
494}
495
496impl RpcStateReader for RestReadStore {
497    fn get_lowest_available_checkpoint_objects(
498        &self,
499    ) -> sui_types::storage::error::Result<CheckpointSequenceNumber> {
500        Ok(self
501            .state
502            .get_object_cache_reader()
503            .get_highest_pruned_checkpoint()
504            .map(|cp| cp + 1)
505            .unwrap_or(0))
506    }
507
508    fn get_chain_identifier(&self) -> Result<sui_types::digests::ChainIdentifier> {
509        Ok(self.state.get_chain_identifier())
510    }
511
512    fn indexes(&self) -> Option<&dyn RpcIndexes> {
513        Some(self)
514    }
515
516    fn get_struct_layout(
517        &self,
518        struct_tag: &move_core_types::language_storage::StructTag,
519    ) -> Result<Option<move_core_types::annotated_value::MoveTypeLayout>> {
520        self.state
521            .load_epoch_store_one_call_per_task()
522            .executor()
523            // TODO(cache) - must read through cache
524            .type_layout_resolver(Box::new(self.state.get_backing_package_store().as_ref()))
525            .get_annotated_layout(struct_tag)
526            .map(|layout| layout.into_layout())
527            .map(Some)
528            .map_err(StorageError::custom)
529    }
530}
531
532struct BatchedEventIterator<'a, I>
533where
534    I: Iterator<Item = Result<crate::rpc_index::EventIndexKey, TypedStoreError>>,
535{
536    key_iter: I,
537    rocks: &'a RocksDbStore,
538    current_checkpoint: Option<u64>,
539    current_checkpoint_contents: Option<sui_types::messages_checkpoint::CheckpointContents>,
540    cached_tx_events: Option<TransactionEvents>,
541    cached_tx_digest: Option<TransactionDigest>,
542}
543
544impl<I> Iterator for BatchedEventIterator<'_, I>
545where
546    I: Iterator<Item = Result<crate::rpc_index::EventIndexKey, TypedStoreError>>,
547{
548    type Item = Result<(u64, u64, u32, u32, sui_types::event::Event), TypedStoreError>;
549
550    fn next(&mut self) -> Option<Self::Item> {
551        let key = match self.key_iter.next()? {
552            Ok(k) => k,
553            Err(e) => return Some(Err(e)),
554        };
555
556        if self.current_checkpoint != Some(key.checkpoint_seq) {
557            self.current_checkpoint = Some(key.checkpoint_seq);
558            self.current_checkpoint_contents = self
559                .rocks
560                .get_checkpoint_contents_by_sequence_number(key.checkpoint_seq);
561            self.cached_tx_events = None;
562            self.cached_tx_digest = None;
563        }
564
565        let checkpoint_contents = self.current_checkpoint_contents.as_ref()?;
566
567        let exec_digest = checkpoint_contents
568            .iter()
569            .nth(key.transaction_idx as usize)?;
570        let tx_digest = exec_digest.transaction;
571
572        if self.cached_tx_digest != Some(tx_digest) {
573            self.cached_tx_digest = Some(tx_digest);
574            self.cached_tx_events = self.rocks.get_events(&tx_digest);
575        }
576
577        let tx_events = self.cached_tx_events.as_ref()?;
578        let event = tx_events.data.get(key.event_index as usize)?.clone();
579
580        Some(Ok((
581            key.checkpoint_seq,
582            key.accumulator_version,
583            key.transaction_idx,
584            key.event_index,
585            event,
586        )))
587    }
588}
589
590impl RpcIndexes for RestReadStore {
591    fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<sui_types::storage::EpochInfo>> {
592        self.index()?
593            .get_epoch_info(epoch)
594            .map_err(StorageError::custom)
595    }
596
597    fn get_transaction_info(
598        &self,
599        digest: &TransactionDigest,
600    ) -> sui_types::storage::error::Result<Option<TransactionInfo>> {
601        self.index()?
602            .get_transaction_info(digest)
603            .map_err(StorageError::custom)
604    }
605
606    fn owned_objects_iter(
607        &self,
608        owner: SuiAddress,
609        object_type: Option<StructTag>,
610        cursor: Option<OwnedObjectInfo>,
611    ) -> Result<Box<dyn Iterator<Item = Result<OwnedObjectInfo, TypedStoreError>> + '_>> {
612        let cursor = cursor.map(|cursor| OwnerIndexKey {
613            owner: cursor.owner,
614            object_type: cursor.object_type,
615            inverted_balance: cursor.balance.map(std::ops::Not::not),
616            object_id: cursor.object_id,
617        });
618
619        let iter = self
620            .index()?
621            .owner_iter(owner, object_type, cursor)?
622            .map(|result| {
623                result.map(
624                    |(
625                        OwnerIndexKey {
626                            owner,
627                            object_id,
628                            object_type,
629                            inverted_balance,
630                        },
631                        OwnerIndexInfo { version },
632                    )| {
633                        OwnedObjectInfo {
634                            owner,
635                            object_type,
636                            balance: inverted_balance.map(std::ops::Not::not),
637                            object_id,
638                            version,
639                        }
640                    },
641                )
642            });
643
644        Ok(Box::new(iter) as _)
645    }
646
647    fn dynamic_field_iter(
648        &self,
649        parent: ObjectID,
650        cursor: Option<ObjectID>,
651    ) -> sui_types::storage::error::Result<
652        Box<dyn Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_>,
653    > {
654        let iter = self.index()?.dynamic_field_iter(parent, cursor)?;
655        Ok(Box::new(iter) as _)
656    }
657
658    fn get_coin_info(
659        &self,
660        coin_type: &StructTag,
661    ) -> sui_types::storage::error::Result<Option<CoinInfo>> {
662        self.index()?
663            .get_coin_info(coin_type)?
664            .map(
665                |CoinIndexInfo {
666                     coin_metadata_object_id,
667                     treasury_object_id,
668                     regulated_coin_metadata_object_id,
669                 }| CoinInfo {
670                    coin_metadata_object_id,
671                    treasury_object_id,
672                    regulated_coin_metadata_object_id,
673                },
674            )
675            .pipe(Ok)
676    }
677
678    fn get_balance(
679        &self,
680        owner: &SuiAddress,
681        coin_type: &StructTag,
682    ) -> sui_types::storage::error::Result<Option<BalanceInfo>> {
683        self.index()?
684            .get_balance(owner, coin_type)?
685            .map(|info| info.into())
686            .pipe(Ok)
687    }
688
689    fn balance_iter(
690        &self,
691        owner: &SuiAddress,
692        cursor: Option<(SuiAddress, StructTag)>,
693    ) -> sui_types::storage::error::Result<BalanceIterator<'_>> {
694        let cursor_key =
695            cursor.map(|(owner, coin_type)| crate::rpc_index::BalanceKey { owner, coin_type });
696
697        Ok(Box::new(
698            self.index()?
699                .balance_iter(*owner, cursor_key)?
700                .map(|result| {
701                    result
702                        .map(|(key, info)| (key.coin_type, info.into()))
703                        .map_err(Into::into)
704                }),
705        ))
706    }
707
708    fn package_versions_iter(
709        &self,
710        original_id: ObjectID,
711        cursor: Option<u64>,
712    ) -> sui_types::storage::error::Result<
713        Box<dyn Iterator<Item = Result<(u64, ObjectID), TypedStoreError>> + '_>,
714    > {
715        let iter = self.index()?.package_versions_iter(original_id, cursor)?;
716        Ok(
717            Box::new(iter.map(|result| result.map(|(key, info)| (key.version, info.storage_id))))
718                as _,
719        )
720    }
721
722    fn get_highest_indexed_checkpoint_seq_number(
723        &self,
724    ) -> sui_types::storage::error::Result<Option<CheckpointSequenceNumber>> {
725        self.index()?
726            .get_highest_indexed_checkpoint_seq_number()
727            .map_err(Into::into)
728    }
729
730    fn authenticated_event_iter(
731        &self,
732        stream_id: SuiAddress,
733        start_checkpoint: u64,
734        start_accumulator_version: Option<u64>,
735        start_transaction_idx: Option<u32>,
736        start_event_idx: Option<u32>,
737        end_checkpoint: u64,
738        limit: u32,
739    ) -> sui_types::storage::error::Result<
740        Box<
741            dyn Iterator<
742                    Item = Result<(u64, u64, u32, u32, sui_types::event::Event), TypedStoreError>,
743                > + '_,
744        >,
745    > {
746        let index = self.index()?;
747        let key_iter = index.event_iter(
748            stream_id,
749            start_checkpoint,
750            start_accumulator_version.unwrap_or(0),
751            start_transaction_idx.unwrap_or(0),
752            start_event_idx.unwrap_or(0),
753            end_checkpoint,
754            limit,
755        )?;
756
757        let rocks = &self.rocks;
758        let iter = BatchedEventIterator {
759            key_iter,
760            rocks,
761            current_checkpoint: None,
762            current_checkpoint_contents: None,
763            cached_tx_events: None,
764            cached_tx_digest: None,
765        };
766
767        Ok(Box::new(iter))
768    }
769}