use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
use crate::authority::authority_store::{ExecutionLockWriteGuard, SuiLockResult};
use crate::authority::epoch_start_configuration::EpochFlag;
use crate::authority::{
authority_notify_read::EffectsNotifyRead, epoch_start_configuration::EpochStartConfiguration,
};
use crate::transaction_outputs::TransactionOutputs;
use async_trait::async_trait;
use futures::{future::BoxFuture, FutureExt};
use prometheus::{register_int_gauge_with_registry, IntGauge, Registry};
use std::collections::HashSet;
use std::path::Path;
use std::sync::Arc;
use sui_protocol_config::ProtocolVersion;
use sui_types::base_types::VerifiedExecutionData;
use sui_types::digests::{TransactionDigest, TransactionEffectsDigest, TransactionEventsDigest};
use sui_types::effects::{TransactionEffects, TransactionEvents};
use sui_types::error::{SuiError, SuiResult, UserInputError};
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use sui_types::object::Object;
use sui_types::storage::{
error::{Error as StorageError, Result as StorageResult},
BackingPackageStore, ChildObjectResolver, MarkerValue, ObjectKey, ObjectOrTombstone,
ObjectStore, PackageObject, ParentSync,
};
use sui_types::sui_system_state::SuiSystemState;
use sui_types::transaction::{VerifiedSignedTransaction, VerifiedTransaction};
use sui_types::{
base_types::{EpochId, ObjectID, ObjectRef, SequenceNumber},
object::Owner,
storage::InputKey,
};
use tracing::instrument;
pub(crate) mod cache_types;
mod object_locks;
pub mod passthrough_cache;
pub mod writeback_cache;
use passthrough_cache::PassthroughCache;
use writeback_cache::WritebackCache;
pub struct ExecutionCacheMetrics {
pending_notify_read: IntGauge,
}
impl ExecutionCacheMetrics {
pub fn new(registry: &Registry) -> Self {
Self {
pending_notify_read: register_int_gauge_with_registry!(
"pending_notify_read",
"Pending notify read requests",
registry,
)
.unwrap(),
}
}
}
pub type ExecutionCache = PassthroughCache;
pub trait ExecutionCacheCommit: Send + Sync {
fn commit_transaction_outputs<'a>(
&'a self,
epoch: EpochId,
digests: &'a [TransactionDigest],
) -> BoxFuture<'a, SuiResult>;
fn persist_transactions<'a>(
&'a self,
digests: &'a [TransactionDigest],
) -> BoxFuture<'a, SuiResult>;
}
pub trait ExecutionCacheRead: Send + Sync {
fn get_package_object(&self, id: &ObjectID) -> SuiResult<Option<PackageObject>>;
fn force_reload_system_packages(&self, system_package_ids: &[ObjectID]);
fn get_object(&self, id: &ObjectID) -> SuiResult<Option<Object>>;
fn get_objects(&self, objects: &[ObjectID]) -> SuiResult<Vec<Option<Object>>> {
let mut ret = Vec::with_capacity(objects.len());
for object_id in objects {
ret.push(self.get_object(object_id)?);
}
Ok(ret)
}
fn get_latest_object_ref_or_tombstone(
&self,
object_id: ObjectID,
) -> SuiResult<Option<ObjectRef>>;
fn get_latest_object_or_tombstone(
&self,
object_id: ObjectID,
) -> SuiResult<Option<(ObjectKey, ObjectOrTombstone)>>;
fn get_object_by_key(
&self,
object_id: &ObjectID,
version: SequenceNumber,
) -> SuiResult<Option<Object>>;
fn multi_get_objects_by_key(&self, object_keys: &[ObjectKey])
-> SuiResult<Vec<Option<Object>>>;
fn object_exists_by_key(
&self,
object_id: &ObjectID,
version: SequenceNumber,
) -> SuiResult<bool>;
fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<bool>>;
fn multi_get_objects_with_more_accurate_error_return(
&self,
object_refs: &[ObjectRef],
) -> Result<Vec<Object>, SuiError> {
let objects = self.multi_get_objects_by_key(
&object_refs.iter().map(ObjectKey::from).collect::<Vec<_>>(),
)?;
let mut result = Vec::new();
for (object_opt, object_ref) in objects.into_iter().zip(object_refs) {
match object_opt {
None => {
let live_objref = self._get_live_objref(object_ref.0)?;
let error = if live_objref.1 >= object_ref.1 {
UserInputError::ObjectVersionUnavailableForConsumption {
provided_obj_ref: *object_ref,
current_version: live_objref.1,
}
} else {
UserInputError::ObjectNotFound {
object_id: object_ref.0,
version: Some(object_ref.1),
}
};
return Err(SuiError::UserInputError { error });
}
Some(object) => {
result.push(object);
}
}
}
assert_eq!(result.len(), object_refs.len());
Ok(result)
}
fn multi_input_objects_available(
&self,
keys: &[InputKey],
receiving_objects: HashSet<InputKey>,
epoch: EpochId,
) -> Result<Vec<bool>, SuiError> {
let (keys_with_version, keys_without_version): (Vec<_>, Vec<_>) = keys
.iter()
.enumerate()
.partition(|(_, key)| key.version().is_some());
let mut versioned_results = vec![];
for ((idx, input_key), has_key) in keys_with_version.iter().zip(
self.multi_object_exists_by_key(
&keys_with_version
.iter()
.map(|(_, k)| ObjectKey(k.id(), k.version().unwrap()))
.collect::<Vec<_>>(),
)?
.into_iter(),
) {
if has_key {
versioned_results.push((*idx, true))
} else if receiving_objects.contains(input_key) {
let is_available = self
.get_object(&input_key.id())?
.map(|obj| obj.version() >= input_key.version().unwrap())
.unwrap_or(false)
|| self.have_deleted_owned_object_at_version_or_after(
&input_key.id(),
input_key.version().unwrap(),
epoch,
)?;
versioned_results.push((*idx, is_available));
} else if self
.get_deleted_shared_object_previous_tx_digest(
&input_key.id(),
input_key.version().unwrap(),
epoch,
)?
.is_some()
{
versioned_results.push((*idx, true));
} else {
versioned_results.push((*idx, false));
}
}
let unversioned_results = keys_without_version.into_iter().map(|(idx, key)| {
(
idx,
match self
.get_latest_object_ref_or_tombstone(key.id())
.expect("read cannot fail")
{
None => false,
Some(entry) => entry.2.is_alive(),
},
)
});
let mut results = versioned_results
.into_iter()
.chain(unversioned_results)
.collect::<Vec<_>>();
results.sort_by_key(|(idx, _)| *idx);
Ok(results.into_iter().map(|(_, result)| result).collect())
}
fn find_object_lt_or_eq_version(
&self,
object_id: ObjectID,
version: SequenceNumber,
) -> SuiResult<Option<Object>>;
fn get_lock(&self, obj_ref: ObjectRef, epoch_store: &AuthorityPerEpochStore) -> SuiLockResult;
fn _get_live_objref(&self, object_id: ObjectID) -> SuiResult<ObjectRef>;
fn check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> SuiResult;
fn multi_get_transaction_blocks(
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<Arc<VerifiedTransaction>>>>;
fn get_transaction_block(
&self,
digest: &TransactionDigest,
) -> SuiResult<Option<Arc<VerifiedTransaction>>> {
self.multi_get_transaction_blocks(&[*digest])
.map(|mut blocks| {
blocks
.pop()
.expect("multi-get must return correct number of items")
})
}
#[instrument(level = "trace", skip_all)]
fn get_transactions_and_serialized_sizes(
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<(VerifiedTransaction, usize)>>> {
let txns = self.multi_get_transaction_blocks(digests)?;
txns.into_iter()
.map(|txn| {
txn.map(|txn| {
match txn.serialized_size() {
Ok(size) => Ok(((*txn).clone(), size)),
Err(e) => Err(e),
}
})
.transpose()
})
.collect::<Result<Vec<_>, _>>()
}
fn multi_get_executed_effects_digests(
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<TransactionEffectsDigest>>>;
fn is_tx_already_executed(&self, digest: &TransactionDigest) -> SuiResult<bool> {
self.multi_get_executed_effects_digests(&[*digest])
.map(|mut digests| {
digests
.pop()
.expect("multi-get must return correct number of items")
.is_some()
})
}
fn multi_get_executed_effects(
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<TransactionEffects>>> {
let effects_digests = self.multi_get_executed_effects_digests(digests)?;
assert_eq!(effects_digests.len(), digests.len());
let mut results = vec![None; digests.len()];
let mut fetch_digests = Vec::with_capacity(digests.len());
let mut fetch_indices = Vec::with_capacity(digests.len());
for (i, digest) in effects_digests.into_iter().enumerate() {
if let Some(digest) = digest {
fetch_digests.push(digest);
fetch_indices.push(i);
}
}
let effects = self.multi_get_effects(&fetch_digests)?;
for (i, effects) in fetch_indices.into_iter().zip(effects.into_iter()) {
results[i] = effects;
}
Ok(results)
}
fn get_executed_effects(
&self,
digest: &TransactionDigest,
) -> SuiResult<Option<TransactionEffects>> {
self.multi_get_executed_effects(&[*digest])
.map(|mut effects| {
effects
.pop()
.expect("multi-get must return correct number of items")
})
}
fn multi_get_effects(
&self,
digests: &[TransactionEffectsDigest],
) -> SuiResult<Vec<Option<TransactionEffects>>>;
fn get_effects(
&self,
digest: &TransactionEffectsDigest,
) -> SuiResult<Option<TransactionEffects>> {
self.multi_get_effects(&[*digest]).map(|mut effects| {
effects
.pop()
.expect("multi-get must return correct number of items")
})
}
fn multi_get_events(
&self,
event_digests: &[TransactionEventsDigest],
) -> SuiResult<Vec<Option<TransactionEvents>>>;
fn get_events(&self, digest: &TransactionEventsDigest) -> SuiResult<Option<TransactionEvents>> {
self.multi_get_events(&[*digest]).map(|mut events| {
events
.pop()
.expect("multi-get must return correct number of items")
})
}
fn notify_read_executed_effects_digests<'a>(
&'a self,
digests: &'a [TransactionDigest],
) -> BoxFuture<'a, SuiResult<Vec<TransactionEffectsDigest>>>;
fn notify_read_executed_effects<'a>(
&'a self,
digests: &'a [TransactionDigest],
) -> BoxFuture<'a, SuiResult<Vec<TransactionEffects>>> {
async move {
let digests = self.notify_read_executed_effects_digests(digests).await?;
self.multi_get_effects(&digests).map(|effects| {
effects
.into_iter()
.map(|e| e.expect("digests must exist"))
.collect()
})
}
.boxed()
}
fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState>;
fn get_marker_value(
&self,
object_id: &ObjectID,
version: SequenceNumber,
epoch_id: EpochId,
) -> SuiResult<Option<MarkerValue>>;
fn get_latest_marker(
&self,
object_id: &ObjectID,
epoch_id: EpochId,
) -> SuiResult<Option<(SequenceNumber, MarkerValue)>>;
fn get_last_shared_object_deletion_info(
&self,
object_id: &ObjectID,
epoch_id: EpochId,
) -> SuiResult<Option<(SequenceNumber, TransactionDigest)>> {
match self.get_latest_marker(object_id, epoch_id)? {
Some((version, MarkerValue::SharedDeleted(digest))) => Ok(Some((version, digest))),
_ => Ok(None),
}
}
fn get_deleted_shared_object_previous_tx_digest(
&self,
object_id: &ObjectID,
version: SequenceNumber,
epoch_id: EpochId,
) -> SuiResult<Option<TransactionDigest>> {
match self.get_marker_value(object_id, version, epoch_id)? {
Some(MarkerValue::SharedDeleted(digest)) => Ok(Some(digest)),
_ => Ok(None),
}
}
fn have_received_object_at_version(
&self,
object_id: &ObjectID,
version: SequenceNumber,
epoch_id: EpochId,
) -> SuiResult<bool> {
match self.get_marker_value(object_id, version, epoch_id)? {
Some(MarkerValue::Received) => Ok(true),
_ => Ok(false),
}
}
fn have_deleted_owned_object_at_version_or_after(
&self,
object_id: &ObjectID,
version: SequenceNumber,
epoch_id: EpochId,
) -> SuiResult<bool> {
match self.get_latest_marker(object_id, epoch_id)? {
Some((marker_version, MarkerValue::OwnedDeleted)) if marker_version >= version => {
Ok(true)
}
_ => Ok(false),
}
}
}
pub trait ExecutionCacheWrite: Send + Sync {
fn write_transaction_outputs(
&self,
epoch_id: EpochId,
tx_outputs: Arc<TransactionOutputs>,
) -> BoxFuture<'_, SuiResult>;
fn acquire_transaction_locks<'a>(
&'a self,
epoch_store: &'a AuthorityPerEpochStore,
owned_input_objects: &'a [ObjectRef],
transaction: VerifiedSignedTransaction,
) -> BoxFuture<'a, SuiResult>;
}
pub trait CheckpointCache: Send + Sync {
fn deprecated_get_transaction_checkpoint(
&self,
digest: &TransactionDigest,
) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>>;
fn deprecated_multi_get_transaction_checkpoint(
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>>;
fn deprecated_insert_finalized_transactions(
&self,
digests: &[TransactionDigest],
epoch: EpochId,
sequence: CheckpointSequenceNumber,
) -> SuiResult;
}
pub trait ExecutionCacheReconfigAPI: Send + Sync {
fn insert_genesis_object(&self, object: Object) -> SuiResult;
fn bulk_insert_genesis_objects(&self, objects: &[Object]) -> SuiResult;
fn revert_state_update(&self, digest: &TransactionDigest) -> SuiResult;
fn set_epoch_start_configuration(
&self,
epoch_start_config: &EpochStartConfiguration,
) -> SuiResult;
fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]);
fn clear_state_end_of_epoch(&self, execution_guard: &ExecutionLockWriteGuard<'_>);
fn expensive_check_sui_conservation(
&self,
old_epoch_store: &AuthorityPerEpochStore,
) -> SuiResult;
fn checkpoint_db(&self, path: &Path) -> SuiResult;
fn maybe_reaccumulate_state_hash(
&self,
cur_epoch_store: &AuthorityPerEpochStore,
new_protocol_version: ProtocolVersion,
);
}
pub trait StateSyncAPI: Send + Sync {
fn insert_transaction_and_effects(
&self,
transaction: &VerifiedTransaction,
transaction_effects: &TransactionEffects,
) -> SuiResult;
fn multi_insert_transaction_and_effects(
&self,
transactions_and_effects: &[VerifiedExecutionData],
) -> SuiResult;
}
pub struct NotifyReadWrapper<T>(Arc<T>);
impl<T> Clone for NotifyReadWrapper<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
#[async_trait]
impl<T: ExecutionCacheRead + 'static> EffectsNotifyRead for NotifyReadWrapper<T> {
async fn notify_read_executed_effects(
&self,
digests: Vec<TransactionDigest>,
) -> SuiResult<Vec<TransactionEffects>> {
self.0.notify_read_executed_effects(&digests).await
}
async fn notify_read_executed_effects_digests(
&self,
digests: Vec<TransactionDigest>,
) -> SuiResult<Vec<TransactionEffectsDigest>> {
self.0.notify_read_executed_effects_digests(&digests).await
}
fn multi_get_executed_effects(
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<TransactionEffects>>> {
self.0.multi_get_executed_effects(digests)
}
}
macro_rules! implement_storage_traits {
($implementor: ident) => {
impl ObjectStore for $implementor {
fn get_object(&self, object_id: &ObjectID) -> StorageResult<Option<Object>> {
ExecutionCacheRead::get_object(self, object_id).map_err(StorageError::custom)
}
fn get_object_by_key(
&self,
object_id: &ObjectID,
version: sui_types::base_types::VersionNumber,
) -> StorageResult<Option<Object>> {
ExecutionCacheRead::get_object_by_key(self, object_id, version)
.map_err(StorageError::custom)
}
}
impl ChildObjectResolver for $implementor {
fn read_child_object(
&self,
parent: &ObjectID,
child: &ObjectID,
child_version_upper_bound: SequenceNumber,
) -> SuiResult<Option<Object>> {
let Some(child_object) =
self.find_object_lt_or_eq_version(*child, child_version_upper_bound)?
else {
return Ok(None);
};
let parent = *parent;
if child_object.owner != Owner::ObjectOwner(parent.into()) {
return Err(SuiError::InvalidChildObjectAccess {
object: *child,
given_parent: parent,
actual_owner: child_object.owner,
});
}
Ok(Some(child_object))
}
fn get_object_received_at_version(
&self,
owner: &ObjectID,
receiving_object_id: &ObjectID,
receive_object_at_version: SequenceNumber,
epoch_id: EpochId,
) -> SuiResult<Option<Object>> {
let Some(recv_object) = ExecutionCacheRead::get_object_by_key(
self,
receiving_object_id,
receive_object_at_version,
)?
else {
return Ok(None);
};
if recv_object.owner != Owner::AddressOwner((*owner).into())
|| self.have_received_object_at_version(
receiving_object_id,
receive_object_at_version,
epoch_id,
)?
{
return Ok(None);
}
Ok(Some(recv_object))
}
}
impl BackingPackageStore for $implementor {
fn get_package_object(
&self,
package_id: &ObjectID,
) -> SuiResult<Option<PackageObject>> {
ExecutionCacheRead::get_package_object(self, package_id)
}
}
impl ParentSync for $implementor {
fn get_latest_parent_entry_ref_deprecated(
&self,
object_id: ObjectID,
) -> SuiResult<Option<ObjectRef>> {
ExecutionCacheRead::get_latest_object_ref_or_tombstone(self, object_id)
}
}
};
}
macro_rules! implement_passthrough_traits {
($implementor: ident) => {
impl CheckpointCache for $implementor {
fn deprecated_get_transaction_checkpoint(
&self,
digest: &TransactionDigest,
) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
self.store.deprecated_get_transaction_checkpoint(digest)
}
fn deprecated_multi_get_transaction_checkpoint(
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>> {
self.store
.deprecated_multi_get_transaction_checkpoint(digests)
}
fn deprecated_insert_finalized_transactions(
&self,
digests: &[TransactionDigest],
epoch: EpochId,
sequence: CheckpointSequenceNumber,
) -> SuiResult {
self.store
.deprecated_insert_finalized_transactions(digests, epoch, sequence)
}
}
impl ExecutionCacheReconfigAPI for $implementor {
fn insert_genesis_object(&self, object: Object) -> SuiResult {
self.store.insert_genesis_object(object)
}
fn bulk_insert_genesis_objects(&self, objects: &[Object]) -> SuiResult {
self.store.bulk_insert_genesis_objects(objects)
}
fn revert_state_update(&self, digest: &TransactionDigest) -> SuiResult {
self.revert_state_update_impl(digest)
}
fn set_epoch_start_configuration(
&self,
epoch_start_config: &EpochStartConfiguration,
) -> SuiResult {
self.store.set_epoch_start_configuration(epoch_start_config)
}
fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]) {
self.store.update_epoch_flags_metrics(old, new)
}
fn clear_state_end_of_epoch(&self, execution_guard: &ExecutionLockWriteGuard<'_>) {
self.clear_state_end_of_epoch_impl(execution_guard)
}
fn expensive_check_sui_conservation(
&self,
old_epoch_store: &AuthorityPerEpochStore,
) -> SuiResult {
self.store
.expensive_check_sui_conservation(self, old_epoch_store)
}
fn checkpoint_db(&self, path: &std::path::Path) -> SuiResult {
self.store.perpetual_tables.checkpoint_db(path)
}
fn maybe_reaccumulate_state_hash(
&self,
cur_epoch_store: &AuthorityPerEpochStore,
new_protocol_version: ProtocolVersion,
) {
self.store
.maybe_reaccumulate_state_hash(cur_epoch_store, new_protocol_version)
}
}
impl StateSyncAPI for $implementor {
fn insert_transaction_and_effects(
&self,
transaction: &VerifiedTransaction,
transaction_effects: &TransactionEffects,
) -> SuiResult {
Ok(self
.store
.insert_transaction_and_effects(transaction, transaction_effects)?)
}
fn multi_insert_transaction_and_effects(
&self,
transactions_and_effects: &[VerifiedExecutionData],
) -> SuiResult {
Ok(self
.store
.multi_insert_transaction_and_effects(transactions_and_effects.iter())?)
}
}
};
}
use implement_passthrough_traits;
implement_storage_traits!(PassthroughCache);
implement_storage_traits!(WritebackCache);
pub trait ExecutionCacheAPI:
ExecutionCacheRead
+ ExecutionCacheWrite
+ ExecutionCacheCommit
+ ExecutionCacheReconfigAPI
+ CheckpointCache
+ StateSyncAPI
{
}