use std::ops::Not;
use std::sync::Arc;
use std::{iter, mem, thread};
use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
use crate::authority::authority_store_pruner::{
AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
};
use crate::authority::authority_store_types::{get_store_object, StoreObject, StoreObjectWrapper};
use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration};
use crate::rpc_index::RpcIndexStore;
use crate::state_accumulator::AccumulatorStore;
use crate::transaction_outputs::TransactionOutputs;
use either::Either;
use fastcrypto::hash::{HashFunction, MultisetHash, Sha3_256};
use futures::stream::FuturesUnordered;
use itertools::izip;
use move_core_types::resolver::ModuleResolver;
use serde::{Deserialize, Serialize};
use sui_config::node::AuthorityStorePruningConfig;
use sui_macros::fail_point_arg;
use sui_storage::mutex_table::{MutexGuard, MutexTable};
use sui_types::accumulator::Accumulator;
use sui_types::digests::TransactionEventsDigest;
use sui_types::error::UserInputError;
use sui_types::execution::TypeLayoutStore;
use sui_types::message_envelope::Message;
use sui_types::storage::{
get_module, BackingPackageStore, FullObjectKey, MarkerValue, ObjectKey, ObjectOrTombstone,
ObjectStore,
};
use sui_types::sui_system_state::get_sui_system_state;
use sui_types::{base_types::SequenceNumber, fp_bail, fp_ensure};
use tokio::time::Instant;
use tracing::{debug, info, trace};
use typed_store::traits::Map;
use typed_store::{
rocks::{DBBatch, DBMap},
TypedStoreError,
};
use super::authority_store_tables::LiveObject;
use super::{authority_store_tables::AuthorityPerpetualTables, *};
use mysten_common::sync::notify_read::NotifyRead;
use sui_types::effects::{TransactionEffects, TransactionEvents};
use sui_types::gas_coin::TOTAL_SUPPLY_MIST;
const NUM_SHARDS: usize = 4096;
struct AuthorityStoreMetrics {
sui_conservation_check_latency: IntGauge,
sui_conservation_live_object_count: IntGauge,
sui_conservation_live_object_size: IntGauge,
sui_conservation_imbalance: IntGauge,
sui_conservation_storage_fund: IntGauge,
sui_conservation_storage_fund_imbalance: IntGauge,
epoch_flags: IntGaugeVec,
}
impl AuthorityStoreMetrics {
pub fn new(registry: &Registry) -> Self {
Self {
sui_conservation_check_latency: register_int_gauge_with_registry!(
"sui_conservation_check_latency",
"Number of seconds took to scan all live objects in the store for SUI conservation check",
registry,
).unwrap(),
sui_conservation_live_object_count: register_int_gauge_with_registry!(
"sui_conservation_live_object_count",
"Number of live objects in the store",
registry,
).unwrap(),
sui_conservation_live_object_size: register_int_gauge_with_registry!(
"sui_conservation_live_object_size",
"Size in bytes of live objects in the store",
registry,
).unwrap(),
sui_conservation_imbalance: register_int_gauge_with_registry!(
"sui_conservation_imbalance",
"Total amount of SUI in the network - 10B * 10^9. This delta shows the amount of imbalance",
registry,
).unwrap(),
sui_conservation_storage_fund: register_int_gauge_with_registry!(
"sui_conservation_storage_fund",
"Storage Fund pool balance (only includes the storage fund proper that represents object storage)",
registry,
).unwrap(),
sui_conservation_storage_fund_imbalance: register_int_gauge_with_registry!(
"sui_conservation_storage_fund_imbalance",
"Imbalance of storage fund, computed with storage_fund_balance - total_object_storage_rebates",
registry,
).unwrap(),
epoch_flags: register_int_gauge_vec_with_registry!(
"epoch_flags",
"Local flags of the currently running epoch",
&["flag"],
registry,
).unwrap(),
}
}
}
pub struct AuthorityStore {
mutex_table: MutexTable<ObjectDigest>,
pub(crate) perpetual_tables: Arc<AuthorityPerpetualTables>,
pub(crate) root_state_notify_read: NotifyRead<EpochId, (CheckpointSequenceNumber, Accumulator)>,
enable_epoch_sui_conservation_check: bool,
metrics: AuthorityStoreMetrics,
}
pub type ExecutionLockReadGuard<'a> = tokio::sync::RwLockReadGuard<'a, EpochId>;
pub type ExecutionLockWriteGuard<'a> = tokio::sync::RwLockWriteGuard<'a, EpochId>;
impl AuthorityStore {
pub async fn open(
perpetual_tables: Arc<AuthorityPerpetualTables>,
genesis: &Genesis,
config: &NodeConfig,
registry: &Registry,
) -> SuiResult<Arc<Self>> {
let enable_epoch_sui_conservation_check = config
.expensive_safety_check_config
.enable_epoch_sui_conservation_check();
let epoch_start_configuration = if perpetual_tables.database_is_empty()? {
info!("Creating new epoch start config from genesis");
#[allow(unused_mut)]
let mut initial_epoch_flags = EpochFlag::default_flags_for_new_epoch(config);
fail_point_arg!("initial_epoch_flags", |flags: Vec<EpochFlag>| {
info!("Setting initial epoch flags to {:?}", flags);
initial_epoch_flags = flags;
});
let epoch_start_configuration = EpochStartConfiguration::new(
genesis.sui_system_object().into_epoch_start_state(),
*genesis.checkpoint().digest(),
&genesis.objects(),
initial_epoch_flags,
)?;
perpetual_tables.set_epoch_start_configuration(&epoch_start_configuration)?;
epoch_start_configuration
} else {
info!("Loading epoch start config from DB");
perpetual_tables
.epoch_start_configuration
.get(&())?
.expect("Epoch start configuration must be set in non-empty DB")
};
let cur_epoch = perpetual_tables.get_recovery_epoch_at_restart()?;
info!("Epoch start config: {:?}", epoch_start_configuration);
info!("Cur epoch: {:?}", cur_epoch);
let this = Self::open_inner(
genesis,
perpetual_tables,
enable_epoch_sui_conservation_check,
registry,
)
.await?;
this.update_epoch_flags_metrics(&[], epoch_start_configuration.flags());
Ok(this)
}
pub fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]) {
for flag in old {
self.metrics
.epoch_flags
.with_label_values(&[&flag.to_string()])
.set(0);
}
for flag in new {
self.metrics
.epoch_flags
.with_label_values(&[&flag.to_string()])
.set(1);
}
}
pub fn clear_object_per_epoch_marker_table(
&self,
_execution_guard: &ExecutionLockWriteGuard<'_>,
) -> SuiResult<()> {
self.perpetual_tables
.object_per_epoch_marker_table
.schedule_delete_all()?;
Ok(self
.perpetual_tables
.object_per_epoch_marker_table_v2
.schedule_delete_all()?)
}
pub async fn open_with_committee_for_testing(
perpetual_tables: Arc<AuthorityPerpetualTables>,
committee: &Committee,
genesis: &Genesis,
) -> SuiResult<Arc<Self>> {
assert_eq!(committee.epoch, 0);
Self::open_inner(genesis, perpetual_tables, true, &Registry::new()).await
}
async fn open_inner(
genesis: &Genesis,
perpetual_tables: Arc<AuthorityPerpetualTables>,
enable_epoch_sui_conservation_check: bool,
registry: &Registry,
) -> SuiResult<Arc<Self>> {
let store = Arc::new(Self {
mutex_table: MutexTable::new(NUM_SHARDS),
perpetual_tables,
root_state_notify_read:
NotifyRead::<EpochId, (CheckpointSequenceNumber, Accumulator)>::new(),
enable_epoch_sui_conservation_check,
metrics: AuthorityStoreMetrics::new(registry),
});
if store
.database_is_empty()
.expect("Database read should not fail at init.")
{
store
.bulk_insert_genesis_objects(genesis.objects())
.expect("Cannot bulk insert genesis objects");
let transaction = VerifiedTransaction::new_unchecked(genesis.transaction().clone());
store
.perpetual_tables
.transactions
.insert(transaction.digest(), transaction.serializable_ref())
.unwrap();
store
.perpetual_tables
.effects
.insert(&genesis.effects().digest(), genesis.effects())
.unwrap();
if genesis.effects().events_digest().is_some() {
store
.perpetual_tables
.events_2
.insert(transaction.digest(), genesis.events())
.unwrap();
}
let event_digests = genesis.events().digest();
let events = genesis
.events()
.data
.iter()
.enumerate()
.map(|(i, e)| ((event_digests, i), e));
store.perpetual_tables.events.multi_insert(events).unwrap();
}
Ok(store)
}
pub fn open_no_genesis(
perpetual_tables: Arc<AuthorityPerpetualTables>,
enable_epoch_sui_conservation_check: bool,
registry: &Registry,
) -> SuiResult<Arc<Self>> {
let store = Arc::new(Self {
mutex_table: MutexTable::new(NUM_SHARDS),
perpetual_tables,
root_state_notify_read:
NotifyRead::<EpochId, (CheckpointSequenceNumber, Accumulator)>::new(),
enable_epoch_sui_conservation_check,
metrics: AuthorityStoreMetrics::new(registry),
});
Ok(store)
}
pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
self.perpetual_tables.get_recovery_epoch_at_restart()
}
pub fn get_effects(
&self,
effects_digest: &TransactionEffectsDigest,
) -> SuiResult<Option<TransactionEffects>> {
Ok(self.perpetual_tables.effects.get(effects_digest)?)
}
pub fn effects_exists(&self, effects_digest: &TransactionEffectsDigest) -> SuiResult<bool> {
self.perpetual_tables
.effects
.contains_key(effects_digest)
.map_err(|e| e.into())
}
pub fn get_events(
&self,
digest: &TransactionDigest,
) -> Result<Option<TransactionEvents>, TypedStoreError> {
if let Some(events) = self.perpetual_tables.events_2.get(digest)? {
return Ok(Some(events));
}
self.get_executed_effects(digest)?
.and_then(|effects| effects.events_digest().copied())
.and_then(|events_digest| self.get_events_by_events_digest(&events_digest).transpose())
.transpose()
}
pub fn get_events_by_events_digest(
&self,
event_digest: &TransactionEventsDigest,
) -> Result<Option<TransactionEvents>, TypedStoreError> {
let data = self
.perpetual_tables
.events
.safe_range_iter((*event_digest, 0)..=(*event_digest, usize::MAX))
.map_ok(|(_, event)| event)
.collect::<Result<Vec<_>, TypedStoreError>>()?;
Ok(data.is_empty().not().then_some(TransactionEvents { data }))
}
pub fn multi_get_events(
&self,
event_digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<TransactionEvents>>> {
Ok(event_digests
.iter()
.map(|digest| self.get_events(digest))
.collect::<Result<Vec<_>, _>>()?)
}
pub fn multi_get_effects<'a>(
&self,
effects_digests: impl Iterator<Item = &'a TransactionEffectsDigest>,
) -> SuiResult<Vec<Option<TransactionEffects>>> {
Ok(self.perpetual_tables.effects.multi_get(effects_digests)?)
}
pub fn get_executed_effects(
&self,
tx_digest: &TransactionDigest,
) -> Result<Option<TransactionEffects>, TypedStoreError> {
let effects_digest = self.perpetual_tables.executed_effects.get(tx_digest)?;
match effects_digest {
Some(digest) => Ok(self.perpetual_tables.effects.get(&digest)?),
None => Ok(None),
}
}
pub fn multi_get_executed_effects_digests(
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<TransactionEffectsDigest>>> {
Ok(self.perpetual_tables.executed_effects.multi_get(digests)?)
}
pub fn multi_get_executed_effects(
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<TransactionEffects>>> {
let executed_effects_digests = self.perpetual_tables.executed_effects.multi_get(digests)?;
let effects = self.multi_get_effects(executed_effects_digests.iter().flatten())?;
let mut tx_to_effects_map = effects
.into_iter()
.flatten()
.map(|effects| (*effects.transaction_digest(), effects))
.collect::<HashMap<_, _>>();
Ok(digests
.iter()
.map(|digest| tx_to_effects_map.remove(digest))
.collect())
}
pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> SuiResult<bool> {
Ok(self
.perpetual_tables
.executed_effects
.contains_key(digest)?)
}
pub fn get_marker_value(
&self,
object_key: FullObjectKey,
epoch_id: EpochId,
) -> SuiResult<Option<MarkerValue>> {
Ok(self
.perpetual_tables
.object_per_epoch_marker_table_v2
.get(&(epoch_id, object_key))?)
}
pub fn get_latest_marker(
&self,
object_id: FullObjectID,
epoch_id: EpochId,
) -> SuiResult<Option<(SequenceNumber, MarkerValue)>> {
let min_key = (epoch_id, FullObjectKey::min_for_id(&object_id));
let max_key = (epoch_id, FullObjectKey::max_for_id(&object_id));
let marker_entry = self
.perpetual_tables
.object_per_epoch_marker_table_v2
.reversed_safe_iter_with_bounds(Some(min_key), Some(max_key))?
.next();
match marker_entry {
Some(Ok(((epoch, key), marker))) => {
assert_eq!(epoch, epoch_id);
assert_eq!(key.id(), object_id);
Ok(Some((key.version(), marker)))
}
Some(Err(e)) => Err(e.into()),
None => Ok(None),
}
}
pub async fn notify_read_root_state_hash(
&self,
epoch: EpochId,
) -> SuiResult<(CheckpointSequenceNumber, Accumulator)> {
let registration = self.root_state_notify_read.register_one(&epoch);
let hash = self.perpetual_tables.root_state_hash_by_epoch.get(&epoch)?;
let result = match hash {
Some(ready) => Either::Left(futures::future::ready(ready)),
None => Either::Right(registration),
}
.await;
Ok(result)
}
pub fn deprecated_insert_finalized_transactions(
&self,
digests: &[TransactionDigest],
epoch: EpochId,
sequence: CheckpointSequenceNumber,
) -> SuiResult {
let mut batch = self
.perpetual_tables
.executed_transactions_to_checkpoint
.batch();
batch.insert_batch(
&self.perpetual_tables.executed_transactions_to_checkpoint,
digests.iter().map(|d| (*d, (epoch, sequence))),
)?;
batch.write()?;
trace!("Transactions {digests:?} finalized at checkpoint {sequence} epoch {epoch}");
Ok(())
}
pub fn deprecated_get_transaction_checkpoint(
&self,
digest: &TransactionDigest,
) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
Ok(self
.perpetual_tables
.executed_transactions_to_checkpoint
.get(digest)?)
}
pub fn deprecated_multi_get_transaction_checkpoint(
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>> {
Ok(self
.perpetual_tables
.executed_transactions_to_checkpoint
.multi_get(digests)?
.into_iter()
.collect())
}
pub fn database_is_empty(&self) -> SuiResult<bool> {
self.perpetual_tables.database_is_empty()
}
fn acquire_locks(&self, input_objects: &[ObjectRef]) -> Vec<MutexGuard> {
self.mutex_table
.acquire_locks(input_objects.iter().map(|(_, _, digest)| *digest))
}
pub fn object_exists_by_key(
&self,
object_id: &ObjectID,
version: VersionNumber,
) -> SuiResult<bool> {
Ok(self
.perpetual_tables
.objects
.contains_key(&ObjectKey(*object_id, version))?)
}
pub fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<bool>> {
Ok(self
.perpetual_tables
.objects
.multi_contains_keys(object_keys.to_vec())?
.into_iter()
.collect())
}
fn get_object_ref_prior_to_key(
&self,
object_id: &ObjectID,
version: VersionNumber,
) -> Result<Option<ObjectRef>, SuiError> {
let Some(prior_version) = version.one_before() else {
return Ok(None);
};
let mut iterator = self
.perpetual_tables
.objects
.reversed_safe_iter_with_bounds(
Some(ObjectKey::min_for_id(object_id)),
Some(ObjectKey(*object_id, prior_version)),
)?;
if let Some((object_key, value)) = iterator.next().transpose()? {
if object_key.0 == *object_id {
return Ok(Some(
self.perpetual_tables.object_reference(&object_key, value)?,
));
}
}
Ok(None)
}
pub fn multi_get_objects_by_key(
&self,
object_keys: &[ObjectKey],
) -> Result<Vec<Option<Object>>, SuiError> {
let wrappers = self
.perpetual_tables
.objects
.multi_get(object_keys.to_vec())?;
let mut ret = vec![];
for (idx, w) in wrappers.into_iter().enumerate() {
ret.push(
w.map(|object| self.perpetual_tables.object(&object_keys[idx], object))
.transpose()?
.flatten(),
);
}
Ok(ret)
}
pub fn get_objects(&self, objects: &[ObjectID]) -> Result<Vec<Option<Object>>, SuiError> {
let mut result = Vec::new();
for id in objects {
result.push(self.get_object(id));
}
Ok(result)
}
pub fn have_deleted_owned_object_at_version_or_after(
&self,
object_id: &ObjectID,
version: VersionNumber,
epoch_id: EpochId,
) -> Result<bool, SuiError> {
let object_key = ObjectKey::max_for_id(object_id);
let marker_key = (epoch_id, object_key);
let marker_entry = self
.perpetual_tables
.object_per_epoch_marker_table
.reversed_safe_iter_with_bounds(None, Some(marker_key))?
.next();
match marker_entry.transpose()? {
Some(((epoch, key), marker)) => {
let object_data_ok = key.0 == *object_id && key.1 >= version;
let epoch_data_ok = epoch == epoch_id;
let mark_data_ok = marker == MarkerValue::OwnedDeleted;
Ok(object_data_ok && epoch_data_ok && mark_data_ok)
}
None => Ok(false),
}
}
pub(crate) fn insert_genesis_object(&self, object: Object) -> SuiResult {
debug_assert!(object.previous_transaction == TransactionDigest::genesis_marker());
let object_ref = object.compute_object_reference();
self.insert_object_direct(object_ref, &object)
}
fn insert_object_direct(&self, object_ref: ObjectRef, object: &Object) -> SuiResult {
let mut write_batch = self.perpetual_tables.objects.batch();
let store_object = get_store_object(object.clone());
write_batch.insert_batch(
&self.perpetual_tables.objects,
std::iter::once((ObjectKey::from(object_ref), store_object)),
)?;
if object.get_single_owner().is_some() {
if !object.is_child_object() {
self.initialize_live_object_markers_impl(&mut write_batch, &[object_ref], false)?;
}
}
write_batch.write()?;
Ok(())
}
#[instrument(level = "debug", skip_all)]
pub(crate) fn bulk_insert_genesis_objects(&self, objects: &[Object]) -> SuiResult<()> {
let mut batch = self.perpetual_tables.objects.batch();
let ref_and_objects: Vec<_> = objects
.iter()
.map(|o| (o.compute_object_reference(), o))
.collect();
batch.insert_batch(
&self.perpetual_tables.objects,
ref_and_objects
.iter()
.map(|(oref, o)| (ObjectKey::from(oref), get_store_object((*o).clone()))),
)?;
let non_child_object_refs: Vec<_> = ref_and_objects
.iter()
.filter(|(_, object)| !object.is_child_object())
.map(|(oref, _)| *oref)
.collect();
self.initialize_live_object_markers_impl(
&mut batch,
&non_child_object_refs,
false, )?;
batch.write()?;
Ok(())
}
pub fn bulk_insert_live_objects(
perpetual_db: &AuthorityPerpetualTables,
live_objects: impl Iterator<Item = LiveObject>,
expected_sha3_digest: &[u8; 32],
) -> SuiResult<()> {
let mut hasher = Sha3_256::default();
let mut batch = perpetual_db.objects.batch();
for object in live_objects {
hasher.update(object.object_reference().2.inner());
match object {
LiveObject::Normal(object) => {
let store_object_wrapper = get_store_object(object.clone());
batch.insert_batch(
&perpetual_db.objects,
std::iter::once((
ObjectKey::from(object.compute_object_reference()),
store_object_wrapper,
)),
)?;
if !object.is_child_object() {
Self::initialize_live_object_markers(
&perpetual_db.live_owned_object_markers,
&mut batch,
&[object.compute_object_reference()],
false, )?;
}
}
LiveObject::Wrapped(object_key) => {
batch.insert_batch(
&perpetual_db.objects,
std::iter::once::<(ObjectKey, StoreObjectWrapper)>((
object_key,
StoreObject::Wrapped.into(),
)),
)?;
}
}
}
let sha3_digest = hasher.finalize().digest;
if *expected_sha3_digest != sha3_digest {
error!(
"Sha does not match! expected: {:?}, actual: {:?}",
expected_sha3_digest, sha3_digest
);
return Err(SuiError::from("Sha does not match"));
}
batch.write()?;
Ok(())
}
pub fn set_epoch_start_configuration(
&self,
epoch_start_configuration: &EpochStartConfiguration,
) -> SuiResult {
self.perpetual_tables
.set_epoch_start_configuration(epoch_start_configuration)?;
Ok(())
}
pub fn get_epoch_start_configuration(&self) -> SuiResult<Option<EpochStartConfiguration>> {
Ok(self.perpetual_tables.epoch_start_configuration.get(&())?)
}
#[instrument(level = "debug", skip_all)]
pub fn build_db_batch(
&self,
epoch_id: EpochId,
tx_outputs: &[Arc<TransactionOutputs>],
) -> SuiResult<DBBatch> {
let mut written = Vec::with_capacity(tx_outputs.len());
for outputs in tx_outputs {
written.extend(outputs.written.values().cloned());
}
let mut write_batch = self.perpetual_tables.transactions.batch();
for outputs in tx_outputs {
self.write_one_transaction_outputs(&mut write_batch, epoch_id, outputs)?;
}
fail_point!("crash");
trace!(
"built batch for committed transactions: {:?}",
tx_outputs
.iter()
.map(|tx| tx.transaction.digest())
.collect::<Vec<_>>()
);
fail_point!("crash");
Ok(write_batch)
}
fn write_one_transaction_outputs(
&self,
write_batch: &mut DBBatch,
epoch_id: EpochId,
tx_outputs: &TransactionOutputs,
) -> SuiResult {
let TransactionOutputs {
transaction,
effects,
markers,
wrapped,
deleted,
written,
events,
locks_to_delete,
new_locks_to_init,
..
} = tx_outputs;
let transaction_digest = transaction.digest();
write_batch.insert_batch(
&self.perpetual_tables.transactions,
iter::once((transaction_digest, transaction.serializable_ref())),
)?;
write_batch.insert_batch(
&self.perpetual_tables.object_per_epoch_marker_table_v2,
markers
.iter()
.map(|(key, marker_value)| ((epoch_id, *key), *marker_value)),
)?;
write_batch.insert_batch(
&self.perpetual_tables.objects,
deleted
.iter()
.map(|key| (key, StoreObject::Deleted))
.chain(wrapped.iter().map(|key| (key, StoreObject::Wrapped)))
.map(|(key, store_object)| (key, StoreObjectWrapper::from(store_object))),
)?;
let new_objects = written.iter().map(|(id, new_object)| {
let version = new_object.version();
trace!(?id, ?version, "writing object");
let store_object = get_store_object(new_object.clone());
(ObjectKey(*id, version), store_object)
});
write_batch.insert_batch(&self.perpetual_tables.objects, new_objects)?;
if effects.events_digest().is_some() {
write_batch.insert_batch(
&self.perpetual_tables.events_2,
[(transaction_digest, events)],
)?;
}
let event_digest = events.digest();
let events = events
.data
.iter()
.enumerate()
.map(|(i, e)| ((event_digest, i), e));
write_batch.insert_batch(&self.perpetual_tables.events, events)?;
self.initialize_live_object_markers_impl(write_batch, new_locks_to_init, false)?;
self.delete_live_object_markers(write_batch, locks_to_delete)?;
let effects_digest = effects.digest();
write_batch
.insert_batch(
&self.perpetual_tables.effects,
[(effects_digest, effects.clone())],
)?
.insert_batch(
&self.perpetual_tables.executed_effects,
[(transaction_digest, effects_digest)],
)?;
debug!(effects_digest = ?effects.digest(), "commit_certificate finished");
Ok(())
}
pub(crate) fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) -> SuiResult {
let mut batch = self.perpetual_tables.transactions.batch();
batch.insert_batch(
&self.perpetual_tables.transactions,
[(tx.digest(), tx.clone().into_unsigned().serializable_ref())],
)?;
batch.write()?;
Ok(())
}
pub fn acquire_transaction_locks(
&self,
epoch_store: &AuthorityPerEpochStore,
owned_input_objects: &[ObjectRef],
tx_digest: TransactionDigest,
signed_transaction: Option<VerifiedSignedTransaction>,
) -> SuiResult {
let epoch = epoch_store.epoch();
let _mutexes = self.acquire_locks(owned_input_objects);
trace!(?owned_input_objects, "acquire_locks");
let mut locks_to_write = Vec::new();
let live_object_markers = self
.perpetual_tables
.live_owned_object_markers
.multi_get(owned_input_objects)?;
let epoch_tables = epoch_store.tables()?;
let locks = epoch_tables.multi_get_locked_transactions(owned_input_objects)?;
assert_eq!(locks.len(), live_object_markers.len());
for (live_marker, lock, obj_ref) in izip!(
live_object_markers.into_iter(),
locks.into_iter(),
owned_input_objects
) {
let Some(live_marker) = live_marker else {
let latest_lock = self.get_latest_live_version_for_object_id(obj_ref.0)?;
fp_bail!(UserInputError::ObjectVersionUnavailableForConsumption {
provided_obj_ref: *obj_ref,
current_version: latest_lock.1
}
.into());
};
let live_marker = live_marker.map(|l| l.migrate().into_inner());
if let Some(LockDetailsDeprecated {
epoch: previous_epoch,
..
}) = &live_marker
{
assert!(
previous_epoch < &epoch,
"lock for {:?} should be from a prior epoch",
obj_ref
);
}
if let Some(previous_tx_digest) = &lock {
if previous_tx_digest == &tx_digest {
continue;
} else {
info!(prev_tx_digest = ?previous_tx_digest,
cur_tx_digest = ?tx_digest,
"Cannot acquire lock: conflicting transaction!");
return Err(SuiError::ObjectLockConflict {
obj_ref: *obj_ref,
pending_transaction: *previous_tx_digest,
});
}
}
locks_to_write.push((*obj_ref, tx_digest));
}
if !locks_to_write.is_empty() {
trace!(?locks_to_write, "Writing locks");
epoch_tables.write_transaction_locks(signed_transaction, locks_to_write.into_iter())?;
}
Ok(())
}
pub(crate) fn get_lock(
&self,
obj_ref: ObjectRef,
epoch_store: &AuthorityPerEpochStore,
) -> SuiLockResult {
if self
.perpetual_tables
.live_owned_object_markers
.get(&obj_ref)?
.is_none()
{
return Ok(ObjectLockStatus::LockedAtDifferentVersion {
locked_ref: self.get_latest_live_version_for_object_id(obj_ref.0)?,
});
}
let tables = epoch_store.tables()?;
let epoch_id = epoch_store.epoch();
if let Some(tx_digest) = tables.get_locked_transaction(&obj_ref)? {
Ok(ObjectLockStatus::LockedToTx {
locked_by_tx: LockDetailsDeprecated {
epoch: epoch_id,
tx_digest,
},
})
} else {
Ok(ObjectLockStatus::Initialized)
}
}
pub(crate) fn get_latest_live_version_for_object_id(
&self,
object_id: ObjectID,
) -> SuiResult<ObjectRef> {
let mut iterator = self
.perpetual_tables
.live_owned_object_markers
.reversed_safe_iter_with_bounds(
None,
Some((object_id, SequenceNumber::MAX, ObjectDigest::MAX)),
)?;
Ok(iterator
.next()
.transpose()?
.and_then(|value| {
if value.0 .0 == object_id {
Some(value)
} else {
None
}
})
.ok_or_else(|| {
SuiError::from(UserInputError::ObjectNotFound {
object_id,
version: None,
})
})?
.0)
}
pub fn check_owned_objects_are_live(&self, objects: &[ObjectRef]) -> SuiResult {
let locks = self
.perpetual_tables
.live_owned_object_markers
.multi_get(objects)?;
for (lock, obj_ref) in locks.into_iter().zip(objects) {
if lock.is_none() {
let latest_lock = self.get_latest_live_version_for_object_id(obj_ref.0)?;
fp_bail!(UserInputError::ObjectVersionUnavailableForConsumption {
provided_obj_ref: *obj_ref,
current_version: latest_lock.1
}
.into());
}
}
Ok(())
}
fn initialize_live_object_markers_impl(
&self,
write_batch: &mut DBBatch,
objects: &[ObjectRef],
is_force_reset: bool,
) -> SuiResult {
AuthorityStore::initialize_live_object_markers(
&self.perpetual_tables.live_owned_object_markers,
write_batch,
objects,
is_force_reset,
)
}
pub fn initialize_live_object_markers(
live_object_marker_table: &DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
write_batch: &mut DBBatch,
objects: &[ObjectRef],
is_force_reset: bool,
) -> SuiResult {
trace!(?objects, "initialize_locks");
let live_object_markers = live_object_marker_table.multi_get(objects)?;
if !is_force_reset {
let existing_live_object_markers: Vec<ObjectRef> = live_object_markers
.iter()
.zip(objects)
.filter_map(|(lock_opt, objref)| {
lock_opt.clone().flatten().map(|_tx_digest| *objref)
})
.collect();
if !existing_live_object_markers.is_empty() {
info!(
?existing_live_object_markers,
"Cannot initialize live_object_markers because some exist already"
);
return Err(SuiError::ObjectLockAlreadyInitialized {
refs: existing_live_object_markers,
});
}
}
write_batch.insert_batch(
live_object_marker_table,
objects.iter().map(|obj_ref| (obj_ref, None)),
)?;
Ok(())
}
fn delete_live_object_markers(
&self,
write_batch: &mut DBBatch,
objects: &[ObjectRef],
) -> SuiResult {
trace!(?objects, "delete_locks");
write_batch.delete_batch(
&self.perpetual_tables.live_owned_object_markers,
objects.iter(),
)?;
Ok(())
}
#[cfg(test)]
pub(crate) fn reset_locks_for_test(
&self,
transactions: &[TransactionDigest],
objects: &[ObjectRef],
epoch_store: &AuthorityPerEpochStore,
) {
for tx in transactions {
epoch_store.delete_signed_transaction_for_test(tx);
epoch_store.delete_object_locks_for_test(objects);
}
let mut batch = self.perpetual_tables.live_owned_object_markers.batch();
batch
.delete_batch(
&self.perpetual_tables.live_owned_object_markers,
objects.iter(),
)
.unwrap();
batch.write().unwrap();
let mut batch = self.perpetual_tables.live_owned_object_markers.batch();
self.initialize_live_object_markers_impl(&mut batch, objects, false)
.unwrap();
batch.write().unwrap();
}
pub fn revert_state_update(&self, tx_digest: &TransactionDigest) -> SuiResult {
let Some(effects) = self.get_executed_effects(tx_digest)? else {
info!("Not reverting {:?} as it was not executed", tx_digest);
return Ok(());
};
info!(?tx_digest, ?effects, "reverting transaction");
assert!(effects.input_shared_objects().is_empty());
let mut write_batch = self.perpetual_tables.transactions.batch();
write_batch.delete_batch(
&self.perpetual_tables.executed_effects,
iter::once(tx_digest),
)?;
if let Some(events_digest) = effects.events_digest() {
write_batch.delete_batch(&self.perpetual_tables.events_2, [tx_digest])?;
write_batch.schedule_delete_range(
&self.perpetual_tables.events,
&(*events_digest, usize::MIN),
&(*events_digest, usize::MAX),
)?;
}
let tombstones = effects
.all_tombstones()
.into_iter()
.map(|(id, version)| ObjectKey(id, version));
write_batch.delete_batch(&self.perpetual_tables.objects, tombstones)?;
let all_new_object_keys = effects
.all_changed_objects()
.into_iter()
.map(|((id, version, _), _, _)| ObjectKey(id, version));
write_batch.delete_batch(&self.perpetual_tables.objects, all_new_object_keys.clone())?;
let modified_object_keys = effects
.modified_at_versions()
.into_iter()
.map(|(id, version)| ObjectKey(id, version));
macro_rules! get_objects_and_locks {
($object_keys: expr) => {
self.perpetual_tables
.objects
.multi_get($object_keys.clone())?
.into_iter()
.zip($object_keys)
.filter_map(|(obj_opt, key)| {
let obj = self
.perpetual_tables
.object(
&key,
obj_opt.unwrap_or_else(|| {
panic!("Older object version not found: {:?}", key)
}),
)
.expect("Matching indirect object not found")?;
if obj.is_immutable() {
return None;
}
let obj_ref = obj.compute_object_reference();
Some(obj.is_address_owned().then_some(obj_ref))
})
};
}
let old_locks = get_objects_and_locks!(modified_object_keys);
let new_locks = get_objects_and_locks!(all_new_object_keys);
let old_locks: Vec<_> = old_locks.flatten().collect();
self.initialize_live_object_markers_impl(&mut write_batch, &old_locks, true)?;
write_batch.delete_batch(
&self.perpetual_tables.live_owned_object_markers,
new_locks.flatten(),
)?;
write_batch.write()?;
Ok(())
}
pub fn find_object_lt_or_eq_version(
&self,
object_id: ObjectID,
version: SequenceNumber,
) -> SuiResult<Option<Object>> {
self.perpetual_tables
.find_object_lt_or_eq_version(object_id, version)
}
pub fn get_latest_object_ref_or_tombstone(
&self,
object_id: ObjectID,
) -> Result<Option<ObjectRef>, SuiError> {
self.perpetual_tables
.get_latest_object_ref_or_tombstone(object_id)
}
pub fn get_latest_object_ref_if_alive(
&self,
object_id: ObjectID,
) -> Result<Option<ObjectRef>, SuiError> {
match self.get_latest_object_ref_or_tombstone(object_id)? {
Some(objref) if objref.2.is_alive() => Ok(Some(objref)),
_ => Ok(None),
}
}
pub fn get_latest_object_or_tombstone(
&self,
object_id: ObjectID,
) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, SuiError> {
let Some((object_key, store_object)) = self
.perpetual_tables
.get_latest_object_or_tombstone(object_id)?
else {
return Ok(None);
};
if let Some(object_ref) = self
.perpetual_tables
.tombstone_reference(&object_key, &store_object)?
{
return Ok(Some((object_key, ObjectOrTombstone::Tombstone(object_ref))));
}
let object = self
.perpetual_tables
.object(&object_key, store_object)?
.expect("Non tombstone store object could not be converted to object");
Ok(Some((object_key, ObjectOrTombstone::Object(object))))
}
pub fn insert_transaction_and_effects(
&self,
transaction: &VerifiedTransaction,
transaction_effects: &TransactionEffects,
) -> Result<(), TypedStoreError> {
let mut write_batch = self.perpetual_tables.transactions.batch();
write_batch
.insert_batch(
&self.perpetual_tables.transactions,
[(transaction.digest(), transaction.serializable_ref())],
)?
.insert_batch(
&self.perpetual_tables.effects,
[(transaction_effects.digest(), transaction_effects)],
)?;
write_batch.write()?;
Ok(())
}
pub fn multi_insert_transaction_and_effects<'a>(
&self,
transactions: impl Iterator<Item = &'a VerifiedExecutionData>,
) -> Result<(), TypedStoreError> {
let mut write_batch = self.perpetual_tables.transactions.batch();
for tx in transactions {
write_batch
.insert_batch(
&self.perpetual_tables.transactions,
[(tx.transaction.digest(), tx.transaction.serializable_ref())],
)?
.insert_batch(
&self.perpetual_tables.effects,
[(tx.effects.digest(), &tx.effects)],
)?;
}
write_batch.write()?;
Ok(())
}
pub fn multi_get_transaction_blocks(
&self,
tx_digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<VerifiedTransaction>>> {
Ok(self
.perpetual_tables
.transactions
.multi_get(tx_digests)
.map(|v| v.into_iter().map(|v| v.map(|v| v.into())).collect())?)
}
pub fn get_transaction_block(
&self,
tx_digest: &TransactionDigest,
) -> Result<Option<VerifiedTransaction>, TypedStoreError> {
self.perpetual_tables
.transactions
.get(tx_digest)
.map(|v| v.map(|v| v.into()))
}
pub fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState> {
get_sui_system_state(self.perpetual_tables.as_ref())
}
pub fn expensive_check_sui_conservation<T>(
self: &Arc<Self>,
type_layout_store: T,
old_epoch_store: &AuthorityPerEpochStore,
) -> SuiResult
where
T: TypeLayoutStore + Send + Copy,
{
if !self.enable_epoch_sui_conservation_check {
return Ok(());
}
let executor = old_epoch_store.executor();
info!("Starting SUI conservation check. This may take a while..");
let cur_time = Instant::now();
let mut pending_objects = vec![];
let mut count = 0;
let mut size = 0;
let (mut total_sui, mut total_storage_rebate) = thread::scope(|s| {
let pending_tasks = FuturesUnordered::new();
for o in self.iter_live_object_set(false) {
match o {
LiveObject::Normal(object) => {
size += object.object_size_for_gas_metering();
count += 1;
pending_objects.push(object);
if count % 1_000_000 == 0 {
let mut task_objects = vec![];
mem::swap(&mut pending_objects, &mut task_objects);
pending_tasks.push(s.spawn(move || {
let mut layout_resolver =
executor.type_layout_resolver(Box::new(type_layout_store));
let mut total_storage_rebate = 0;
let mut total_sui = 0;
for object in task_objects {
total_storage_rebate += object.storage_rebate;
total_sui +=
object.get_total_sui(layout_resolver.as_mut()).unwrap()
- object.storage_rebate;
}
if count % 50_000_000 == 0 {
info!("Processed {} objects", count);
}
(total_sui, total_storage_rebate)
}));
}
}
LiveObject::Wrapped(_) => {
unreachable!("Explicitly asked to not include wrapped tombstones")
}
}
}
pending_tasks.into_iter().fold((0, 0), |init, result| {
let result = result.join().unwrap();
(init.0 + result.0, init.1 + result.1)
})
});
let mut layout_resolver = executor.type_layout_resolver(Box::new(type_layout_store));
for object in pending_objects {
total_storage_rebate += object.storage_rebate;
total_sui +=
object.get_total_sui(layout_resolver.as_mut()).unwrap() - object.storage_rebate;
}
info!(
"Scanned {} live objects, took {:?}",
count,
cur_time.elapsed()
);
self.metrics
.sui_conservation_live_object_count
.set(count as i64);
self.metrics
.sui_conservation_live_object_size
.set(size as i64);
self.metrics
.sui_conservation_check_latency
.set(cur_time.elapsed().as_secs() as i64);
let system_state = self
.get_sui_system_state_object_unsafe()
.expect("Reading sui system state object cannot fail")
.into_sui_system_state_summary();
let storage_fund_balance = system_state.storage_fund_total_object_storage_rebates;
info!(
"Total SUI amount in the network: {}, storage fund balance: {}, total storage rebate: {} at beginning of epoch {}",
total_sui, storage_fund_balance, total_storage_rebate, system_state.epoch
);
let imbalance = (storage_fund_balance as i64) - (total_storage_rebate as i64);
self.metrics
.sui_conservation_storage_fund
.set(storage_fund_balance as i64);
self.metrics
.sui_conservation_storage_fund_imbalance
.set(imbalance);
self.metrics
.sui_conservation_imbalance
.set((total_sui as i128 - TOTAL_SUPPLY_MIST as i128) as i64);
if let Some(expected_imbalance) = self
.perpetual_tables
.expected_storage_fund_imbalance
.get(&())
.expect("DB read cannot fail")
{
fp_ensure!(
imbalance == expected_imbalance,
SuiError::from(
format!(
"Inconsistent state detected at epoch {}: total storage rebate: {}, storage fund balance: {}, expected imbalance: {}",
system_state.epoch, total_storage_rebate, storage_fund_balance, expected_imbalance
).as_str()
)
);
} else {
self.perpetual_tables
.expected_storage_fund_imbalance
.insert(&(), &imbalance)
.expect("DB write cannot fail");
}
if let Some(expected_sui) = self
.perpetual_tables
.expected_network_sui_amount
.get(&())
.expect("DB read cannot fail")
{
fp_ensure!(
total_sui == expected_sui,
SuiError::from(
format!(
"Inconsistent state detected at epoch {}: total sui: {}, expecting {}",
system_state.epoch, total_sui, expected_sui
)
.as_str()
)
);
} else {
self.perpetual_tables
.expected_network_sui_amount
.insert(&(), &total_sui)
.expect("DB write cannot fail");
}
Ok(())
}
#[instrument(level = "error", skip_all)]
pub fn maybe_reaccumulate_state_hash(
&self,
cur_epoch_store: &AuthorityPerEpochStore,
new_protocol_version: ProtocolVersion,
) {
let old_simplified_unwrap_then_delete = cur_epoch_store
.protocol_config()
.simplified_unwrap_then_delete();
let new_simplified_unwrap_then_delete =
ProtocolConfig::get_for_version(new_protocol_version, cur_epoch_store.get_chain())
.simplified_unwrap_then_delete();
let should_reaccumulate =
!old_simplified_unwrap_then_delete && new_simplified_unwrap_then_delete;
if !should_reaccumulate {
return;
}
info!("[Re-accumulate] simplified_unwrap_then_delete is enabled in the new protocol version, re-accumulating state hash");
let cur_time = Instant::now();
std::thread::scope(|s| {
let pending_tasks = FuturesUnordered::new();
const BITS: u8 = 5;
for index in 0u8..(1 << BITS) {
pending_tasks.push(s.spawn(move || {
let mut id_bytes = [0; ObjectID::LENGTH];
id_bytes[0] = index << (8 - BITS);
let start_id = ObjectID::new(id_bytes);
id_bytes[0] |= (1 << (8 - BITS)) - 1;
for element in id_bytes.iter_mut().skip(1) {
*element = u8::MAX;
}
let end_id = ObjectID::new(id_bytes);
info!(
"[Re-accumulate] Scanning object ID range {:?}..{:?}",
start_id, end_id
);
let mut prev = (
ObjectKey::min_for_id(&ObjectID::ZERO),
StoreObjectWrapper::V1(StoreObject::Deleted),
);
let mut object_scanned: u64 = 0;
let mut wrapped_objects_to_remove = vec![];
for db_result in self.perpetual_tables.objects.safe_range_iter(
ObjectKey::min_for_id(&start_id)..=ObjectKey::max_for_id(&end_id),
) {
match db_result {
Ok((object_key, object)) => {
object_scanned += 1;
if object_scanned % 100000 == 0 {
info!(
"[Re-accumulate] Task {}: object scanned: {}",
index, object_scanned,
);
}
if matches!(prev.1.inner(), StoreObject::Wrapped)
&& object_key.0 != prev.0 .0
{
wrapped_objects_to_remove
.push(WrappedObject::new(prev.0 .0, prev.0 .1));
}
prev = (object_key, object);
}
Err(err) => {
warn!("Object iterator encounter RocksDB error {:?}", err);
return Err(err);
}
}
}
if matches!(prev.1.inner(), StoreObject::Wrapped) {
wrapped_objects_to_remove.push(WrappedObject::new(prev.0 .0, prev.0 .1));
}
info!(
"[Re-accumulate] Task {}: object scanned: {}, wrapped objects: {}",
index,
object_scanned,
wrapped_objects_to_remove.len(),
);
Ok((wrapped_objects_to_remove, object_scanned))
}));
}
let (last_checkpoint_of_epoch, cur_accumulator) = self
.get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
.expect("read cannot fail")
.expect("accumulator must exist");
let (accumulator, total_objects_scanned, total_wrapped_objects) =
pending_tasks.into_iter().fold(
(cur_accumulator, 0u64, 0usize),
|(mut accumulator, total_objects_scanned, total_wrapped_objects), task| {
let (wrapped_objects_to_remove, object_scanned) =
task.join().unwrap().unwrap();
accumulator.remove_all(
wrapped_objects_to_remove
.iter()
.map(|wrapped| bcs::to_bytes(wrapped).unwrap().to_vec())
.collect::<Vec<Vec<u8>>>(),
);
(
accumulator,
total_objects_scanned + object_scanned,
total_wrapped_objects + wrapped_objects_to_remove.len(),
)
},
);
info!(
"[Re-accumulate] Total objects scanned: {}, total wrapped objects: {}",
total_objects_scanned, total_wrapped_objects,
);
info!(
"[Re-accumulate] New accumulator value: {:?}",
accumulator.digest()
);
self.insert_state_accumulator_for_epoch(
cur_epoch_store.epoch(),
&last_checkpoint_of_epoch,
&accumulator,
)
.unwrap();
});
info!(
"[Re-accumulate] Re-accumulating took {}seconds",
cur_time.elapsed().as_secs()
);
}
pub async fn prune_objects_and_compact_for_testing(
&self,
checkpoint_store: &Arc<CheckpointStore>,
rpc_index: Option<&RpcIndexStore>,
) {
let pruning_config = AuthorityStorePruningConfig {
num_epochs_to_retain: 0,
..Default::default()
};
let _ = AuthorityStorePruner::prune_objects_for_eligible_epochs(
&self.perpetual_tables,
checkpoint_store,
rpc_index,
None,
pruning_config,
AuthorityStorePruningMetrics::new_for_test(),
EPOCH_DURATION_MS_FOR_TESTING,
)
.await;
let _ = AuthorityStorePruner::compact(&self.perpetual_tables);
}
#[cfg(test)]
pub async fn prune_objects_immediately_for_testing(
&self,
transaction_effects: Vec<TransactionEffects>,
) -> anyhow::Result<()> {
let mut wb = self.perpetual_tables.objects.batch();
let mut object_keys_to_prune = vec![];
for effects in &transaction_effects {
for (object_id, seq_number) in effects.modified_at_versions() {
info!("Pruning object {:?} version {:?}", object_id, seq_number);
object_keys_to_prune.push(ObjectKey(object_id, seq_number));
}
}
wb.delete_batch(
&self.perpetual_tables.objects,
object_keys_to_prune.into_iter(),
)?;
wb.write()?;
Ok(())
}
#[cfg(msim)]
pub fn count_object_versions(&self, object_id: ObjectID) -> usize {
self.perpetual_tables
.objects
.safe_iter_with_bounds(
Some(ObjectKey(object_id, VersionNumber::MIN)),
Some(ObjectKey(object_id, VersionNumber::MAX)),
)
.collect::<Result<Vec<_>, _>>()
.unwrap()
.len()
}
}
impl AccumulatorStore for AuthorityStore {
fn get_object_ref_prior_to_key_deprecated(
&self,
object_id: &ObjectID,
version: VersionNumber,
) -> SuiResult<Option<ObjectRef>> {
self.get_object_ref_prior_to_key(object_id, version)
}
fn get_root_state_accumulator_for_epoch(
&self,
epoch: EpochId,
) -> SuiResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
self.perpetual_tables
.root_state_hash_by_epoch
.get(&epoch)
.map_err(Into::into)
}
fn get_root_state_accumulator_for_highest_epoch(
&self,
) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, Accumulator))>> {
Ok(self
.perpetual_tables
.root_state_hash_by_epoch
.reversed_safe_iter_with_bounds(None, None)?
.next()
.transpose()?)
}
fn insert_state_accumulator_for_epoch(
&self,
epoch: EpochId,
last_checkpoint_of_epoch: &CheckpointSequenceNumber,
acc: &Accumulator,
) -> SuiResult {
self.perpetual_tables
.root_state_hash_by_epoch
.insert(&epoch, &(*last_checkpoint_of_epoch, acc.clone()))?;
self.root_state_notify_read
.notify(&epoch, &(*last_checkpoint_of_epoch, acc.clone()));
Ok(())
}
fn iter_live_object_set(
&self,
include_wrapped_object: bool,
) -> Box<dyn Iterator<Item = LiveObject> + '_> {
Box::new(
self.perpetual_tables
.iter_live_object_set(include_wrapped_object),
)
}
}
impl ObjectStore for AuthorityStore {
fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
self.perpetual_tables.as_ref().get_object(object_id)
}
fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
self.perpetual_tables.get_object_by_key(object_id, version)
}
}
pub struct ResolverWrapper {
pub resolver: Arc<dyn BackingPackageStore + Send + Sync>,
pub metrics: Arc<ResolverMetrics>,
}
impl ResolverWrapper {
pub fn new(
resolver: Arc<dyn BackingPackageStore + Send + Sync>,
metrics: Arc<ResolverMetrics>,
) -> Self {
metrics.module_cache_size.set(0);
ResolverWrapper { resolver, metrics }
}
fn inc_cache_size_gauge(&self) {
let current = self.metrics.module_cache_size.get();
self.metrics.module_cache_size.set(current + 1);
}
}
impl ModuleResolver for ResolverWrapper {
type Error = SuiError;
fn get_module(&self, module_id: &ModuleId) -> Result<Option<Vec<u8>>, Self::Error> {
self.inc_cache_size_gauge();
get_module(&*self.resolver, module_id)
}
}
pub enum UpdateType {
Transaction(TransactionEffectsDigest),
Genesis,
}
pub type SuiLockResult = SuiResult<ObjectLockStatus>;
#[derive(Debug, PartialEq, Eq)]
pub enum ObjectLockStatus {
Initialized,
LockedToTx { locked_by_tx: LockDetailsDeprecated },
LockedAtDifferentVersion { locked_ref: ObjectRef },
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum LockDetailsWrapperDeprecated {
V1(LockDetailsV1Deprecated),
}
impl LockDetailsWrapperDeprecated {
pub fn migrate(self) -> Self {
self
}
pub fn inner(&self) -> &LockDetailsDeprecated {
match self {
Self::V1(v1) => v1,
#[allow(unreachable_patterns)]
_ => panic!("lock details should have been migrated to latest version at read time"),
}
}
pub fn into_inner(self) -> LockDetailsDeprecated {
match self {
Self::V1(v1) => v1,
#[allow(unreachable_patterns)]
_ => panic!("lock details should have been migrated to latest version at read time"),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct LockDetailsV1Deprecated {
pub epoch: EpochId,
pub tx_digest: TransactionDigest,
}
pub type LockDetailsDeprecated = LockDetailsV1Deprecated;
impl From<LockDetailsDeprecated> for LockDetailsWrapperDeprecated {
fn from(details: LockDetailsDeprecated) -> Self {
LockDetailsWrapperDeprecated::V1(details)
}
}