1use crate::authority::AuthorityState;
5use crate::checkpoints::CheckpointStore;
6use crate::epoch::committee_store::CommitteeStore;
7use crate::execution_cache::ExecutionCacheTraitPointers;
8use crate::rpc_index::CoinIndexInfo;
9use crate::rpc_index::OwnerIndexInfo;
10use crate::rpc_index::OwnerIndexKey;
11use crate::rpc_index::RpcIndexStore;
12use move_core_types::language_storage::StructTag;
13use parking_lot::Mutex;
14use std::sync::Arc;
15use sui_rpc_store::RpcStoreReader;
16use sui_types::base_types::ObjectID;
17use sui_types::base_types::SequenceNumber;
18use sui_types::base_types::SuiAddress;
19use sui_types::base_types::TransactionDigest;
20use sui_types::committee::Committee;
21use sui_types::committee::EpochId;
22use sui_types::effects::{TransactionEffects, TransactionEvents};
23use sui_types::error::{SuiErrorKind, SuiResult};
24use sui_types::full_checkpoint_content::ObjectSet;
25use sui_types::messages_checkpoint::CheckpointContentsDigest;
26use sui_types::messages_checkpoint::CheckpointDigest;
27use sui_types::messages_checkpoint::CheckpointSequenceNumber;
28use sui_types::messages_checkpoint::EndOfEpochData;
29use sui_types::messages_checkpoint::VerifiedCheckpoint;
30use sui_types::messages_checkpoint::VerifiedCheckpointContents;
31use sui_types::messages_checkpoint::VersionedFullCheckpointContents;
32use sui_types::object::Object;
33use sui_types::object::Owner;
34use sui_types::storage::BalanceInfo;
35use sui_types::storage::BalanceIterator;
36use sui_types::storage::ChildObjectResolver;
37use sui_types::storage::CoinInfo;
38use sui_types::storage::DynamicFieldKey;
39use sui_types::storage::LedgerBitmapBucketIterator;
40use sui_types::storage::LedgerTxSeqDigest;
41use sui_types::storage::LedgerTxSeqDigestIterator;
42use sui_types::storage::ObjectStore;
43use sui_types::storage::OwnedObjectInfo;
44use sui_types::storage::RpcIndexes;
45use sui_types::storage::RpcStateReader;
46use sui_types::storage::WriteStore;
47use sui_types::storage::error::Error as StorageError;
48use sui_types::storage::error::Result;
49use sui_types::storage::{ObjectKey, OverlayBackingPackageStore, ReadStore};
50use sui_types::transaction::VerifiedTransaction;
51use tap::Pipe;
52use tap::TapFallible;
53use tracing::error;
54use typed_store::TypedStoreError;
55
56#[derive(Clone)]
57pub struct RocksDbStore {
58 cache_traits: ExecutionCacheTraitPointers,
59
60 committee_store: Arc<CommitteeStore>,
61 checkpoint_store: Arc<CheckpointStore>,
62 highest_verified_checkpoint: Arc<Mutex<Option<u64>>>,
64 highest_synced_checkpoint: Arc<Mutex<Option<u64>>>,
65}
66
67impl RocksDbStore {
68 pub fn new(
69 cache_traits: ExecutionCacheTraitPointers,
70 committee_store: Arc<CommitteeStore>,
71 checkpoint_store: Arc<CheckpointStore>,
72 ) -> Self {
73 Self {
74 cache_traits,
75 committee_store,
76 checkpoint_store,
77 highest_verified_checkpoint: Arc::new(Mutex::new(None)),
78 highest_synced_checkpoint: Arc::new(Mutex::new(None)),
79 }
80 }
81
82 pub fn get_objects(&self, object_keys: &[ObjectKey]) -> Vec<Option<Object>> {
83 self.cache_traits
84 .object_cache_reader
85 .multi_get_objects_by_key(object_keys)
86 }
87
88 pub fn get_last_executed_checkpoint(&self) -> Option<VerifiedCheckpoint> {
89 self.checkpoint_store
90 .get_highest_executed_checkpoint()
91 .expect("db error")
92 }
93}
94
95impl ReadStore for RocksDbStore {
96 fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<VerifiedCheckpoint> {
97 self.checkpoint_store
98 .get_checkpoint_by_digest(digest)
99 .expect("db error")
100 }
101
102 fn get_checkpoint_by_sequence_number(
103 &self,
104 sequence_number: CheckpointSequenceNumber,
105 ) -> Option<VerifiedCheckpoint> {
106 self.checkpoint_store
107 .get_checkpoint_by_sequence_number(sequence_number)
108 .expect("db error")
109 }
110
111 fn get_highest_verified_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
112 self.checkpoint_store
113 .get_highest_verified_checkpoint()
114 .map(|maybe_checkpoint| {
115 maybe_checkpoint
116 .expect("storage should have been initialized with genesis checkpoint")
117 })
118 .map_err(Into::into)
119 }
120
121 fn get_highest_synced_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
122 self.checkpoint_store
123 .get_highest_synced_checkpoint()
124 .map(|maybe_checkpoint| {
125 maybe_checkpoint
126 .expect("storage should have been initialized with genesis checkpoint")
127 })
128 .map_err(Into::into)
129 }
130
131 fn get_lowest_available_checkpoint(&self) -> Result<CheckpointSequenceNumber, StorageError> {
132 if let Some(highest_pruned_cp) = self
133 .checkpoint_store
134 .get_highest_pruned_checkpoint_seq_number()
135 .map_err(Into::<StorageError>::into)?
136 {
137 Ok(highest_pruned_cp + 1)
138 } else {
139 Ok(0)
140 }
141 }
142
143 fn get_full_checkpoint_contents(
144 &self,
145 sequence_number: Option<CheckpointSequenceNumber>,
146 digest: &CheckpointContentsDigest,
147 ) -> Option<VersionedFullCheckpointContents> {
148 #[cfg(debug_assertions)]
149 if let Some(sequence_number) = sequence_number {
150 if let Some(loaded_sequence_number) = self
154 .checkpoint_store
155 .get_sequence_number_by_contents_digest(digest)
156 .expect("db error")
157 {
158 assert_eq!(loaded_sequence_number, sequence_number);
159 }
160 }
161
162 let sequence_number = sequence_number.or_else(|| {
163 self.checkpoint_store
164 .get_sequence_number_by_contents_digest(digest)
165 .expect("db error")
166 });
167 if let Some(sequence_number) = sequence_number {
168 if let Ok(Some(contents)) = self
174 .checkpoint_store
175 .get_full_checkpoint_contents_by_sequence_number(sequence_number)
176 .tap_err(|e| {
177 error!(
178 "error getting full checkpoint contents for checkpoint {:?}: {:?}",
179 sequence_number, e
180 )
181 })
182 {
183 return Some(contents);
184 }
185 }
186
187 self.checkpoint_store
193 .get_checkpoint_contents(digest)
194 .expect("db error")
195 .and_then(|contents| {
196 let mut transactions = Vec::with_capacity(contents.size());
197 for tx in contents.iter() {
198 if let (Some(t), Some(e)) = (
199 self.get_transaction(&tx.transaction),
200 self.cache_traits
201 .transaction_cache_reader
202 .get_effects(&tx.effects),
203 ) {
204 transactions.push(sui_types::base_types::ExecutionData::new(
205 (*t).clone().into_inner(),
206 e,
207 ))
208 } else {
209 return None;
210 }
211 }
212 Some(
213 VersionedFullCheckpointContents::from_contents_and_execution_data(
214 contents,
215 transactions.into_iter(),
216 ),
217 )
218 })
219 }
220
221 fn get_committee(&self, epoch: EpochId) -> Option<Arc<Committee>> {
222 self.committee_store.get_committee(&epoch).unwrap()
223 }
224
225 fn get_transaction(&self, digest: &TransactionDigest) -> Option<Arc<VerifiedTransaction>> {
226 self.cache_traits
227 .transaction_cache_reader
228 .get_transaction_block(digest)
229 }
230
231 fn multi_get_transactions(
232 &self,
233 digests: &[TransactionDigest],
234 ) -> Vec<Option<Arc<VerifiedTransaction>>> {
235 self.cache_traits
236 .transaction_cache_reader
237 .multi_get_transaction_blocks(digests)
238 }
239
240 fn get_transaction_effects(&self, digest: &TransactionDigest) -> Option<TransactionEffects> {
241 self.cache_traits
242 .transaction_cache_reader
243 .get_executed_effects(digest)
244 }
245
246 fn multi_get_transaction_effects(
247 &self,
248 digests: &[TransactionDigest],
249 ) -> Vec<Option<TransactionEffects>> {
250 self.cache_traits
251 .transaction_cache_reader
252 .multi_get_executed_effects(digests)
253 }
254
255 fn get_events(&self, digest: &TransactionDigest) -> Option<TransactionEvents> {
256 self.cache_traits
257 .transaction_cache_reader
258 .get_events(digest)
259 }
260
261 fn multi_get_events(&self, digests: &[TransactionDigest]) -> Vec<Option<TransactionEvents>> {
262 self.cache_traits
263 .transaction_cache_reader
264 .multi_get_events(digests)
265 }
266
267 fn get_unchanged_loaded_runtime_objects(
268 &self,
269 digest: &TransactionDigest,
270 ) -> Option<Vec<ObjectKey>> {
271 self.cache_traits
272 .transaction_cache_reader
273 .get_unchanged_loaded_runtime_objects(digest)
274 }
275
276 fn get_transaction_checkpoint(
277 &self,
278 digest: &TransactionDigest,
279 ) -> Option<CheckpointSequenceNumber> {
280 self.cache_traits
281 .checkpoint_cache
282 .deprecated_get_transaction_checkpoint(digest)
283 .map(|(_epoch, checkpoint)| checkpoint)
284 }
285
286 fn get_latest_checkpoint(&self) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
287 self.checkpoint_store
288 .get_highest_executed_checkpoint()
289 .expect("db error")
290 .ok_or_else(|| {
291 sui_types::storage::error::Error::missing("unable to get latest checkpoint")
292 })
293 }
294
295 fn get_checkpoint_contents_by_digest(
296 &self,
297 digest: &CheckpointContentsDigest,
298 ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
299 self.checkpoint_store
300 .get_checkpoint_contents(digest)
301 .expect("db error")
302 }
303
304 fn get_checkpoint_contents_by_sequence_number(
305 &self,
306 sequence_number: CheckpointSequenceNumber,
307 ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
308 match self.get_checkpoint_by_sequence_number(sequence_number) {
309 Some(checkpoint) => self.get_checkpoint_contents_by_digest(&checkpoint.content_digest),
310 None => None,
311 }
312 }
313}
314
315impl ObjectStore for RocksDbStore {
316 fn get_object(&self, object_id: &sui_types::base_types::ObjectID) -> Option<Object> {
317 self.cache_traits.object_store.get_object(object_id)
318 }
319
320 fn get_object_by_key(
321 &self,
322 object_id: &sui_types::base_types::ObjectID,
323 version: sui_types::base_types::VersionNumber,
324 ) -> Option<Object> {
325 self.cache_traits
326 .object_store
327 .get_object_by_key(object_id, version)
328 }
329}
330
331impl WriteStore for RocksDbStore {
332 fn insert_checkpoint(
333 &self,
334 checkpoint: &VerifiedCheckpoint,
335 ) -> Result<(), sui_types::storage::error::Error> {
336 if let Some(EndOfEpochData {
337 next_epoch_committee,
338 ..
339 }) = checkpoint.end_of_epoch_data.as_ref()
340 {
341 let next_committee = next_epoch_committee.iter().cloned().collect();
342 let committee =
343 Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
344 self.insert_committee(committee)?;
345 }
346
347 self.checkpoint_store
348 .insert_verified_checkpoint(checkpoint)
349 .map_err(Into::into)
350 }
351
352 fn update_highest_synced_checkpoint(
353 &self,
354 checkpoint: &VerifiedCheckpoint,
355 ) -> Result<(), sui_types::storage::error::Error> {
356 let mut locked = self.highest_synced_checkpoint.lock();
357 if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
358 return Ok(());
359 }
360 self.checkpoint_store
361 .update_highest_synced_checkpoint(checkpoint)
362 .map_err(sui_types::storage::error::Error::custom)?;
363 *locked = Some(checkpoint.sequence_number);
364 Ok(())
365 }
366
367 fn update_highest_verified_checkpoint(
368 &self,
369 checkpoint: &VerifiedCheckpoint,
370 ) -> Result<(), sui_types::storage::error::Error> {
371 let mut locked = self.highest_verified_checkpoint.lock();
372 if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
373 return Ok(());
374 }
375 self.checkpoint_store
376 .update_highest_verified_checkpoint(checkpoint)
377 .map_err(sui_types::storage::error::Error::custom)?;
378 *locked = Some(checkpoint.sequence_number);
379 Ok(())
380 }
381
382 fn insert_checkpoint_contents(
383 &self,
384 checkpoint: &VerifiedCheckpoint,
385 contents: VerifiedCheckpointContents,
386 ) -> Result<(), sui_types::storage::error::Error> {
387 self.cache_traits
388 .state_sync_store
389 .multi_insert_transaction_and_effects(contents.transactions());
390 self.checkpoint_store
391 .insert_verified_checkpoint_contents(checkpoint, contents)
392 .map_err(Into::into)
393 }
394
395 fn insert_committee(
396 &self,
397 new_committee: Committee,
398 ) -> Result<(), sui_types::storage::error::Error> {
399 self.committee_store
400 .insert_new_committee(&new_committee)
401 .unwrap();
402 Ok(())
403 }
404}
405
406pub struct RestReadStore {
407 state: Arc<AuthorityState>,
408 rocks: RocksDbStore,
409}
410
411impl RestReadStore {
412 pub fn new(state: Arc<AuthorityState>, rocks: RocksDbStore) -> Self {
413 Self { state, rocks }
414 }
415
416 fn index(&self) -> sui_types::storage::error::Result<&RpcIndexStore> {
417 self.state
418 .rpc_index
419 .as_deref()
420 .ok_or_else(|| sui_types::storage::error::Error::custom("rest index store is disabled"))
421 }
422}
423
424impl ObjectStore for RestReadStore {
425 fn get_object(&self, object_id: &sui_types::base_types::ObjectID) -> Option<Object> {
426 self.rocks.get_object(object_id)
427 }
428
429 fn get_object_by_key(
430 &self,
431 object_id: &sui_types::base_types::ObjectID,
432 version: sui_types::base_types::VersionNumber,
433 ) -> Option<Object> {
434 self.rocks.get_object_by_key(object_id, version)
435 }
436}
437
438impl ReadStore for RestReadStore {
439 fn get_committee(&self, epoch: EpochId) -> Option<Arc<Committee>> {
440 self.rocks.get_committee(epoch)
441 }
442
443 fn get_latest_checkpoint(&self) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
444 self.rocks.get_latest_checkpoint()
445 }
446
447 fn get_highest_verified_checkpoint(
448 &self,
449 ) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
450 self.rocks.get_highest_verified_checkpoint()
451 }
452
453 fn get_highest_synced_checkpoint(
454 &self,
455 ) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
456 self.rocks.get_highest_synced_checkpoint()
457 }
458
459 fn get_lowest_available_checkpoint(
460 &self,
461 ) -> sui_types::storage::error::Result<CheckpointSequenceNumber> {
462 self.rocks.get_lowest_available_checkpoint()
463 }
464
465 fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<VerifiedCheckpoint> {
466 self.rocks.get_checkpoint_by_digest(digest)
467 }
468
469 fn get_checkpoint_by_sequence_number(
470 &self,
471 sequence_number: CheckpointSequenceNumber,
472 ) -> Option<VerifiedCheckpoint> {
473 self.rocks
474 .get_checkpoint_by_sequence_number(sequence_number)
475 }
476
477 fn get_checkpoint_contents_by_digest(
478 &self,
479 digest: &CheckpointContentsDigest,
480 ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
481 self.rocks.get_checkpoint_contents_by_digest(digest)
482 }
483
484 fn get_checkpoint_contents_by_sequence_number(
485 &self,
486 sequence_number: CheckpointSequenceNumber,
487 ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
488 self.rocks
489 .get_checkpoint_contents_by_sequence_number(sequence_number)
490 }
491
492 fn get_transaction(&self, digest: &TransactionDigest) -> Option<Arc<VerifiedTransaction>> {
493 self.rocks.get_transaction(digest)
494 }
495
496 fn multi_get_transactions(
497 &self,
498 digests: &[TransactionDigest],
499 ) -> Vec<Option<Arc<VerifiedTransaction>>> {
500 self.rocks.multi_get_transactions(digests)
501 }
502
503 fn get_transaction_effects(&self, digest: &TransactionDigest) -> Option<TransactionEffects> {
504 self.rocks.get_transaction_effects(digest)
505 }
506
507 fn multi_get_transaction_effects(
508 &self,
509 digests: &[TransactionDigest],
510 ) -> Vec<Option<TransactionEffects>> {
511 self.rocks.multi_get_transaction_effects(digests)
512 }
513
514 fn get_events(&self, digest: &TransactionDigest) -> Option<TransactionEvents> {
515 self.rocks.get_events(digest)
516 }
517
518 fn multi_get_events(&self, digests: &[TransactionDigest]) -> Vec<Option<TransactionEvents>> {
519 self.rocks.multi_get_events(digests)
520 }
521
522 fn get_full_checkpoint_contents(
523 &self,
524 sequence_number: Option<CheckpointSequenceNumber>,
525 digest: &CheckpointContentsDigest,
526 ) -> Option<VersionedFullCheckpointContents> {
527 self.rocks
528 .get_full_checkpoint_contents(sequence_number, digest)
529 }
530
531 fn get_unchanged_loaded_runtime_objects(
532 &self,
533 digest: &TransactionDigest,
534 ) -> Option<Vec<ObjectKey>> {
535 self.rocks.get_unchanged_loaded_runtime_objects(digest)
536 }
537
538 fn get_transaction_checkpoint(
539 &self,
540 digest: &TransactionDigest,
541 ) -> Option<CheckpointSequenceNumber> {
542 self.rocks.get_transaction_checkpoint(digest)
543 }
544}
545
546impl ChildObjectResolver for RestReadStore {
547 fn read_child_object(
548 &self,
549 parent: &ObjectID,
550 child: &ObjectID,
551 child_version_upper_bound: SequenceNumber,
552 ) -> SuiResult<Option<Object>> {
553 Ok(self.get_object(child).and_then(|o| {
554 if o.version() <= child_version_upper_bound
555 && o.owner == Owner::ObjectOwner((*parent).into())
556 {
557 Some(o)
558 } else {
559 None
560 }
561 }))
562 }
563
564 fn get_object_received_at_version(
565 &self,
566 _owner: &ObjectID,
567 _receiving_object_id: &ObjectID,
568 _receive_object_at_version: SequenceNumber,
569 _epoch_id: EpochId,
570 ) -> SuiResult<Option<Object>> {
571 Err(SuiErrorKind::UnsupportedFeatureError {
572 error: "RestReadStore does not support receiving objects".to_string(),
573 }
574 .into())
575 }
576}
577
578impl RpcStateReader for RestReadStore {
579 fn get_lowest_available_checkpoint_objects(
580 &self,
581 ) -> sui_types::storage::error::Result<CheckpointSequenceNumber> {
582 Ok(self
583 .state
584 .get_object_cache_reader()
585 .get_highest_pruned_checkpoint()
586 .map(|cp| cp + 1)
587 .unwrap_or(0))
588 }
589
590 fn get_chain_identifier(&self) -> Result<sui_types::digests::ChainIdentifier> {
591 Ok(self.state.get_chain_identifier())
592 }
593
594 fn indexes(&self) -> Option<&dyn RpcIndexes> {
595 Some(self)
596 }
597
598 fn get_struct_layout_with_overlay(
599 &self,
600 struct_tag: &move_core_types::language_storage::StructTag,
601 overlay: &ObjectSet,
602 ) -> Result<Option<move_core_types::annotated_value::MoveTypeLayout>> {
603 let backing_store = self.state.get_backing_package_store();
604 let overlay_store = OverlayBackingPackageStore::new(overlay, backing_store.as_ref());
605 let epoch_store = self.state.load_epoch_store_one_call_per_task();
606 epoch_store
607 .executor()
608 .type_layout_resolver(epoch_store.protocol_config(), Box::new(overlay_store))
610 .get_annotated_layout(struct_tag)
611 .map(|layout| layout.into_layout())
612 .map(Some)
613 .map_err(StorageError::custom)
614 }
615}
616
617impl RpcIndexes for RestReadStore {
618 fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<sui_types::storage::EpochInfo>> {
619 self.index()?
620 .get_epoch_info(epoch)
621 .map_err(StorageError::custom)
622 }
623
624 fn owned_objects_iter(
625 &self,
626 owner: SuiAddress,
627 object_type: Option<StructTag>,
628 cursor: Option<OwnedObjectInfo>,
629 ) -> Result<Box<dyn Iterator<Item = Result<OwnedObjectInfo, TypedStoreError>> + '_>> {
630 let cursor = cursor.map(|cursor| OwnerIndexKey {
631 owner: cursor.owner,
632 object_type: cursor.object_type,
633 inverted_balance: cursor.balance.map(std::ops::Not::not),
634 object_id: cursor.object_id,
635 });
636
637 let iter = self
638 .index()?
639 .owner_iter(owner, object_type, cursor)?
640 .map(|result| {
641 result.map(
642 |(
643 OwnerIndexKey {
644 owner,
645 object_id,
646 object_type,
647 inverted_balance,
648 },
649 OwnerIndexInfo { version },
650 )| {
651 OwnedObjectInfo {
652 owner,
653 object_type,
654 balance: inverted_balance.map(std::ops::Not::not),
655 object_id,
656 version,
657 }
658 },
659 )
660 });
661
662 Ok(Box::new(iter) as _)
663 }
664
665 fn dynamic_field_iter(
666 &self,
667 parent: ObjectID,
668 cursor: Option<ObjectID>,
669 ) -> sui_types::storage::error::Result<
670 Box<dyn Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_>,
671 > {
672 let iter = self.index()?.dynamic_field_iter(parent, cursor)?;
673 Ok(Box::new(iter) as _)
674 }
675
676 fn get_coin_info(
677 &self,
678 coin_type: &StructTag,
679 ) -> sui_types::storage::error::Result<Option<CoinInfo>> {
680 self.index()?
681 .get_coin_info(coin_type)?
682 .map(
683 |CoinIndexInfo {
684 coin_metadata_object_id,
685 treasury_object_id,
686 regulated_coin_metadata_object_id,
687 }| CoinInfo {
688 coin_metadata_object_id,
689 treasury_object_id,
690 regulated_coin_metadata_object_id,
691 },
692 )
693 .pipe(Ok)
694 }
695
696 fn get_balance(
697 &self,
698 owner: &SuiAddress,
699 coin_type: &StructTag,
700 ) -> sui_types::storage::error::Result<Option<BalanceInfo>> {
701 self.index()?
702 .get_balance(owner, coin_type)?
703 .map(|info| info.into())
704 .pipe(Ok)
705 }
706
707 fn balance_iter(
708 &self,
709 owner: &SuiAddress,
710 cursor: Option<(SuiAddress, StructTag)>,
711 ) -> sui_types::storage::error::Result<BalanceIterator<'_>> {
712 let cursor_key =
713 cursor.map(|(owner, coin_type)| crate::rpc_index::BalanceKey { owner, coin_type });
714
715 Ok(Box::new(
716 self.index()?
717 .balance_iter(*owner, cursor_key)?
718 .map(|result| {
719 result
720 .map(|(key, info)| (key.coin_type, info.into()))
721 .map_err(Into::into)
722 }),
723 ))
724 }
725
726 fn package_versions_iter(
727 &self,
728 original_id: ObjectID,
729 cursor: Option<u64>,
730 ) -> sui_types::storage::error::Result<
731 Box<dyn Iterator<Item = Result<(u64, ObjectID), TypedStoreError>> + '_>,
732 > {
733 let iter = self.index()?.package_versions_iter(original_id, cursor)?;
734 Ok(
735 Box::new(iter.map(|result| result.map(|(key, info)| (key.version, info.storage_id))))
736 as _,
737 )
738 }
739
740 fn get_highest_indexed_checkpoint_seq_number(
741 &self,
742 ) -> sui_types::storage::error::Result<Option<CheckpointSequenceNumber>> {
743 self.index()?
744 .get_highest_indexed_checkpoint_seq_number()
745 .map_err(Into::into)
746 }
747
748 fn ledger_tx_seq_digest(&self, tx_seq: u64) -> Result<Option<LedgerTxSeqDigest>> {
749 self.index()?
750 .ledger_tx_seq_digest(tx_seq)
751 .map_err(Into::into)
752 }
753
754 fn ledger_tx_seq_digest_multi_get(
755 &self,
756 tx_seqs: &[u64],
757 ) -> Result<Vec<Option<LedgerTxSeqDigest>>> {
758 self.index()?
759 .ledger_tx_seq_digest_multi_get(tx_seqs)
760 .map_err(Into::into)
761 }
762
763 fn ledger_tx_seq_digest_iter(
764 &self,
765 start: u64,
766 end_exclusive: u64,
767 descending: bool,
768 ) -> Result<LedgerTxSeqDigestIterator<'_>> {
769 self.index()?
770 .ledger_tx_seq_digest_iter(start, end_exclusive, descending)
771 .map_err(Into::into)
772 }
773
774 fn transaction_bitmap_bucket_iter(
775 &self,
776 dimension_key: Vec<u8>,
777 start_bucket: u64,
778 end_bucket_exclusive: u64,
779 descending: bool,
780 ) -> Result<LedgerBitmapBucketIterator<'_>> {
781 self.index()?
782 .transaction_bitmap_bucket_iter(
783 dimension_key,
784 start_bucket,
785 end_bucket_exclusive,
786 descending,
787 )
788 .map_err(Into::into)
789 }
790
791 fn event_bitmap_bucket_iter(
792 &self,
793 dimension_key: Vec<u8>,
794 start_bucket: u64,
795 end_bucket_exclusive: u64,
796 descending: bool,
797 ) -> Result<LedgerBitmapBucketIterator<'_>> {
798 self.index()?
799 .event_bitmap_bucket_iter(
800 dimension_key,
801 start_bucket,
802 end_bucket_exclusive,
803 descending,
804 )
805 .map_err(Into::into)
806 }
807}
808
809pub struct RpcStoreReadStore {
833 state: Arc<AuthorityState>,
834 rocks: RocksDbStore,
835 reader: RpcStoreReader,
836}
837
838impl RpcStoreReadStore {
839 pub fn new(state: Arc<AuthorityState>, rocks: RocksDbStore, reader: RpcStoreReader) -> Self {
840 Self {
841 state,
842 rocks,
843 reader,
844 }
845 }
846}
847
848impl ObjectStore for RpcStoreReadStore {
849 fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
850 self.rocks.get_object(object_id)
851 }
852
853 fn get_object_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> Option<Object> {
854 self.rocks.get_object_by_key(object_id, version)
855 }
856}
857
858impl ReadStore for RpcStoreReadStore {
859 fn get_committee(&self, epoch: EpochId) -> Option<Arc<Committee>> {
860 self.rocks.get_committee(epoch)
861 }
862
863 fn get_latest_checkpoint(&self) -> Result<VerifiedCheckpoint> {
864 self.rocks.get_latest_checkpoint()
865 }
866
867 fn get_highest_verified_checkpoint(&self) -> Result<VerifiedCheckpoint> {
868 self.rocks.get_highest_verified_checkpoint()
869 }
870
871 fn get_highest_synced_checkpoint(&self) -> Result<VerifiedCheckpoint> {
872 self.rocks.get_highest_synced_checkpoint()
873 }
874
875 fn get_lowest_available_checkpoint(&self) -> Result<CheckpointSequenceNumber> {
876 let perpetual = self.rocks.get_lowest_available_checkpoint()?;
880 let rpc_store = self.reader.get_lowest_available_checkpoint()?;
881 Ok(perpetual.max(rpc_store))
882 }
883
884 fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<VerifiedCheckpoint> {
885 self.rocks.get_checkpoint_by_digest(digest)
886 }
887
888 fn get_checkpoint_by_sequence_number(
889 &self,
890 sequence_number: CheckpointSequenceNumber,
891 ) -> Option<VerifiedCheckpoint> {
892 self.rocks
893 .get_checkpoint_by_sequence_number(sequence_number)
894 }
895
896 fn get_checkpoint_contents_by_digest(
897 &self,
898 digest: &CheckpointContentsDigest,
899 ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
900 self.rocks.get_checkpoint_contents_by_digest(digest)
901 }
902
903 fn get_checkpoint_contents_by_sequence_number(
904 &self,
905 sequence_number: CheckpointSequenceNumber,
906 ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
907 self.rocks
908 .get_checkpoint_contents_by_sequence_number(sequence_number)
909 }
910
911 fn get_transaction(&self, digest: &TransactionDigest) -> Option<Arc<VerifiedTransaction>> {
912 self.rocks.get_transaction(digest)
913 }
914
915 fn multi_get_transactions(
916 &self,
917 digests: &[TransactionDigest],
918 ) -> Vec<Option<Arc<VerifiedTransaction>>> {
919 self.rocks.multi_get_transactions(digests)
920 }
921
922 fn get_transaction_effects(&self, digest: &TransactionDigest) -> Option<TransactionEffects> {
923 self.rocks.get_transaction_effects(digest)
924 }
925
926 fn multi_get_transaction_effects(
927 &self,
928 digests: &[TransactionDigest],
929 ) -> Vec<Option<TransactionEffects>> {
930 self.rocks.multi_get_transaction_effects(digests)
931 }
932
933 fn get_events(&self, digest: &TransactionDigest) -> Option<TransactionEvents> {
934 self.rocks.get_events(digest)
935 }
936
937 fn multi_get_events(&self, digests: &[TransactionDigest]) -> Vec<Option<TransactionEvents>> {
938 self.rocks.multi_get_events(digests)
939 }
940
941 fn get_full_checkpoint_contents(
942 &self,
943 sequence_number: Option<CheckpointSequenceNumber>,
944 digest: &CheckpointContentsDigest,
945 ) -> Option<VersionedFullCheckpointContents> {
946 self.rocks
947 .get_full_checkpoint_contents(sequence_number, digest)
948 }
949
950 fn get_unchanged_loaded_runtime_objects(
951 &self,
952 digest: &TransactionDigest,
953 ) -> Option<Vec<ObjectKey>> {
954 self.rocks.get_unchanged_loaded_runtime_objects(digest)
955 }
956
957 fn get_transaction_checkpoint(
958 &self,
959 digest: &TransactionDigest,
960 ) -> Option<CheckpointSequenceNumber> {
961 self.rocks.get_transaction_checkpoint(digest)
962 }
963}
964
965impl ChildObjectResolver for RpcStoreReadStore {
966 fn read_child_object(
967 &self,
968 parent: &ObjectID,
969 child: &ObjectID,
970 child_version_upper_bound: SequenceNumber,
971 ) -> SuiResult<Option<Object>> {
972 Ok(self.get_object(child).and_then(|o| {
973 if o.version() <= child_version_upper_bound
974 && o.owner == Owner::ObjectOwner((*parent).into())
975 {
976 Some(o)
977 } else {
978 None
979 }
980 }))
981 }
982
983 fn get_object_received_at_version(
984 &self,
985 _owner: &ObjectID,
986 _receiving_object_id: &ObjectID,
987 _receive_object_at_version: SequenceNumber,
988 _epoch_id: EpochId,
989 ) -> SuiResult<Option<Object>> {
990 Err(SuiErrorKind::UnsupportedFeatureError {
991 error: "RpcStoreReadStore does not support receiving objects".to_string(),
992 }
993 .into())
994 }
995}
996
997impl RpcStateReader for RpcStoreReadStore {
998 fn get_lowest_available_checkpoint_objects(&self) -> Result<CheckpointSequenceNumber> {
999 let perpetual = self
1000 .state
1001 .get_object_cache_reader()
1002 .get_highest_pruned_checkpoint()
1003 .map(|cp| cp + 1)
1004 .unwrap_or(0);
1005 let rpc_store = self.reader.get_lowest_available_checkpoint_objects()?;
1006 Ok(perpetual.max(rpc_store))
1007 }
1008
1009 fn get_chain_identifier(&self) -> Result<sui_types::digests::ChainIdentifier> {
1010 Ok(self.state.get_chain_identifier())
1011 }
1012
1013 fn indexes(&self) -> Option<&dyn RpcIndexes> {
1014 Some(self)
1015 }
1016
1017 fn get_struct_layout_with_overlay(
1018 &self,
1019 struct_tag: &move_core_types::language_storage::StructTag,
1020 overlay: &ObjectSet,
1021 ) -> Result<Option<move_core_types::annotated_value::MoveTypeLayout>> {
1022 let backing_store = self.state.get_backing_package_store();
1027 let overlay_store = OverlayBackingPackageStore::new(overlay, backing_store.as_ref());
1028 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1029 epoch_store
1030 .executor()
1031 .type_layout_resolver(epoch_store.protocol_config(), Box::new(overlay_store))
1032 .get_annotated_layout(struct_tag)
1033 .map(|layout| layout.into_layout())
1034 .map(Some)
1035 .map_err(StorageError::custom)
1036 }
1037}
1038
1039impl RpcIndexes for RpcStoreReadStore {
1040 fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<sui_types::storage::EpochInfo>> {
1041 self.reader.get_epoch_info(epoch)
1042 }
1043
1044 fn owned_objects_iter(
1045 &self,
1046 owner: SuiAddress,
1047 object_type: Option<StructTag>,
1048 cursor: Option<OwnedObjectInfo>,
1049 ) -> Result<Box<dyn Iterator<Item = Result<OwnedObjectInfo, TypedStoreError>> + '_>> {
1050 self.reader.owned_objects_iter(owner, object_type, cursor)
1051 }
1052
1053 fn dynamic_field_iter(
1054 &self,
1055 parent: ObjectID,
1056 cursor: Option<ObjectID>,
1057 ) -> Result<Box<dyn Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_>> {
1058 self.reader.dynamic_field_iter(parent, cursor)
1059 }
1060
1061 fn get_coin_info(&self, coin_type: &StructTag) -> Result<Option<CoinInfo>> {
1062 self.reader.get_coin_info(coin_type)
1063 }
1064
1065 fn get_balance(
1066 &self,
1067 owner: &SuiAddress,
1068 coin_type: &StructTag,
1069 ) -> Result<Option<BalanceInfo>> {
1070 self.reader.get_balance(owner, coin_type)
1071 }
1072
1073 fn balance_iter(
1074 &self,
1075 owner: &SuiAddress,
1076 cursor: Option<(SuiAddress, StructTag)>,
1077 ) -> Result<BalanceIterator<'_>> {
1078 self.reader.balance_iter(owner, cursor)
1079 }
1080
1081 fn package_versions_iter(
1082 &self,
1083 original_id: ObjectID,
1084 cursor: Option<u64>,
1085 ) -> Result<Box<dyn Iterator<Item = Result<(u64, ObjectID), TypedStoreError>> + '_>> {
1086 self.reader.package_versions_iter(original_id, cursor)
1087 }
1088
1089 fn get_highest_indexed_checkpoint_seq_number(
1090 &self,
1091 ) -> Result<Option<CheckpointSequenceNumber>> {
1092 self.reader.get_highest_indexed_checkpoint_seq_number()
1093 }
1094
1095 fn ledger_tx_seq_digest(&self, tx_seq: u64) -> Result<Option<LedgerTxSeqDigest>> {
1096 self.reader.ledger_tx_seq_digest(tx_seq)
1097 }
1098
1099 fn ledger_tx_seq_digest_multi_get(
1100 &self,
1101 tx_seqs: &[u64],
1102 ) -> Result<Vec<Option<LedgerTxSeqDigest>>> {
1103 self.reader.ledger_tx_seq_digest_multi_get(tx_seqs)
1104 }
1105
1106 fn ledger_tx_seq_digest_iter(
1107 &self,
1108 start: u64,
1109 end_exclusive: u64,
1110 descending: bool,
1111 ) -> Result<LedgerTxSeqDigestIterator<'_>> {
1112 self.reader
1113 .ledger_tx_seq_digest_iter(start, end_exclusive, descending)
1114 }
1115
1116 fn transaction_bitmap_bucket_iter(
1117 &self,
1118 dimension_key: Vec<u8>,
1119 start_bucket: u64,
1120 end_bucket_exclusive: u64,
1121 descending: bool,
1122 ) -> Result<LedgerBitmapBucketIterator<'_>> {
1123 self.reader.transaction_bitmap_bucket_iter(
1124 dimension_key,
1125 start_bucket,
1126 end_bucket_exclusive,
1127 descending,
1128 )
1129 }
1130
1131 fn event_bitmap_bucket_iter(
1132 &self,
1133 dimension_key: Vec<u8>,
1134 start_bucket: u64,
1135 end_bucket_exclusive: u64,
1136 descending: bool,
1137 ) -> Result<LedgerBitmapBucketIterator<'_>> {
1138 self.reader.event_bitmap_bucket_iter(
1139 dimension_key,
1140 start_bucket,
1141 end_bucket_exclusive,
1142 descending,
1143 )
1144 }
1145}