use super::*;
use crate::authority::authority_store::LockDetailsWrapperDeprecated;
use rocksdb::Options;
use serde::{Deserialize, Serialize};
use std::path::Path;
use sui_types::accumulator::Accumulator;
use sui_types::base_types::SequenceNumber;
use sui_types::digests::TransactionEventsDigest;
use sui_types::effects::TransactionEffects;
use sui_types::storage::MarkerValue;
use typed_store::metrics::SamplingInterval;
use typed_store::rocks::util::{empty_compaction_filter, reference_count_merge_operator};
use typed_store::rocks::{
default_db_options, read_size_from_env, DBBatch, DBMap, DBOptions, MetricConf, ReadWriteOptions,
};
use typed_store::traits::{Map, TableSummary, TypedStoreDebug};
use crate::authority::authority_store_types::{
get_store_object_pair, try_construct_object, ObjectContentDigest, StoreData,
StoreMoveObjectWrapper, StoreObject, StoreObjectPair, StoreObjectValue, StoreObjectWrapper,
};
use crate::authority::epoch_start_configuration::EpochStartConfiguration;
use typed_store_derive::DBMapUtils;
const ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE: &str = "OBJECTS_BLOCK_CACHE_MB";
pub(crate) const ENV_VAR_LOCKS_BLOCK_CACHE_SIZE: &str = "LOCKS_BLOCK_CACHE_MB";
const ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE: &str = "TRANSACTIONS_BLOCK_CACHE_MB";
const ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE: &str = "EFFECTS_BLOCK_CACHE_MB";
const ENV_VAR_EVENTS_BLOCK_CACHE_SIZE: &str = "EVENTS_BLOCK_CACHE_MB";
const ENV_VAR_INDIRECT_OBJECTS_BLOCK_CACHE_SIZE: &str = "INDIRECT_OBJECTS_BLOCK_CACHE_MB";
#[derive(DBMapUtils)]
pub struct AuthorityPerpetualTables {
#[default_options_override_fn = "objects_table_default_config"]
pub(crate) objects: DBMap<ObjectKey, StoreObjectWrapper>,
#[default_options_override_fn = "indirect_move_objects_table_default_config"]
pub(crate) indirect_move_objects: DBMap<ObjectContentDigest, StoreMoveObjectWrapper>,
#[default_options_override_fn = "owned_object_transaction_locks_table_default_config"]
#[rename = "owned_object_transaction_locks"]
pub(crate) live_owned_object_markers: DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
#[default_options_override_fn = "transactions_table_default_config"]
pub(crate) transactions: DBMap<TransactionDigest, TrustedTransaction>,
#[default_options_override_fn = "effects_table_default_config"]
pub(crate) effects: DBMap<TransactionEffectsDigest, TransactionEffects>,
pub(crate) executed_effects: DBMap<TransactionDigest, TransactionEffectsDigest>,
#[default_options_override_fn = "events_table_default_config"]
pub(crate) events: DBMap<(TransactionEventsDigest, usize), Event>,
pub(crate) executed_transactions_to_checkpoint:
DBMap<TransactionDigest, (EpochId, CheckpointSequenceNumber)>,
pub(crate) root_state_hash_by_epoch: DBMap<EpochId, (CheckpointSequenceNumber, Accumulator)>,
pub(crate) epoch_start_configuration: DBMap<(), EpochStartConfiguration>,
pub(crate) pruned_checkpoint: DBMap<(), CheckpointSequenceNumber>,
pub(crate) expected_network_sui_amount: DBMap<(), u64>,
pub(crate) expected_storage_fund_imbalance: DBMap<(), i64>,
pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
}
impl AuthorityPerpetualTables {
pub fn path(parent_path: &Path) -> PathBuf {
parent_path.join("perpetual")
}
pub fn open(parent_path: &Path, db_options: Option<Options>) -> Self {
Self::open_tables_read_write(
Self::path(parent_path),
MetricConf::new("perpetual")
.with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
db_options,
None,
)
}
pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly {
Self::get_read_only_handle(
Self::path(parent_path),
None,
None,
MetricConf::new("perpetual_readonly"),
)
}
pub fn find_object_lt_or_eq_version(
&self,
object_id: ObjectID,
version: SequenceNumber,
) -> SuiResult<Option<Object>> {
let iter = self
.objects
.safe_range_iter(ObjectKey::min_for_id(&object_id)..=ObjectKey::max_for_id(&object_id))
.skip_prior_to(&ObjectKey(object_id, version))?;
match iter.reverse().next() {
Some(Ok((key, o))) => self.object(&key, o),
Some(Err(e)) => Err(e.into()),
None => Ok(None),
}
}
fn construct_object(
&self,
object_key: &ObjectKey,
store_object: StoreObjectValue,
) -> Result<Object, SuiError> {
let indirect_object = match store_object.data {
StoreData::IndirectObject(ref metadata) => self
.indirect_move_objects
.get(&metadata.digest)?
.map(|o| o.migrate().into_inner()),
_ => None,
};
try_construct_object(object_key, store_object, indirect_object)
}
pub fn object(
&self,
object_key: &ObjectKey,
store_object: StoreObjectWrapper,
) -> Result<Option<Object>, SuiError> {
let StoreObject::Value(store_object) = store_object.migrate().into_inner() else {
return Ok(None);
};
Ok(Some(self.construct_object(object_key, store_object)?))
}
pub fn object_reference(
&self,
object_key: &ObjectKey,
store_object: StoreObjectWrapper,
) -> Result<ObjectRef, SuiError> {
let obj_ref = match store_object.migrate().into_inner() {
StoreObject::Value(object) => self
.construct_object(object_key, object)?
.compute_object_reference(),
StoreObject::Deleted => (
object_key.0,
object_key.1,
ObjectDigest::OBJECT_DIGEST_DELETED,
),
StoreObject::Wrapped => (
object_key.0,
object_key.1,
ObjectDigest::OBJECT_DIGEST_WRAPPED,
),
};
Ok(obj_ref)
}
pub fn tombstone_reference(
&self,
object_key: &ObjectKey,
store_object: &StoreObjectWrapper,
) -> Result<Option<ObjectRef>, SuiError> {
let obj_ref = match store_object.inner() {
StoreObject::Deleted => Some((
object_key.0,
object_key.1,
ObjectDigest::OBJECT_DIGEST_DELETED,
)),
StoreObject::Wrapped => Some((
object_key.0,
object_key.1,
ObjectDigest::OBJECT_DIGEST_WRAPPED,
)),
_ => None,
};
Ok(obj_ref)
}
pub fn get_latest_object_ref_or_tombstone(
&self,
object_id: ObjectID,
) -> Result<Option<ObjectRef>, SuiError> {
let mut iterator = self
.objects
.unbounded_iter()
.skip_prior_to(&ObjectKey::max_for_id(&object_id))?;
if let Some((object_key, value)) = iterator.next() {
if object_key.0 == object_id {
return Ok(Some(self.object_reference(&object_key, value)?));
}
}
Ok(None)
}
pub fn get_latest_object_or_tombstone(
&self,
object_id: ObjectID,
) -> Result<Option<(ObjectKey, StoreObjectWrapper)>, SuiError> {
let mut iterator = self
.objects
.unbounded_iter()
.skip_prior_to(&ObjectKey::max_for_id(&object_id))?;
if let Some((object_key, value)) = iterator.next() {
if object_key.0 == object_id {
return Ok(Some((object_key, value)));
}
}
Ok(None)
}
pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
Ok(self
.epoch_start_configuration
.get(&())?
.expect("Must have current epoch.")
.epoch_start_state()
.epoch())
}
pub fn set_epoch_start_configuration(
&self,
epoch_start_configuration: &EpochStartConfiguration,
) -> SuiResult {
let mut wb = self.epoch_start_configuration.batch();
wb.insert_batch(
&self.epoch_start_configuration,
std::iter::once(((), epoch_start_configuration)),
)?;
wb.write()?;
Ok(())
}
pub fn get_highest_pruned_checkpoint(&self) -> SuiResult<CheckpointSequenceNumber> {
Ok(self.pruned_checkpoint.get(&())?.unwrap_or_default())
}
pub fn set_highest_pruned_checkpoint(
&self,
wb: &mut DBBatch,
checkpoint_number: CheckpointSequenceNumber,
) -> SuiResult {
wb.insert_batch(&self.pruned_checkpoint, [((), checkpoint_number)])?;
Ok(())
}
pub fn get_transaction(
&self,
digest: &TransactionDigest,
) -> SuiResult<Option<TrustedTransaction>> {
let Some(transaction) = self.transactions.get(digest)? else {
return Ok(None);
};
Ok(Some(transaction))
}
pub fn get_effects(&self, digest: &TransactionDigest) -> SuiResult<Option<TransactionEffects>> {
let Some(effect_digest) = self.executed_effects.get(digest)? else {
return Ok(None);
};
Ok(self.effects.get(&effect_digest)?)
}
pub fn get_checkpoint_sequence_number(
&self,
digest: &TransactionDigest,
) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
Ok(self.executed_transactions_to_checkpoint.get(digest)?)
}
pub fn get_newer_object_keys(
&self,
object: &(ObjectID, SequenceNumber),
) -> SuiResult<Vec<ObjectKey>> {
let mut objects = vec![];
for result in self.objects.safe_iter_with_bounds(
Some(ObjectKey(object.0, object.1.next())),
Some(ObjectKey(object.0, VersionNumber::MAX)),
) {
let (key, _) = result?;
objects.push(key);
}
Ok(objects)
}
pub fn set_highest_pruned_checkpoint_without_wb(
&self,
checkpoint_number: CheckpointSequenceNumber,
) -> SuiResult {
let mut wb = self.pruned_checkpoint.batch();
self.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
wb.write()?;
Ok(())
}
pub fn database_is_empty(&self) -> SuiResult<bool> {
Ok(self
.objects
.unbounded_iter()
.skip_to(&ObjectKey::ZERO)?
.next()
.is_none())
}
pub fn iter_live_object_set(&self, include_wrapped_object: bool) -> LiveSetIter<'_> {
LiveSetIter {
iter: self.objects.unbounded_iter(),
tables: self,
prev: None,
include_wrapped_object,
}
}
pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
self.objects.checkpoint_db(path).map_err(Into::into)
}
pub fn reset_db_for_execution_since_genesis(&self) -> SuiResult {
self.objects.unsafe_clear()?;
self.indirect_move_objects.unsafe_clear()?;
self.live_owned_object_markers.unsafe_clear()?;
self.executed_effects.unsafe_clear()?;
self.events.unsafe_clear()?;
self.executed_transactions_to_checkpoint.unsafe_clear()?;
self.root_state_hash_by_epoch.unsafe_clear()?;
self.epoch_start_configuration.unsafe_clear()?;
self.pruned_checkpoint.unsafe_clear()?;
self.expected_network_sui_amount.unsafe_clear()?;
self.expected_storage_fund_imbalance.unsafe_clear()?;
self.object_per_epoch_marker_table.unsafe_clear()?;
self.objects.rocksdb.flush()?;
Ok(())
}
pub fn get_root_state_hash(
&self,
epoch: EpochId,
) -> SuiResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
Ok(self.root_state_hash_by_epoch.get(&epoch)?)
}
pub fn insert_root_state_hash(
&self,
epoch: EpochId,
last_checkpoint_of_epoch: CheckpointSequenceNumber,
accumulator: Accumulator,
) -> SuiResult {
self.root_state_hash_by_epoch
.insert(&epoch, &(last_checkpoint_of_epoch, accumulator))?;
Ok(())
}
pub fn insert_object_test_only(&self, object: Object) -> SuiResult {
let object_reference = object.compute_object_reference();
let StoreObjectPair(wrapper, _indirect_object) = get_store_object_pair(object, usize::MAX);
let mut wb = self.objects.batch();
wb.insert_batch(
&self.objects,
std::iter::once((ObjectKey::from(object_reference), wrapper)),
)?;
wb.write()?;
Ok(())
}
}
impl ObjectStore for AuthorityPerpetualTables {
fn get_object(
&self,
object_id: &ObjectID,
) -> Result<Option<Object>, sui_types::storage::error::Error> {
let obj_entry = self
.objects
.unbounded_iter()
.skip_prior_to(&ObjectKey::max_for_id(object_id))
.map_err(sui_types::storage::error::Error::custom)?
.next();
match obj_entry {
Some((ObjectKey(obj_id, version), obj)) if obj_id == *object_id => Ok(self
.object(&ObjectKey(obj_id, version), obj)
.map_err(sui_types::storage::error::Error::custom)?),
_ => Ok(None),
}
}
fn get_object_by_key(
&self,
object_id: &ObjectID,
version: VersionNumber,
) -> Result<Option<Object>, sui_types::storage::error::Error> {
Ok(self
.objects
.get(&ObjectKey(*object_id, version))
.map_err(sui_types::storage::error::Error::custom)?
.map(|object| self.object(&ObjectKey(*object_id, version), object))
.transpose()
.map_err(sui_types::storage::error::Error::custom)?
.flatten())
}
}
pub struct LiveSetIter<'a> {
iter:
<DBMap<ObjectKey, StoreObjectWrapper> as Map<'a, ObjectKey, StoreObjectWrapper>>::Iterator,
tables: &'a AuthorityPerpetualTables,
prev: Option<(ObjectKey, StoreObjectWrapper)>,
include_wrapped_object: bool,
}
#[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)]
pub enum LiveObject {
Normal(Object),
Wrapped(ObjectKey),
}
impl LiveObject {
pub fn object_id(&self) -> ObjectID {
match self {
LiveObject::Normal(obj) => obj.id(),
LiveObject::Wrapped(key) => key.0,
}
}
pub fn version(&self) -> SequenceNumber {
match self {
LiveObject::Normal(obj) => obj.version(),
LiveObject::Wrapped(key) => key.1,
}
}
pub fn object_reference(&self) -> ObjectRef {
match self {
LiveObject::Normal(obj) => obj.compute_object_reference(),
LiveObject::Wrapped(key) => (key.0, key.1, ObjectDigest::OBJECT_DIGEST_WRAPPED),
}
}
}
impl LiveSetIter<'_> {
fn store_object_wrapper_to_live_object(
&self,
object_key: ObjectKey,
store_object: StoreObjectWrapper,
) -> Option<LiveObject> {
match store_object.migrate().into_inner() {
StoreObject::Value(object) => {
let object = self
.tables
.construct_object(&object_key, object)
.expect("Constructing object from store cannot fail");
Some(LiveObject::Normal(object))
}
StoreObject::Wrapped => {
if self.include_wrapped_object {
Some(LiveObject::Wrapped(object_key))
} else {
None
}
}
StoreObject::Deleted => None,
}
}
}
impl Iterator for LiveSetIter<'_> {
type Item = LiveObject;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some((next_key, next_value)) = self.iter.next() {
let prev = self.prev.take();
self.prev = Some((next_key, next_value));
if let Some((prev_key, prev_value)) = prev {
if prev_key.0 != next_key.0 {
let live_object =
self.store_object_wrapper_to_live_object(prev_key, prev_value);
if live_object.is_some() {
return live_object;
}
}
}
continue;
}
if let Some((key, value)) = self.prev.take() {
let live_object = self.store_object_wrapper_to_live_object(key, value);
if live_object.is_some() {
return live_object;
}
}
return None;
}
}
}
fn owned_object_transaction_locks_table_default_config() -> DBOptions {
DBOptions {
options: default_db_options()
.optimize_for_write_throughput()
.optimize_for_read(read_size_from_env(ENV_VAR_LOCKS_BLOCK_CACHE_SIZE).unwrap_or(1024))
.options,
rw_options: ReadWriteOptions::default().set_ignore_range_deletions(false),
}
}
fn objects_table_default_config() -> DBOptions {
default_db_options()
.optimize_for_write_throughput()
.optimize_for_read(read_size_from_env(ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(5 * 1024))
}
fn transactions_table_default_config() -> DBOptions {
default_db_options()
.optimize_for_write_throughput()
.optimize_for_point_lookup(
read_size_from_env(ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE).unwrap_or(512),
)
}
fn effects_table_default_config() -> DBOptions {
default_db_options()
.optimize_for_write_throughput()
.optimize_for_point_lookup(
read_size_from_env(ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE).unwrap_or(1024),
)
}
fn events_table_default_config() -> DBOptions {
default_db_options()
.optimize_for_write_throughput()
.optimize_for_read(read_size_from_env(ENV_VAR_EVENTS_BLOCK_CACHE_SIZE).unwrap_or(1024))
}
fn indirect_move_objects_table_default_config() -> DBOptions {
let mut options = default_db_options()
.optimize_for_write_throughput()
.optimize_for_point_lookup(
read_size_from_env(ENV_VAR_INDIRECT_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(512),
);
options.options.set_merge_operator(
"refcount operator",
reference_count_merge_operator,
reference_count_merge_operator,
);
options
.options
.set_compaction_filter("empty filter", empty_compaction_filter);
options
}