sui_core/
storage.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::AuthorityState;
5use crate::checkpoints::CheckpointStore;
6use crate::epoch::committee_store::CommitteeStore;
7use crate::execution_cache::ExecutionCacheTraitPointers;
8use crate::rpc_index::CoinIndexInfo;
9use crate::rpc_index::OwnerIndexInfo;
10use crate::rpc_index::OwnerIndexKey;
11use crate::rpc_index::RpcIndexStore;
12use move_core_types::language_storage::StructTag;
13use parking_lot::Mutex;
14use std::sync::Arc;
15use sui_rpc_store::RpcStoreReader;
16use sui_types::base_types::ObjectID;
17use sui_types::base_types::SequenceNumber;
18use sui_types::base_types::SuiAddress;
19use sui_types::base_types::TransactionDigest;
20use sui_types::committee::Committee;
21use sui_types::committee::EpochId;
22use sui_types::effects::{TransactionEffects, TransactionEvents};
23use sui_types::error::{SuiErrorKind, SuiResult};
24use sui_types::full_checkpoint_content::ObjectSet;
25use sui_types::messages_checkpoint::CheckpointContentsDigest;
26use sui_types::messages_checkpoint::CheckpointDigest;
27use sui_types::messages_checkpoint::CheckpointSequenceNumber;
28use sui_types::messages_checkpoint::EndOfEpochData;
29use sui_types::messages_checkpoint::VerifiedCheckpoint;
30use sui_types::messages_checkpoint::VerifiedCheckpointContents;
31use sui_types::messages_checkpoint::VersionedFullCheckpointContents;
32use sui_types::object::Object;
33use sui_types::object::Owner;
34use sui_types::storage::BalanceInfo;
35use sui_types::storage::BalanceIterator;
36use sui_types::storage::ChildObjectResolver;
37use sui_types::storage::CoinInfo;
38use sui_types::storage::DynamicFieldKey;
39use sui_types::storage::LedgerBitmapBucketIterator;
40use sui_types::storage::LedgerTxSeqDigest;
41use sui_types::storage::LedgerTxSeqDigestIterator;
42use sui_types::storage::ObjectStore;
43use sui_types::storage::OwnedObjectInfo;
44use sui_types::storage::RpcIndexes;
45use sui_types::storage::RpcStateReader;
46use sui_types::storage::WriteStore;
47use sui_types::storage::error::Error as StorageError;
48use sui_types::storage::error::Result;
49use sui_types::storage::{ObjectKey, OverlayBackingPackageStore, ReadStore};
50use sui_types::transaction::VerifiedTransaction;
51use tap::Pipe;
52use tap::TapFallible;
53use tracing::error;
54use typed_store::TypedStoreError;
55
56#[derive(Clone)]
57pub struct RocksDbStore {
58    cache_traits: ExecutionCacheTraitPointers,
59
60    committee_store: Arc<CommitteeStore>,
61    checkpoint_store: Arc<CheckpointStore>,
62    // in memory checkpoint watermark sequence numbers
63    highest_verified_checkpoint: Arc<Mutex<Option<u64>>>,
64    highest_synced_checkpoint: Arc<Mutex<Option<u64>>>,
65}
66
67impl RocksDbStore {
68    pub fn new(
69        cache_traits: ExecutionCacheTraitPointers,
70        committee_store: Arc<CommitteeStore>,
71        checkpoint_store: Arc<CheckpointStore>,
72    ) -> Self {
73        Self {
74            cache_traits,
75            committee_store,
76            checkpoint_store,
77            highest_verified_checkpoint: Arc::new(Mutex::new(None)),
78            highest_synced_checkpoint: Arc::new(Mutex::new(None)),
79        }
80    }
81
82    pub fn get_objects(&self, object_keys: &[ObjectKey]) -> Vec<Option<Object>> {
83        self.cache_traits
84            .object_cache_reader
85            .multi_get_objects_by_key(object_keys)
86    }
87
88    pub fn get_last_executed_checkpoint(&self) -> Option<VerifiedCheckpoint> {
89        self.checkpoint_store
90            .get_highest_executed_checkpoint()
91            .expect("db error")
92    }
93}
94
95impl ReadStore for RocksDbStore {
96    fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<VerifiedCheckpoint> {
97        self.checkpoint_store
98            .get_checkpoint_by_digest(digest)
99            .expect("db error")
100    }
101
102    fn get_checkpoint_by_sequence_number(
103        &self,
104        sequence_number: CheckpointSequenceNumber,
105    ) -> Option<VerifiedCheckpoint> {
106        self.checkpoint_store
107            .get_checkpoint_by_sequence_number(sequence_number)
108            .expect("db error")
109    }
110
111    fn get_highest_verified_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
112        self.checkpoint_store
113            .get_highest_verified_checkpoint()
114            .map(|maybe_checkpoint| {
115                maybe_checkpoint
116                    .expect("storage should have been initialized with genesis checkpoint")
117            })
118            .map_err(Into::into)
119    }
120
121    fn get_highest_synced_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
122        self.checkpoint_store
123            .get_highest_synced_checkpoint()
124            .map(|maybe_checkpoint| {
125                maybe_checkpoint
126                    .expect("storage should have been initialized with genesis checkpoint")
127            })
128            .map_err(Into::into)
129    }
130
131    fn get_lowest_available_checkpoint(&self) -> Result<CheckpointSequenceNumber, StorageError> {
132        if let Some(highest_pruned_cp) = self
133            .checkpoint_store
134            .get_highest_pruned_checkpoint_seq_number()
135            .map_err(Into::<StorageError>::into)?
136        {
137            Ok(highest_pruned_cp + 1)
138        } else {
139            Ok(0)
140        }
141    }
142
143    fn get_full_checkpoint_contents(
144        &self,
145        sequence_number: Option<CheckpointSequenceNumber>,
146        digest: &CheckpointContentsDigest,
147    ) -> Option<VersionedFullCheckpointContents> {
148        #[cfg(debug_assertions)]
149        if let Some(sequence_number) = sequence_number {
150            // When sequence_number is provided as an optimization, we want to ensure that
151            // the sequence number we get from the db matches the one we provided.
152            // Only check this in debug mode though.
153            if let Some(loaded_sequence_number) = self
154                .checkpoint_store
155                .get_sequence_number_by_contents_digest(digest)
156                .expect("db error")
157            {
158                assert_eq!(loaded_sequence_number, sequence_number);
159            }
160        }
161
162        let sequence_number = sequence_number.or_else(|| {
163            self.checkpoint_store
164                .get_sequence_number_by_contents_digest(digest)
165                .expect("db error")
166        });
167        if let Some(sequence_number) = sequence_number {
168            // Note: We don't use `?` here because we want to tolerate
169            // potential db errors due to data corruption.
170            // In that case, we will fallback and construct the contents
171            // from the individual components as if we could not find the
172            // cached full contents.
173            if let Ok(Some(contents)) = self
174                .checkpoint_store
175                .get_full_checkpoint_contents_by_sequence_number(sequence_number)
176                .tap_err(|e| {
177                    error!(
178                        "error getting full checkpoint contents for checkpoint {:?}: {:?}",
179                        sequence_number, e
180                    )
181                })
182            {
183                return Some(contents);
184            }
185        }
186
187        // Otherwise gather it from the individual components.
188        // Note we can't insert the constructed contents into `full_checkpoint_content`,
189        // because it needs to be inserted along with `checkpoint_sequence_by_contents_digest`
190        // and `checkpoint_content`. However at this point it's likely we don't know the
191        // corresponding sequence number yet.
192        self.checkpoint_store
193            .get_checkpoint_contents(digest)
194            .expect("db error")
195            .and_then(|contents| {
196                let mut transactions = Vec::with_capacity(contents.size());
197                for tx in contents.iter() {
198                    if let (Some(t), Some(e)) = (
199                        self.get_transaction(&tx.transaction),
200                        self.cache_traits
201                            .transaction_cache_reader
202                            .get_effects(&tx.effects),
203                    ) {
204                        transactions.push(sui_types::base_types::ExecutionData::new(
205                            (*t).clone().into_inner(),
206                            e,
207                        ))
208                    } else {
209                        return None;
210                    }
211                }
212                Some(
213                    VersionedFullCheckpointContents::from_contents_and_execution_data(
214                        contents,
215                        transactions.into_iter(),
216                    ),
217                )
218            })
219    }
220
221    fn get_committee(&self, epoch: EpochId) -> Option<Arc<Committee>> {
222        self.committee_store.get_committee(&epoch).unwrap()
223    }
224
225    fn get_transaction(&self, digest: &TransactionDigest) -> Option<Arc<VerifiedTransaction>> {
226        self.cache_traits
227            .transaction_cache_reader
228            .get_transaction_block(digest)
229    }
230
231    fn multi_get_transactions(
232        &self,
233        digests: &[TransactionDigest],
234    ) -> Vec<Option<Arc<VerifiedTransaction>>> {
235        self.cache_traits
236            .transaction_cache_reader
237            .multi_get_transaction_blocks(digests)
238    }
239
240    fn get_transaction_effects(&self, digest: &TransactionDigest) -> Option<TransactionEffects> {
241        self.cache_traits
242            .transaction_cache_reader
243            .get_executed_effects(digest)
244    }
245
246    fn multi_get_transaction_effects(
247        &self,
248        digests: &[TransactionDigest],
249    ) -> Vec<Option<TransactionEffects>> {
250        self.cache_traits
251            .transaction_cache_reader
252            .multi_get_executed_effects(digests)
253    }
254
255    fn get_events(&self, digest: &TransactionDigest) -> Option<TransactionEvents> {
256        self.cache_traits
257            .transaction_cache_reader
258            .get_events(digest)
259    }
260
261    fn multi_get_events(&self, digests: &[TransactionDigest]) -> Vec<Option<TransactionEvents>> {
262        self.cache_traits
263            .transaction_cache_reader
264            .multi_get_events(digests)
265    }
266
267    fn get_unchanged_loaded_runtime_objects(
268        &self,
269        digest: &TransactionDigest,
270    ) -> Option<Vec<ObjectKey>> {
271        self.cache_traits
272            .transaction_cache_reader
273            .get_unchanged_loaded_runtime_objects(digest)
274    }
275
276    fn get_transaction_checkpoint(
277        &self,
278        digest: &TransactionDigest,
279    ) -> Option<CheckpointSequenceNumber> {
280        self.cache_traits
281            .checkpoint_cache
282            .deprecated_get_transaction_checkpoint(digest)
283            .map(|(_epoch, checkpoint)| checkpoint)
284    }
285
286    fn get_latest_checkpoint(&self) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
287        self.checkpoint_store
288            .get_highest_executed_checkpoint()
289            .expect("db error")
290            .ok_or_else(|| {
291                sui_types::storage::error::Error::missing("unable to get latest checkpoint")
292            })
293    }
294
295    fn get_checkpoint_contents_by_digest(
296        &self,
297        digest: &CheckpointContentsDigest,
298    ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
299        self.checkpoint_store
300            .get_checkpoint_contents(digest)
301            .expect("db error")
302    }
303
304    fn get_checkpoint_contents_by_sequence_number(
305        &self,
306        sequence_number: CheckpointSequenceNumber,
307    ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
308        match self.get_checkpoint_by_sequence_number(sequence_number) {
309            Some(checkpoint) => self.get_checkpoint_contents_by_digest(&checkpoint.content_digest),
310            None => None,
311        }
312    }
313}
314
315impl ObjectStore for RocksDbStore {
316    fn get_object(&self, object_id: &sui_types::base_types::ObjectID) -> Option<Object> {
317        self.cache_traits.object_store.get_object(object_id)
318    }
319
320    fn get_object_by_key(
321        &self,
322        object_id: &sui_types::base_types::ObjectID,
323        version: sui_types::base_types::VersionNumber,
324    ) -> Option<Object> {
325        self.cache_traits
326            .object_store
327            .get_object_by_key(object_id, version)
328    }
329}
330
331impl WriteStore for RocksDbStore {
332    fn insert_checkpoint(
333        &self,
334        checkpoint: &VerifiedCheckpoint,
335    ) -> Result<(), sui_types::storage::error::Error> {
336        if let Some(EndOfEpochData {
337            next_epoch_committee,
338            ..
339        }) = checkpoint.end_of_epoch_data.as_ref()
340        {
341            let next_committee = next_epoch_committee.iter().cloned().collect();
342            let committee =
343                Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
344            self.insert_committee(committee)?;
345        }
346
347        self.checkpoint_store
348            .insert_verified_checkpoint(checkpoint)
349            .map_err(Into::into)
350    }
351
352    fn update_highest_synced_checkpoint(
353        &self,
354        checkpoint: &VerifiedCheckpoint,
355    ) -> Result<(), sui_types::storage::error::Error> {
356        let mut locked = self.highest_synced_checkpoint.lock();
357        if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
358            return Ok(());
359        }
360        self.checkpoint_store
361            .update_highest_synced_checkpoint(checkpoint)
362            .map_err(sui_types::storage::error::Error::custom)?;
363        *locked = Some(checkpoint.sequence_number);
364        Ok(())
365    }
366
367    fn update_highest_verified_checkpoint(
368        &self,
369        checkpoint: &VerifiedCheckpoint,
370    ) -> Result<(), sui_types::storage::error::Error> {
371        let mut locked = self.highest_verified_checkpoint.lock();
372        if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
373            return Ok(());
374        }
375        self.checkpoint_store
376            .update_highest_verified_checkpoint(checkpoint)
377            .map_err(sui_types::storage::error::Error::custom)?;
378        *locked = Some(checkpoint.sequence_number);
379        Ok(())
380    }
381
382    fn insert_checkpoint_contents(
383        &self,
384        checkpoint: &VerifiedCheckpoint,
385        contents: VerifiedCheckpointContents,
386    ) -> Result<(), sui_types::storage::error::Error> {
387        self.cache_traits
388            .state_sync_store
389            .multi_insert_transaction_and_effects(contents.transactions());
390        self.checkpoint_store
391            .insert_verified_checkpoint_contents(checkpoint, contents)
392            .map_err(Into::into)
393    }
394
395    fn insert_committee(
396        &self,
397        new_committee: Committee,
398    ) -> Result<(), sui_types::storage::error::Error> {
399        self.committee_store
400            .insert_new_committee(&new_committee)
401            .unwrap();
402        Ok(())
403    }
404}
405
406pub struct RestReadStore {
407    state: Arc<AuthorityState>,
408    rocks: RocksDbStore,
409}
410
411impl RestReadStore {
412    pub fn new(state: Arc<AuthorityState>, rocks: RocksDbStore) -> Self {
413        Self { state, rocks }
414    }
415
416    fn index(&self) -> sui_types::storage::error::Result<&RpcIndexStore> {
417        self.state
418            .rpc_index
419            .as_deref()
420            .ok_or_else(|| sui_types::storage::error::Error::custom("rest index store is disabled"))
421    }
422}
423
424impl ObjectStore for RestReadStore {
425    fn get_object(&self, object_id: &sui_types::base_types::ObjectID) -> Option<Object> {
426        self.rocks.get_object(object_id)
427    }
428
429    fn get_object_by_key(
430        &self,
431        object_id: &sui_types::base_types::ObjectID,
432        version: sui_types::base_types::VersionNumber,
433    ) -> Option<Object> {
434        self.rocks.get_object_by_key(object_id, version)
435    }
436}
437
438impl ReadStore for RestReadStore {
439    fn get_committee(&self, epoch: EpochId) -> Option<Arc<Committee>> {
440        self.rocks.get_committee(epoch)
441    }
442
443    fn get_latest_checkpoint(&self) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
444        self.rocks.get_latest_checkpoint()
445    }
446
447    fn get_highest_verified_checkpoint(
448        &self,
449    ) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
450        self.rocks.get_highest_verified_checkpoint()
451    }
452
453    fn get_highest_synced_checkpoint(
454        &self,
455    ) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
456        self.rocks.get_highest_synced_checkpoint()
457    }
458
459    fn get_lowest_available_checkpoint(
460        &self,
461    ) -> sui_types::storage::error::Result<CheckpointSequenceNumber> {
462        self.rocks.get_lowest_available_checkpoint()
463    }
464
465    fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<VerifiedCheckpoint> {
466        self.rocks.get_checkpoint_by_digest(digest)
467    }
468
469    fn get_checkpoint_by_sequence_number(
470        &self,
471        sequence_number: CheckpointSequenceNumber,
472    ) -> Option<VerifiedCheckpoint> {
473        self.rocks
474            .get_checkpoint_by_sequence_number(sequence_number)
475    }
476
477    fn get_checkpoint_contents_by_digest(
478        &self,
479        digest: &CheckpointContentsDigest,
480    ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
481        self.rocks.get_checkpoint_contents_by_digest(digest)
482    }
483
484    fn get_checkpoint_contents_by_sequence_number(
485        &self,
486        sequence_number: CheckpointSequenceNumber,
487    ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
488        self.rocks
489            .get_checkpoint_contents_by_sequence_number(sequence_number)
490    }
491
492    fn get_transaction(&self, digest: &TransactionDigest) -> Option<Arc<VerifiedTransaction>> {
493        self.rocks.get_transaction(digest)
494    }
495
496    fn multi_get_transactions(
497        &self,
498        digests: &[TransactionDigest],
499    ) -> Vec<Option<Arc<VerifiedTransaction>>> {
500        self.rocks.multi_get_transactions(digests)
501    }
502
503    fn get_transaction_effects(&self, digest: &TransactionDigest) -> Option<TransactionEffects> {
504        self.rocks.get_transaction_effects(digest)
505    }
506
507    fn multi_get_transaction_effects(
508        &self,
509        digests: &[TransactionDigest],
510    ) -> Vec<Option<TransactionEffects>> {
511        self.rocks.multi_get_transaction_effects(digests)
512    }
513
514    fn get_events(&self, digest: &TransactionDigest) -> Option<TransactionEvents> {
515        self.rocks.get_events(digest)
516    }
517
518    fn multi_get_events(&self, digests: &[TransactionDigest]) -> Vec<Option<TransactionEvents>> {
519        self.rocks.multi_get_events(digests)
520    }
521
522    fn get_full_checkpoint_contents(
523        &self,
524        sequence_number: Option<CheckpointSequenceNumber>,
525        digest: &CheckpointContentsDigest,
526    ) -> Option<VersionedFullCheckpointContents> {
527        self.rocks
528            .get_full_checkpoint_contents(sequence_number, digest)
529    }
530
531    fn get_unchanged_loaded_runtime_objects(
532        &self,
533        digest: &TransactionDigest,
534    ) -> Option<Vec<ObjectKey>> {
535        self.rocks.get_unchanged_loaded_runtime_objects(digest)
536    }
537
538    fn get_transaction_checkpoint(
539        &self,
540        digest: &TransactionDigest,
541    ) -> Option<CheckpointSequenceNumber> {
542        self.rocks.get_transaction_checkpoint(digest)
543    }
544}
545
546impl ChildObjectResolver for RestReadStore {
547    fn read_child_object(
548        &self,
549        parent: &ObjectID,
550        child: &ObjectID,
551        child_version_upper_bound: SequenceNumber,
552    ) -> SuiResult<Option<Object>> {
553        Ok(self.get_object(child).and_then(|o| {
554            if o.version() <= child_version_upper_bound
555                && o.owner == Owner::ObjectOwner((*parent).into())
556            {
557                Some(o)
558            } else {
559                None
560            }
561        }))
562    }
563
564    fn get_object_received_at_version(
565        &self,
566        _owner: &ObjectID,
567        _receiving_object_id: &ObjectID,
568        _receive_object_at_version: SequenceNumber,
569        _epoch_id: EpochId,
570    ) -> SuiResult<Option<Object>> {
571        Err(SuiErrorKind::UnsupportedFeatureError {
572            error: "RestReadStore does not support receiving objects".to_string(),
573        }
574        .into())
575    }
576}
577
578impl RpcStateReader for RestReadStore {
579    fn get_lowest_available_checkpoint_objects(
580        &self,
581    ) -> sui_types::storage::error::Result<CheckpointSequenceNumber> {
582        Ok(self
583            .state
584            .get_object_cache_reader()
585            .get_highest_pruned_checkpoint()
586            .map(|cp| cp + 1)
587            .unwrap_or(0))
588    }
589
590    fn get_chain_identifier(&self) -> Result<sui_types::digests::ChainIdentifier> {
591        Ok(self.state.get_chain_identifier())
592    }
593
594    fn indexes(&self) -> Option<&dyn RpcIndexes> {
595        Some(self)
596    }
597
598    fn get_struct_layout_with_overlay(
599        &self,
600        struct_tag: &move_core_types::language_storage::StructTag,
601        overlay: &ObjectSet,
602    ) -> Result<Option<move_core_types::annotated_value::MoveTypeLayout>> {
603        let backing_store = self.state.get_backing_package_store();
604        let overlay_store = OverlayBackingPackageStore::new(overlay, backing_store.as_ref());
605        let epoch_store = self.state.load_epoch_store_one_call_per_task();
606        epoch_store
607            .executor()
608            // TODO(cache) - must read through cache
609            .type_layout_resolver(epoch_store.protocol_config(), Box::new(overlay_store))
610            .get_annotated_layout(struct_tag)
611            .map(|layout| layout.into_layout())
612            .map(Some)
613            .map_err(StorageError::custom)
614    }
615}
616
617impl RpcIndexes for RestReadStore {
618    fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<sui_types::storage::EpochInfo>> {
619        self.index()?
620            .get_epoch_info(epoch)
621            .map_err(StorageError::custom)
622    }
623
624    fn owned_objects_iter(
625        &self,
626        owner: SuiAddress,
627        object_type: Option<StructTag>,
628        cursor: Option<OwnedObjectInfo>,
629    ) -> Result<Box<dyn Iterator<Item = Result<OwnedObjectInfo, TypedStoreError>> + '_>> {
630        let cursor = cursor.map(|cursor| OwnerIndexKey {
631            owner: cursor.owner,
632            object_type: cursor.object_type,
633            inverted_balance: cursor.balance.map(std::ops::Not::not),
634            object_id: cursor.object_id,
635        });
636
637        let iter = self
638            .index()?
639            .owner_iter(owner, object_type, cursor)?
640            .map(|result| {
641                result.map(
642                    |(
643                        OwnerIndexKey {
644                            owner,
645                            object_id,
646                            object_type,
647                            inverted_balance,
648                        },
649                        OwnerIndexInfo { version },
650                    )| {
651                        OwnedObjectInfo {
652                            owner,
653                            object_type,
654                            balance: inverted_balance.map(std::ops::Not::not),
655                            object_id,
656                            version,
657                        }
658                    },
659                )
660            });
661
662        Ok(Box::new(iter) as _)
663    }
664
665    fn dynamic_field_iter(
666        &self,
667        parent: ObjectID,
668        cursor: Option<ObjectID>,
669    ) -> sui_types::storage::error::Result<
670        Box<dyn Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_>,
671    > {
672        let iter = self.index()?.dynamic_field_iter(parent, cursor)?;
673        Ok(Box::new(iter) as _)
674    }
675
676    fn get_coin_info(
677        &self,
678        coin_type: &StructTag,
679    ) -> sui_types::storage::error::Result<Option<CoinInfo>> {
680        self.index()?
681            .get_coin_info(coin_type)?
682            .map(
683                |CoinIndexInfo {
684                     coin_metadata_object_id,
685                     treasury_object_id,
686                     regulated_coin_metadata_object_id,
687                 }| CoinInfo {
688                    coin_metadata_object_id,
689                    treasury_object_id,
690                    regulated_coin_metadata_object_id,
691                },
692            )
693            .pipe(Ok)
694    }
695
696    fn get_balance(
697        &self,
698        owner: &SuiAddress,
699        coin_type: &StructTag,
700    ) -> sui_types::storage::error::Result<Option<BalanceInfo>> {
701        self.index()?
702            .get_balance(owner, coin_type)?
703            .map(|info| info.into())
704            .pipe(Ok)
705    }
706
707    fn balance_iter(
708        &self,
709        owner: &SuiAddress,
710        cursor: Option<(SuiAddress, StructTag)>,
711    ) -> sui_types::storage::error::Result<BalanceIterator<'_>> {
712        let cursor_key =
713            cursor.map(|(owner, coin_type)| crate::rpc_index::BalanceKey { owner, coin_type });
714
715        Ok(Box::new(
716            self.index()?
717                .balance_iter(*owner, cursor_key)?
718                .map(|result| {
719                    result
720                        .map(|(key, info)| (key.coin_type, info.into()))
721                        .map_err(Into::into)
722                }),
723        ))
724    }
725
726    fn package_versions_iter(
727        &self,
728        original_id: ObjectID,
729        cursor: Option<u64>,
730    ) -> sui_types::storage::error::Result<
731        Box<dyn Iterator<Item = Result<(u64, ObjectID), TypedStoreError>> + '_>,
732    > {
733        let iter = self.index()?.package_versions_iter(original_id, cursor)?;
734        Ok(
735            Box::new(iter.map(|result| result.map(|(key, info)| (key.version, info.storage_id))))
736                as _,
737        )
738    }
739
740    fn get_highest_indexed_checkpoint_seq_number(
741        &self,
742    ) -> sui_types::storage::error::Result<Option<CheckpointSequenceNumber>> {
743        self.index()?
744            .get_highest_indexed_checkpoint_seq_number()
745            .map_err(Into::into)
746    }
747
748    fn ledger_tx_seq_digest(&self, tx_seq: u64) -> Result<Option<LedgerTxSeqDigest>> {
749        self.index()?
750            .ledger_tx_seq_digest(tx_seq)
751            .map_err(Into::into)
752    }
753
754    fn ledger_tx_seq_digest_multi_get(
755        &self,
756        tx_seqs: &[u64],
757    ) -> Result<Vec<Option<LedgerTxSeqDigest>>> {
758        self.index()?
759            .ledger_tx_seq_digest_multi_get(tx_seqs)
760            .map_err(Into::into)
761    }
762
763    fn ledger_tx_seq_digest_iter(
764        &self,
765        start: u64,
766        end_exclusive: u64,
767        descending: bool,
768    ) -> Result<LedgerTxSeqDigestIterator<'_>> {
769        self.index()?
770            .ledger_tx_seq_digest_iter(start, end_exclusive, descending)
771            .map_err(Into::into)
772    }
773
774    fn transaction_bitmap_bucket_iter(
775        &self,
776        dimension_key: Vec<u8>,
777        start_bucket: u64,
778        end_bucket_exclusive: u64,
779        descending: bool,
780    ) -> Result<LedgerBitmapBucketIterator<'_>> {
781        self.index()?
782            .transaction_bitmap_bucket_iter(
783                dimension_key,
784                start_bucket,
785                end_bucket_exclusive,
786                descending,
787            )
788            .map_err(Into::into)
789    }
790
791    fn event_bitmap_bucket_iter(
792        &self,
793        dimension_key: Vec<u8>,
794        start_bucket: u64,
795        end_bucket_exclusive: u64,
796        descending: bool,
797    ) -> Result<LedgerBitmapBucketIterator<'_>> {
798        self.index()?
799            .event_bitmap_bucket_iter(
800                dimension_key,
801                start_bucket,
802                end_bucket_exclusive,
803                descending,
804            )
805            .map_err(Into::into)
806    }
807}
808
809/// Read store backed by the embedded [`sui_rpc_store`] indexer instead
810/// of the built-in [`RpcIndexStore`].
811///
812/// The two read stores serve the same `sui-rpc-api` trait stack; the
813/// difference is where the *index* surface comes from. This wrapper
814/// composes two backends:
815///
816/// - **Raw chain data** — objects, transactions, effects, events,
817///   checkpoints, committees, and child-object resolution — is served
818///   from the validator's perpetual / checkpoint stores
819///   ([`RocksDbStore`]), exactly like [`RestReadStore`]. The embedded
820///   rpc-store does not duplicate this data.
821/// - **The index surface** ([`RpcIndexes`]) — owner / type / balance /
822///   coin / package-version listings, epoch info, and the
823///   ledger-history bitmaps — is served from the
824///   [`RpcStoreReader`].
825///
826/// The object/state available range is the intersection of the two
827/// backends' ranges (`max` of their lower bounds): a consistent read
828/// at checkpoint `C` needs both the object bytes (perpetual store) and
829/// the index rows (rpc-store) at `C`. Ledger-history-specific
830/// availability (bounded by the history backfill watermark) is exposed
831/// separately.
832pub struct RpcStoreReadStore {
833    state: Arc<AuthorityState>,
834    rocks: RocksDbStore,
835    reader: RpcStoreReader,
836}
837
838impl RpcStoreReadStore {
839    pub fn new(state: Arc<AuthorityState>, rocks: RocksDbStore, reader: RpcStoreReader) -> Self {
840        Self {
841            state,
842            rocks,
843            reader,
844        }
845    }
846}
847
848impl ObjectStore for RpcStoreReadStore {
849    fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
850        self.rocks.get_object(object_id)
851    }
852
853    fn get_object_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> Option<Object> {
854        self.rocks.get_object_by_key(object_id, version)
855    }
856}
857
858impl ReadStore for RpcStoreReadStore {
859    fn get_committee(&self, epoch: EpochId) -> Option<Arc<Committee>> {
860        self.rocks.get_committee(epoch)
861    }
862
863    fn get_latest_checkpoint(&self) -> Result<VerifiedCheckpoint> {
864        self.rocks.get_latest_checkpoint()
865    }
866
867    fn get_highest_verified_checkpoint(&self) -> Result<VerifiedCheckpoint> {
868        self.rocks.get_highest_verified_checkpoint()
869    }
870
871    fn get_highest_synced_checkpoint(&self) -> Result<VerifiedCheckpoint> {
872        self.rocks.get_highest_synced_checkpoint()
873    }
874
875    fn get_lowest_available_checkpoint(&self) -> Result<CheckpointSequenceNumber> {
876        // A consistent read needs both the raw chain data (perpetual
877        // store) and the index rows (rpc-store), so the available range
878        // starts at the higher of the two lower bounds.
879        let perpetual = self.rocks.get_lowest_available_checkpoint()?;
880        let rpc_store = self.reader.get_lowest_available_checkpoint()?;
881        Ok(perpetual.max(rpc_store))
882    }
883
884    fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<VerifiedCheckpoint> {
885        self.rocks.get_checkpoint_by_digest(digest)
886    }
887
888    fn get_checkpoint_by_sequence_number(
889        &self,
890        sequence_number: CheckpointSequenceNumber,
891    ) -> Option<VerifiedCheckpoint> {
892        self.rocks
893            .get_checkpoint_by_sequence_number(sequence_number)
894    }
895
896    fn get_checkpoint_contents_by_digest(
897        &self,
898        digest: &CheckpointContentsDigest,
899    ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
900        self.rocks.get_checkpoint_contents_by_digest(digest)
901    }
902
903    fn get_checkpoint_contents_by_sequence_number(
904        &self,
905        sequence_number: CheckpointSequenceNumber,
906    ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
907        self.rocks
908            .get_checkpoint_contents_by_sequence_number(sequence_number)
909    }
910
911    fn get_transaction(&self, digest: &TransactionDigest) -> Option<Arc<VerifiedTransaction>> {
912        self.rocks.get_transaction(digest)
913    }
914
915    fn multi_get_transactions(
916        &self,
917        digests: &[TransactionDigest],
918    ) -> Vec<Option<Arc<VerifiedTransaction>>> {
919        self.rocks.multi_get_transactions(digests)
920    }
921
922    fn get_transaction_effects(&self, digest: &TransactionDigest) -> Option<TransactionEffects> {
923        self.rocks.get_transaction_effects(digest)
924    }
925
926    fn multi_get_transaction_effects(
927        &self,
928        digests: &[TransactionDigest],
929    ) -> Vec<Option<TransactionEffects>> {
930        self.rocks.multi_get_transaction_effects(digests)
931    }
932
933    fn get_events(&self, digest: &TransactionDigest) -> Option<TransactionEvents> {
934        self.rocks.get_events(digest)
935    }
936
937    fn multi_get_events(&self, digests: &[TransactionDigest]) -> Vec<Option<TransactionEvents>> {
938        self.rocks.multi_get_events(digests)
939    }
940
941    fn get_full_checkpoint_contents(
942        &self,
943        sequence_number: Option<CheckpointSequenceNumber>,
944        digest: &CheckpointContentsDigest,
945    ) -> Option<VersionedFullCheckpointContents> {
946        self.rocks
947            .get_full_checkpoint_contents(sequence_number, digest)
948    }
949
950    fn get_unchanged_loaded_runtime_objects(
951        &self,
952        digest: &TransactionDigest,
953    ) -> Option<Vec<ObjectKey>> {
954        self.rocks.get_unchanged_loaded_runtime_objects(digest)
955    }
956
957    fn get_transaction_checkpoint(
958        &self,
959        digest: &TransactionDigest,
960    ) -> Option<CheckpointSequenceNumber> {
961        self.rocks.get_transaction_checkpoint(digest)
962    }
963}
964
965impl ChildObjectResolver for RpcStoreReadStore {
966    fn read_child_object(
967        &self,
968        parent: &ObjectID,
969        child: &ObjectID,
970        child_version_upper_bound: SequenceNumber,
971    ) -> SuiResult<Option<Object>> {
972        Ok(self.get_object(child).and_then(|o| {
973            if o.version() <= child_version_upper_bound
974                && o.owner == Owner::ObjectOwner((*parent).into())
975            {
976                Some(o)
977            } else {
978                None
979            }
980        }))
981    }
982
983    fn get_object_received_at_version(
984        &self,
985        _owner: &ObjectID,
986        _receiving_object_id: &ObjectID,
987        _receive_object_at_version: SequenceNumber,
988        _epoch_id: EpochId,
989    ) -> SuiResult<Option<Object>> {
990        Err(SuiErrorKind::UnsupportedFeatureError {
991            error: "RpcStoreReadStore does not support receiving objects".to_string(),
992        }
993        .into())
994    }
995}
996
997impl RpcStateReader for RpcStoreReadStore {
998    fn get_lowest_available_checkpoint_objects(&self) -> Result<CheckpointSequenceNumber> {
999        let perpetual = self
1000            .state
1001            .get_object_cache_reader()
1002            .get_highest_pruned_checkpoint()
1003            .map(|cp| cp + 1)
1004            .unwrap_or(0);
1005        let rpc_store = self.reader.get_lowest_available_checkpoint_objects()?;
1006        Ok(perpetual.max(rpc_store))
1007    }
1008
1009    fn get_chain_identifier(&self) -> Result<sui_types::digests::ChainIdentifier> {
1010        Ok(self.state.get_chain_identifier())
1011    }
1012
1013    fn indexes(&self) -> Option<&dyn RpcIndexes> {
1014        Some(self)
1015    }
1016
1017    fn get_struct_layout_with_overlay(
1018        &self,
1019        struct_tag: &move_core_types::language_storage::StructTag,
1020        overlay: &ObjectSet,
1021    ) -> Result<Option<move_core_types::annotated_value::MoveTypeLayout>> {
1022        // Resolve through the authority's live executor and backing
1023        // package store, matching `RestReadStore`: the perpetual store
1024        // backs the package reads and the loaded epoch store carries
1025        // the current protocol config.
1026        let backing_store = self.state.get_backing_package_store();
1027        let overlay_store = OverlayBackingPackageStore::new(overlay, backing_store.as_ref());
1028        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1029        epoch_store
1030            .executor()
1031            .type_layout_resolver(epoch_store.protocol_config(), Box::new(overlay_store))
1032            .get_annotated_layout(struct_tag)
1033            .map(|layout| layout.into_layout())
1034            .map(Some)
1035            .map_err(StorageError::custom)
1036    }
1037}
1038
1039impl RpcIndexes for RpcStoreReadStore {
1040    fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<sui_types::storage::EpochInfo>> {
1041        self.reader.get_epoch_info(epoch)
1042    }
1043
1044    fn owned_objects_iter(
1045        &self,
1046        owner: SuiAddress,
1047        object_type: Option<StructTag>,
1048        cursor: Option<OwnedObjectInfo>,
1049    ) -> Result<Box<dyn Iterator<Item = Result<OwnedObjectInfo, TypedStoreError>> + '_>> {
1050        self.reader.owned_objects_iter(owner, object_type, cursor)
1051    }
1052
1053    fn dynamic_field_iter(
1054        &self,
1055        parent: ObjectID,
1056        cursor: Option<ObjectID>,
1057    ) -> Result<Box<dyn Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_>> {
1058        self.reader.dynamic_field_iter(parent, cursor)
1059    }
1060
1061    fn get_coin_info(&self, coin_type: &StructTag) -> Result<Option<CoinInfo>> {
1062        self.reader.get_coin_info(coin_type)
1063    }
1064
1065    fn get_balance(
1066        &self,
1067        owner: &SuiAddress,
1068        coin_type: &StructTag,
1069    ) -> Result<Option<BalanceInfo>> {
1070        self.reader.get_balance(owner, coin_type)
1071    }
1072
1073    fn balance_iter(
1074        &self,
1075        owner: &SuiAddress,
1076        cursor: Option<(SuiAddress, StructTag)>,
1077    ) -> Result<BalanceIterator<'_>> {
1078        self.reader.balance_iter(owner, cursor)
1079    }
1080
1081    fn package_versions_iter(
1082        &self,
1083        original_id: ObjectID,
1084        cursor: Option<u64>,
1085    ) -> Result<Box<dyn Iterator<Item = Result<(u64, ObjectID), TypedStoreError>> + '_>> {
1086        self.reader.package_versions_iter(original_id, cursor)
1087    }
1088
1089    fn get_highest_indexed_checkpoint_seq_number(
1090        &self,
1091    ) -> Result<Option<CheckpointSequenceNumber>> {
1092        self.reader.get_highest_indexed_checkpoint_seq_number()
1093    }
1094
1095    fn ledger_tx_seq_digest(&self, tx_seq: u64) -> Result<Option<LedgerTxSeqDigest>> {
1096        self.reader.ledger_tx_seq_digest(tx_seq)
1097    }
1098
1099    fn ledger_tx_seq_digest_multi_get(
1100        &self,
1101        tx_seqs: &[u64],
1102    ) -> Result<Vec<Option<LedgerTxSeqDigest>>> {
1103        self.reader.ledger_tx_seq_digest_multi_get(tx_seqs)
1104    }
1105
1106    fn ledger_tx_seq_digest_iter(
1107        &self,
1108        start: u64,
1109        end_exclusive: u64,
1110        descending: bool,
1111    ) -> Result<LedgerTxSeqDigestIterator<'_>> {
1112        self.reader
1113            .ledger_tx_seq_digest_iter(start, end_exclusive, descending)
1114    }
1115
1116    fn transaction_bitmap_bucket_iter(
1117        &self,
1118        dimension_key: Vec<u8>,
1119        start_bucket: u64,
1120        end_bucket_exclusive: u64,
1121        descending: bool,
1122    ) -> Result<LedgerBitmapBucketIterator<'_>> {
1123        self.reader.transaction_bitmap_bucket_iter(
1124            dimension_key,
1125            start_bucket,
1126            end_bucket_exclusive,
1127            descending,
1128        )
1129    }
1130
1131    fn event_bitmap_bucket_iter(
1132        &self,
1133        dimension_key: Vec<u8>,
1134        start_bucket: u64,
1135        end_bucket_exclusive: u64,
1136        descending: bool,
1137    ) -> Result<LedgerBitmapBucketIterator<'_>> {
1138        self.reader.event_bitmap_bucket_iter(
1139            dimension_key,
1140            start_bucket,
1141            end_bucket_exclusive,
1142            descending,
1143        )
1144    }
1145}