1use arc_swap::Guard;
5use async_trait::async_trait;
6use move_core_types::language_storage::TypeTag;
7use std::collections::{BTreeMap, HashMap};
8use std::sync::Arc;
9use sui_core::authority::AuthorityState;
10use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
11use sui_core::execution_cache::ObjectCacheRead;
12use sui_core::jsonrpc_index::TotalBalance;
13use sui_core::subscription_handler::SubscriptionHandler;
14use sui_json_rpc_types::{
15 Coin as SuiCoin, DevInspectResults, DryRunTransactionBlockResponse, EventFilter, SuiEvent,
16 SuiObjectDataFilter, TransactionFilter,
17};
18use sui_storage::key_value_store::{
19 KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
20};
21use sui_types::base_types::{
22 MoveObjectType, ObjectID, ObjectInfo, ObjectRef, SequenceNumber, SuiAddress,
23};
24use sui_types::bridge::Bridge;
25use sui_types::committee::{Committee, EpochId};
26use sui_types::digests::{ChainIdentifier, TransactionDigest};
27use sui_types::dynamic_field::DynamicFieldInfo;
28use sui_types::effects::TransactionEffects;
29use sui_types::error::{SuiError, SuiErrorKind, UserInputError};
30use sui_types::event::EventID;
31use sui_types::governance::StakedSui;
32use sui_types::messages_checkpoint::{
33 CheckpointContents, CheckpointContentsDigest, CheckpointDigest, CheckpointSequenceNumber,
34 VerifiedCheckpoint,
35};
36use sui_types::object::{Object, ObjectRead, PastObjectRead};
37use sui_types::storage::{BackingPackageStore, ObjectStore, WriteKind};
38use sui_types::sui_serde::BigInt;
39use sui_types::sui_system_state::SuiSystemState;
40use sui_types::transaction::{Transaction, TransactionData, TransactionKind};
41use thiserror::Error;
42use tokio::task::JoinError;
43
44use crate::ObjectProvider;
45#[cfg(test)]
46use mockall::automock;
47use typed_store_error::TypedStoreError;
48
49pub type StateReadResult<T = ()> = Result<T, StateReadError>;
50
51#[cfg_attr(test, automock)]
53#[async_trait]
54pub trait StateRead: Send + Sync {
55 async fn multi_get(
56 &self,
57 transactions: &[TransactionDigest],
58 effects: &[TransactionDigest],
59 ) -> StateReadResult<KVStoreTransactionData>;
60
61 fn get_object_read(&self, object_id: &ObjectID) -> StateReadResult<ObjectRead>;
62
63 fn get_past_object_read(
64 &self,
65 object_id: &ObjectID,
66 version: SequenceNumber,
67 ) -> StateReadResult<PastObjectRead>;
68
69 async fn get_object(&self, object_id: &ObjectID) -> StateReadResult<Option<Object>>;
70
71 fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>>;
72
73 fn get_dynamic_fields(
74 &self,
75 owner: ObjectID,
76 cursor: Option<ObjectID>,
77 limit: usize,
78 ) -> StateReadResult<Vec<(ObjectID, DynamicFieldInfo)>>;
79
80 fn get_cache_reader(&self) -> &Arc<dyn ObjectCacheRead>;
81
82 fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync>;
83
84 fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync>;
85
86 fn get_owner_objects(
87 &self,
88 owner: SuiAddress,
89 cursor: Option<ObjectID>,
90 filter: Option<SuiObjectDataFilter>,
91 ) -> StateReadResult<Vec<ObjectInfo>>;
92
93 async fn query_events(
94 &self,
95 kv_store: &Arc<TransactionKeyValueStore>,
96 query: EventFilter,
97 cursor: Option<EventID>,
99 limit: usize,
100 descending: bool,
101 ) -> StateReadResult<Vec<SuiEvent>>;
102
103 #[allow(clippy::type_complexity)]
105 async fn dry_exec_transaction(
106 &self,
107 transaction: TransactionData,
108 transaction_digest: TransactionDigest,
109 ) -> StateReadResult<(
110 DryRunTransactionBlockResponse,
111 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
112 TransactionEffects,
113 Option<ObjectID>,
114 )>;
115
116 async fn dev_inspect_transaction_block(
117 &self,
118 sender: SuiAddress,
119 transaction_kind: TransactionKind,
120 gas_price: Option<u64>,
121 gas_budget: Option<u64>,
122 gas_sponsor: Option<SuiAddress>,
123 gas_objects: Option<Vec<ObjectRef>>,
124 show_raw_txn_data_and_effects: Option<bool>,
125 skip_checks: Option<bool>,
126 ) -> StateReadResult<DevInspectResults>;
127
128 fn get_subscription_handler(&self) -> Arc<SubscriptionHandler>;
130
131 fn get_owner_objects_with_limit(
132 &self,
133 owner: SuiAddress,
134 cursor: Option<ObjectID>,
135 limit: usize,
136 filter: Option<SuiObjectDataFilter>,
137 ) -> StateReadResult<Vec<ObjectInfo>>;
138
139 async fn get_transactions(
140 &self,
141 kv_store: &Arc<TransactionKeyValueStore>,
142 filter: Option<TransactionFilter>,
143 cursor: Option<TransactionDigest>,
144 limit: Option<usize>,
145 reverse: bool,
146 ) -> StateReadResult<Vec<TransactionDigest>>;
147
148 fn get_dynamic_field_object_id(
149 &self,
150 owner: ObjectID,
151 name_type: TypeTag,
152 name_bcs_bytes: &[u8],
153 ) -> StateReadResult<Option<ObjectID>>;
154
155 async fn get_staked_sui(&self, owner: SuiAddress) -> StateReadResult<Vec<StakedSui>>;
157 fn get_system_state(&self) -> StateReadResult<SuiSystemState>;
158 fn get_or_latest_committee(&self, epoch: Option<BigInt<u64>>) -> StateReadResult<Committee>;
159
160 fn get_bridge(&self) -> StateReadResult<Bridge>;
162
163 fn find_publish_txn_digest(&self, package_id: ObjectID) -> StateReadResult<TransactionDigest>;
165 fn get_owned_coins(
166 &self,
167 owner: SuiAddress,
168 cursor: (String, u64, ObjectID),
169 limit: usize,
170 one_coin_type_only: bool,
171 ) -> StateReadResult<Vec<SuiCoin>>;
172 async fn get_executed_transaction_and_effects(
173 &self,
174 digest: TransactionDigest,
175 kv_store: Arc<TransactionKeyValueStore>,
176 ) -> StateReadResult<(Transaction, TransactionEffects)>;
177 async fn get_balance(
178 &self,
179 owner: SuiAddress,
180 coin_type: TypeTag,
181 ) -> StateReadResult<TotalBalance>;
182 async fn get_all_balance(
183 &self,
184 owner: SuiAddress,
185 ) -> StateReadResult<Arc<HashMap<TypeTag, TotalBalance>>>;
186
187 fn get_verified_checkpoint_by_sequence_number(
189 &self,
190 sequence_number: CheckpointSequenceNumber,
191 ) -> StateReadResult<VerifiedCheckpoint>;
192
193 fn get_checkpoint_contents(
194 &self,
195 digest: CheckpointContentsDigest,
196 ) -> StateReadResult<CheckpointContents>;
197
198 fn get_verified_checkpoint_summary_by_digest(
199 &self,
200 digest: CheckpointDigest,
201 ) -> StateReadResult<VerifiedCheckpoint>;
202
203 fn deprecated_multi_get_transaction_checkpoint(
204 &self,
205 digests: &[TransactionDigest],
206 ) -> StateReadResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>>;
207
208 fn deprecated_get_transaction_checkpoint(
209 &self,
210 digest: &TransactionDigest,
211 ) -> StateReadResult<Option<(EpochId, CheckpointSequenceNumber)>>;
212
213 fn multi_get_checkpoint_by_sequence_number(
214 &self,
215 sequence_numbers: &[CheckpointSequenceNumber],
216 ) -> StateReadResult<Vec<Option<VerifiedCheckpoint>>>;
217
218 fn get_total_transaction_blocks(&self) -> StateReadResult<u64>;
219
220 fn get_checkpoint_by_sequence_number(
221 &self,
222 sequence_number: CheckpointSequenceNumber,
223 ) -> StateReadResult<Option<VerifiedCheckpoint>>;
224
225 fn get_latest_checkpoint_sequence_number(&self) -> StateReadResult<CheckpointSequenceNumber>;
226
227 fn get_chain_identifier(&self) -> StateReadResult<ChainIdentifier>;
228}
229
230#[async_trait]
231impl StateRead for AuthorityState {
232 async fn multi_get(
233 &self,
234 transactions: &[TransactionDigest],
235 effects: &[TransactionDigest],
236 ) -> StateReadResult<KVStoreTransactionData> {
237 Ok(
238 <AuthorityState as TransactionKeyValueStoreTrait>::multi_get(
239 self,
240 transactions,
241 effects,
242 )
243 .await?,
244 )
245 }
246
247 fn get_object_read(&self, object_id: &ObjectID) -> StateReadResult<ObjectRead> {
248 Ok(self.get_object_read(object_id)?)
249 }
250
251 async fn get_object(&self, object_id: &ObjectID) -> StateReadResult<Option<Object>> {
252 Ok(self.get_object(object_id).await)
253 }
254
255 fn get_past_object_read(
256 &self,
257 object_id: &ObjectID,
258 version: SequenceNumber,
259 ) -> StateReadResult<PastObjectRead> {
260 Ok(self.get_past_object_read(object_id, version)?)
261 }
262
263 fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
264 self.load_epoch_store_one_call_per_task()
265 }
266
267 fn get_dynamic_fields(
268 &self,
269 owner: ObjectID,
270 cursor: Option<ObjectID>,
271 limit: usize,
272 ) -> StateReadResult<Vec<(ObjectID, DynamicFieldInfo)>> {
273 Ok(self.get_dynamic_fields(owner, cursor, limit)?)
274 }
275
276 fn get_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
277 self.get_object_cache_reader()
278 }
279
280 fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
281 self.get_object_store()
282 }
283
284 fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
285 self.get_backing_package_store()
286 }
287
288 fn get_owner_objects(
289 &self,
290 owner: SuiAddress,
291 cursor: Option<ObjectID>,
292 filter: Option<SuiObjectDataFilter>,
293 ) -> StateReadResult<Vec<ObjectInfo>> {
294 Ok(self
295 .get_owner_objects_iterator(owner, cursor, filter)?
296 .collect())
297 }
298
299 async fn query_events(
300 &self,
301 kv_store: &Arc<TransactionKeyValueStore>,
302 query: EventFilter,
303 cursor: Option<EventID>,
305 limit: usize,
306 descending: bool,
307 ) -> StateReadResult<Vec<SuiEvent>> {
308 Ok(self
309 .query_events(kv_store, query, cursor, limit, descending)
310 .await?)
311 }
312
313 #[allow(clippy::type_complexity)]
314 async fn dry_exec_transaction(
315 &self,
316 transaction: TransactionData,
317 transaction_digest: TransactionDigest,
318 ) -> StateReadResult<(
319 DryRunTransactionBlockResponse,
320 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
321 TransactionEffects,
322 Option<ObjectID>,
323 )> {
324 Ok(self
325 .dry_exec_transaction(transaction, transaction_digest)
326 .await?)
327 }
328
329 async fn dev_inspect_transaction_block(
330 &self,
331 sender: SuiAddress,
332 transaction_kind: TransactionKind,
333 gas_price: Option<u64>,
334 gas_budget: Option<u64>,
335 gas_sponsor: Option<SuiAddress>,
336 gas_objects: Option<Vec<ObjectRef>>,
337 show_raw_txn_data_and_effects: Option<bool>,
338 skip_checks: Option<bool>,
339 ) -> StateReadResult<DevInspectResults> {
340 Ok(self
341 .dev_inspect_transaction_block(
342 sender,
343 transaction_kind,
344 gas_price,
345 gas_budget,
346 gas_sponsor,
347 gas_objects,
348 show_raw_txn_data_and_effects,
349 skip_checks,
350 )
351 .await?)
352 }
353
354 fn get_subscription_handler(&self) -> Arc<SubscriptionHandler> {
355 self.subscription_handler.clone()
356 }
357
358 fn get_owner_objects_with_limit(
359 &self,
360 owner: SuiAddress,
361 cursor: Option<ObjectID>,
362 limit: usize,
363 filter: Option<SuiObjectDataFilter>,
364 ) -> StateReadResult<Vec<ObjectInfo>> {
365 Ok(self.get_owner_objects(owner, cursor, limit, filter)?)
366 }
367
368 async fn get_transactions(
369 &self,
370 kv_store: &Arc<TransactionKeyValueStore>,
371 filter: Option<TransactionFilter>,
372 cursor: Option<TransactionDigest>,
373 limit: Option<usize>,
374 reverse: bool,
375 ) -> StateReadResult<Vec<TransactionDigest>> {
376 Ok(self
377 .get_transactions(kv_store, filter, cursor, limit, reverse)
378 .await?)
379 }
380
381 fn get_dynamic_field_object_id(
382 &self,
384 owner: ObjectID,
385 name_type: TypeTag,
386 name_bcs_bytes: &[u8],
387 ) -> StateReadResult<Option<ObjectID>> {
388 Ok(self.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)?)
389 }
390
391 async fn get_staked_sui(&self, owner: SuiAddress) -> StateReadResult<Vec<StakedSui>> {
392 Ok(self
393 .get_move_objects(owner, MoveObjectType::staked_sui())
394 .await?)
395 }
396 fn get_system_state(&self) -> StateReadResult<SuiSystemState> {
397 Ok(self
398 .get_object_cache_reader()
399 .get_sui_system_state_object_unsafe()?)
400 }
401 fn get_or_latest_committee(&self, epoch: Option<BigInt<u64>>) -> StateReadResult<Committee> {
402 Ok(self
403 .committee_store()
404 .get_or_latest_committee(epoch.map(|e| *e))?)
405 }
406
407 fn get_bridge(&self) -> StateReadResult<Bridge> {
408 self.get_cache_reader()
409 .get_bridge_object_unsafe()
410 .map_err(|err| err.into())
411 }
412
413 fn find_publish_txn_digest(&self, package_id: ObjectID) -> StateReadResult<TransactionDigest> {
414 Ok(self.find_publish_txn_digest(package_id)?)
415 }
416 fn get_owned_coins(
417 &self,
418 owner: SuiAddress,
419 cursor: (String, u64, ObjectID),
420 limit: usize,
421 one_coin_type_only: bool,
422 ) -> StateReadResult<Vec<SuiCoin>> {
423 Ok(self
424 .get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)?
425 .map(|(key, coin)| SuiCoin {
426 coin_type: key.coin_type,
427 coin_object_id: key.object_id,
428 version: coin.version,
429 digest: coin.digest,
430 balance: coin.balance,
431 previous_transaction: coin.previous_transaction,
432 })
433 .collect())
434 }
435
436 async fn get_executed_transaction_and_effects(
437 &self,
438 digest: TransactionDigest,
439 kv_store: Arc<TransactionKeyValueStore>,
440 ) -> StateReadResult<(Transaction, TransactionEffects)> {
441 Ok(self
442 .get_executed_transaction_and_effects(digest, kv_store)
443 .await?)
444 }
445
446 async fn get_balance(
447 &self,
448 owner: SuiAddress,
449 coin_type: TypeTag,
450 ) -> StateReadResult<TotalBalance> {
451 let indexes = self.indexes.clone();
452 Ok(tokio::task::spawn_blocking(move || {
453 indexes
454 .as_ref()
455 .ok_or(SuiErrorKind::IndexStoreNotAvailable)?
456 .get_balance(owner, coin_type)
457 })
458 .await
459 .map_err(|e: JoinError| {
460 SuiError(Box::new(SuiErrorKind::ExecutionError(e.to_string())))
461 })??)
462 }
463
464 async fn get_all_balance(
465 &self,
466 owner: SuiAddress,
467 ) -> StateReadResult<Arc<HashMap<TypeTag, TotalBalance>>> {
468 let indexes = self.indexes.clone();
469 Ok(tokio::task::spawn_blocking(move || {
470 indexes
471 .as_ref()
472 .ok_or(SuiErrorKind::IndexStoreNotAvailable)?
473 .get_all_balance(owner)
474 })
475 .await
476 .map_err(|e: JoinError| {
477 SuiError(Box::new(SuiErrorKind::ExecutionError(e.to_string())))
478 })??)
479 }
480
481 fn get_verified_checkpoint_by_sequence_number(
482 &self,
483 sequence_number: CheckpointSequenceNumber,
484 ) -> StateReadResult<VerifiedCheckpoint> {
485 Ok(self.get_verified_checkpoint_by_sequence_number(sequence_number)?)
486 }
487
488 fn get_checkpoint_contents(
489 &self,
490 digest: CheckpointContentsDigest,
491 ) -> StateReadResult<CheckpointContents> {
492 Ok(self.get_checkpoint_contents(digest)?)
493 }
494
495 fn get_verified_checkpoint_summary_by_digest(
496 &self,
497 digest: CheckpointDigest,
498 ) -> StateReadResult<VerifiedCheckpoint> {
499 Ok(self.get_verified_checkpoint_summary_by_digest(digest)?)
500 }
501
502 fn deprecated_multi_get_transaction_checkpoint(
503 &self,
504 digests: &[TransactionDigest],
505 ) -> StateReadResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>> {
506 Ok(self
507 .get_checkpoint_cache()
508 .deprecated_multi_get_transaction_checkpoint(digests))
509 }
510
511 fn deprecated_get_transaction_checkpoint(
512 &self,
513 digest: &TransactionDigest,
514 ) -> StateReadResult<Option<(EpochId, CheckpointSequenceNumber)>> {
515 Ok(self
516 .get_checkpoint_cache()
517 .deprecated_get_transaction_checkpoint(digest))
518 }
519
520 fn multi_get_checkpoint_by_sequence_number(
521 &self,
522 sequence_numbers: &[CheckpointSequenceNumber],
523 ) -> StateReadResult<Vec<Option<VerifiedCheckpoint>>> {
524 Ok(self.multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
525 }
526
527 fn get_total_transaction_blocks(&self) -> StateReadResult<u64> {
528 Ok(self.get_total_transaction_blocks()?)
529 }
530
531 fn get_checkpoint_by_sequence_number(
532 &self,
533 sequence_number: CheckpointSequenceNumber,
534 ) -> StateReadResult<Option<VerifiedCheckpoint>> {
535 Ok(self.get_checkpoint_by_sequence_number(sequence_number)?)
536 }
537
538 fn get_latest_checkpoint_sequence_number(&self) -> StateReadResult<CheckpointSequenceNumber> {
539 Ok(self.get_latest_checkpoint_sequence_number()?)
540 }
541
542 fn get_chain_identifier(&self) -> StateReadResult<ChainIdentifier> {
543 Ok(self.get_chain_identifier())
544 }
545}
546
547#[async_trait]
550impl<S: ?Sized + StateRead> ObjectProvider for Arc<S> {
551 type Error = StateReadError;
552
553 async fn get_object(
554 &self,
555 id: &ObjectID,
556 version: &SequenceNumber,
557 ) -> Result<Object, Self::Error> {
558 Ok(self.get_past_object_read(id, *version)?.into_object()?)
559 }
560
561 async fn find_object_lt_or_eq_version(
562 &self,
563 id: &ObjectID,
564 version: &SequenceNumber,
565 ) -> Result<Option<Object>, Self::Error> {
566 Ok(self
567 .get_cache_reader()
568 .find_object_lt_or_eq_version(*id, *version))
569 }
570}
571
572#[async_trait]
573impl<S: ?Sized + StateRead> ObjectProvider for (Arc<S>, Arc<TransactionKeyValueStore>) {
574 type Error = StateReadError;
575
576 async fn get_object(
577 &self,
578 id: &ObjectID,
579 version: &SequenceNumber,
580 ) -> Result<Object, Self::Error> {
581 let object_read = self.0.get_past_object_read(id, *version)?;
582 match object_read {
583 PastObjectRead::ObjectNotExists(_) | PastObjectRead::VersionNotFound(..) => {
584 match self.1.get_object(*id, *version).await? {
585 Some(object) => Ok(object),
586 None => Ok(PastObjectRead::VersionNotFound(*id, *version).into_object()?),
587 }
588 }
589 _ => Ok(object_read.into_object()?),
590 }
591 }
592
593 async fn find_object_lt_or_eq_version(
594 &self,
595 id: &ObjectID,
596 version: &SequenceNumber,
597 ) -> Result<Option<Object>, Self::Error> {
598 Ok(self
599 .0
600 .get_cache_reader()
601 .find_object_lt_or_eq_version(*id, *version))
602 }
603}
604
605#[derive(Debug, Error)]
606pub enum StateReadInternalError {
607 #[error(transparent)]
608 SuiError(#[from] SuiError),
609 #[error(transparent)]
610 JoinError(#[from] JoinError),
611 #[error(transparent)]
612 Anyhow(#[from] anyhow::Error),
613}
614
615impl From<SuiErrorKind> for StateReadInternalError {
616 fn from(e: SuiErrorKind) -> Self {
617 StateReadInternalError::SuiError(SuiError::from(e))
618 }
619}
620
621#[derive(Debug, Error)]
622pub enum StateReadClientError {
623 #[error(transparent)]
624 SuiError(#[from] SuiError),
625 #[error(transparent)]
626 UserInputError(#[from] UserInputError),
627}
628
629impl From<SuiErrorKind> for StateReadClientError {
630 fn from(e: SuiErrorKind) -> Self {
631 StateReadClientError::SuiError(SuiError::from(e))
632 }
633}
634
635#[derive(Debug, Error)]
640pub enum StateReadError {
641 #[error(transparent)]
643 Internal(#[from] StateReadInternalError),
644
645 #[error(transparent)]
647 Client(#[from] StateReadClientError),
648}
649
650impl From<SuiErrorKind> for StateReadError {
651 fn from(e: SuiErrorKind) -> Self {
652 match e {
653 SuiErrorKind::IndexStoreNotAvailable
654 | SuiErrorKind::TransactionNotFound { .. }
655 | SuiErrorKind::UnsupportedFeatureError { .. }
656 | SuiErrorKind::UserInputError { .. }
657 | SuiErrorKind::WrongMessageVersion { .. } => StateReadError::Client(e.into()),
658 _ => StateReadError::Internal(e.into()),
659 }
660 }
661}
662
663impl From<SuiError> for StateReadError {
664 fn from(e: SuiError) -> Self {
665 e.into_inner().into()
666 }
667}
668
669impl From<UserInputError> for StateReadError {
670 fn from(e: UserInputError) -> Self {
671 StateReadError::Client(e.into())
672 }
673}
674
675impl From<JoinError> for StateReadError {
676 fn from(e: JoinError) -> Self {
677 StateReadError::Internal(e.into())
678 }
679}
680
681impl From<anyhow::Error> for StateReadError {
682 fn from(e: anyhow::Error) -> Self {
683 StateReadError::Internal(e.into())
684 }
685}
686
687impl From<TypedStoreError> for StateReadError {
688 fn from(e: TypedStoreError) -> Self {
689 let error: SuiError = e.into();
690 StateReadError::Internal(error.into())
691 }
692}