1use crate::authority::AuthorityState;
5use crate::checkpoints::CheckpointStore;
6use crate::epoch::committee_store::CommitteeStore;
7use crate::execution_cache::ExecutionCacheTraitPointers;
8use crate::rpc_index::CoinIndexInfo;
9use crate::rpc_index::OwnerIndexInfo;
10use crate::rpc_index::OwnerIndexKey;
11use crate::rpc_index::RpcIndexStore;
12use move_core_types::language_storage::StructTag;
13use parking_lot::Mutex;
14use std::sync::Arc;
15use sui_types::base_types::ObjectID;
16use sui_types::base_types::SequenceNumber;
17use sui_types::base_types::SuiAddress;
18use sui_types::base_types::TransactionDigest;
19use sui_types::committee::Committee;
20use sui_types::committee::EpochId;
21use sui_types::effects::{TransactionEffects, TransactionEvents};
22use sui_types::error::{SuiErrorKind, SuiResult};
23use sui_types::full_checkpoint_content::ObjectSet;
24use sui_types::messages_checkpoint::CheckpointContentsDigest;
25use sui_types::messages_checkpoint::CheckpointDigest;
26use sui_types::messages_checkpoint::CheckpointSequenceNumber;
27use sui_types::messages_checkpoint::EndOfEpochData;
28use sui_types::messages_checkpoint::VerifiedCheckpoint;
29use sui_types::messages_checkpoint::VerifiedCheckpointContents;
30use sui_types::messages_checkpoint::VersionedFullCheckpointContents;
31use sui_types::object::Object;
32use sui_types::object::Owner;
33use sui_types::storage::BalanceInfo;
34use sui_types::storage::BalanceIterator;
35use sui_types::storage::ChildObjectResolver;
36use sui_types::storage::CoinInfo;
37use sui_types::storage::DynamicFieldKey;
38use sui_types::storage::ObjectStore;
39use sui_types::storage::OwnedObjectInfo;
40use sui_types::storage::RpcIndexes;
41use sui_types::storage::RpcStateReader;
42use sui_types::storage::WriteStore;
43use sui_types::storage::error::Error as StorageError;
44use sui_types::storage::error::Result;
45use sui_types::storage::{ObjectKey, OverlayBackingPackageStore, ReadStore};
46use sui_types::transaction::VerifiedTransaction;
47use tap::Pipe;
48use tap::TapFallible;
49use tracing::error;
50use typed_store::TypedStoreError;
51
52#[derive(Clone)]
53pub struct RocksDbStore {
54 cache_traits: ExecutionCacheTraitPointers,
55
56 committee_store: Arc<CommitteeStore>,
57 checkpoint_store: Arc<CheckpointStore>,
58 highest_verified_checkpoint: Arc<Mutex<Option<u64>>>,
60 highest_synced_checkpoint: Arc<Mutex<Option<u64>>>,
61}
62
63impl RocksDbStore {
64 pub fn new(
65 cache_traits: ExecutionCacheTraitPointers,
66 committee_store: Arc<CommitteeStore>,
67 checkpoint_store: Arc<CheckpointStore>,
68 ) -> Self {
69 Self {
70 cache_traits,
71 committee_store,
72 checkpoint_store,
73 highest_verified_checkpoint: Arc::new(Mutex::new(None)),
74 highest_synced_checkpoint: Arc::new(Mutex::new(None)),
75 }
76 }
77
78 pub fn get_objects(&self, object_keys: &[ObjectKey]) -> Vec<Option<Object>> {
79 self.cache_traits
80 .object_cache_reader
81 .multi_get_objects_by_key(object_keys)
82 }
83
84 pub fn get_last_executed_checkpoint(&self) -> Option<VerifiedCheckpoint> {
85 self.checkpoint_store
86 .get_highest_executed_checkpoint()
87 .expect("db error")
88 }
89}
90
91impl ReadStore for RocksDbStore {
92 fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<VerifiedCheckpoint> {
93 self.checkpoint_store
94 .get_checkpoint_by_digest(digest)
95 .expect("db error")
96 }
97
98 fn get_checkpoint_by_sequence_number(
99 &self,
100 sequence_number: CheckpointSequenceNumber,
101 ) -> Option<VerifiedCheckpoint> {
102 self.checkpoint_store
103 .get_checkpoint_by_sequence_number(sequence_number)
104 .expect("db error")
105 }
106
107 fn get_highest_verified_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
108 self.checkpoint_store
109 .get_highest_verified_checkpoint()
110 .map(|maybe_checkpoint| {
111 maybe_checkpoint
112 .expect("storage should have been initialized with genesis checkpoint")
113 })
114 .map_err(Into::into)
115 }
116
117 fn get_highest_synced_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
118 self.checkpoint_store
119 .get_highest_synced_checkpoint()
120 .map(|maybe_checkpoint| {
121 maybe_checkpoint
122 .expect("storage should have been initialized with genesis checkpoint")
123 })
124 .map_err(Into::into)
125 }
126
127 fn get_lowest_available_checkpoint(&self) -> Result<CheckpointSequenceNumber, StorageError> {
128 if let Some(highest_pruned_cp) = self
129 .checkpoint_store
130 .get_highest_pruned_checkpoint_seq_number()
131 .map_err(Into::<StorageError>::into)?
132 {
133 Ok(highest_pruned_cp + 1)
134 } else {
135 Ok(0)
136 }
137 }
138
139 fn get_full_checkpoint_contents(
140 &self,
141 sequence_number: Option<CheckpointSequenceNumber>,
142 digest: &CheckpointContentsDigest,
143 ) -> Option<VersionedFullCheckpointContents> {
144 #[cfg(debug_assertions)]
145 if let Some(sequence_number) = sequence_number {
146 if let Some(loaded_sequence_number) = self
150 .checkpoint_store
151 .get_sequence_number_by_contents_digest(digest)
152 .expect("db error")
153 {
154 assert_eq!(loaded_sequence_number, sequence_number);
155 }
156 }
157
158 let sequence_number = sequence_number.or_else(|| {
159 self.checkpoint_store
160 .get_sequence_number_by_contents_digest(digest)
161 .expect("db error")
162 });
163 if let Some(sequence_number) = sequence_number {
164 if let Ok(Some(contents)) = self
170 .checkpoint_store
171 .get_full_checkpoint_contents_by_sequence_number(sequence_number)
172 .tap_err(|e| {
173 error!(
174 "error getting full checkpoint contents for checkpoint {:?}: {:?}",
175 sequence_number, e
176 )
177 })
178 {
179 return Some(contents);
180 }
181 }
182
183 self.checkpoint_store
189 .get_checkpoint_contents(digest)
190 .expect("db error")
191 .and_then(|contents| {
192 let mut transactions = Vec::with_capacity(contents.size());
193 for tx in contents.iter() {
194 if let (Some(t), Some(e)) = (
195 self.get_transaction(&tx.transaction),
196 self.cache_traits
197 .transaction_cache_reader
198 .get_effects(&tx.effects),
199 ) {
200 transactions.push(sui_types::base_types::ExecutionData::new(
201 (*t).clone().into_inner(),
202 e,
203 ))
204 } else {
205 return None;
206 }
207 }
208 Some(
209 VersionedFullCheckpointContents::from_contents_and_execution_data(
210 contents,
211 transactions.into_iter(),
212 ),
213 )
214 })
215 }
216
217 fn get_committee(&self, epoch: EpochId) -> Option<Arc<Committee>> {
218 self.committee_store.get_committee(&epoch).unwrap()
219 }
220
221 fn get_transaction(&self, digest: &TransactionDigest) -> Option<Arc<VerifiedTransaction>> {
222 self.cache_traits
223 .transaction_cache_reader
224 .get_transaction_block(digest)
225 }
226
227 fn get_transaction_effects(&self, digest: &TransactionDigest) -> Option<TransactionEffects> {
228 self.cache_traits
229 .transaction_cache_reader
230 .get_executed_effects(digest)
231 }
232
233 fn get_events(&self, digest: &TransactionDigest) -> Option<TransactionEvents> {
234 self.cache_traits
235 .transaction_cache_reader
236 .get_events(digest)
237 }
238
239 fn get_unchanged_loaded_runtime_objects(
240 &self,
241 digest: &TransactionDigest,
242 ) -> Option<Vec<ObjectKey>> {
243 self.cache_traits
244 .transaction_cache_reader
245 .get_unchanged_loaded_runtime_objects(digest)
246 }
247
248 fn get_transaction_checkpoint(
249 &self,
250 digest: &TransactionDigest,
251 ) -> Option<CheckpointSequenceNumber> {
252 self.cache_traits
253 .checkpoint_cache
254 .deprecated_get_transaction_checkpoint(digest)
255 .map(|(_epoch, checkpoint)| checkpoint)
256 }
257
258 fn get_latest_checkpoint(&self) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
259 self.checkpoint_store
260 .get_highest_executed_checkpoint()
261 .expect("db error")
262 .ok_or_else(|| {
263 sui_types::storage::error::Error::missing("unable to get latest checkpoint")
264 })
265 }
266
267 fn get_checkpoint_contents_by_digest(
268 &self,
269 digest: &CheckpointContentsDigest,
270 ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
271 self.checkpoint_store
272 .get_checkpoint_contents(digest)
273 .expect("db error")
274 }
275
276 fn get_checkpoint_contents_by_sequence_number(
277 &self,
278 sequence_number: CheckpointSequenceNumber,
279 ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
280 match self.get_checkpoint_by_sequence_number(sequence_number) {
281 Some(checkpoint) => self.get_checkpoint_contents_by_digest(&checkpoint.content_digest),
282 None => None,
283 }
284 }
285}
286
287impl ObjectStore for RocksDbStore {
288 fn get_object(&self, object_id: &sui_types::base_types::ObjectID) -> Option<Object> {
289 self.cache_traits.object_store.get_object(object_id)
290 }
291
292 fn get_object_by_key(
293 &self,
294 object_id: &sui_types::base_types::ObjectID,
295 version: sui_types::base_types::VersionNumber,
296 ) -> Option<Object> {
297 self.cache_traits
298 .object_store
299 .get_object_by_key(object_id, version)
300 }
301}
302
303impl WriteStore for RocksDbStore {
304 fn insert_checkpoint(
305 &self,
306 checkpoint: &VerifiedCheckpoint,
307 ) -> Result<(), sui_types::storage::error::Error> {
308 if let Some(EndOfEpochData {
309 next_epoch_committee,
310 ..
311 }) = checkpoint.end_of_epoch_data.as_ref()
312 {
313 let next_committee = next_epoch_committee.iter().cloned().collect();
314 let committee =
315 Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
316 self.insert_committee(committee)?;
317 }
318
319 self.checkpoint_store
320 .insert_verified_checkpoint(checkpoint)
321 .map_err(Into::into)
322 }
323
324 fn update_highest_synced_checkpoint(
325 &self,
326 checkpoint: &VerifiedCheckpoint,
327 ) -> Result<(), sui_types::storage::error::Error> {
328 let mut locked = self.highest_synced_checkpoint.lock();
329 if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
330 return Ok(());
331 }
332 self.checkpoint_store
333 .update_highest_synced_checkpoint(checkpoint)
334 .map_err(sui_types::storage::error::Error::custom)?;
335 *locked = Some(checkpoint.sequence_number);
336 Ok(())
337 }
338
339 fn update_highest_verified_checkpoint(
340 &self,
341 checkpoint: &VerifiedCheckpoint,
342 ) -> Result<(), sui_types::storage::error::Error> {
343 let mut locked = self.highest_verified_checkpoint.lock();
344 if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
345 return Ok(());
346 }
347 self.checkpoint_store
348 .update_highest_verified_checkpoint(checkpoint)
349 .map_err(sui_types::storage::error::Error::custom)?;
350 *locked = Some(checkpoint.sequence_number);
351 Ok(())
352 }
353
354 fn insert_checkpoint_contents(
355 &self,
356 checkpoint: &VerifiedCheckpoint,
357 contents: VerifiedCheckpointContents,
358 ) -> Result<(), sui_types::storage::error::Error> {
359 self.cache_traits
360 .state_sync_store
361 .multi_insert_transaction_and_effects(contents.transactions());
362 self.checkpoint_store
363 .insert_verified_checkpoint_contents(checkpoint, contents)
364 .map_err(Into::into)
365 }
366
367 fn insert_committee(
368 &self,
369 new_committee: Committee,
370 ) -> Result<(), sui_types::storage::error::Error> {
371 self.committee_store
372 .insert_new_committee(&new_committee)
373 .unwrap();
374 Ok(())
375 }
376}
377
378pub struct RestReadStore {
379 state: Arc<AuthorityState>,
380 rocks: RocksDbStore,
381}
382
383impl RestReadStore {
384 pub fn new(state: Arc<AuthorityState>, rocks: RocksDbStore) -> Self {
385 Self { state, rocks }
386 }
387
388 fn index(&self) -> sui_types::storage::error::Result<&RpcIndexStore> {
389 self.state
390 .rpc_index
391 .as_deref()
392 .ok_or_else(|| sui_types::storage::error::Error::custom("rest index store is disabled"))
393 }
394}
395
396impl ObjectStore for RestReadStore {
397 fn get_object(&self, object_id: &sui_types::base_types::ObjectID) -> Option<Object> {
398 self.rocks.get_object(object_id)
399 }
400
401 fn get_object_by_key(
402 &self,
403 object_id: &sui_types::base_types::ObjectID,
404 version: sui_types::base_types::VersionNumber,
405 ) -> Option<Object> {
406 self.rocks.get_object_by_key(object_id, version)
407 }
408}
409
410impl ReadStore for RestReadStore {
411 fn get_committee(&self, epoch: EpochId) -> Option<Arc<Committee>> {
412 self.rocks.get_committee(epoch)
413 }
414
415 fn get_latest_checkpoint(&self) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
416 self.rocks.get_latest_checkpoint()
417 }
418
419 fn get_highest_verified_checkpoint(
420 &self,
421 ) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
422 self.rocks.get_highest_verified_checkpoint()
423 }
424
425 fn get_highest_synced_checkpoint(
426 &self,
427 ) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
428 self.rocks.get_highest_synced_checkpoint()
429 }
430
431 fn get_lowest_available_checkpoint(
432 &self,
433 ) -> sui_types::storage::error::Result<CheckpointSequenceNumber> {
434 self.rocks.get_lowest_available_checkpoint()
435 }
436
437 fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<VerifiedCheckpoint> {
438 self.rocks.get_checkpoint_by_digest(digest)
439 }
440
441 fn get_checkpoint_by_sequence_number(
442 &self,
443 sequence_number: CheckpointSequenceNumber,
444 ) -> Option<VerifiedCheckpoint> {
445 self.rocks
446 .get_checkpoint_by_sequence_number(sequence_number)
447 }
448
449 fn get_checkpoint_contents_by_digest(
450 &self,
451 digest: &CheckpointContentsDigest,
452 ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
453 self.rocks.get_checkpoint_contents_by_digest(digest)
454 }
455
456 fn get_checkpoint_contents_by_sequence_number(
457 &self,
458 sequence_number: CheckpointSequenceNumber,
459 ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
460 self.rocks
461 .get_checkpoint_contents_by_sequence_number(sequence_number)
462 }
463
464 fn get_transaction(&self, digest: &TransactionDigest) -> Option<Arc<VerifiedTransaction>> {
465 self.rocks.get_transaction(digest)
466 }
467
468 fn get_transaction_effects(&self, digest: &TransactionDigest) -> Option<TransactionEffects> {
469 self.rocks.get_transaction_effects(digest)
470 }
471
472 fn get_events(&self, digest: &TransactionDigest) -> Option<TransactionEvents> {
473 self.rocks.get_events(digest)
474 }
475
476 fn get_full_checkpoint_contents(
477 &self,
478 sequence_number: Option<CheckpointSequenceNumber>,
479 digest: &CheckpointContentsDigest,
480 ) -> Option<VersionedFullCheckpointContents> {
481 self.rocks
482 .get_full_checkpoint_contents(sequence_number, digest)
483 }
484
485 fn get_unchanged_loaded_runtime_objects(
486 &self,
487 digest: &TransactionDigest,
488 ) -> Option<Vec<ObjectKey>> {
489 self.rocks.get_unchanged_loaded_runtime_objects(digest)
490 }
491
492 fn get_transaction_checkpoint(
493 &self,
494 digest: &TransactionDigest,
495 ) -> Option<CheckpointSequenceNumber> {
496 self.rocks.get_transaction_checkpoint(digest)
497 }
498}
499
500impl ChildObjectResolver for RestReadStore {
501 fn read_child_object(
502 &self,
503 parent: &ObjectID,
504 child: &ObjectID,
505 child_version_upper_bound: SequenceNumber,
506 ) -> SuiResult<Option<Object>> {
507 Ok(self.get_object(child).and_then(|o| {
508 if o.version() <= child_version_upper_bound
509 && o.owner == Owner::ObjectOwner((*parent).into())
510 {
511 Some(o)
512 } else {
513 None
514 }
515 }))
516 }
517
518 fn get_object_received_at_version(
519 &self,
520 _owner: &ObjectID,
521 _receiving_object_id: &ObjectID,
522 _receive_object_at_version: SequenceNumber,
523 _epoch_id: EpochId,
524 ) -> SuiResult<Option<Object>> {
525 Err(SuiErrorKind::UnsupportedFeatureError {
526 error: "RestReadStore does not support receiving objects".to_string(),
527 }
528 .into())
529 }
530}
531
532impl RpcStateReader for RestReadStore {
533 fn get_lowest_available_checkpoint_objects(
534 &self,
535 ) -> sui_types::storage::error::Result<CheckpointSequenceNumber> {
536 Ok(self
537 .state
538 .get_object_cache_reader()
539 .get_highest_pruned_checkpoint()
540 .map(|cp| cp + 1)
541 .unwrap_or(0))
542 }
543
544 fn get_chain_identifier(&self) -> Result<sui_types::digests::ChainIdentifier> {
545 Ok(self.state.get_chain_identifier())
546 }
547
548 fn indexes(&self) -> Option<&dyn RpcIndexes> {
549 Some(self)
550 }
551
552 fn get_struct_layout_with_overlay(
553 &self,
554 struct_tag: &move_core_types::language_storage::StructTag,
555 overlay: &ObjectSet,
556 ) -> Result<Option<move_core_types::annotated_value::MoveTypeLayout>> {
557 let backing_store = self.state.get_backing_package_store();
558 let overlay_store = OverlayBackingPackageStore::new(overlay, backing_store.as_ref());
559 self.state
560 .load_epoch_store_one_call_per_task()
561 .executor()
562 .type_layout_resolver(Box::new(overlay_store))
564 .get_annotated_layout(struct_tag)
565 .map(|layout| layout.into_layout())
566 .map(Some)
567 .map_err(StorageError::custom)
568 }
569}
570
571struct BatchedEventIterator<'a, I>
572where
573 I: Iterator<Item = Result<crate::rpc_index::EventIndexKey, TypedStoreError>>,
574{
575 key_iter: I,
576 rocks: &'a RocksDbStore,
577 current_checkpoint: Option<u64>,
578 current_checkpoint_contents: Option<sui_types::messages_checkpoint::CheckpointContents>,
579 cached_tx_events: Option<TransactionEvents>,
580 cached_tx_digest: Option<TransactionDigest>,
581}
582
583impl<I> Iterator for BatchedEventIterator<'_, I>
584where
585 I: Iterator<Item = Result<crate::rpc_index::EventIndexKey, TypedStoreError>>,
586{
587 type Item = Result<(u64, u64, u32, u32, sui_types::event::Event), TypedStoreError>;
588
589 fn next(&mut self) -> Option<Self::Item> {
590 let key = match self.key_iter.next()? {
591 Ok(k) => k,
592 Err(e) => return Some(Err(e)),
593 };
594
595 if self.current_checkpoint != Some(key.checkpoint_seq) {
596 self.current_checkpoint = Some(key.checkpoint_seq);
597 self.current_checkpoint_contents = self
598 .rocks
599 .get_checkpoint_contents_by_sequence_number(key.checkpoint_seq);
600 self.cached_tx_events = None;
601 self.cached_tx_digest = None;
602 }
603
604 let checkpoint_contents = self.current_checkpoint_contents.as_ref()?;
605
606 let exec_digest = checkpoint_contents
607 .iter()
608 .nth(key.transaction_idx as usize)?;
609 let tx_digest = exec_digest.transaction;
610
611 if self.cached_tx_digest != Some(tx_digest) {
612 self.cached_tx_digest = Some(tx_digest);
613 self.cached_tx_events = self.rocks.get_events(&tx_digest);
614 }
615
616 let tx_events = self.cached_tx_events.as_ref()?;
617 let event = tx_events.data.get(key.event_index as usize)?.clone();
618
619 Some(Ok((
620 key.checkpoint_seq,
621 key.accumulator_version,
622 key.transaction_idx,
623 key.event_index,
624 event,
625 )))
626 }
627}
628
629impl RpcIndexes for RestReadStore {
630 fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<sui_types::storage::EpochInfo>> {
631 self.index()?
632 .get_epoch_info(epoch)
633 .map_err(StorageError::custom)
634 }
635
636 fn owned_objects_iter(
637 &self,
638 owner: SuiAddress,
639 object_type: Option<StructTag>,
640 cursor: Option<OwnedObjectInfo>,
641 ) -> Result<Box<dyn Iterator<Item = Result<OwnedObjectInfo, TypedStoreError>> + '_>> {
642 let cursor = cursor.map(|cursor| OwnerIndexKey {
643 owner: cursor.owner,
644 object_type: cursor.object_type,
645 inverted_balance: cursor.balance.map(std::ops::Not::not),
646 object_id: cursor.object_id,
647 });
648
649 let iter = self
650 .index()?
651 .owner_iter(owner, object_type, cursor)?
652 .map(|result| {
653 result.map(
654 |(
655 OwnerIndexKey {
656 owner,
657 object_id,
658 object_type,
659 inverted_balance,
660 },
661 OwnerIndexInfo { version },
662 )| {
663 OwnedObjectInfo {
664 owner,
665 object_type,
666 balance: inverted_balance.map(std::ops::Not::not),
667 object_id,
668 version,
669 }
670 },
671 )
672 });
673
674 Ok(Box::new(iter) as _)
675 }
676
677 fn dynamic_field_iter(
678 &self,
679 parent: ObjectID,
680 cursor: Option<ObjectID>,
681 ) -> sui_types::storage::error::Result<
682 Box<dyn Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_>,
683 > {
684 let iter = self.index()?.dynamic_field_iter(parent, cursor)?;
685 Ok(Box::new(iter) as _)
686 }
687
688 fn get_coin_info(
689 &self,
690 coin_type: &StructTag,
691 ) -> sui_types::storage::error::Result<Option<CoinInfo>> {
692 self.index()?
693 .get_coin_info(coin_type)?
694 .map(
695 |CoinIndexInfo {
696 coin_metadata_object_id,
697 treasury_object_id,
698 regulated_coin_metadata_object_id,
699 }| CoinInfo {
700 coin_metadata_object_id,
701 treasury_object_id,
702 regulated_coin_metadata_object_id,
703 },
704 )
705 .pipe(Ok)
706 }
707
708 fn get_balance(
709 &self,
710 owner: &SuiAddress,
711 coin_type: &StructTag,
712 ) -> sui_types::storage::error::Result<Option<BalanceInfo>> {
713 self.index()?
714 .get_balance(owner, coin_type)?
715 .map(|info| info.into())
716 .pipe(Ok)
717 }
718
719 fn balance_iter(
720 &self,
721 owner: &SuiAddress,
722 cursor: Option<(SuiAddress, StructTag)>,
723 ) -> sui_types::storage::error::Result<BalanceIterator<'_>> {
724 let cursor_key =
725 cursor.map(|(owner, coin_type)| crate::rpc_index::BalanceKey { owner, coin_type });
726
727 Ok(Box::new(
728 self.index()?
729 .balance_iter(*owner, cursor_key)?
730 .map(|result| {
731 result
732 .map(|(key, info)| (key.coin_type, info.into()))
733 .map_err(Into::into)
734 }),
735 ))
736 }
737
738 fn package_versions_iter(
739 &self,
740 original_id: ObjectID,
741 cursor: Option<u64>,
742 ) -> sui_types::storage::error::Result<
743 Box<dyn Iterator<Item = Result<(u64, ObjectID), TypedStoreError>> + '_>,
744 > {
745 let iter = self.index()?.package_versions_iter(original_id, cursor)?;
746 Ok(
747 Box::new(iter.map(|result| result.map(|(key, info)| (key.version, info.storage_id))))
748 as _,
749 )
750 }
751
752 fn get_highest_indexed_checkpoint_seq_number(
753 &self,
754 ) -> sui_types::storage::error::Result<Option<CheckpointSequenceNumber>> {
755 self.index()?
756 .get_highest_indexed_checkpoint_seq_number()
757 .map_err(Into::into)
758 }
759
760 fn authenticated_event_iter(
761 &self,
762 stream_id: SuiAddress,
763 start_checkpoint: u64,
764 start_accumulator_version: Option<u64>,
765 start_transaction_idx: Option<u32>,
766 start_event_idx: Option<u32>,
767 end_checkpoint: u64,
768 limit: u32,
769 ) -> sui_types::storage::error::Result<
770 Box<
771 dyn Iterator<
772 Item = Result<(u64, u64, u32, u32, sui_types::event::Event), TypedStoreError>,
773 > + '_,
774 >,
775 > {
776 let index = self.index()?;
777 let key_iter = index.event_iter(
778 stream_id,
779 start_checkpoint,
780 start_accumulator_version.unwrap_or(0),
781 start_transaction_idx.unwrap_or(0),
782 start_event_idx.unwrap_or(0),
783 end_checkpoint,
784 limit,
785 )?;
786
787 let rocks = &self.rocks;
788 let iter = BatchedEventIterator {
789 key_iter,
790 rocks,
791 current_checkpoint: None,
792 current_checkpoint_contents: None,
793 cached_tx_events: None,
794 cached_tx_digest: None,
795 };
796
797 Ok(Box::new(iter))
798 }
799}