sui_core/
storage.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::AuthorityState;
5use crate::checkpoints::CheckpointStore;
6use crate::epoch::committee_store::CommitteeStore;
7use crate::execution_cache::ExecutionCacheTraitPointers;
8use crate::rpc_index::CoinIndexInfo;
9use crate::rpc_index::OwnerIndexInfo;
10use crate::rpc_index::OwnerIndexKey;
11use crate::rpc_index::RpcIndexStore;
12use move_core_types::language_storage::StructTag;
13use parking_lot::Mutex;
14use std::sync::Arc;
15use sui_types::base_types::ObjectID;
16use sui_types::base_types::SuiAddress;
17use sui_types::base_types::TransactionDigest;
18use sui_types::committee::Committee;
19use sui_types::committee::EpochId;
20use sui_types::effects::{TransactionEffects, TransactionEvents};
21use sui_types::messages_checkpoint::CheckpointContentsDigest;
22use sui_types::messages_checkpoint::CheckpointDigest;
23use sui_types::messages_checkpoint::CheckpointSequenceNumber;
24use sui_types::messages_checkpoint::EndOfEpochData;
25use sui_types::messages_checkpoint::VerifiedCheckpoint;
26use sui_types::messages_checkpoint::VerifiedCheckpointContents;
27use sui_types::messages_checkpoint::VersionedFullCheckpointContents;
28use sui_types::object::Object;
29use sui_types::storage::BalanceInfo;
30use sui_types::storage::BalanceIterator;
31use sui_types::storage::CoinInfo;
32use sui_types::storage::DynamicFieldKey;
33use sui_types::storage::ObjectStore;
34use sui_types::storage::OwnedObjectInfo;
35use sui_types::storage::RpcIndexes;
36use sui_types::storage::RpcStateReader;
37use sui_types::storage::WriteStore;
38use sui_types::storage::error::Error as StorageError;
39use sui_types::storage::error::Result;
40use sui_types::storage::{ObjectKey, ReadStore};
41use sui_types::transaction::VerifiedTransaction;
42use tap::Pipe;
43use tap::TapFallible;
44use tracing::error;
45use typed_store::TypedStoreError;
46
47#[derive(Clone)]
48pub struct RocksDbStore {
49    cache_traits: ExecutionCacheTraitPointers,
50
51    committee_store: Arc<CommitteeStore>,
52    checkpoint_store: Arc<CheckpointStore>,
53    // in memory checkpoint watermark sequence numbers
54    highest_verified_checkpoint: Arc<Mutex<Option<u64>>>,
55    highest_synced_checkpoint: Arc<Mutex<Option<u64>>>,
56}
57
58impl RocksDbStore {
59    pub fn new(
60        cache_traits: ExecutionCacheTraitPointers,
61        committee_store: Arc<CommitteeStore>,
62        checkpoint_store: Arc<CheckpointStore>,
63    ) -> Self {
64        Self {
65            cache_traits,
66            committee_store,
67            checkpoint_store,
68            highest_verified_checkpoint: Arc::new(Mutex::new(None)),
69            highest_synced_checkpoint: Arc::new(Mutex::new(None)),
70        }
71    }
72
73    pub fn get_objects(&self, object_keys: &[ObjectKey]) -> Vec<Option<Object>> {
74        self.cache_traits
75            .object_cache_reader
76            .multi_get_objects_by_key(object_keys)
77    }
78
79    pub fn get_last_executed_checkpoint(&self) -> Option<VerifiedCheckpoint> {
80        self.checkpoint_store
81            .get_highest_executed_checkpoint()
82            .expect("db error")
83    }
84}
85
86impl ReadStore for RocksDbStore {
87    fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<VerifiedCheckpoint> {
88        self.checkpoint_store
89            .get_checkpoint_by_digest(digest)
90            .expect("db error")
91    }
92
93    fn get_checkpoint_by_sequence_number(
94        &self,
95        sequence_number: CheckpointSequenceNumber,
96    ) -> Option<VerifiedCheckpoint> {
97        self.checkpoint_store
98            .get_checkpoint_by_sequence_number(sequence_number)
99            .expect("db error")
100    }
101
102    fn get_highest_verified_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
103        self.checkpoint_store
104            .get_highest_verified_checkpoint()
105            .map(|maybe_checkpoint| {
106                maybe_checkpoint
107                    .expect("storage should have been initialized with genesis checkpoint")
108            })
109            .map_err(Into::into)
110    }
111
112    fn get_highest_synced_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
113        self.checkpoint_store
114            .get_highest_synced_checkpoint()
115            .map(|maybe_checkpoint| {
116                maybe_checkpoint
117                    .expect("storage should have been initialized with genesis checkpoint")
118            })
119            .map_err(Into::into)
120    }
121
122    fn get_lowest_available_checkpoint(&self) -> Result<CheckpointSequenceNumber, StorageError> {
123        if let Some(highest_pruned_cp) = self
124            .checkpoint_store
125            .get_highest_pruned_checkpoint_seq_number()
126            .map_err(Into::<StorageError>::into)?
127        {
128            Ok(highest_pruned_cp + 1)
129        } else {
130            Ok(0)
131        }
132    }
133
134    fn get_full_checkpoint_contents(
135        &self,
136        sequence_number: Option<CheckpointSequenceNumber>,
137        digest: &CheckpointContentsDigest,
138    ) -> Option<VersionedFullCheckpointContents> {
139        #[cfg(debug_assertions)]
140        if let Some(sequence_number) = sequence_number {
141            // When sequence_number is provided as an optimization, we want to ensure that
142            // the sequence number we get from the db matches the one we provided.
143            // Only check this in debug mode though.
144            if let Some(loaded_sequence_number) = self
145                .checkpoint_store
146                .get_sequence_number_by_contents_digest(digest)
147                .expect("db error")
148            {
149                assert_eq!(loaded_sequence_number, sequence_number);
150            }
151        }
152
153        let sequence_number = sequence_number.or_else(|| {
154            self.checkpoint_store
155                .get_sequence_number_by_contents_digest(digest)
156                .expect("db error")
157        });
158        if let Some(sequence_number) = sequence_number {
159            // Note: We don't use `?` here because we want to tolerate
160            // potential db errors due to data corruption.
161            // In that case, we will fallback and construct the contents
162            // from the individual components as if we could not find the
163            // cached full contents.
164            if let Ok(Some(contents)) = self
165                .checkpoint_store
166                .get_full_checkpoint_contents_by_sequence_number(sequence_number)
167                .tap_err(|e| {
168                    error!(
169                        "error getting full checkpoint contents for checkpoint {:?}: {:?}",
170                        sequence_number, e
171                    )
172                })
173            {
174                return Some(contents);
175            }
176        }
177
178        // Otherwise gather it from the individual components.
179        // Note we can't insert the constructed contents into `full_checkpoint_content`,
180        // because it needs to be inserted along with `checkpoint_sequence_by_contents_digest`
181        // and `checkpoint_content`. However at this point it's likely we don't know the
182        // corresponding sequence number yet.
183        self.checkpoint_store
184            .get_checkpoint_contents(digest)
185            .expect("db error")
186            .and_then(|contents| {
187                let mut transactions = Vec::with_capacity(contents.size());
188                for tx in contents.iter() {
189                    if let (Some(t), Some(e)) = (
190                        self.get_transaction(&tx.transaction),
191                        self.cache_traits
192                            .transaction_cache_reader
193                            .get_effects(&tx.effects),
194                    ) {
195                        transactions.push(sui_types::base_types::ExecutionData::new(
196                            (*t).clone().into_inner(),
197                            e,
198                        ))
199                    } else {
200                        return None;
201                    }
202                }
203                Some(
204                    VersionedFullCheckpointContents::from_contents_and_execution_data(
205                        contents,
206                        transactions.into_iter(),
207                    ),
208                )
209            })
210    }
211
212    fn get_committee(&self, epoch: EpochId) -> Option<Arc<Committee>> {
213        self.committee_store.get_committee(&epoch).unwrap()
214    }
215
216    fn get_transaction(&self, digest: &TransactionDigest) -> Option<Arc<VerifiedTransaction>> {
217        self.cache_traits
218            .transaction_cache_reader
219            .get_transaction_block(digest)
220    }
221
222    fn get_transaction_effects(&self, digest: &TransactionDigest) -> Option<TransactionEffects> {
223        self.cache_traits
224            .transaction_cache_reader
225            .get_executed_effects(digest)
226    }
227
228    fn get_events(&self, digest: &TransactionDigest) -> Option<TransactionEvents> {
229        self.cache_traits
230            .transaction_cache_reader
231            .get_events(digest)
232    }
233
234    fn get_unchanged_loaded_runtime_objects(
235        &self,
236        digest: &TransactionDigest,
237    ) -> Option<Vec<ObjectKey>> {
238        self.cache_traits
239            .transaction_cache_reader
240            .get_unchanged_loaded_runtime_objects(digest)
241    }
242
243    fn get_transaction_checkpoint(
244        &self,
245        digest: &TransactionDigest,
246    ) -> Option<CheckpointSequenceNumber> {
247        self.cache_traits
248            .checkpoint_cache
249            .deprecated_get_transaction_checkpoint(digest)
250            .map(|(_epoch, checkpoint)| checkpoint)
251    }
252
253    fn get_latest_checkpoint(&self) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
254        self.checkpoint_store
255            .get_highest_executed_checkpoint()
256            .expect("db error")
257            .ok_or_else(|| {
258                sui_types::storage::error::Error::missing("unable to get latest checkpoint")
259            })
260    }
261
262    fn get_checkpoint_contents_by_digest(
263        &self,
264        digest: &CheckpointContentsDigest,
265    ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
266        self.checkpoint_store
267            .get_checkpoint_contents(digest)
268            .expect("db error")
269    }
270
271    fn get_checkpoint_contents_by_sequence_number(
272        &self,
273        sequence_number: CheckpointSequenceNumber,
274    ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
275        match self.get_checkpoint_by_sequence_number(sequence_number) {
276            Some(checkpoint) => self.get_checkpoint_contents_by_digest(&checkpoint.content_digest),
277            None => None,
278        }
279    }
280}
281
282impl ObjectStore for RocksDbStore {
283    fn get_object(&self, object_id: &sui_types::base_types::ObjectID) -> Option<Object> {
284        self.cache_traits.object_store.get_object(object_id)
285    }
286
287    fn get_object_by_key(
288        &self,
289        object_id: &sui_types::base_types::ObjectID,
290        version: sui_types::base_types::VersionNumber,
291    ) -> Option<Object> {
292        self.cache_traits
293            .object_store
294            .get_object_by_key(object_id, version)
295    }
296}
297
298impl WriteStore for RocksDbStore {
299    fn insert_checkpoint(
300        &self,
301        checkpoint: &VerifiedCheckpoint,
302    ) -> Result<(), sui_types::storage::error::Error> {
303        if let Some(EndOfEpochData {
304            next_epoch_committee,
305            ..
306        }) = checkpoint.end_of_epoch_data.as_ref()
307        {
308            let next_committee = next_epoch_committee.iter().cloned().collect();
309            let committee =
310                Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
311            self.insert_committee(committee)?;
312        }
313
314        self.checkpoint_store
315            .insert_verified_checkpoint(checkpoint)
316            .map_err(Into::into)
317    }
318
319    fn update_highest_synced_checkpoint(
320        &self,
321        checkpoint: &VerifiedCheckpoint,
322    ) -> Result<(), sui_types::storage::error::Error> {
323        let mut locked = self.highest_synced_checkpoint.lock();
324        if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
325            return Ok(());
326        }
327        self.checkpoint_store
328            .update_highest_synced_checkpoint(checkpoint)
329            .map_err(sui_types::storage::error::Error::custom)?;
330        *locked = Some(checkpoint.sequence_number);
331        Ok(())
332    }
333
334    fn update_highest_verified_checkpoint(
335        &self,
336        checkpoint: &VerifiedCheckpoint,
337    ) -> Result<(), sui_types::storage::error::Error> {
338        let mut locked = self.highest_verified_checkpoint.lock();
339        if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
340            return Ok(());
341        }
342        self.checkpoint_store
343            .update_highest_verified_checkpoint(checkpoint)
344            .map_err(sui_types::storage::error::Error::custom)?;
345        *locked = Some(checkpoint.sequence_number);
346        Ok(())
347    }
348
349    fn insert_checkpoint_contents(
350        &self,
351        checkpoint: &VerifiedCheckpoint,
352        contents: VerifiedCheckpointContents,
353    ) -> Result<(), sui_types::storage::error::Error> {
354        self.cache_traits
355            .state_sync_store
356            .multi_insert_transaction_and_effects(contents.transactions());
357        self.checkpoint_store
358            .insert_verified_checkpoint_contents(checkpoint, contents)
359            .map_err(Into::into)
360    }
361
362    fn insert_committee(
363        &self,
364        new_committee: Committee,
365    ) -> Result<(), sui_types::storage::error::Error> {
366        self.committee_store
367            .insert_new_committee(&new_committee)
368            .unwrap();
369        Ok(())
370    }
371}
372
373pub struct RestReadStore {
374    state: Arc<AuthorityState>,
375    rocks: RocksDbStore,
376}
377
378impl RestReadStore {
379    pub fn new(state: Arc<AuthorityState>, rocks: RocksDbStore) -> Self {
380        Self { state, rocks }
381    }
382
383    fn index(&self) -> sui_types::storage::error::Result<&RpcIndexStore> {
384        self.state
385            .rpc_index
386            .as_deref()
387            .ok_or_else(|| sui_types::storage::error::Error::custom("rest index store is disabled"))
388    }
389}
390
391impl ObjectStore for RestReadStore {
392    fn get_object(&self, object_id: &sui_types::base_types::ObjectID) -> Option<Object> {
393        self.rocks.get_object(object_id)
394    }
395
396    fn get_object_by_key(
397        &self,
398        object_id: &sui_types::base_types::ObjectID,
399        version: sui_types::base_types::VersionNumber,
400    ) -> Option<Object> {
401        self.rocks.get_object_by_key(object_id, version)
402    }
403}
404
405impl ReadStore for RestReadStore {
406    fn get_committee(&self, epoch: EpochId) -> Option<Arc<Committee>> {
407        self.rocks.get_committee(epoch)
408    }
409
410    fn get_latest_checkpoint(&self) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
411        self.rocks.get_latest_checkpoint()
412    }
413
414    fn get_highest_verified_checkpoint(
415        &self,
416    ) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
417        self.rocks.get_highest_verified_checkpoint()
418    }
419
420    fn get_highest_synced_checkpoint(
421        &self,
422    ) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
423        self.rocks.get_highest_synced_checkpoint()
424    }
425
426    fn get_lowest_available_checkpoint(
427        &self,
428    ) -> sui_types::storage::error::Result<CheckpointSequenceNumber> {
429        self.rocks.get_lowest_available_checkpoint()
430    }
431
432    fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<VerifiedCheckpoint> {
433        self.rocks.get_checkpoint_by_digest(digest)
434    }
435
436    fn get_checkpoint_by_sequence_number(
437        &self,
438        sequence_number: CheckpointSequenceNumber,
439    ) -> Option<VerifiedCheckpoint> {
440        self.rocks
441            .get_checkpoint_by_sequence_number(sequence_number)
442    }
443
444    fn get_checkpoint_contents_by_digest(
445        &self,
446        digest: &CheckpointContentsDigest,
447    ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
448        self.rocks.get_checkpoint_contents_by_digest(digest)
449    }
450
451    fn get_checkpoint_contents_by_sequence_number(
452        &self,
453        sequence_number: CheckpointSequenceNumber,
454    ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
455        self.rocks
456            .get_checkpoint_contents_by_sequence_number(sequence_number)
457    }
458
459    fn get_transaction(&self, digest: &TransactionDigest) -> Option<Arc<VerifiedTransaction>> {
460        self.rocks.get_transaction(digest)
461    }
462
463    fn get_transaction_effects(&self, digest: &TransactionDigest) -> Option<TransactionEffects> {
464        self.rocks.get_transaction_effects(digest)
465    }
466
467    fn get_events(&self, digest: &TransactionDigest) -> Option<TransactionEvents> {
468        self.rocks.get_events(digest)
469    }
470
471    fn get_full_checkpoint_contents(
472        &self,
473        sequence_number: Option<CheckpointSequenceNumber>,
474        digest: &CheckpointContentsDigest,
475    ) -> Option<VersionedFullCheckpointContents> {
476        self.rocks
477            .get_full_checkpoint_contents(sequence_number, digest)
478    }
479
480    fn get_unchanged_loaded_runtime_objects(
481        &self,
482        digest: &TransactionDigest,
483    ) -> Option<Vec<ObjectKey>> {
484        self.rocks.get_unchanged_loaded_runtime_objects(digest)
485    }
486
487    fn get_transaction_checkpoint(
488        &self,
489        digest: &TransactionDigest,
490    ) -> Option<CheckpointSequenceNumber> {
491        self.rocks.get_transaction_checkpoint(digest)
492    }
493}
494
495impl RpcStateReader for RestReadStore {
496    fn get_lowest_available_checkpoint_objects(
497        &self,
498    ) -> sui_types::storage::error::Result<CheckpointSequenceNumber> {
499        Ok(self
500            .state
501            .get_object_cache_reader()
502            .get_highest_pruned_checkpoint()
503            .map(|cp| cp + 1)
504            .unwrap_or(0))
505    }
506
507    fn get_chain_identifier(&self) -> Result<sui_types::digests::ChainIdentifier> {
508        Ok(self.state.get_chain_identifier())
509    }
510
511    fn indexes(&self) -> Option<&dyn RpcIndexes> {
512        Some(self)
513    }
514
515    fn get_struct_layout(
516        &self,
517        struct_tag: &move_core_types::language_storage::StructTag,
518    ) -> Result<Option<move_core_types::annotated_value::MoveTypeLayout>> {
519        self.state
520            .load_epoch_store_one_call_per_task()
521            .executor()
522            // TODO(cache) - must read through cache
523            .type_layout_resolver(Box::new(self.state.get_backing_package_store().as_ref()))
524            .get_annotated_layout(struct_tag)
525            .map(|layout| layout.into_layout())
526            .map(Some)
527            .map_err(StorageError::custom)
528    }
529}
530
531struct BatchedEventIterator<'a, I>
532where
533    I: Iterator<Item = Result<crate::rpc_index::EventIndexKey, TypedStoreError>>,
534{
535    key_iter: I,
536    rocks: &'a RocksDbStore,
537    current_checkpoint: Option<u64>,
538    current_checkpoint_contents: Option<sui_types::messages_checkpoint::CheckpointContents>,
539    cached_tx_events: Option<TransactionEvents>,
540    cached_tx_digest: Option<TransactionDigest>,
541}
542
543impl<I> Iterator for BatchedEventIterator<'_, I>
544where
545    I: Iterator<Item = Result<crate::rpc_index::EventIndexKey, TypedStoreError>>,
546{
547    type Item = Result<(u64, u64, u32, u32, sui_types::event::Event), TypedStoreError>;
548
549    fn next(&mut self) -> Option<Self::Item> {
550        let key = match self.key_iter.next()? {
551            Ok(k) => k,
552            Err(e) => return Some(Err(e)),
553        };
554
555        if self.current_checkpoint != Some(key.checkpoint_seq) {
556            self.current_checkpoint = Some(key.checkpoint_seq);
557            self.current_checkpoint_contents = self
558                .rocks
559                .get_checkpoint_contents_by_sequence_number(key.checkpoint_seq);
560            self.cached_tx_events = None;
561            self.cached_tx_digest = None;
562        }
563
564        let checkpoint_contents = self.current_checkpoint_contents.as_ref()?;
565
566        let exec_digest = checkpoint_contents
567            .iter()
568            .nth(key.transaction_idx as usize)?;
569        let tx_digest = exec_digest.transaction;
570
571        if self.cached_tx_digest != Some(tx_digest) {
572            self.cached_tx_digest = Some(tx_digest);
573            self.cached_tx_events = self.rocks.get_events(&tx_digest);
574        }
575
576        let tx_events = self.cached_tx_events.as_ref()?;
577        let event = tx_events.data.get(key.event_index as usize)?.clone();
578
579        Some(Ok((
580            key.checkpoint_seq,
581            key.accumulator_version,
582            key.transaction_idx,
583            key.event_index,
584            event,
585        )))
586    }
587}
588
589impl RpcIndexes for RestReadStore {
590    fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<sui_types::storage::EpochInfo>> {
591        self.index()?
592            .get_epoch_info(epoch)
593            .map_err(StorageError::custom)
594    }
595
596    fn owned_objects_iter(
597        &self,
598        owner: SuiAddress,
599        object_type: Option<StructTag>,
600        cursor: Option<OwnedObjectInfo>,
601    ) -> Result<Box<dyn Iterator<Item = Result<OwnedObjectInfo, TypedStoreError>> + '_>> {
602        let cursor = cursor.map(|cursor| OwnerIndexKey {
603            owner: cursor.owner,
604            object_type: cursor.object_type,
605            inverted_balance: cursor.balance.map(std::ops::Not::not),
606            object_id: cursor.object_id,
607        });
608
609        let iter = self
610            .index()?
611            .owner_iter(owner, object_type, cursor)?
612            .map(|result| {
613                result.map(
614                    |(
615                        OwnerIndexKey {
616                            owner,
617                            object_id,
618                            object_type,
619                            inverted_balance,
620                        },
621                        OwnerIndexInfo { version },
622                    )| {
623                        OwnedObjectInfo {
624                            owner,
625                            object_type,
626                            balance: inverted_balance.map(std::ops::Not::not),
627                            object_id,
628                            version,
629                        }
630                    },
631                )
632            });
633
634        Ok(Box::new(iter) as _)
635    }
636
637    fn dynamic_field_iter(
638        &self,
639        parent: ObjectID,
640        cursor: Option<ObjectID>,
641    ) -> sui_types::storage::error::Result<
642        Box<dyn Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_>,
643    > {
644        let iter = self.index()?.dynamic_field_iter(parent, cursor)?;
645        Ok(Box::new(iter) as _)
646    }
647
648    fn get_coin_info(
649        &self,
650        coin_type: &StructTag,
651    ) -> sui_types::storage::error::Result<Option<CoinInfo>> {
652        self.index()?
653            .get_coin_info(coin_type)?
654            .map(
655                |CoinIndexInfo {
656                     coin_metadata_object_id,
657                     treasury_object_id,
658                     regulated_coin_metadata_object_id,
659                 }| CoinInfo {
660                    coin_metadata_object_id,
661                    treasury_object_id,
662                    regulated_coin_metadata_object_id,
663                },
664            )
665            .pipe(Ok)
666    }
667
668    fn get_balance(
669        &self,
670        owner: &SuiAddress,
671        coin_type: &StructTag,
672    ) -> sui_types::storage::error::Result<Option<BalanceInfo>> {
673        self.index()?
674            .get_balance(owner, coin_type)?
675            .map(|info| info.into())
676            .pipe(Ok)
677    }
678
679    fn balance_iter(
680        &self,
681        owner: &SuiAddress,
682        cursor: Option<(SuiAddress, StructTag)>,
683    ) -> sui_types::storage::error::Result<BalanceIterator<'_>> {
684        let cursor_key =
685            cursor.map(|(owner, coin_type)| crate::rpc_index::BalanceKey { owner, coin_type });
686
687        Ok(Box::new(
688            self.index()?
689                .balance_iter(*owner, cursor_key)?
690                .map(|result| {
691                    result
692                        .map(|(key, info)| (key.coin_type, info.into()))
693                        .map_err(Into::into)
694                }),
695        ))
696    }
697
698    fn package_versions_iter(
699        &self,
700        original_id: ObjectID,
701        cursor: Option<u64>,
702    ) -> sui_types::storage::error::Result<
703        Box<dyn Iterator<Item = Result<(u64, ObjectID), TypedStoreError>> + '_>,
704    > {
705        let iter = self.index()?.package_versions_iter(original_id, cursor)?;
706        Ok(
707            Box::new(iter.map(|result| result.map(|(key, info)| (key.version, info.storage_id))))
708                as _,
709        )
710    }
711
712    fn get_highest_indexed_checkpoint_seq_number(
713        &self,
714    ) -> sui_types::storage::error::Result<Option<CheckpointSequenceNumber>> {
715        self.index()?
716            .get_highest_indexed_checkpoint_seq_number()
717            .map_err(Into::into)
718    }
719
720    fn authenticated_event_iter(
721        &self,
722        stream_id: SuiAddress,
723        start_checkpoint: u64,
724        start_accumulator_version: Option<u64>,
725        start_transaction_idx: Option<u32>,
726        start_event_idx: Option<u32>,
727        end_checkpoint: u64,
728        limit: u32,
729    ) -> sui_types::storage::error::Result<
730        Box<
731            dyn Iterator<
732                    Item = Result<(u64, u64, u32, u32, sui_types::event::Event), TypedStoreError>,
733                > + '_,
734        >,
735    > {
736        let index = self.index()?;
737        let key_iter = index.event_iter(
738            stream_id,
739            start_checkpoint,
740            start_accumulator_version.unwrap_or(0),
741            start_transaction_idx.unwrap_or(0),
742            start_event_idx.unwrap_or(0),
743            end_checkpoint,
744            limit,
745        )?;
746
747        let rocks = &self.rocks;
748        let iter = BatchedEventIterator {
749            key_iter,
750            rocks,
751            current_checkpoint: None,
752            current_checkpoint_contents: None,
753            cached_tx_events: None,
754            cached_tx_digest: None,
755        };
756
757        Ok(Box::new(iter))
758    }
759}