use crate::execution_cache::NotifyReadWrapper;
use crate::transaction_outputs::TransactionOutputs;
use crate::verify_indexes::verify_indexes;
use anyhow::anyhow;
use arc_swap::{ArcSwap, Guard};
use async_trait::async_trait;
use chrono::prelude::*;
use fastcrypto::encoding::Base58;
use fastcrypto::encoding::Encoding;
use fastcrypto::hash::MultisetHash;
use itertools::Itertools;
use move_binary_format::binary_config::BinaryConfig;
use move_binary_format::CompiledModule;
use move_core_types::annotated_value::MoveStructLayout;
use move_core_types::language_storage::ModuleId;
use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX};
use parking_lot::Mutex;
use prometheus::{
register_histogram_vec_with_registry, register_histogram_with_registry,
register_int_counter_vec_with_registry, register_int_counter_with_registry,
register_int_gauge_vec_with_registry, register_int_gauge_with_registry, Histogram, IntCounter,
IntCounterVec, IntGauge, IntGaugeVec, Registry,
};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, BTreeSet};
use std::fs::File;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::{
collections::{HashMap, HashSet},
fs,
pin::Pin,
sync::Arc,
vec,
};
use sui_config::node::{AuthorityOverloadConfig, StateDebugDumpConfig};
use sui_config::NodeConfig;
use sui_types::crypto::RandomnessRound;
use sui_types::execution_status::ExecutionStatus;
use sui_types::type_resolver::LayoutResolver;
use tap::{TapFallible, TapOptional};
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::{mpsc, oneshot, RwLock};
use tokio::task::JoinHandle;
use tracing::{debug, error, info, instrument, warn, Instrument};
use self::authority_store::ExecutionLockWriteGuard;
use self::authority_store_pruner::AuthorityStorePruningMetrics;
pub use authority_notify_read::EffectsNotifyRead;
pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
use mysten_metrics::{monitored_scope, spawn_monitored_task};
use once_cell::sync::OnceCell;
use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
use sui_archival::reader::ArchiveReaderBalancer;
use sui_config::genesis::Genesis;
use sui_config::node::{DBCheckpointConfig, ExpensiveSafetyCheckConfig};
use sui_framework::{BuiltInFramework, SystemPackage};
use sui_json_rpc_types::{
DevInspectResults, DryRunTransactionBlockResponse, EventFilter, SuiEvent, SuiMoveValue,
SuiObjectDataFilter, SuiTransactionBlockData, SuiTransactionBlockEffects,
SuiTransactionBlockEvents, TransactionFilter,
};
use sui_macros::{fail_point, fail_point_async, fail_point_if};
use sui_protocol_config::{ProtocolConfig, SupportedProtocolVersions};
use sui_storage::indexes::{CoinInfo, ObjectIndexChanges};
use sui_storage::key_value_store::{TransactionKeyValueStore, TransactionKeyValueStoreTrait};
use sui_storage::key_value_store_metrics::KeyValueStoreMetrics;
use sui_storage::IndexStore;
use sui_types::authenticator_state::get_authenticator_state;
use sui_types::committee::{EpochId, ProtocolVersion};
use sui_types::crypto::{default_hash, AuthoritySignInfo, Signer};
use sui_types::deny_list::DenyList;
use sui_types::digests::ChainIdentifier;
use sui_types::digests::TransactionEventsDigest;
use sui_types::dynamic_field::{DynamicFieldInfo, DynamicFieldName, DynamicFieldType};
use sui_types::effects::{
InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
TransactionEvents, VerifiedCertifiedTransactionEffects, VerifiedSignedTransactionEffects,
};
use sui_types::error::{ExecutionError, UserInputError};
use sui_types::event::{Event, EventID};
use sui_types::executable_transaction::VerifiedExecutableTransaction;
use sui_types::gas::{GasCostSummary, SuiGasStatus};
use sui_types::inner_temporary_store::{
InnerTemporaryStore, ObjectMap, TemporaryModuleResolver, TemporaryPackageStore, TxCoins,
WrittenObjects,
};
use sui_types::message_envelope::Message;
use sui_types::messages_checkpoint::{
CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointContentsDigest,
CheckpointDigest, CheckpointRequest, CheckpointRequestV2, CheckpointResponse,
CheckpointResponseV2, CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
};
use sui_types::messages_consensus::AuthorityCapabilities;
use sui_types::messages_grpc::{
HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest, ObjectInfoRequestKind,
ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse, TransactionStatus,
};
use sui_types::metrics::{BytecodeVerifierMetrics, LimitsMetrics};
use sui_types::object::{MoveObject, Owner, PastObjectRead, OBJECT_START_VERSION};
use sui_types::storage::{
BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
};
use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
use sui_types::sui_system_state::SuiSystemStateTrait;
use sui_types::sui_system_state::{get_sui_system_state, SuiSystemState};
use sui_types::{
base_types::*,
committee::Committee,
crypto::AuthoritySignature,
error::{SuiError, SuiResult},
fp_ensure,
object::{Object, ObjectRead},
transaction::*,
SUI_SYSTEM_ADDRESS,
};
use sui_types::{is_system_package, TypeTag};
use typed_store::TypedStoreError;
use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard};
use crate::authority::authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner;
use crate::authority::authority_store::{ExecutionLockReadGuard, ObjectLockStatus};
use crate::authority::authority_store_pruner::{
AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING,
};
use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
use crate::authority::epoch_start_configuration::EpochStartConfiguration;
use crate::checkpoints::checkpoint_executor::CheckpointExecutor;
use crate::checkpoints::CheckpointStore;
use crate::consensus_adapter::ConsensusAdapter;
use crate::epoch::committee_store::CommitteeStore;
use crate::execution_cache::{
CheckpointCache, ExecutionCache, ExecutionCacheCommit, ExecutionCacheRead,
ExecutionCacheReconfigAPI, ExecutionCacheWrite, StateSyncAPI,
};
use crate::execution_driver::execution_process;
use crate::metrics::LatencyObserver;
use crate::metrics::RateTracker;
use crate::module_cache_metrics::ResolverMetrics;
use crate::overload_monitor::{overload_monitor_accept_tx, AuthorityOverloadInfo};
use crate::stake_aggregator::StakeAggregator;
use crate::state_accumulator::{AccumulatorStore, StateAccumulator, WrappedObject};
use crate::subscription_handler::SubscriptionHandler;
use crate::transaction_input_loader::TransactionInputLoader;
use crate::transaction_manager::TransactionManager;
#[cfg(msim)]
pub use crate::checkpoints::checkpoint_executor::{
init_checkpoint_timeout_config, CheckpointTimeoutConfig,
};
#[cfg(msim)]
use sui_types::committee::CommitteeTrait;
use sui_types::execution_config_utils::to_binary_config;
#[cfg(test)]
#[path = "unit_tests/authority_tests.rs"]
pub mod authority_tests;
#[cfg(test)]
#[path = "unit_tests/transaction_tests.rs"]
pub mod transaction_tests;
#[cfg(test)]
#[path = "unit_tests/batch_transaction_tests.rs"]
mod batch_transaction_tests;
#[cfg(test)]
#[path = "unit_tests/move_integration_tests.rs"]
pub mod move_integration_tests;
#[cfg(test)]
#[path = "unit_tests/gas_tests.rs"]
mod gas_tests;
#[cfg(test)]
#[path = "unit_tests/batch_verification_tests.rs"]
mod batch_verification_tests;
#[cfg(any(test, feature = "test-utils"))]
pub mod authority_test_utils;
pub mod authority_per_epoch_store;
pub mod authority_per_epoch_store_pruner;
pub mod authority_store_pruner;
pub mod authority_store_tables;
pub mod authority_store_types;
pub mod epoch_start_configuration;
pub mod shared_object_congestion_tracker;
pub mod shared_object_version_manager;
pub mod test_authority_builder;
pub(crate) mod authority_notify_read;
pub(crate) mod authority_store;
pub static CHAIN_IDENTIFIER: OnceCell<ChainIdentifier> = OnceCell::new();
pub struct AuthorityMetrics {
tx_orders: IntCounter,
total_certs: IntCounter,
total_cert_attempts: IntCounter,
total_effects: IntCounter,
pub shared_obj_tx: IntCounter,
sponsored_tx: IntCounter,
tx_already_processed: IntCounter,
num_input_objs: Histogram,
num_shared_objects: Histogram,
batch_size: Histogram,
authority_state_handle_transaction_latency: Histogram,
execute_certificate_latency_single_writer: Histogram,
execute_certificate_latency_shared_object: Histogram,
execute_certificate_with_effects_latency: Histogram,
internal_execution_latency: Histogram,
execution_load_input_objects_latency: Histogram,
prepare_certificate_latency: Histogram,
commit_certificate_latency: Histogram,
db_checkpoint_latency: Histogram,
pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
pub(crate) transaction_manager_num_missing_objects: IntGauge,
pub(crate) transaction_manager_num_pending_certificates: IntGauge,
pub(crate) transaction_manager_num_executing_certificates: IntGauge,
pub(crate) transaction_manager_num_ready: IntGauge,
pub(crate) transaction_manager_object_cache_size: IntGauge,
pub(crate) transaction_manager_object_cache_hits: IntCounter,
pub(crate) transaction_manager_object_cache_misses: IntCounter,
pub(crate) transaction_manager_object_cache_evictions: IntCounter,
pub(crate) transaction_manager_package_cache_size: IntGauge,
pub(crate) transaction_manager_package_cache_hits: IntCounter,
pub(crate) transaction_manager_package_cache_misses: IntCounter,
pub(crate) transaction_manager_package_cache_evictions: IntCounter,
pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
pub(crate) execution_driver_executed_transactions: IntCounter,
pub(crate) execution_driver_dispatch_queue: IntGauge,
pub(crate) execution_queueing_delay_s: Histogram,
pub(crate) prepare_cert_gas_latency_ratio: Histogram,
pub(crate) execution_gas_latency_ratio: Histogram,
pub(crate) skipped_consensus_txns: IntCounter,
pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
pub(crate) authority_overload_status: IntGauge,
pub(crate) authority_load_shedding_percentage: IntGauge,
post_processing_total_events_emitted: IntCounter,
post_processing_total_tx_indexed: IntCounter,
post_processing_total_tx_had_event_processed: IntCounter,
post_processing_total_failures: IntCounter,
pub consensus_handler_processed_bytes: IntCounter,
pub consensus_handler_processed: IntCounterVec,
pub consensus_handler_num_low_scoring_authorities: IntGauge,
pub consensus_handler_scores: IntGaugeVec,
pub consensus_handler_deferred_transactions: IntCounter,
pub consensus_handler_congested_transactions: IntCounter,
pub consensus_committed_subdags: IntCounterVec,
pub consensus_committed_messages: IntGaugeVec,
pub consensus_committed_user_transactions: IntGaugeVec,
pub consensus_calculated_throughput: IntGauge,
pub consensus_calculated_throughput_profile: IntGauge,
pub limits_metrics: Arc<LimitsMetrics>,
pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
pub authenticator_state_update_failed: IntCounter,
pub zklogin_sig_count: IntCounter,
pub multisig_sig_count: IntCounter,
pub execution_queueing_latency: LatencyObserver,
pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
}
const POSITIVE_INT_BUCKETS: &[f64] = &[
1., 2., 5., 10., 20., 50., 100., 200., 500., 1000., 2000., 5000., 10000., 20000., 50000.,
];
const LATENCY_SEC_BUCKETS: &[f64] = &[
0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
10., 20., 30., 60., 90.,
];
const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
];
const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
];
pub const DEV_INSPECT_GAS_COIN_VALUE: u64 = 1_000_000_000_000;
impl AuthorityMetrics {
pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
let execute_certificate_latency = register_histogram_vec_with_registry!(
"authority_state_execute_certificate_latency",
"Latency of executing certificates, including waiting for inputs",
&["tx_type"],
LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap();
let execute_certificate_latency_single_writer =
execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
let execute_certificate_latency_shared_object =
execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
Self {
tx_orders: register_int_counter_with_registry!(
"total_transaction_orders",
"Total number of transaction orders",
registry,
)
.unwrap(),
total_certs: register_int_counter_with_registry!(
"total_transaction_certificates",
"Total number of transaction certificates handled",
registry,
)
.unwrap(),
total_cert_attempts: register_int_counter_with_registry!(
"total_handle_certificate_attempts",
"Number of calls to handle_certificate",
registry,
)
.unwrap(),
total_effects: register_int_counter_with_registry!(
"total_transaction_effects",
"Total number of transaction effects produced",
registry,
)
.unwrap(),
shared_obj_tx: register_int_counter_with_registry!(
"num_shared_obj_tx",
"Number of transactions involving shared objects",
registry,
)
.unwrap(),
sponsored_tx: register_int_counter_with_registry!(
"num_sponsored_tx",
"Number of sponsored transactions",
registry,
)
.unwrap(),
tx_already_processed: register_int_counter_with_registry!(
"num_tx_already_processed",
"Number of transaction orders already processed previously",
registry,
)
.unwrap(),
num_input_objs: register_histogram_with_registry!(
"num_input_objects",
"Distribution of number of input TX objects per TX",
POSITIVE_INT_BUCKETS.to_vec(),
registry,
)
.unwrap(),
num_shared_objects: register_histogram_with_registry!(
"num_shared_objects",
"Number of shared input objects per TX",
POSITIVE_INT_BUCKETS.to_vec(),
registry,
)
.unwrap(),
batch_size: register_histogram_with_registry!(
"batch_size",
"Distribution of size of transaction batch",
POSITIVE_INT_BUCKETS.to_vec(),
registry,
)
.unwrap(),
authority_state_handle_transaction_latency: register_histogram_with_registry!(
"authority_state_handle_transaction_latency",
"Latency of handling transactions",
LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
execute_certificate_latency_single_writer,
execute_certificate_latency_shared_object,
execute_certificate_with_effects_latency: register_histogram_with_registry!(
"authority_state_execute_certificate_with_effects_latency",
"Latency of executing certificates with effects, including waiting for inputs",
LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
internal_execution_latency: register_histogram_with_registry!(
"authority_state_internal_execution_latency",
"Latency of actual certificate executions",
LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
execution_load_input_objects_latency: register_histogram_with_registry!(
"authority_state_execution_load_input_objects_latency",
"Latency of loading input objects for execution",
LOW_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
prepare_certificate_latency: register_histogram_with_registry!(
"authority_state_prepare_certificate_latency",
"Latency of executing certificates, before committing the results",
LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
commit_certificate_latency: register_histogram_with_registry!(
"authority_state_commit_certificate_latency",
"Latency of committing certificate execution results",
LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
db_checkpoint_latency: register_histogram_with_registry!(
"db_checkpoint_latency",
"Latency of checkpointing dbs",
LATENCY_SEC_BUCKETS.to_vec(),
registry,
).unwrap(),
transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
"transaction_manager_num_enqueued_certificates",
"Current number of certificates enqueued to TransactionManager",
&["result"],
registry,
)
.unwrap(),
transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
"transaction_manager_num_missing_objects",
"Current number of missing objects in TransactionManager",
registry,
)
.unwrap(),
transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
"transaction_manager_num_pending_certificates",
"Number of certificates pending in TransactionManager, with at least 1 missing input object",
registry,
)
.unwrap(),
transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
"transaction_manager_num_executing_certificates",
"Number of executing certificates, including queued and actually running certificates",
registry,
)
.unwrap(),
transaction_manager_num_ready: register_int_gauge_with_registry!(
"transaction_manager_num_ready",
"Number of ready transactions in TransactionManager",
registry,
)
.unwrap(),
transaction_manager_object_cache_size: register_int_gauge_with_registry!(
"transaction_manager_object_cache_size",
"Current size of object-availability cache in TransactionManager",
registry,
)
.unwrap(),
transaction_manager_object_cache_hits: register_int_counter_with_registry!(
"transaction_manager_object_cache_hits",
"Number of object-availability cache hits in TransactionManager",
registry,
)
.unwrap(),
authority_overload_status: register_int_gauge_with_registry!(
"authority_overload_status",
"Whether authority is current experiencing overload and enters load shedding mode.",
registry)
.unwrap(),
authority_load_shedding_percentage: register_int_gauge_with_registry!(
"authority_load_shedding_percentage",
"The percentage of transactions is shed when the authority is in load shedding mode.",
registry)
.unwrap(),
transaction_manager_object_cache_misses: register_int_counter_with_registry!(
"transaction_manager_object_cache_misses",
"Number of object-availability cache misses in TransactionManager",
registry,
)
.unwrap(),
transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
"transaction_manager_object_cache_evictions",
"Number of object-availability cache evictions in TransactionManager",
registry,
)
.unwrap(),
transaction_manager_package_cache_size: register_int_gauge_with_registry!(
"transaction_manager_package_cache_size",
"Current size of package-availability cache in TransactionManager",
registry,
)
.unwrap(),
transaction_manager_package_cache_hits: register_int_counter_with_registry!(
"transaction_manager_package_cache_hits",
"Number of package-availability cache hits in TransactionManager",
registry,
)
.unwrap(),
transaction_manager_package_cache_misses: register_int_counter_with_registry!(
"transaction_manager_package_cache_misses",
"Number of package-availability cache misses in TransactionManager",
registry,
)
.unwrap(),
transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
"transaction_manager_package_cache_evictions",
"Number of package-availability cache evictions in TransactionManager",
registry,
)
.unwrap(),
transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
"transaction_manager_transaction_queue_age_s",
"Time spent in waiting for transaction in the queue",
LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
execution_driver_executed_transactions: register_int_counter_with_registry!(
"execution_driver_executed_transactions",
"Cumulative number of transaction executed by execution driver",
registry,
)
.unwrap(),
execution_driver_dispatch_queue: register_int_gauge_with_registry!(
"execution_driver_dispatch_queue",
"Number of transaction pending in execution driver dispatch queue",
registry,
)
.unwrap(),
execution_queueing_delay_s: register_histogram_with_registry!(
"execution_queueing_delay_s",
"Queueing delay between a transaction is ready for execution until it starts executing.",
LATENCY_SEC_BUCKETS.to_vec(),
registry
)
.unwrap(),
prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
"prepare_cert_gas_latency_ratio",
"The ratio of computation gas divided by VM execution latency.",
GAS_LATENCY_RATIO_BUCKETS.to_vec(),
registry
)
.unwrap(),
execution_gas_latency_ratio: register_histogram_with_registry!(
"execution_gas_latency_ratio",
"The ratio of computation gas divided by certificate execution latency, include committing certificate.",
GAS_LATENCY_RATIO_BUCKETS.to_vec(),
registry
)
.unwrap(),
skipped_consensus_txns: register_int_counter_with_registry!(
"skipped_consensus_txns",
"Total number of consensus transactions skipped",
registry,
)
.unwrap(),
skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
"skipped_consensus_txns_cache_hit",
"Total number of consensus transactions skipped because of local cache hit",
registry,
)
.unwrap(),
post_processing_total_events_emitted: register_int_counter_with_registry!(
"post_processing_total_events_emitted",
"Total number of events emitted in post processing",
registry,
)
.unwrap(),
post_processing_total_tx_indexed: register_int_counter_with_registry!(
"post_processing_total_tx_indexed",
"Total number of txes indexed in post processing",
registry,
)
.unwrap(),
post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
"post_processing_total_tx_had_event_processed",
"Total number of txes finished event processing in post processing",
registry,
)
.unwrap(),
post_processing_total_failures: register_int_counter_with_registry!(
"post_processing_total_failures",
"Total number of failure in post processing",
registry,
)
.unwrap(),
consensus_handler_processed_bytes: register_int_counter_with_registry!(
"consensus_handler_processed_bytes",
"Number of bytes processed by consensus_handler",
registry
).unwrap(),
consensus_handler_processed: register_int_counter_vec_with_registry!("consensus_handler_processed", "Number of transactions processed by consensus handler", &["class"], registry)
.unwrap(),
consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
"consensus_handler_num_low_scoring_authorities",
"Number of low scoring authorities based on reputation scores from consensus",
registry
).unwrap(),
consensus_handler_scores: register_int_gauge_vec_with_registry!(
"consensus_handler_scores",
"scores from consensus for each authority",
&["authority"],
registry,
).unwrap(),
consensus_handler_deferred_transactions: register_int_counter_with_registry!(
"consensus_handler_deferred_transactions",
"Number of transactions deferred by consensus handler",
registry,
).unwrap(),
consensus_handler_congested_transactions: register_int_counter_with_registry!(
"consensus_handler_congested_transactions",
"Number of transactions deferred by consensus handler due to congestion",
registry,
).unwrap(),
consensus_committed_subdags: register_int_counter_vec_with_registry!(
"consensus_committed_subdags",
"Number of committed subdags, sliced by author",
&["authority"],
registry,
).unwrap(),
consensus_committed_messages: register_int_gauge_vec_with_registry!(
"consensus_committed_messages",
"Total number of committed consensus messages, sliced by author",
&["authority"],
registry,
).unwrap(),
consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
"consensus_committed_user_transactions",
"Number of committed user transactions, sliced by submitter",
&["authority"],
registry,
).unwrap(),
limits_metrics: Arc::new(LimitsMetrics::new(registry)),
bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
authenticator_state_update_failed: register_int_counter_with_registry!(
"authenticator_state_update_failed",
"Number of failed authenticator state updates",
registry,
)
.unwrap(),
zklogin_sig_count: register_int_counter_with_registry!(
"zklogin_sig_count",
"Count of zkLogin signatures",
registry,
)
.unwrap(),
multisig_sig_count: register_int_counter_with_registry!(
"multisig_sig_count",
"Count of zkLogin signatures",
registry,
)
.unwrap(),
consensus_calculated_throughput: register_int_gauge_with_registry!(
"consensus_calculated_throughput",
"The calculated throughput from consensus output. Result is calculated based on unique transactions.",
registry,
).unwrap(),
consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
"consensus_calculated_throughput_profile",
"The current active calculated throughput profile",
registry
).unwrap(),
execution_queueing_latency: LatencyObserver::new(),
txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
}
}
}
pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
struct ExecutionCacheTraitPointers {
cache_reader: Arc<dyn ExecutionCacheRead>,
backing_store: Arc<dyn BackingStore + Send + Sync>,
backing_package_store: Arc<dyn BackingPackageStore + Send + Sync>,
effects_notify_read: NotifyReadWrapper<ExecutionCache>,
object_store: Arc<dyn ObjectStore + Send + Sync>,
reconfig_api: Arc<dyn ExecutionCacheReconfigAPI>,
accumulator_store: Arc<dyn AccumulatorStore>,
checkpoint_cache: Arc<dyn CheckpointCache>,
state_sync_store: Arc<dyn StateSyncAPI>,
cache_commit: Arc<dyn ExecutionCacheCommit>,
}
impl ExecutionCacheTraitPointers {
fn new(cache: &Arc<ExecutionCache>) -> Self {
Self {
cache_reader: cache.clone(),
backing_store: cache.clone(),
backing_package_store: cache.clone(),
effects_notify_read: cache.clone().as_notify_read_wrapper(),
object_store: cache.clone(),
reconfig_api: cache.clone(),
accumulator_store: cache.clone(),
checkpoint_cache: cache.clone(),
state_sync_store: cache.clone(),
cache_commit: cache.clone(),
}
}
}
pub struct AuthorityState {
pub name: AuthorityName,
pub secret: StableSyncAuthoritySigner,
input_loader: TransactionInputLoader,
execution_cache: Arc<ExecutionCache>,
execution_cache_trait_pointers: ExecutionCacheTraitPointers,
epoch_store: ArcSwap<AuthorityPerEpochStore>,
execution_lock: RwLock<EpochId>,
pub indexes: Option<Arc<IndexStore>>,
pub subscription_handler: Arc<SubscriptionHandler>,
checkpoint_store: Arc<CheckpointStore>,
committee_store: Arc<CommitteeStore>,
transaction_manager: Arc<TransactionManager>,
#[allow(unused)]
tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
pub metrics: Arc<AuthorityMetrics>,
_pruner: AuthorityStorePruner,
_authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
db_checkpoint_config: DBCheckpointConfig,
config: NodeConfig,
pub overload_info: AuthorityOverloadInfo,
}
impl AuthorityState {
pub fn is_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
epoch_store.committee().authority_exists(&self.name)
}
pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
!self.is_validator(epoch_store)
}
pub fn committee_store(&self) -> &Arc<CommitteeStore> {
&self.committee_store
}
pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
self.committee_store.clone()
}
pub fn overload_config(&self) -> &AuthorityOverloadConfig {
&self.config.authority_overload_config
}
pub fn get_epoch_state_commitments(
&self,
epoch: EpochId,
) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
self.checkpoint_store.get_epoch_state_commitments(epoch)
}
#[instrument(level = "trace", skip_all)]
async fn handle_transaction_impl(
&self,
transaction: VerifiedTransaction,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult<VerifiedSignedTransaction> {
let tx_digest = transaction.digest();
let tx_data = transaction.data().transaction_data();
tx_data.check_version_supported(epoch_store.protocol_config())?;
tx_data.validity_check(epoch_store.protocol_config())?;
let input_object_kinds = tx_data.input_objects()?;
let receiving_objects_refs = tx_data.receiving_objects();
sui_transaction_checks::deny::check_transaction_for_signing(
tx_data,
transaction.tx_signatures(),
&input_object_kinds,
&receiving_objects_refs,
&self.config.transaction_deny_config,
self.get_backing_package_store().as_ref(),
)?;
let (input_objects, receiving_objects) = self
.input_loader
.read_objects_for_signing(
Some(tx_digest),
&input_object_kinds,
&receiving_objects_refs,
epoch_store.epoch(),
)
.await?;
let (_gas_status, checked_input_objects) = sui_transaction_checks::check_transaction_input(
epoch_store.protocol_config(),
epoch_store.reference_gas_price(),
tx_data,
input_objects,
&receiving_objects,
&self.metrics.bytecode_verifier_metrics,
)?;
if epoch_store.coin_deny_list_state_enabled() {
self.check_coin_deny(tx_data.sender(), &checked_input_objects, &receiving_objects)?;
}
let owned_objects = checked_input_objects.inner().filter_owned_objects();
let signed_transaction = VerifiedSignedTransaction::new(
epoch_store.epoch(),
transaction,
self.name,
&*self.secret,
);
self.execution_cache
.acquire_transaction_locks(epoch_store, &owned_objects, signed_transaction.clone())
.await?;
Ok(signed_transaction)
}
#[instrument(level = "trace", skip_all)]
pub async fn handle_transaction(
&self,
epoch_store: &Arc<AuthorityPerEpochStore>,
transaction: VerifiedTransaction,
) -> SuiResult<HandleTransactionResponse> {
fp_ensure!(
!transaction.is_system_tx(),
SuiError::InvalidSystemTransaction
);
let tx_digest = *transaction.digest();
debug!("handle_transaction");
if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
return Ok(HandleTransactionResponse { status });
}
let _metrics_guard = self
.metrics
.authority_state_handle_transaction_latency
.start_timer();
self.metrics.tx_orders.inc();
if !epoch_store
.get_reconfig_state_read_lock_guard()
.should_accept_user_certs()
{
return Err(SuiError::ValidatorHaltedAtEpochEnd);
}
if match &transaction.inner().data().transaction_data().expiration() {
TransactionExpiration::None => false,
TransactionExpiration::Epoch(epoch) => *epoch < epoch_store.epoch(),
} {
return Err(SuiError::TransactionExpired);
}
let signed = self.handle_transaction_impl(transaction, epoch_store).await;
match signed {
Ok(s) => Ok(HandleTransactionResponse {
status: TransactionStatus::Signed(s.into_inner().into_sig()),
}),
Err(err) => Ok(HandleTransactionResponse {
status: self
.get_transaction_status(&tx_digest, epoch_store)?
.ok_or(err)?
.1,
}),
}
}
pub fn check_system_overload_at_signing(&self) -> bool {
self.config
.authority_overload_config
.check_system_overload_at_signing
}
pub fn check_system_overload_at_execution(&self) -> bool {
self.config
.authority_overload_config
.check_system_overload_at_execution
}
pub(crate) fn check_system_overload(
&self,
consensus_adapter: &Arc<ConsensusAdapter>,
tx_data: &SenderSignedData,
do_authority_overload_check: bool,
) -> SuiResult {
if do_authority_overload_check {
self.check_authority_overload(tx_data)?;
}
self.transaction_manager
.check_execution_overload(self.overload_config(), tx_data)?;
consensus_adapter.check_consensus_overload()?;
Ok(())
}
fn check_authority_overload(&self, tx_data: &SenderSignedData) -> SuiResult {
if !self.overload_info.is_overload.load(Ordering::Relaxed) {
return Ok(());
}
let load_shedding_percentage = self
.overload_info
.load_shedding_percentage
.load(Ordering::Relaxed);
overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
}
#[instrument(level = "trace", skip_all)]
pub async fn fullnode_execute_certificate_with_effects(
&self,
transaction: &VerifiedExecutableTransaction,
effects: &VerifiedCertifiedTransactionEffects,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult {
assert!(self.is_fullnode(epoch_store));
if self.epoch_store.load().epoch() != epoch_store.epoch() {
return Err(SuiError::EpochEnded(epoch_store.epoch()));
}
let _metrics_guard = self
.metrics
.execute_certificate_with_effects_latency
.start_timer();
let digest = *transaction.digest();
debug!("execute_certificate_with_effects");
fp_ensure!(
*effects.data().transaction_digest() == digest,
SuiError::ErrorWhileProcessingCertificate {
err: "effects/tx digest mismatch".to_string()
}
);
if transaction.contains_shared_object() {
epoch_store
.acquire_shared_locks_from_effects(
transaction,
effects.data(),
self.execution_cache.as_ref(),
)
.await?;
}
let expected_effects_digest = effects.digest();
self.transaction_manager
.enqueue(vec![transaction.clone()], epoch_store);
let observed_effects = self
.execution_cache
.notify_read_executed_effects(&[digest])
.instrument(tracing::debug_span!(
"notify_read_effects_in_execute_certificate_with_effects"
))
.await?
.pop()
.expect("notify_read_effects should return exactly 1 element");
let observed_effects_digest = observed_effects.digest();
if &observed_effects_digest != expected_effects_digest {
panic!(
"Locally executed effects do not match canonical effects! expected_effects_digest={:?} observed_effects_digest={:?} expected_effects={:?} observed_effects={:?} input_objects={:?}",
expected_effects_digest, observed_effects_digest, effects.data(), observed_effects, transaction.data().transaction_data().input_objects()
);
}
Ok(())
}
#[instrument(level = "trace", skip_all)]
pub async fn execute_certificate(
&self,
certificate: &VerifiedCertificate,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult<VerifiedSignedTransactionEffects> {
let _metrics_guard = if certificate.contains_shared_object() {
self.metrics
.execute_certificate_latency_shared_object
.start_timer()
} else {
self.metrics
.execute_certificate_latency_single_writer
.start_timer()
};
debug!("execute_certificate");
self.metrics.total_cert_attempts.inc();
if !certificate.contains_shared_object() {
self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
}
let effects = self.notify_read_effects(certificate).await?;
self.sign_effects(effects, epoch_store)
}
#[instrument(level = "trace", skip_all)]
pub async fn try_execute_immediately(
&self,
certificate: &VerifiedExecutableTransaction,
expected_effects_digest: Option<TransactionEffectsDigest>,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult<(TransactionEffects, Option<ExecutionError>)> {
let _scope = monitored_scope("Execution::try_execute_immediately");
let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
debug!("execute_certificate_internal");
let tx_digest = certificate.digest();
let input_objects = self
.read_objects_for_execution(certificate, epoch_store)
.await?;
let tx_guard = epoch_store.acquire_tx_guard(certificate).await?;
self.process_certificate(
tx_guard,
certificate,
input_objects,
expected_effects_digest,
epoch_store,
)
.await
.tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
}
pub async fn read_objects_for_execution(
&self,
certificate: &VerifiedExecutableTransaction,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult<InputObjects> {
let _scope = monitored_scope("Execution::load_input_objects");
let _metrics_guard = self
.metrics
.execution_load_input_objects_latency
.start_timer();
let input_objects = &certificate.data().transaction_data().input_objects()?;
self.input_loader
.read_objects_for_execution(
epoch_store.as_ref(),
&certificate.key(),
input_objects,
epoch_store.epoch(),
)
.await
}
pub async fn try_execute_for_test(
&self,
certificate: &VerifiedCertificate,
) -> SuiResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
let epoch_store = self.epoch_store_for_testing();
let (effects, execution_error_opt) = self
.try_execute_immediately(
&VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
None,
&epoch_store,
)
.await?;
let signed_effects = self.sign_effects(effects, &epoch_store)?;
Ok((signed_effects, execution_error_opt))
}
pub async fn notify_read_effects(
&self,
certificate: &VerifiedCertificate,
) -> SuiResult<TransactionEffects> {
self.execution_cache
.notify_read_executed_effects(&[*certificate.digest()])
.await
.map(|mut r| r.pop().expect("must return correct number of effects"))
}
fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
self.execution_cache
.check_owned_objects_are_live(owned_object_refs)
}
pub(crate) fn debug_dump_transaction_state(
&self,
tx_digest: &TransactionDigest,
effects: &TransactionEffects,
expected_effects_digest: TransactionEffectsDigest,
inner_temporary_store: &InnerTemporaryStore,
certificate: &VerifiedExecutableTransaction,
debug_dump_config: &StateDebugDumpConfig,
) -> SuiResult<PathBuf> {
let dump_dir = debug_dump_config
.dump_file_directory
.as_ref()
.cloned()
.unwrap_or(std::env::temp_dir());
let epoch_store = self.load_epoch_store_one_call_per_task();
NodeStateDump::new(
tx_digest,
effects,
expected_effects_digest,
&self.execution_cache,
&epoch_store,
inner_temporary_store,
certificate,
)?
.write_to_file(&dump_dir)
.map_err(|e| SuiError::FileIOError(e.to_string()))
}
#[instrument(level = "trace", skip_all)]
pub(crate) async fn process_certificate(
&self,
tx_guard: CertTxGuard,
certificate: &VerifiedExecutableTransaction,
input_objects: InputObjects,
expected_effects_digest: Option<TransactionEffectsDigest>,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult<(TransactionEffects, Option<ExecutionError>)> {
let process_certificate_start_time = tokio::time::Instant::now();
let digest = *certificate.digest();
fail_point_if!("correlated-crash-process-certificate", || {
if sui_simulator::random::deterministic_probability_once(&digest, 0.01) {
sui_simulator::task::kill_current_node(None);
}
});
if let Some(effects) = self.execution_cache.get_executed_effects(&digest)? {
tx_guard.release();
return Ok((effects, None));
}
let execution_guard = self
.execution_lock_for_executable_transaction(certificate)
.await;
let execution_guard = match execution_guard {
Ok(execution_guard) => execution_guard,
Err(err) => {
tx_guard.release();
return Err(err);
}
};
if *execution_guard != epoch_store.epoch() {
tx_guard.release();
info!("The epoch of the execution_guard doesn't match the epoch store");
return Err(SuiError::WrongEpoch {
expected_epoch: epoch_store.epoch(),
actual_epoch: *execution_guard,
});
}
let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
&execution_guard,
certificate,
input_objects,
epoch_store,
) {
Err(e) => {
info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
tx_guard.release();
return Err(e);
}
Ok(res) => res,
};
if let Some(expected_effects_digest) = expected_effects_digest {
if effects.digest() != expected_effects_digest {
match self.debug_dump_transaction_state(
&digest,
&effects,
expected_effects_digest,
&inner_temporary_store,
certificate,
&self.config.state_debug_dump_config,
) {
Ok(out_path) => {
info!(
"Dumped node state for transaction {} to {}",
digest,
out_path.as_path().display().to_string()
);
}
Err(e) => {
error!("Error dumping state for transaction {}: {e}", digest);
}
}
error!(
tx_digest = ?digest,
?expected_effects_digest,
actual_effects = ?effects,
"fork detected!"
);
panic!(
"Transaction {} is expected to have effects digest {}, but got {}!",
digest,
expected_effects_digest,
effects.digest(),
);
}
}
fail_point_async!("crash");
self.commit_certificate(
certificate,
inner_temporary_store,
&effects,
tx_guard,
execution_guard,
epoch_store,
)
.await?;
if let TransactionKind::AuthenticatorStateUpdate(auth_state) =
certificate.data().transaction_data().kind()
{
if let Some(err) = &execution_error_opt {
error!("Authenticator state update failed: {err}");
self.metrics.authenticator_state_update_failed.inc();
}
debug_assert!(execution_error_opt.is_none());
epoch_store.update_authenticator_state(auth_state);
if cfg!(debug_assertions) {
let authenticator_state = get_authenticator_state(&self.execution_cache)
.expect("Read cannot fail")
.expect("Authenticator state must exist");
let mut sys_jwks: Vec<_> = authenticator_state
.active_jwks
.into_iter()
.map(|jwk| (jwk.jwk_id, jwk.jwk))
.collect();
let mut active_jwks: Vec<_> = epoch_store
.signature_verifier
.get_jwks()
.into_iter()
.collect();
sys_jwks.sort();
active_jwks.sort();
assert_eq!(sys_jwks, active_jwks);
}
}
let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
if elapsed > 0.0 {
self.metrics
.execution_gas_latency_ratio
.observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
};
Ok((effects, execution_error_opt))
}
#[instrument(level = "trace", skip_all)]
async fn commit_certificate(
&self,
certificate: &VerifiedExecutableTransaction,
inner_temporary_store: InnerTemporaryStore,
effects: &TransactionEffects,
tx_guard: CertTxGuard,
_execution_guard: ExecutionLockReadGuard<'_>,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult {
let _scope: Option<mysten_metrics::MonitoredScopeGuard> =
monitored_scope("Execution::commit_certificate");
let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
let tx_key = certificate.key();
let tx_digest = certificate.digest();
let input_object_count = inner_temporary_store.input_objects.len();
let shared_object_count = effects.input_shared_objects().len();
let output_keys = inner_temporary_store.get_output_keys(effects);
let effects_sig = if self.is_validator(epoch_store) {
Some(AuthoritySignInfo::new(
epoch_store.epoch(),
effects,
Intent::sui_app(IntentScope::TransactionEffects),
self.name,
&*self.secret,
))
} else {
None
};
let _ = self
.post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
.await
.tap_err(|e| {
self.metrics.post_processing_total_failures.inc();
error!(?tx_digest, "tx post processing failed: {e}");
});
epoch_store.insert_tx_cert_and_effects_signature(
&tx_key,
tx_digest,
certificate.certificate_sig(),
effects_sig.as_ref(),
)?;
fail_point_async!("crash");
let transaction_outputs = TransactionOutputs::build_transaction_outputs(
certificate.clone().into_unsigned(),
effects.clone(),
inner_temporary_store,
);
self.execution_cache
.write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())
.await?;
if certificate.transaction_data().is_end_of_epoch_tx() {
self.execution_cache
.force_reload_system_packages(&BuiltInFramework::all_package_ids());
}
tx_guard.commit_tx();
self.transaction_manager
.notify_commit(tx_digest, output_keys, epoch_store);
self.update_metrics(certificate, input_object_count, shared_object_count);
Ok(())
}
fn update_metrics(
&self,
certificate: &VerifiedExecutableTransaction,
input_object_count: usize,
shared_object_count: usize,
) {
if certificate.has_zklogin_sig() {
self.metrics.zklogin_sig_count.inc();
} else if certificate.has_upgraded_multisig() {
self.metrics.multisig_sig_count.inc();
}
self.metrics.total_effects.inc();
self.metrics.total_certs.inc();
if shared_object_count > 0 {
self.metrics.shared_obj_tx.inc();
}
if certificate.is_sponsored_tx() {
self.metrics.sponsored_tx.inc();
}
self.metrics
.num_input_objs
.observe(input_object_count as f64);
self.metrics
.num_shared_objects
.observe(shared_object_count as f64);
self.metrics.batch_size.observe(
certificate
.data()
.intent_message()
.value
.kind()
.num_commands() as f64,
);
}
#[instrument(level = "trace", skip_all)]
fn prepare_certificate(
&self,
_execution_guard: &ExecutionLockReadGuard<'_>,
certificate: &VerifiedExecutableTransaction,
input_objects: InputObjects,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult<(
InnerTemporaryStore,
TransactionEffects,
Option<ExecutionError>,
)> {
let _scope = monitored_scope("Execution::prepare_certificate");
let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
let prepare_certificate_start_time = tokio::time::Instant::now();
let tx_data = certificate.data().transaction_data();
tx_data.check_version_supported(epoch_store.protocol_config())?;
tx_data.validity_check(epoch_store.protocol_config())?;
let (gas_status, input_objects) = sui_transaction_checks::check_certificate_input(
certificate,
input_objects,
epoch_store.protocol_config(),
epoch_store.reference_gas_price(),
)?;
let owned_object_refs = input_objects.inner().filter_owned_objects();
self.check_owned_locks(&owned_object_refs)?;
let tx_digest = *certificate.digest();
let protocol_config = epoch_store.protocol_config();
let transaction_data = &certificate.data().intent_message().value;
let (kind, signer, gas) = transaction_data.execution_parts();
#[allow(unused_mut)]
let (inner_temp_store, _, mut effects, execution_error_opt) =
epoch_store.executor().execute_transaction_to_effects(
self.get_backing_store().as_ref(),
protocol_config,
self.metrics.limits_metrics.clone(),
self.config
.expensive_safety_check_config
.enable_deep_per_tx_sui_conservation_check(),
self.config.certificate_deny_config.certificate_deny_set(),
&epoch_store.epoch_start_config().epoch_data().epoch_id(),
epoch_store
.epoch_start_config()
.epoch_data()
.epoch_start_timestamp(),
input_objects,
gas,
gas_status,
kind,
signer,
tx_digest,
);
fail_point_if!("cp_execution_nondeterminism", || {
#[cfg(msim)]
self.create_fail_state(certificate, epoch_store, &mut effects);
});
let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
if elapsed > 0.0 {
self.metrics
.prepare_cert_gas_latency_ratio
.observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
}
Ok((inner_temp_store, effects, execution_error_opt.err()))
}
pub fn prepare_certificate_for_benchmark(
&self,
certificate: &VerifiedExecutableTransaction,
input_objects: InputObjects,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult<(
InnerTemporaryStore,
TransactionEffects,
Option<ExecutionError>,
)> {
let lock: RwLock<EpochId> = RwLock::new(epoch_store.epoch());
let execution_guard = lock.try_read().unwrap();
self.prepare_certificate(&execution_guard, certificate, input_objects, epoch_store)
}
pub async fn dry_exec_transaction(
&self,
transaction: TransactionData,
transaction_digest: TransactionDigest,
) -> SuiResult<(
DryRunTransactionBlockResponse,
BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
TransactionEffects,
Option<ObjectID>,
)> {
let epoch_store = self.load_epoch_store_one_call_per_task();
if !self.is_fullnode(&epoch_store) {
return Err(SuiError::UnsupportedFeatureError {
error: "dry-exec is only supported on fullnodes".to_string(),
});
}
if transaction.kind().is_system_tx() {
return Err(SuiError::UnsupportedFeatureError {
error: "dry-exec does not support system transactions".to_string(),
});
}
self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
.await
}
pub async fn dry_exec_transaction_for_benchmark(
&self,
transaction: TransactionData,
transaction_digest: TransactionDigest,
) -> SuiResult<(
DryRunTransactionBlockResponse,
BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
TransactionEffects,
Option<ObjectID>,
)> {
let epoch_store = self.load_epoch_store_one_call_per_task();
self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
.await
}
async fn dry_exec_transaction_impl(
&self,
epoch_store: &AuthorityPerEpochStore,
transaction: TransactionData,
transaction_digest: TransactionDigest,
) -> SuiResult<(
DryRunTransactionBlockResponse,
BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
TransactionEffects,
Option<ObjectID>,
)> {
transaction.check_version_supported(epoch_store.protocol_config())?;
transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
let input_object_kinds = transaction.input_objects()?;
let receiving_object_refs = transaction.receiving_objects();
sui_transaction_checks::deny::check_transaction_for_signing(
&transaction,
&[],
&input_object_kinds,
&receiving_object_refs,
&self.config.transaction_deny_config,
self.get_backing_package_store().as_ref(),
)?;
let (input_objects, receiving_objects) = self
.input_loader
.read_objects_for_signing(
None,
&input_object_kinds,
&receiving_object_refs,
epoch_store.epoch(),
)
.await?;
let mut gas_object_refs = transaction.gas().to_vec();
let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
let sender = transaction.sender();
const MIST_TO_SUI: u64 = 1_000_000_000;
const DRY_RUN_SUI: u64 = 1_000_000_000;
let max_coin_value = MIST_TO_SUI * DRY_RUN_SUI;
let gas_object_id = ObjectID::random();
let gas_object = Object::new_move(
MoveObject::new_gas_coin(OBJECT_START_VERSION, gas_object_id, max_coin_value),
Owner::AddressOwner(sender),
TransactionDigest::genesis_marker(),
);
let gas_object_ref = gas_object.compute_object_reference();
gas_object_refs = vec![gas_object_ref];
(
sui_transaction_checks::check_transaction_input_with_given_gas(
epoch_store.protocol_config(),
epoch_store.reference_gas_price(),
&transaction,
input_objects,
receiving_objects,
gas_object,
&self.metrics.bytecode_verifier_metrics,
)?,
Some(gas_object_id),
)
} else {
(
sui_transaction_checks::check_transaction_input(
epoch_store.protocol_config(),
epoch_store.reference_gas_price(),
&transaction,
input_objects,
&receiving_objects,
&self.metrics.bytecode_verifier_metrics,
)?,
None,
)
};
let protocol_config = epoch_store.protocol_config();
let (kind, signer, _) = transaction.execution_parts();
let silent = true;
let executor = sui_execution::executor(protocol_config, silent, None)
.expect("Creating an executor should not fail here");
let expensive_checks = false;
let (inner_temp_store, _, effects, _execution_error) = executor
.execute_transaction_to_effects(
self.get_backing_store().as_ref(),
protocol_config,
self.metrics.limits_metrics.clone(),
expensive_checks,
self.config.certificate_deny_config.certificate_deny_set(),
&epoch_store.epoch_start_config().epoch_data().epoch_id(),
epoch_store
.epoch_start_config()
.epoch_data()
.epoch_start_timestamp(),
checked_input_objects,
gas_object_refs,
gas_status,
kind,
signer,
transaction_digest,
);
let tx_digest = *effects.transaction_digest();
let module_cache =
TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
let mut layout_resolver =
epoch_store
.executor()
.type_layout_resolver(Box::new(TemporaryPackageStore::new(
&inner_temp_store,
self.execution_cache.clone(),
)));
let object_changes = Vec::new();
let balance_changes = Vec::new();
let written_with_kind = effects
.created()
.into_iter()
.map(|(oref, _)| (oref, WriteKind::Create))
.chain(
effects
.unwrapped()
.into_iter()
.map(|(oref, _)| (oref, WriteKind::Unwrap)),
)
.chain(
effects
.mutated()
.into_iter()
.map(|(oref, _)| (oref, WriteKind::Mutate)),
)
.map(|(oref, kind)| {
let obj = inner_temp_store.written.get(&oref.0).unwrap();
(oref.0, (oref, obj.clone(), kind))
})
.collect();
Ok((
DryRunTransactionBlockResponse {
input: SuiTransactionBlockData::try_from(transaction, &module_cache).map_err(
|e| SuiError::TransactionSerializationError {
error: format!(
"Failed to convert transaction to SuiTransactionBlockData: {}",
e
),
},
)?, effects: effects.clone().try_into()?,
events: SuiTransactionBlockEvents::try_from(
inner_temp_store.events.clone(),
tx_digest,
None,
layout_resolver.as_mut(),
)?,
object_changes,
balance_changes,
},
written_with_kind,
effects,
mock_gas,
))
}
#[allow(clippy::collapsible_else_if)]
pub async fn dev_inspect_transaction_block(
&self,
sender: SuiAddress,
transaction_kind: TransactionKind,
gas_price: Option<u64>,
gas_budget: Option<u64>,
gas_sponsor: Option<SuiAddress>,
gas_objects: Option<Vec<ObjectRef>>,
show_raw_txn_data_and_effects: Option<bool>,
skip_checks: Option<bool>,
) -> SuiResult<DevInspectResults> {
let epoch_store = self.load_epoch_store_one_call_per_task();
if !self.is_fullnode(&epoch_store) {
return Err(SuiError::UnsupportedFeatureError {
error: "dev-inspect is only supported on fullnodes".to_string(),
});
}
if transaction_kind.is_system_tx() {
return Err(SuiError::UnsupportedFeatureError {
error: "system transactions are not supported".to_string(),
});
}
let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
let skip_checks = skip_checks.unwrap_or(true);
let reference_gas_price = epoch_store.reference_gas_price();
let protocol_config = epoch_store.protocol_config();
let max_tx_gas = protocol_config.max_tx_gas();
let price = gas_price.unwrap_or(reference_gas_price);
let budget = gas_budget.unwrap_or(max_tx_gas);
let owner = gas_sponsor.unwrap_or(sender);
let payment = gas_objects.unwrap_or_default();
let transaction = TransactionData::V1(TransactionDataV1 {
kind: transaction_kind.clone(),
sender,
gas_data: GasData {
payment,
owner,
price,
budget,
},
expiration: TransactionExpiration::None,
});
let raw_txn_data = if show_raw_txn_data_and_effects {
bcs::to_bytes(&transaction).map_err(|_| SuiError::TransactionSerializationError {
error: "Failed to serialize transaction during dev inspect".to_string(),
})?
} else {
vec![]
};
transaction.check_version_supported(protocol_config)?;
transaction.validity_check_no_gas_check(protocol_config)?;
let input_object_kinds = transaction.input_objects()?;
let receiving_object_refs = transaction.receiving_objects();
sui_transaction_checks::deny::check_transaction_for_signing(
&transaction,
&[],
&input_object_kinds,
&receiving_object_refs,
&self.config.transaction_deny_config,
self.get_backing_package_store().as_ref(),
)?;
let (mut input_objects, receiving_objects) = self
.input_loader
.read_objects_for_signing(
None,
&input_object_kinds,
&receiving_object_refs,
epoch_store.epoch(),
)
.await?;
let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
DEV_INSPECT_GAS_COIN_VALUE,
transaction.gas_owner(),
);
let gas_objects = if transaction.gas().is_empty() {
let gas_object_ref = dummy_gas_object.compute_object_reference();
vec![gas_object_ref]
} else {
transaction.gas().to_vec()
};
let (gas_status, checked_input_objects) = if skip_checks {
if transaction.gas().is_empty() {
input_objects.push(ObjectReadResult::new(
InputObjectKind::ImmOrOwnedMoveObject(gas_objects[0]),
dummy_gas_object.into(),
));
}
let checked_input_objects = sui_transaction_checks::check_dev_inspect_input(
protocol_config,
&transaction_kind,
input_objects,
receiving_objects,
)?;
let gas_status = SuiGasStatus::new(
max_tx_gas,
transaction.gas_price(),
reference_gas_price,
protocol_config,
)?;
(gas_status, checked_input_objects)
} else {
if transaction.gas().is_empty() {
sui_transaction_checks::check_transaction_input_with_given_gas(
epoch_store.protocol_config(),
epoch_store.reference_gas_price(),
&transaction,
input_objects,
receiving_objects,
dummy_gas_object,
&self.metrics.bytecode_verifier_metrics,
)?
} else {
sui_transaction_checks::check_transaction_input(
epoch_store.protocol_config(),
epoch_store.reference_gas_price(),
&transaction,
input_objects,
&receiving_objects,
&self.metrics.bytecode_verifier_metrics,
)?
}
};
let executor = sui_execution::executor(protocol_config, true, None)
.expect("Creating an executor should not fail here");
let intent_msg = IntentMessage::new(
Intent {
version: IntentVersion::V0,
scope: IntentScope::TransactionData,
app_id: AppId::Sui,
},
transaction,
);
let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
self.get_backing_store().as_ref(),
protocol_config,
self.metrics.limits_metrics.clone(),
false,
self.config.certificate_deny_config.certificate_deny_set(),
&epoch_store.epoch_start_config().epoch_data().epoch_id(),
epoch_store
.epoch_start_config()
.epoch_data()
.epoch_start_timestamp(),
checked_input_objects,
gas_objects,
gas_status,
transaction_kind,
sender,
transaction_digest,
skip_checks,
);
let raw_effects = if show_raw_txn_data_and_effects {
bcs::to_bytes(&effects).map_err(|_| SuiError::TransactionSerializationError {
error: "Failed to serialize transaction effects during dev inspect".to_string(),
})?
} else {
vec![]
};
let package_store =
TemporaryPackageStore::new(&inner_temp_store, self.execution_cache.clone());
let mut layout_resolver = epoch_store
.executor()
.type_layout_resolver(Box::new(package_store));
DevInspectResults::new(
effects,
inner_temp_store.events.clone(),
execution_result,
raw_txn_data,
raw_effects,
layout_resolver.as_mut(),
)
}
pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
let epoch_store = self.epoch_store_for_testing();
Ok(epoch_store.reference_gas_price())
}
pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> SuiResult<bool> {
self.execution_cache.is_tx_already_executed(digest)
}
#[instrument(level = "debug", skip_all, err)]
async fn index_tx(
&self,
indexes: &IndexStore,
digest: &TransactionDigest,
cert: &VerifiedExecutableTransaction,
effects: &TransactionEffects,
events: &TransactionEvents,
timestamp_ms: u64,
tx_coins: Option<TxCoins>,
written: &WrittenObjects,
inner_temporary_store: &InnerTemporaryStore,
) -> SuiResult<u64> {
let changes = self
.process_object_index(effects, written, inner_temporary_store)
.tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
indexes
.index_tx(
cert.data().intent_message().value.sender(),
cert.data()
.intent_message()
.value
.input_objects()?
.iter()
.map(|o| o.object_id()),
effects
.all_changed_objects()
.into_iter()
.map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
cert.data()
.intent_message()
.value
.move_calls()
.into_iter()
.map(|(package, module, function)| {
(*package, module.to_owned(), function.to_owned())
}),
events,
changes,
digest,
timestamp_ms,
tx_coins,
&inner_temporary_store.loaded_runtime_objects,
)
.await
}
#[cfg(msim)]
fn create_fail_state(
&self,
certificate: &VerifiedExecutableTransaction,
epoch_store: &Arc<AuthorityPerEpochStore>,
effects: &mut TransactionEffects,
) {
use std::cell::RefCell;
thread_local! {
static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
}
if !certificate.data().intent_message().value.is_system_tx() {
let committee = epoch_store.committee();
let cur_stake = (**committee).weight(&self.name);
if cur_stake > 0 {
FAIL_STATE.with_borrow_mut(|fail_state| {
if fail_state.0 < committee.validity_threshold() {
fail_state.0 += cur_stake;
fail_state.1.insert(self.name);
}
if fail_state.1.contains(&self.name) {
info!("cp_exec failing tx");
effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
}
});
}
}
}
fn process_object_index(
&self,
effects: &TransactionEffects,
written: &WrittenObjects,
inner_temporary_store: &InnerTemporaryStore,
) -> SuiResult<ObjectIndexChanges> {
let epoch_store = self.load_epoch_store_one_call_per_task();
let mut layout_resolver =
epoch_store
.executor()
.type_layout_resolver(Box::new(TemporaryPackageStore::new(
inner_temporary_store,
self.execution_cache.clone(),
)));
let modified_at_version = effects
.modified_at_versions()
.into_iter()
.collect::<HashMap<_, _>>();
let tx_digest = effects.transaction_digest();
let mut deleted_owners = vec![];
let mut deleted_dynamic_fields = vec![];
for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
let old_version = modified_at_version.get(&id).unwrap();
match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
|e| panic!("tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}. Err: {:?}", tx_digest, id, old_version, e),
) {
Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
Owner::ObjectOwner(object_id) => {
deleted_dynamic_fields.push((ObjectID::from(object_id), id))
}
_ => {}
}
}
let mut new_owners = vec![];
let mut new_dynamic_fields = vec![];
for (oref, owner, kind) in effects.all_changed_objects() {
let id = &oref.0;
if let WriteKind::Mutate = kind {
let Some(old_version) = modified_at_version.get(id) else {
panic!("tx_digest={:?}, error processing object owner index, cannot find modified at version for mutated object [{id}].", tx_digest);
};
let Some(old_object) = self.execution_cache.get_object_by_key(id, *old_version)?
else {
panic!("tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}", tx_digest, id, old_version);
};
if old_object.owner != owner {
match old_object.owner {
Owner::AddressOwner(addr) => {
deleted_owners.push((addr, *id));
}
Owner::ObjectOwner(object_id) => {
deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
}
_ => {}
}
}
}
match owner {
Owner::AddressOwner(addr) => {
let new_object = written.get(id).unwrap_or_else(
|| panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
);
assert_eq!(new_object.version(), oref.1, "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}", tx_digest, id, new_object.version(), oref.1);
let type_ = new_object
.type_()
.map(|type_| ObjectType::Struct(type_.clone()))
.unwrap_or(ObjectType::Package);
new_owners.push((
(addr, *id),
ObjectInfo {
object_id: *id,
version: oref.1,
digest: oref.2,
type_,
owner,
previous_transaction: *effects.transaction_digest(),
},
));
}
Owner::ObjectOwner(owner) => {
let new_object = written.get(id).unwrap_or_else(
|| panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
);
assert_eq!(new_object.version(), oref.1, "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}", tx_digest, id, new_object.version(), oref.1);
let Some(df_info) = self
.try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
.unwrap_or_else(|e| {
error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
None
}
)
else {
continue;
};
new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
}
_ => {}
}
}
Ok(ObjectIndexChanges {
deleted_owners,
deleted_dynamic_fields,
new_owners,
new_dynamic_fields,
})
}
fn try_create_dynamic_field_info(
&self,
o: &Object,
written: &WrittenObjects,
resolver: &mut dyn LayoutResolver,
) -> SuiResult<Option<DynamicFieldInfo>> {
let Some(move_object) = o.data.try_as_move().cloned() else {
return Ok(None);
};
if !move_object.type_().is_dynamic_field() {
return Ok(None);
}
let layout = resolver.get_annotated_layout(&move_object.type_().clone().into())?;
let move_struct = move_object.to_move_struct(&layout)?;
let (name_value, type_, object_id) =
DynamicFieldInfo::parse_move_object(&move_struct).tap_err(|e| warn!("{e}"))?;
let name_type = move_object.type_().try_extract_field_name(&type_)?;
let bcs_name = bcs::to_bytes(&name_value.clone().undecorate()).map_err(|e| {
SuiError::ObjectSerializationError {
error: format!("{e}"),
}
})?;
let name = DynamicFieldName {
type_: name_type,
value: SuiMoveValue::from(name_value).to_json_value(),
};
Ok(Some(match type_ {
DynamicFieldType::DynamicObject => {
let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
let version = object.version();
let digest = object.digest();
let object_type = object.data.type_().unwrap().clone();
(version, digest, object_type)
} else {
let object = self
.execution_cache
.get_object_by_key(&object_id, o.version())?
.ok_or_else(|| UserInputError::ObjectNotFound {
object_id,
version: Some(o.version()),
})?;
let version = object.version();
let digest = object.digest();
let object_type = object.data.type_().unwrap().clone();
(version, digest, object_type)
};
DynamicFieldInfo {
name,
bcs_name,
type_,
object_type: object_type.to_string(),
object_id,
version,
digest,
}
}
DynamicFieldType::DynamicField { .. } => DynamicFieldInfo {
name,
bcs_name,
type_,
object_type: move_object.into_type().into_type_params()[1].to_string(),
object_id: o.id(),
version: o.version(),
digest: o.digest(),
},
}))
}
#[instrument(level = "trace", skip_all, err)]
async fn post_process_one_tx(
&self,
certificate: &VerifiedExecutableTransaction,
effects: &TransactionEffects,
inner_temporary_store: &InnerTemporaryStore,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult {
if self.indexes.is_none() {
return Ok(());
}
let tx_digest = certificate.digest();
let timestamp_ms = Self::unixtime_now_ms();
let events = &inner_temporary_store.events;
let written = &inner_temporary_store.written;
let tx_coins =
self.fullnode_only_get_tx_coins_for_indexing(inner_temporary_store, epoch_store);
if let Some(indexes) = &self.indexes {
let _ = self
.index_tx(
indexes.as_ref(),
tx_digest,
certificate,
effects,
events,
timestamp_ms,
tx_coins,
written,
inner_temporary_store,
)
.await
.tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
.tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
.expect("Indexing tx should not fail");
let effects: SuiTransactionBlockEffects = effects.clone().try_into()?;
let events = self.make_transaction_block_events(
events.clone(),
*tx_digest,
timestamp_ms,
epoch_store,
inner_temporary_store,
)?;
self.subscription_handler
.process_tx(certificate.data().transaction_data(), &effects, &events)
.await
.tap_ok(|_| {
self.metrics
.post_processing_total_tx_had_event_processed
.inc()
})
.tap_err(|e| {
warn!(
?tx_digest,
"Post processing - Couldn't process events for tx: {}", e
)
})?;
self.metrics
.post_processing_total_events_emitted
.inc_by(events.data.len() as u64);
};
Ok(())
}
fn make_transaction_block_events(
&self,
transaction_events: TransactionEvents,
digest: TransactionDigest,
timestamp_ms: u64,
epoch_store: &Arc<AuthorityPerEpochStore>,
inner_temporary_store: &InnerTemporaryStore,
) -> SuiResult<SuiTransactionBlockEvents> {
let mut layout_resolver =
epoch_store
.executor()
.type_layout_resolver(Box::new(TemporaryPackageStore::new(
inner_temporary_store,
self.execution_cache.clone(),
)));
SuiTransactionBlockEvents::try_from(
transaction_events,
digest,
Some(timestamp_ms),
layout_resolver.as_mut(),
)
}
pub fn unixtime_now_ms() -> u64 {
let ts_ms = Utc::now().timestamp_millis();
u64::try_from(ts_ms).expect("Travelling in time machine")
}
#[instrument(level = "trace", skip_all)]
pub async fn handle_transaction_info_request(
&self,
request: TransactionInfoRequest,
) -> SuiResult<TransactionInfoResponse> {
let epoch_store = self.load_epoch_store_one_call_per_task();
let (transaction, status) = self
.get_transaction_status(&request.transaction_digest, &epoch_store)?
.ok_or(SuiError::TransactionNotFound {
digest: request.transaction_digest,
})?;
Ok(TransactionInfoResponse {
transaction,
status,
})
}
#[instrument(level = "trace", skip_all)]
pub async fn handle_object_info_request(
&self,
request: ObjectInfoRequest,
) -> SuiResult<ObjectInfoResponse> {
let epoch_store = self.load_epoch_store_one_call_per_task();
let requested_object_seq = match request.request_kind {
ObjectInfoRequestKind::LatestObjectInfo => {
let (_, seq, _) = self
.get_object_or_tombstone(request.object_id)
.await?
.ok_or_else(|| {
SuiError::from(UserInputError::ObjectNotFound {
object_id: request.object_id,
version: None,
})
})?;
seq
}
ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
};
let object = self
.execution_cache
.get_object_by_key(&request.object_id, requested_object_seq)?
.ok_or_else(|| {
SuiError::from(UserInputError::ObjectNotFound {
object_id: request.object_id,
version: Some(requested_object_seq),
})
})?;
let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
(request.generate_layout, object.data.try_as_move())
{
Some(
self.load_epoch_store_one_call_per_task()
.executor()
.type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
.get_annotated_layout(&move_obj.type_().clone().into())?,
)
} else {
None
};
let lock = if !object.is_address_owned() {
None
} else {
self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
.await?
.map(|s| s.into_inner())
};
Ok(ObjectInfoResponse {
object,
layout,
lock_for_debugging: lock,
})
}
#[instrument(level = "trace", skip_all)]
pub fn handle_checkpoint_request(
&self,
request: &CheckpointRequest,
) -> SuiResult<CheckpointResponse> {
let summary = match request.sequence_number {
Some(seq) => self
.checkpoint_store
.get_checkpoint_by_sequence_number(seq)?,
None => self.checkpoint_store.get_latest_certified_checkpoint(),
}
.map(|v| v.into_inner());
let contents = match &summary {
Some(s) => self
.checkpoint_store
.get_checkpoint_contents(&s.content_digest)?,
None => None,
};
Ok(CheckpointResponse {
checkpoint: summary,
contents,
})
}
#[instrument(level = "trace", skip_all)]
pub fn handle_checkpoint_request_v2(
&self,
request: &CheckpointRequestV2,
) -> SuiResult<CheckpointResponseV2> {
let summary = if request.certified {
let summary = match request.sequence_number {
Some(seq) => self
.checkpoint_store
.get_checkpoint_by_sequence_number(seq)?,
None => self.checkpoint_store.get_latest_certified_checkpoint(),
}
.map(|v| v.into_inner());
summary.map(CheckpointSummaryResponse::Certified)
} else {
let summary = match request.sequence_number {
Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
None => self
.checkpoint_store
.get_latest_locally_computed_checkpoint(),
};
summary.map(CheckpointSummaryResponse::Pending)
};
let contents = match &summary {
Some(s) => self
.checkpoint_store
.get_checkpoint_contents(&s.content_digest())?,
None => None,
};
Ok(CheckpointResponseV2 {
checkpoint: summary,
contents,
})
}
fn check_protocol_version(
supported_protocol_versions: SupportedProtocolVersions,
current_version: ProtocolVersion,
) {
info!("current protocol version is now {:?}", current_version);
info!("supported versions are: {:?}", supported_protocol_versions);
if !supported_protocol_versions.is_version_supported(current_version) {
let msg = format!(
"Unsupported protocol version. The network is at {:?}, but this SuiNode only supports: {:?}. Shutting down.",
current_version, supported_protocol_versions,
);
error!("{}", msg);
eprintln!("{}", msg);
#[cfg(not(msim))]
std::process::exit(1);
#[cfg(msim)]
sui_simulator::task::shutdown_current_node();
}
}
#[allow(clippy::disallowed_methods)] pub async fn new(
name: AuthorityName,
secret: StableSyncAuthoritySigner,
supported_protocol_versions: SupportedProtocolVersions,
store: Arc<AuthorityStore>,
execution_cache: Arc<ExecutionCache>,
epoch_store: Arc<AuthorityPerEpochStore>,
committee_store: Arc<CommitteeStore>,
indexes: Option<Arc<IndexStore>>,
checkpoint_store: Arc<CheckpointStore>,
prometheus_registry: &Registry,
genesis_objects: &[Object],
db_checkpoint_config: &DBCheckpointConfig,
config: NodeConfig,
indirect_objects_threshold: usize,
archive_readers: ArchiveReaderBalancer,
) -> Arc<Self> {
Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
let transaction_manager = Arc::new(TransactionManager::new(
execution_cache.clone(),
&epoch_store,
tx_ready_certificates,
metrics.clone(),
));
let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
epoch_store.get_parent_path(),
&config.authority_store_pruning_config,
);
let _pruner = AuthorityStorePruner::new(
store.perpetual_tables.clone(),
checkpoint_store.clone(),
store.objects_lock_table.clone(),
config.authority_store_pruning_config,
epoch_store.committee().authority_exists(&name),
epoch_store.epoch_start_state().epoch_duration_ms(),
prometheus_registry,
indirect_objects_threshold,
archive_readers,
);
let input_loader = TransactionInputLoader::new(execution_cache.clone());
let cache_pointers = ExecutionCacheTraitPointers::new(&execution_cache);
let epoch = epoch_store.epoch();
let state = Arc::new(AuthorityState {
name,
secret,
execution_lock: RwLock::new(epoch),
epoch_store: ArcSwap::new(epoch_store.clone()),
input_loader,
execution_cache,
execution_cache_trait_pointers: cache_pointers,
indexes,
subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
checkpoint_store,
committee_store,
transaction_manager,
tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
metrics,
_pruner,
_authority_per_epoch_pruner,
db_checkpoint_config: db_checkpoint_config.clone(),
config,
overload_info: AuthorityOverloadInfo::default(),
});
let authority_state = Arc::downgrade(&state);
spawn_monitored_task!(execution_process(
authority_state,
rx_ready_certificates,
rx_execution_shutdown,
));
state
.create_owner_index_if_empty(genesis_objects, &epoch_store)
.expect("Error indexing genesis objects.");
state
}
pub fn get_execution_cache(&self) -> Arc<ExecutionCache> {
self.execution_cache.clone()
}
pub fn get_cache_reader(&self) -> &Arc<dyn ExecutionCacheRead> {
&self.execution_cache_trait_pointers.cache_reader
}
pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
&self.execution_cache_trait_pointers.backing_store
}
pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
&self.execution_cache_trait_pointers.backing_package_store
}
pub fn get_effects_notify_read(&self) -> &NotifyReadWrapper<ExecutionCache> {
&self.execution_cache_trait_pointers.effects_notify_read
}
pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
&self.execution_cache_trait_pointers.object_store
}
pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
&self.execution_cache_trait_pointers.reconfig_api
}
pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
&self.execution_cache_trait_pointers.accumulator_store
}
pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
&self.execution_cache_trait_pointers.checkpoint_cache
}
pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
&self.execution_cache_trait_pointers.state_sync_store
}
pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
&self.execution_cache_trait_pointers.cache_commit
}
pub fn database_for_testing(&self) -> &Arc<AuthorityStore> {
self.execution_cache.store_for_testing()
}
pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
&self,
config: NodeConfig,
metrics: Arc<AuthorityStorePruningMetrics>,
) -> anyhow::Result<()> {
let archive_readers =
ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
&self.execution_cache.store_for_testing().perpetual_tables,
&self.checkpoint_store,
&self.execution_cache.store_for_testing().objects_lock_table,
config.authority_store_pruning_config,
metrics,
config.indirect_objects_threshold,
archive_readers,
EPOCH_DURATION_MS_FOR_TESTING,
)
.await
}
pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
&self.transaction_manager
}
pub fn enqueue_certificates_for_execution(
&self,
certs: Vec<VerifiedCertificate>,
epoch_store: &Arc<AuthorityPerEpochStore>,
) {
self.transaction_manager
.enqueue_certificates(certs, epoch_store)
}
pub(crate) fn enqueue_with_expected_effects_digest(
&self,
certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
epoch_store: &AuthorityPerEpochStore,
) {
self.transaction_manager
.enqueue_with_expected_effects_digest(certs, epoch_store)
}
fn create_owner_index_if_empty(
&self,
genesis_objects: &[Object],
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult {
let Some(index_store) = &self.indexes else {
return Ok(());
};
if !index_store.is_empty() {
return Ok(());
}
let mut new_owners = vec![];
let mut new_dynamic_fields = vec![];
let mut layout_resolver = epoch_store
.executor()
.type_layout_resolver(Box::new(self.execution_cache.as_ref()));
for o in genesis_objects.iter() {
match o.owner {
Owner::AddressOwner(addr) => new_owners.push((
(addr, o.id()),
ObjectInfo::new(&o.compute_object_reference(), o),
)),
Owner::ObjectOwner(object_id) => {
let id = o.id();
let Some(info) = self.try_create_dynamic_field_info(
o,
&BTreeMap::new(),
layout_resolver.as_mut(),
)?
else {
continue;
};
new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
}
_ => {}
}
}
index_store.insert_genesis_objects(ObjectIndexChanges {
deleted_owners: vec![],
deleted_dynamic_fields: vec![],
new_owners,
new_dynamic_fields,
})
}
pub async fn execution_lock_for_executable_transaction(
&self,
transaction: &VerifiedExecutableTransaction,
) -> SuiResult<ExecutionLockReadGuard> {
let lock = self.execution_lock.read().await;
if *lock == transaction.auth_sig().epoch() {
Ok(lock)
} else {
Err(SuiError::WrongEpoch {
expected_epoch: *lock,
actual_epoch: transaction.auth_sig().epoch(),
})
}
}
pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard {
self.execution_lock.write().await
}
#[instrument(level = "error", skip_all)]
pub async fn reconfigure(
&self,
cur_epoch_store: &AuthorityPerEpochStore,
supported_protocol_versions: SupportedProtocolVersions,
new_committee: Committee,
epoch_start_configuration: EpochStartConfiguration,
checkpoint_executor: &CheckpointExecutor,
accumulator: Arc<StateAccumulator>,
expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
) -> SuiResult<Arc<AuthorityPerEpochStore>> {
Self::check_protocol_version(
supported_protocol_versions,
epoch_start_configuration
.epoch_start_state()
.protocol_version(),
);
self.committee_store.insert_new_committee(&new_committee)?;
let mut execution_lock = self.execution_lock_for_reconfiguration().await;
self.revert_uncommitted_epoch_transactions(cur_epoch_store)
.await?;
self.execution_cache
.clear_state_end_of_epoch(&execution_lock);
self.check_system_consistency(
cur_epoch_store,
checkpoint_executor,
accumulator,
expensive_safety_check_config,
);
self.maybe_reaccumulate_state_hash(
cur_epoch_store,
epoch_start_configuration
.epoch_start_state()
.protocol_version(),
);
self.execution_cache
.set_epoch_start_configuration(&epoch_start_configuration)?;
if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
if self
.db_checkpoint_config
.perform_db_checkpoints_at_epoch_end
{
let checkpoint_indexes = self
.db_checkpoint_config
.perform_index_db_checkpoints_at_epoch_end
.unwrap_or(false);
let current_epoch = cur_epoch_store.epoch();
let epoch_checkpoint_path =
checkpoint_path.join(format!("epoch_{}", current_epoch));
self.checkpoint_all_dbs(
&epoch_checkpoint_path,
cur_epoch_store,
checkpoint_indexes,
)?;
}
}
let new_epoch = new_committee.epoch;
let new_epoch_store = self
.reopen_epoch_db(
cur_epoch_store,
new_committee,
epoch_start_configuration,
expensive_safety_check_config,
)
.await?;
assert_eq!(new_epoch_store.epoch(), new_epoch);
self.transaction_manager.reconfigure(new_epoch);
*execution_lock = new_epoch;
Ok(new_epoch_store)
}
#[instrument(level = "error", skip_all)]
fn maybe_reaccumulate_state_hash(
&self,
cur_epoch_store: &AuthorityPerEpochStore,
new_protocol_version: ProtocolVersion,
) {
self.execution_cache
.maybe_reaccumulate_state_hash(cur_epoch_store, new_protocol_version);
}
#[instrument(level = "error", skip_all)]
fn check_system_consistency(
&self,
cur_epoch_store: &AuthorityPerEpochStore,
checkpoint_executor: &CheckpointExecutor,
accumulator: Arc<StateAccumulator>,
expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
) {
info!(
"Performing sui conservation consistency check for epoch {}",
cur_epoch_store.epoch()
);
if let Err(err) = self
.execution_cache
.expensive_check_sui_conservation(cur_epoch_store)
{
if cfg!(debug_assertions) {
panic!("{}", err);
} else {
warn!("Sui conservation consistency check failed: {}", err);
}
} else {
info!("Sui conservation consistency check passed");
}
if expensive_safety_check_config.enable_state_consistency_check() {
info!(
"Performing state consistency check for epoch {}",
cur_epoch_store.epoch()
);
self.expensive_check_is_consistent_state(
checkpoint_executor,
accumulator,
cur_epoch_store,
cfg!(debug_assertions), );
}
if expensive_safety_check_config.enable_secondary_index_checks() {
if let Some(indexes) = self.indexes.clone() {
verify_indexes(&*self.execution_cache, indexes)
.expect("secondary indexes are inconsistent");
}
}
}
fn expensive_check_is_consistent_state(
&self,
checkpoint_executor: &CheckpointExecutor,
accumulator: Arc<StateAccumulator>,
cur_epoch_store: &AuthorityPerEpochStore,
panic: bool,
) {
let live_object_set_hash = accumulator.digest_live_object_set(
!cur_epoch_store
.protocol_config()
.simplified_unwrap_then_delete(),
);
let root_state_hash: ECMHLiveObjectSetDigest = self
.execution_cache
.get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
.expect("Retrieving root state hash cannot fail")
.expect("Root state hash for epoch must exist")
.1
.digest()
.into();
let is_inconsistent = root_state_hash != live_object_set_hash;
if is_inconsistent {
if panic {
panic!(
"Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
root_state_hash, live_object_set_hash
);
} else {
error!(
"Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
root_state_hash, live_object_set_hash
);
}
} else {
info!("State consistency check passed");
}
if !panic {
checkpoint_executor.set_inconsistent_state(is_inconsistent);
}
}
pub fn current_epoch_for_testing(&self) -> EpochId {
self.epoch_store_for_testing().epoch()
}
#[instrument(level = "error", skip_all)]
pub fn checkpoint_all_dbs(
&self,
checkpoint_path: &Path,
cur_epoch_store: &AuthorityPerEpochStore,
checkpoint_indexes: bool,
) -> SuiResult {
let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
let current_epoch = cur_epoch_store.epoch();
if checkpoint_path.exists() {
info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
return Ok(());
}
let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
if checkpoint_path_tmp.exists() {
fs::remove_dir_all(&checkpoint_path_tmp)
.map_err(|e| SuiError::FileIOError(e.to_string()))?;
}
fs::create_dir_all(&checkpoint_path_tmp)
.map_err(|e| SuiError::FileIOError(e.to_string()))?;
fs::create_dir(&store_checkpoint_path_tmp)
.map_err(|e| SuiError::FileIOError(e.to_string()))?;
self.checkpoint_store
.checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
self.execution_cache
.checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
self.committee_store
.checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
if checkpoint_indexes {
if let Some(indexes) = self.indexes.as_ref() {
indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
}
}
fs::rename(checkpoint_path_tmp, checkpoint_path)
.map_err(|e| SuiError::FileIOError(e.to_string()))?;
Ok(())
}
pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
self.epoch_store.load()
}
pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
self.load_epoch_store_one_call_per_task()
}
pub fn clone_committee_for_testing(&self) -> Committee {
Committee::clone(self.epoch_store_for_testing().committee())
}
#[instrument(level = "trace", skip_all)]
pub async fn get_object(&self, object_id: &ObjectID) -> SuiResult<Option<Object>> {
self.execution_cache
.get_object(object_id)
.map_err(Into::into)
}
pub async fn get_sui_system_package_object_ref(&self) -> SuiResult<ObjectRef> {
Ok(self
.get_object(&SUI_SYSTEM_ADDRESS.into())
.await?
.expect("framework object should always exist")
.compute_object_reference())
}
pub fn get_sui_system_state_object_for_testing(&self) -> SuiResult<SuiSystemState> {
self.execution_cache.get_sui_system_state_object_unsafe()
}
#[instrument(level = "trace", skip_all)]
fn get_transaction_checkpoint_sequence(
&self,
digest: &TransactionDigest,
epoch_store: &AuthorityPerEpochStore,
) -> SuiResult<Option<CheckpointSequenceNumber>> {
epoch_store.get_transaction_checkpoint(digest)
}
#[instrument(level = "trace", skip_all)]
pub fn get_checkpoint_by_sequence_number(
&self,
sequence_number: CheckpointSequenceNumber,
) -> SuiResult<Option<VerifiedCheckpoint>> {
Ok(self
.checkpoint_store
.get_checkpoint_by_sequence_number(sequence_number)?)
}
#[instrument(level = "trace", skip_all)]
pub fn get_transaction_checkpoint_for_tests(
&self,
digest: &TransactionDigest,
epoch_store: &AuthorityPerEpochStore,
) -> SuiResult<Option<VerifiedCheckpoint>> {
let checkpoint = self.get_transaction_checkpoint_sequence(digest, epoch_store)?;
let Some(checkpoint) = checkpoint else {
return Ok(None);
};
let checkpoint = self
.checkpoint_store
.get_checkpoint_by_sequence_number(checkpoint)?;
Ok(checkpoint)
}
#[instrument(level = "trace", skip_all)]
pub fn get_object_read(&self, object_id: &ObjectID) -> SuiResult<ObjectRead> {
Ok(
match self
.execution_cache
.get_latest_object_or_tombstone(*object_id)?
{
Some((_, ObjectOrTombstone::Object(object))) => {
let layout = self.get_object_layout(&object)?;
ObjectRead::Exists(object.compute_object_reference(), object, layout)
}
Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
None => ObjectRead::NotExists(*object_id),
},
)
}
pub fn get_chain_identifier(&self) -> Option<ChainIdentifier> {
if let Some(digest) = CHAIN_IDENTIFIER.get() {
return Some(*digest);
}
let checkpoint = self
.get_checkpoint_by_sequence_number(0)
.tap_err(|e| error!("Failed to get genesis checkpoint: {:?}", e))
.ok()?
.tap_none(|| error!("Genesis checkpoint is missing from DB"))?;
let _ = CHAIN_IDENTIFIER.set(ChainIdentifier::from(*checkpoint.digest()));
Some(ChainIdentifier::from(*checkpoint.digest()))
}
#[instrument(level = "trace", skip_all)]
pub fn get_move_object<T>(&self, object_id: &ObjectID) -> SuiResult<T>
where
T: DeserializeOwned,
{
let o = self.get_object_read(object_id)?.into_object()?;
if let Some(move_object) = o.data.try_as_move() {
Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
SuiError::ObjectDeserializationError {
error: format!("{e}"),
}
})?)
} else {
Err(SuiError::ObjectDeserializationError {
error: format!("Provided object : [{object_id}] is not a Move object."),
})
}
}
#[instrument(level = "trace", skip_all)]
pub fn get_past_object_read(
&self,
object_id: &ObjectID,
version: SequenceNumber,
) -> SuiResult<PastObjectRead> {
let Some(obj_ref) = self
.execution_cache
.get_latest_object_ref_or_tombstone(*object_id)?
else {
return Ok(PastObjectRead::ObjectNotExists(*object_id));
};
if version > obj_ref.1 {
return Ok(PastObjectRead::VersionTooHigh {
object_id: *object_id,
asked_version: version,
latest_version: obj_ref.1,
});
}
if version < obj_ref.1 {
return Ok(match self.read_object_at_version(object_id, version)? {
Some((object, layout)) => {
let obj_ref = object.compute_object_reference();
PastObjectRead::VersionFound(obj_ref, object, layout)
}
None => PastObjectRead::VersionNotFound(*object_id, version),
});
}
if !obj_ref.2.is_alive() {
return Ok(PastObjectRead::ObjectDeleted(obj_ref));
}
match self.read_object_at_version(object_id, obj_ref.1)? {
Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
None => {
error!(
"Object with in parent_entry is missing from object store, datastore is \
inconsistent",
);
Err(UserInputError::ObjectNotFound {
object_id: *object_id,
version: Some(obj_ref.1),
}
.into())
}
}
}
#[instrument(level = "trace", skip_all)]
fn read_object_at_version(
&self,
object_id: &ObjectID,
version: SequenceNumber,
) -> SuiResult<Option<(Object, Option<MoveStructLayout>)>> {
let Some(object) = self.execution_cache.get_object_by_key(object_id, version)? else {
return Ok(None);
};
let layout = self.get_object_layout(&object)?;
Ok(Some((object, layout)))
}
fn get_object_layout(&self, object: &Object) -> SuiResult<Option<MoveStructLayout>> {
let layout = object
.data
.try_as_move()
.map(|object| {
self.load_epoch_store_one_call_per_task()
.executor()
.type_layout_resolver(Box::new(self.execution_cache.as_ref()))
.get_annotated_layout(&object.type_().clone().into())
})
.transpose()?;
Ok(layout)
}
fn get_owner_at_version(
&self,
object_id: &ObjectID,
version: SequenceNumber,
) -> SuiResult<Owner> {
self.execution_cache
.get_object_by_key(object_id, version)?
.ok_or_else(|| {
SuiError::from(UserInputError::ObjectNotFound {
object_id: *object_id,
version: Some(version),
})
})
.map(|o| o.owner)
}
#[instrument(level = "trace", skip_all)]
pub fn get_owner_objects(
&self,
owner: SuiAddress,
cursor: Option<ObjectID>,
limit: usize,
filter: Option<SuiObjectDataFilter>,
) -> SuiResult<Vec<ObjectInfo>> {
if let Some(indexes) = &self.indexes {
indexes.get_owner_objects(owner, cursor, limit, filter)
} else {
Err(SuiError::IndexStoreNotAvailable)
}
}
#[instrument(level = "trace", skip_all)]
pub fn get_owned_coins_iterator_with_cursor(
&self,
owner: SuiAddress,
cursor: (String, ObjectID),
limit: usize,
one_coin_type_only: bool,
) -> SuiResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
if let Some(indexes) = &self.indexes {
indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
} else {
Err(SuiError::IndexStoreNotAvailable)
}
}
#[instrument(level = "trace", skip_all)]
pub fn get_owner_objects_iterator(
&self,
owner: SuiAddress,
cursor: Option<ObjectID>,
filter: Option<SuiObjectDataFilter>,
) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
if let Some(indexes) = &self.indexes {
indexes.get_owner_objects_iterator(owner, cursor_u, filter)
} else {
Err(SuiError::IndexStoreNotAvailable)
}
}
#[instrument(level = "trace", skip_all)]
pub async fn get_move_objects<T>(
&self,
owner: SuiAddress,
type_: MoveObjectType,
) -> SuiResult<Vec<T>>
where
T: DeserializeOwned,
{
let object_ids = self
.get_owner_objects_iterator(owner, None, None)?
.filter(|o| match &o.type_ {
ObjectType::Struct(s) => &type_ == s,
ObjectType::Package => false,
})
.map(|info| ObjectKey(info.object_id, info.version))
.collect::<Vec<_>>();
let mut move_objects = vec![];
let objects = self.execution_cache.multi_get_objects_by_key(&object_ids)?;
for (o, id) in objects.into_iter().zip(object_ids) {
let object = o.ok_or_else(|| {
SuiError::from(UserInputError::ObjectNotFound {
object_id: id.0,
version: Some(id.1),
})
})?;
let move_object = object.data.try_as_move().ok_or_else(|| {
SuiError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
})?;
move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
SuiError::ObjectDeserializationError {
error: format!("{e}"),
}
})?);
}
Ok(move_objects)
}
#[instrument(level = "trace", skip_all)]
pub fn get_dynamic_fields(
&self,
owner: ObjectID,
cursor: Option<ObjectID>,
limit: usize,
) -> SuiResult<Vec<(ObjectID, DynamicFieldInfo)>> {
Ok(self
.get_dynamic_fields_iterator(owner, cursor)?
.take(limit)
.collect::<Result<Vec<_>, _>>()?)
}
fn get_dynamic_fields_iterator(
&self,
owner: ObjectID,
cursor: Option<ObjectID>,
) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
{
if let Some(indexes) = &self.indexes {
indexes.get_dynamic_fields_iterator(owner, cursor)
} else {
Err(SuiError::IndexStoreNotAvailable)
}
}
#[instrument(level = "trace", skip_all)]
pub fn get_dynamic_field_object_id(
&self,
owner: ObjectID,
name_type: TypeTag,
name_bcs_bytes: &[u8],
) -> SuiResult<Option<ObjectID>> {
if let Some(indexes) = &self.indexes {
indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
} else {
Err(SuiError::IndexStoreNotAvailable)
}
}
#[instrument(level = "trace", skip_all)]
pub fn get_total_transaction_blocks(&self) -> SuiResult<u64> {
Ok(self.get_indexes()?.next_sequence_number())
}
#[instrument(level = "trace", skip_all)]
pub async fn get_executed_transaction_and_effects(
&self,
digest: TransactionDigest,
kv_store: Arc<TransactionKeyValueStore>,
) -> SuiResult<(Transaction, TransactionEffects)> {
let transaction = kv_store.get_tx(digest).await?;
let effects = kv_store.get_fx_by_tx_digest(digest).await?;
Ok((transaction, effects))
}
#[instrument(level = "trace", skip_all)]
pub fn multi_get_checkpoint_by_sequence_number(
&self,
sequence_numbers: &[CheckpointSequenceNumber],
) -> SuiResult<Vec<Option<VerifiedCheckpoint>>> {
Ok(self
.checkpoint_store
.multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
}
#[instrument(level = "trace", skip_all)]
pub fn get_transaction_events(
&self,
digest: &TransactionEventsDigest,
) -> SuiResult<TransactionEvents> {
self.execution_cache
.get_events(digest)?
.ok_or(SuiError::TransactionEventsNotFound { digest: *digest })
}
pub fn get_transaction_input_objects(
&self,
effects: &TransactionEffects,
) -> anyhow::Result<Vec<Object>> {
let input_object_keys = effects
.modified_at_versions()
.into_iter()
.map(|(object_id, version)| ObjectKey(object_id, version))
.collect::<Vec<_>>();
let input_objects = self
.get_object_store()
.multi_get_objects_by_key(&input_object_keys)?
.into_iter()
.enumerate()
.map(|(idx, maybe_object)| {
maybe_object.ok_or_else(|| {
anyhow::anyhow!(
"missing input object key {:?} from tx {}",
input_object_keys[idx],
effects.transaction_digest()
)
})
})
.collect::<anyhow::Result<Vec<_>>>()?;
Ok(input_objects)
}
pub fn get_transaction_output_objects(
&self,
effects: &TransactionEffects,
) -> anyhow::Result<Vec<Object>> {
let output_object_keys = effects
.all_changed_objects()
.into_iter()
.map(|(object_ref, _owner, _kind)| ObjectKey::from(object_ref))
.collect::<Vec<_>>();
let output_objects = self
.get_object_store()
.multi_get_objects_by_key(&output_object_keys)?
.into_iter()
.enumerate()
.map(|(idx, maybe_object)| {
maybe_object.ok_or_else(|| {
anyhow::anyhow!(
"missing output object key {:?} from tx {}",
output_object_keys[idx],
effects.transaction_digest()
)
})
})
.collect::<anyhow::Result<Vec<_>>>()?;
Ok(output_objects)
}
fn get_indexes(&self) -> SuiResult<Arc<IndexStore>> {
match &self.indexes {
Some(i) => Ok(i.clone()),
None => Err(SuiError::UnsupportedFeatureError {
error: "extended object indexing is not enabled on this server".into(),
}),
}
}
#[instrument(level = "trace", skip_all)]
pub fn loaded_child_object_versions(
&self,
transaction_digest: &TransactionDigest,
) -> SuiResult<Option<Vec<(ObjectID, SequenceNumber)>>> {
self.get_indexes()?
.loaded_child_object_versions(transaction_digest)
}
pub async fn get_transactions_for_tests(
self: &Arc<Self>,
filter: Option<TransactionFilter>,
cursor: Option<TransactionDigest>,
limit: Option<usize>,
reverse: bool,
) -> SuiResult<Vec<TransactionDigest>> {
let metrics = KeyValueStoreMetrics::new_for_tests();
let kv_store = Arc::new(TransactionKeyValueStore::new(
"rocksdb",
metrics,
self.clone(),
));
self.get_transactions(&kv_store, filter, cursor, limit, reverse)
.await
}
#[instrument(level = "trace", skip_all)]
pub async fn get_transactions(
&self,
kv_store: &Arc<TransactionKeyValueStore>,
filter: Option<TransactionFilter>,
cursor: Option<TransactionDigest>,
limit: Option<usize>,
reverse: bool,
) -> SuiResult<Vec<TransactionDigest>> {
if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
let iter = checkpoint_contents.iter().map(|c| c.transaction);
if reverse {
let iter = iter
.rev()
.skip_while(|d| cursor.is_some() && Some(*d) != cursor)
.skip(usize::from(cursor.is_some()));
return Ok(iter.take(limit.unwrap_or(usize::max_value())).collect());
} else {
let iter = iter
.skip_while(|d| cursor.is_some() && Some(*d) != cursor)
.skip(usize::from(cursor.is_some()));
return Ok(iter.take(limit.unwrap_or(usize::max_value())).collect());
}
}
self.get_indexes()?
.get_transactions(filter, cursor, limit, reverse)
}
pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
&self.checkpoint_store
}
pub fn get_latest_checkpoint_sequence_number(&self) -> SuiResult<CheckpointSequenceNumber> {
self.get_checkpoint_store()
.get_highest_executed_checkpoint_seq_number()?
.ok_or(SuiError::UserInputError {
error: UserInputError::LatestCheckpointSequenceNumberNotFound,
})
}
#[cfg(msim)]
pub fn get_highest_pruned_checkpoint_for_testing(&self) -> SuiResult<CheckpointSequenceNumber> {
self.database_for_testing()
.perpetual_tables
.get_highest_pruned_checkpoint()
}
#[instrument(level = "trace", skip_all)]
pub fn get_checkpoint_summary_by_sequence_number(
&self,
sequence_number: CheckpointSequenceNumber,
) -> SuiResult<CheckpointSummary> {
let verified_checkpoint = self
.get_checkpoint_store()
.get_checkpoint_by_sequence_number(sequence_number)?;
match verified_checkpoint {
Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
None => Err(SuiError::UserInputError {
error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
}),
}
}
#[instrument(level = "trace", skip_all)]
pub fn get_checkpoint_summary_by_digest(
&self,
digest: CheckpointDigest,
) -> SuiResult<CheckpointSummary> {
let verified_checkpoint = self
.get_checkpoint_store()
.get_checkpoint_by_digest(&digest)?;
match verified_checkpoint {
Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
None => Err(SuiError::UserInputError {
error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
}),
}
}
#[instrument(level = "trace", skip_all)]
pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> SuiResult<TransactionDigest> {
if is_system_package(package_id) {
return self.find_genesis_txn_digest();
}
Ok(self
.get_object_read(&package_id)?
.into_object()?
.previous_transaction)
}
#[instrument(level = "trace", skip_all)]
pub fn find_genesis_txn_digest(&self) -> SuiResult<TransactionDigest> {
let summary = self
.get_verified_checkpoint_by_sequence_number(0)?
.into_message();
let content = self.get_checkpoint_contents(summary.content_digest)?;
let genesis_transaction = content.enumerate_transactions(&summary).next();
Ok(genesis_transaction
.ok_or(SuiError::UserInputError {
error: UserInputError::GenesisTransactionNotFound,
})?
.1
.transaction)
}
#[instrument(level = "trace", skip_all)]
pub fn get_verified_checkpoint_by_sequence_number(
&self,
sequence_number: CheckpointSequenceNumber,
) -> SuiResult<VerifiedCheckpoint> {
let verified_checkpoint = self
.get_checkpoint_store()
.get_checkpoint_by_sequence_number(sequence_number)?;
match verified_checkpoint {
Some(verified_checkpoint) => Ok(verified_checkpoint),
None => Err(SuiError::UserInputError {
error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
}),
}
}
#[instrument(level = "trace", skip_all)]
pub fn get_verified_checkpoint_summary_by_digest(
&self,
digest: CheckpointDigest,
) -> SuiResult<VerifiedCheckpoint> {
let verified_checkpoint = self
.get_checkpoint_store()
.get_checkpoint_by_digest(&digest)?;
match verified_checkpoint {
Some(verified_checkpoint) => Ok(verified_checkpoint),
None => Err(SuiError::UserInputError {
error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
}),
}
}
#[instrument(level = "trace", skip_all)]
pub fn get_checkpoint_contents(
&self,
digest: CheckpointContentsDigest,
) -> SuiResult<CheckpointContents> {
self.get_checkpoint_store()
.get_checkpoint_contents(&digest)?
.ok_or(SuiError::UserInputError {
error: UserInputError::CheckpointContentsNotFound(digest),
})
}
#[instrument(level = "trace", skip_all)]
pub fn get_checkpoint_contents_by_sequence_number(
&self,
sequence_number: CheckpointSequenceNumber,
) -> SuiResult<CheckpointContents> {
let verified_checkpoint = self
.get_checkpoint_store()
.get_checkpoint_by_sequence_number(sequence_number)?;
match verified_checkpoint {
Some(verified_checkpoint) => {
let content_digest = verified_checkpoint.into_inner().content_digest;
self.get_checkpoint_contents(content_digest)
}
None => Err(SuiError::UserInputError {
error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
}),
}
}
#[instrument(level = "trace", skip_all)]
pub async fn query_events(
&self,
kv_store: &Arc<TransactionKeyValueStore>,
query: EventFilter,
cursor: Option<EventID>,
limit: usize,
descending: bool,
) -> SuiResult<Vec<SuiEvent>> {
let index_store = self.get_indexes()?;
let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
SuiError::TransactionNotFound {
digest: cursor.tx_digest,
},
)?;
(tx_seq, cursor.event_seq as usize)
} else if descending {
(u64::MAX, usize::MAX)
} else {
(0, 0)
};
let limit = limit + 1;
let mut event_keys = match query {
EventFilter::All(filters) => {
if filters.is_empty() {
index_store.all_events(tx_num, event_num, limit, descending)?
} else {
return Err(SuiError::UserInputError {
error: UserInputError::Unsupported(
"This query type does not currently support filter combinations"
.to_string(),
),
});
}
}
EventFilter::Transaction(digest) => {
index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
}
EventFilter::MoveModule { package, module } => {
let module_id = ModuleId::new(package.into(), module);
index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
}
EventFilter::MoveEventType(struct_name) => index_store
.events_by_move_event_struct_name(
&struct_name,
tx_num,
event_num,
limit,
descending,
)?,
EventFilter::Sender(sender) => {
index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
}
EventFilter::TimeRange {
start_time,
end_time,
} => index_store
.event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
EventFilter::MoveEventModule { package, module } => index_store
.events_by_move_event_module(
&ModuleId::new(package.into(), module),
tx_num,
event_num,
limit,
descending,
)?,
EventFilter::Package(_)
| EventFilter::MoveEventField { .. }
| EventFilter::Any(_)
| EventFilter::And(_, _)
| EventFilter::Or(_, _) => {
return Err(SuiError::UserInputError {
error: UserInputError::Unsupported(
"This query type is not supported by the full node.".to_string(),
),
})
}
};
if cursor.is_some() {
if !event_keys.is_empty() {
event_keys.remove(0);
}
} else {
event_keys.truncate(limit - 1);
}
let event_digests = event_keys
.iter()
.map(|(digest, _, _, _)| *digest)
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();
let events = kv_store.multi_get_events(&event_digests).await?;
let events_map: HashMap<_, _> = event_digests.iter().zip(events.into_iter()).collect();
let stored_events = event_keys
.into_iter()
.map(|k| {
(
k,
events_map
.get(&k.0)
.expect("fetched digest is missing")
.clone()
.and_then(|e| e.data.get(k.2).cloned()),
)
})
.map(|((digest, tx_digest, event_seq, timestamp), event)| {
event
.map(|e| (e, tx_digest, event_seq, timestamp))
.ok_or(SuiError::TransactionEventsNotFound { digest })
})
.collect::<Result<Vec<_>, _>>()?;
let epoch_store = self.load_epoch_store_one_call_per_task();
let backing_store = self.execution_cache.as_ref();
let mut layout_resolver = epoch_store
.executor()
.type_layout_resolver(Box::new(backing_store));
let mut events = vec![];
for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
events.push(SuiEvent::try_from(
e.clone(),
tx_digest,
event_seq as u64,
Some(timestamp),
layout_resolver.get_annotated_layout(&e.type_)?,
)?)
}
Ok(events)
}
pub async fn insert_genesis_object(&self, object: Object) {
self.execution_cache
.insert_genesis_object(object)
.expect("Cannot insert genesis object")
}
pub async fn insert_genesis_objects(&self, objects: &[Object]) {
futures::future::join_all(
objects
.iter()
.map(|o| self.insert_genesis_object(o.clone())),
)
.await;
}
#[instrument(level = "trace", skip_all)]
pub fn get_transaction_status(
&self,
transaction_digest: &TransactionDigest,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult<Option<(SenderSignedData, TransactionStatus)>> {
if let Some(effects) =
self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
{
if let Some(transaction) = self
.execution_cache
.get_transaction_block(transaction_digest)?
{
let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
let events = if let Some(digest) = effects.events_digest() {
self.get_transaction_events(digest)?
} else {
TransactionEvents::default()
};
return Ok(Some((
(*transaction).clone().into_message(),
TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
)));
} else {
debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
}
}
if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
self.metrics.tx_already_processed.inc();
let (transaction, sig) = signed.into_inner().into_data_and_sig();
Ok(Some((transaction, TransactionStatus::Signed(sig))))
} else {
Ok(None)
}
}
#[instrument(level = "trace", skip_all)]
pub fn get_signed_effects_and_maybe_resign(
&self,
transaction_digest: &TransactionDigest,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult<Option<VerifiedSignedTransactionEffects>> {
let effects = self
.execution_cache
.get_executed_effects(transaction_digest)?;
match effects {
Some(effects) => Ok(Some(self.sign_effects(effects, epoch_store)?)),
None => Ok(None),
}
}
#[instrument(level = "trace", skip_all)]
pub(crate) fn sign_effects(
&self,
effects: TransactionEffects,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult<VerifiedSignedTransactionEffects> {
let tx_digest = *effects.transaction_digest();
let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
Some(sig) if sig.epoch == epoch_store.epoch() => {
SignedTransactionEffects::new_from_data_and_sig(effects, sig)
}
_ => {
debug!(
?tx_digest,
epoch=?epoch_store.epoch(),
"Re-signing the effects with the current epoch"
);
SignedTransactionEffects::new(
epoch_store.epoch(),
effects,
&*self.secret,
self.name,
)
}
};
Ok(VerifiedSignedTransactionEffects::new_unchecked(
signed_effects,
))
}
#[instrument(level = "trace", skip_all)]
fn fullnode_only_get_tx_coins_for_indexing(
&self,
inner_temporary_store: &InnerTemporaryStore,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> Option<TxCoins> {
if self.indexes.is_none() || self.is_validator(epoch_store) {
return None;
}
let written_coin_objects = inner_temporary_store
.written
.iter()
.filter_map(|(k, v)| {
if v.is_coin() {
Some((*k, v.clone()))
} else {
None
}
})
.collect();
let input_coin_objects = inner_temporary_store
.input_objects
.iter()
.filter_map(|(k, v)| {
if v.is_coin() {
Some((*k, v.clone()))
} else {
None
}
})
.collect::<ObjectMap>();
Some((input_coin_objects, written_coin_objects))
}
#[instrument(level = "trace", skip_all)]
pub async fn get_transaction_lock(
&self,
object_ref: &ObjectRef,
epoch_store: &AuthorityPerEpochStore,
) -> SuiResult<Option<VerifiedSignedTransaction>> {
let lock_info = self
.execution_cache
.get_lock(*object_ref, epoch_store)
.map_err(SuiError::from)?;
let lock_info = match lock_info {
ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
return Err(UserInputError::ObjectVersionUnavailableForConsumption {
provided_obj_ref: *object_ref,
current_version: locked_ref.1,
}
.into());
}
ObjectLockStatus::Initialized => {
return Ok(None);
}
ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
};
epoch_store.get_signed_transaction(&lock_info.tx_digest)
}
pub async fn get_objects(&self, objects: &[ObjectID]) -> SuiResult<Vec<Option<Object>>> {
self.execution_cache.get_objects(objects)
}
pub async fn get_object_or_tombstone(
&self,
object_id: ObjectID,
) -> SuiResult<Option<ObjectRef>> {
self.execution_cache
.get_latest_object_ref_or_tombstone(object_id)
}
pub fn set_override_protocol_upgrade_buffer_stake(
&self,
expected_epoch: EpochId,
buffer_stake_bps: u64,
) -> SuiResult {
let epoch_store = self.load_epoch_store_one_call_per_task();
let actual_epoch = epoch_store.epoch();
if actual_epoch != expected_epoch {
return Err(SuiError::WrongEpoch {
expected_epoch,
actual_epoch,
});
}
epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
}
pub fn clear_override_protocol_upgrade_buffer_stake(
&self,
expected_epoch: EpochId,
) -> SuiResult {
let epoch_store = self.load_epoch_store_one_call_per_task();
let actual_epoch = epoch_store.epoch();
if actual_epoch != expected_epoch {
return Err(SuiError::WrongEpoch {
expected_epoch,
actual_epoch,
});
}
epoch_store.clear_override_protocol_upgrade_buffer_stake()
}
pub async fn get_available_system_packages(
&self,
binary_config: &BinaryConfig,
) -> Vec<ObjectRef> {
let mut results = vec![];
let system_packages = BuiltInFramework::iter_system_packages();
#[cfg(msim)]
let extra_packages = framework_injection::get_extra_packages(self.name);
#[cfg(msim)]
let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
for system_package in system_packages {
let modules = system_package.modules().to_vec();
#[cfg(msim)]
let modules = framework_injection::get_override_modules(system_package.id(), self.name)
.unwrap_or(modules);
let Some(obj_ref) = sui_framework::compare_system_package(
&self.execution_cache,
system_package.id(),
&modules,
system_package.dependencies().to_vec(),
binary_config,
)
.await
else {
return vec![];
};
results.push(obj_ref);
}
results
}
async fn get_system_package_bytes(
&self,
system_packages: Vec<ObjectRef>,
binary_config: &BinaryConfig,
) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
let objects = self.get_objects(&ids).await.expect("read cannot fail");
let mut res = Vec::with_capacity(system_packages.len());
for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
let prev_transaction = match object {
Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
info!("Framework {} does not need updating", system_package_ref.0);
continue;
}
Some(cur_object) => cur_object.previous_transaction,
None => TransactionDigest::genesis_marker(),
};
#[cfg(msim)]
let SystemPackage {
id: _,
bytes,
dependencies,
} = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
.unwrap_or_else(|| {
BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
});
#[cfg(not(msim))]
let SystemPackage {
id: _,
bytes,
dependencies,
} = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
let modules: Vec<_> = bytes
.iter()
.map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
.collect();
let new_object = Object::new_system_package(
&modules,
system_package_ref.1,
dependencies.clone(),
prev_transaction,
);
let new_ref = new_object.compute_object_reference();
if new_ref != system_package_ref {
error!(
"Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
);
return None;
}
res.push((system_package_ref.1, bytes, dependencies));
}
Some(res)
}
fn is_protocol_version_supported(
current_protocol_version: ProtocolVersion,
proposed_protocol_version: ProtocolVersion,
protocol_config: &ProtocolConfig,
committee: &Committee,
capabilities: Vec<AuthorityCapabilities>,
mut buffer_stake_bps: u64,
) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
if proposed_protocol_version > current_protocol_version + 1
&& !protocol_config.advance_to_highest_supported_protocol_version()
{
return None;
}
if buffer_stake_bps > 10000 {
warn!("clamping buffer_stake_bps to 10000");
buffer_stake_bps = 10000;
}
let mut desired_upgrades: Vec<_> = capabilities
.into_iter()
.filter_map(|mut cap| {
if cap.available_system_packages.is_empty() {
return None;
}
cap.available_system_packages.sort();
info!(
"validator {:?} supports {:?} with system packages: {:?}",
cap.authority.concise(),
cap.supported_protocol_versions,
cap.available_system_packages,
);
cap.supported_protocol_versions
.is_version_supported(proposed_protocol_version)
.then_some((cap.available_system_packages, cap.authority))
})
.collect();
desired_upgrades.sort();
desired_upgrades
.into_iter()
.group_by(|(packages, _authority)| packages.clone())
.into_iter()
.find_map(|(packages, group)| {
assert!(!packages.is_empty());
let mut stake_aggregator: StakeAggregator<(), true> =
StakeAggregator::new(Arc::new(committee.clone()));
for (_, authority) in group {
stake_aggregator.insert_generic(authority, ());
}
let total_votes = stake_aggregator.total_votes();
let quorum_threshold = committee.quorum_threshold();
let f = committee.total_votes() - committee.quorum_threshold();
let buffer_stake = (f * buffer_stake_bps + 9999) / 10000;
let effective_threshold = quorum_threshold + buffer_stake;
info!(
?total_votes,
?quorum_threshold,
?buffer_stake_bps,
?effective_threshold,
?proposed_protocol_version,
?packages,
"support for upgrade"
);
let has_support = total_votes >= effective_threshold;
has_support.then_some((proposed_protocol_version, packages))
})
}
fn choose_protocol_version_and_system_packages(
current_protocol_version: ProtocolVersion,
protocol_config: &ProtocolConfig,
committee: &Committee,
capabilities: Vec<AuthorityCapabilities>,
buffer_stake_bps: u64,
) -> (ProtocolVersion, Vec<ObjectRef>) {
let mut next_protocol_version = current_protocol_version;
let mut system_packages = vec![];
while let Some((version, packages)) = Self::is_protocol_version_supported(
current_protocol_version,
next_protocol_version + 1,
protocol_config,
committee,
capabilities.clone(),
buffer_stake_bps,
) {
next_protocol_version = version;
system_packages = packages;
}
(next_protocol_version, system_packages)
}
#[instrument(level = "debug", skip_all)]
fn create_authenticator_state_tx(
&self,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> Option<EndOfEpochTransactionKind> {
if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
info!("authenticator state transactions not enabled");
return None;
}
let authenticator_state_exists = epoch_store.authenticator_state_exists();
let tx = if authenticator_state_exists {
let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
let min_epoch =
next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
let authenticator_obj_initial_shared_version = epoch_store
.epoch_start_config()
.authenticator_obj_initial_shared_version()
.expect("initial version must exist");
let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
min_epoch,
authenticator_obj_initial_shared_version,
);
info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
tx
} else {
let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
info!("Creating AuthenticatorStateCreate tx");
tx
};
Some(tx)
}
#[instrument(level = "debug", skip_all)]
fn create_randomness_state_tx(
&self,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> Option<EndOfEpochTransactionKind> {
if !epoch_store.protocol_config().random_beacon() {
info!("randomness state transactions not enabled");
return None;
}
if epoch_store.randomness_state_exists() {
return None;
}
let tx = EndOfEpochTransactionKind::new_randomness_state_create();
info!("Creating RandomnessStateCreate tx");
Some(tx)
}
#[instrument(level = "debug", skip_all)]
fn create_deny_list_state_tx(
&self,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> Option<EndOfEpochTransactionKind> {
if !epoch_store.protocol_config().enable_coin_deny_list() {
return None;
}
if epoch_store.coin_deny_list_state_exists() {
return None;
}
let tx = EndOfEpochTransactionKind::new_deny_list_state_create();
info!("Creating DenyListStateCreate tx");
Some(tx)
}
#[instrument(level = "error", skip_all)]
pub async fn create_and_execute_advance_epoch_tx(
&self,
epoch_store: &Arc<AuthorityPerEpochStore>,
gas_cost_summary: &GasCostSummary,
checkpoint: CheckpointSequenceNumber,
epoch_start_timestamp_ms: CheckpointTimestamp,
) -> anyhow::Result<(SuiSystemState, TransactionEffects)> {
let mut txns = Vec::new();
if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
txns.push(tx);
}
if let Some(tx) = self.create_randomness_state_tx(epoch_store) {
txns.push(tx);
}
if let Some(tx) = self.create_deny_list_state_tx(epoch_store) {
txns.push(tx);
}
let next_epoch = epoch_store.epoch() + 1;
let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
let (next_epoch_protocol_version, next_epoch_system_packages) =
Self::choose_protocol_version_and_system_packages(
epoch_store.protocol_version(),
epoch_store.protocol_config(),
epoch_store.committee(),
epoch_store
.get_capabilities()
.expect("read capabilities from db cannot fail"),
buffer_stake_bps,
);
let config = epoch_store.protocol_config();
let binary_config = to_binary_config(config);
let Some(next_epoch_system_package_bytes) = self
.get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
.await
else {
error!(
"upgraded system packages {:?} are not locally available, cannot create \
ChangeEpochTx. validator binary must be upgraded to the correct version!",
next_epoch_system_packages
);
return Err(anyhow!(
"missing system packages: cannot form ChangeEpochTx"
));
};
let tx = if epoch_store
.protocol_config()
.end_of_epoch_transaction_supported()
{
txns.push(EndOfEpochTransactionKind::new_change_epoch(
next_epoch,
next_epoch_protocol_version,
gas_cost_summary.storage_cost,
gas_cost_summary.computation_cost,
gas_cost_summary.storage_rebate,
gas_cost_summary.non_refundable_storage_fee,
epoch_start_timestamp_ms,
next_epoch_system_package_bytes,
));
VerifiedTransaction::new_end_of_epoch_transaction(txns)
} else {
VerifiedTransaction::new_change_epoch(
next_epoch,
next_epoch_protocol_version,
gas_cost_summary.storage_cost,
gas_cost_summary.computation_cost,
gas_cost_summary.storage_rebate,
gas_cost_summary.non_refundable_storage_fee,
epoch_start_timestamp_ms,
next_epoch_system_package_bytes,
)
};
let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
tx.clone(),
epoch_store.epoch(),
checkpoint,
);
let tx_digest = executable_tx.digest();
info!(
?next_epoch,
?next_epoch_protocol_version,
?next_epoch_system_packages,
computation_cost=?gas_cost_summary.computation_cost,
storage_cost=?gas_cost_summary.storage_cost,
storage_rebate=?gas_cost_summary.storage_rebate,
non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
?tx_digest,
"Creating advance epoch transaction"
);
fail_point_async!("change_epoch_tx_delay");
let _tx_lock = epoch_store.acquire_tx_lock(tx_digest).await;
if self
.execution_cache
.is_tx_already_executed(tx_digest)
.expect("read cannot fail")
{
warn!("change epoch tx has already been executed via state sync");
return Err(anyhow::anyhow!(
"change epoch tx has already been executed via state sync"
));
}
let execution_guard = self
.execution_lock_for_executable_transaction(&executable_tx)
.await?;
epoch_store
.assign_shared_object_versions_idempotent(
self.get_cache_reader().as_ref(),
&[executable_tx.clone()],
)
.await?;
let input_objects = self
.read_objects_for_execution(&executable_tx, epoch_store)
.await?;
let (temporary_store, effects, _execution_error_opt) =
self.prepare_certificate(&execution_guard, &executable_tx, input_objects, epoch_store)?;
let system_obj = get_sui_system_state(&temporary_store.written)
.expect("change epoch tx must write to system object");
self.execution_cache
.insert_transaction_and_effects(&tx, &effects)
.map_err(|err| {
let err: anyhow::Error = err.into();
err
})?;
info!(
"Effects summary of the change epoch transaction: {:?}",
effects.summary_for_debug()
);
epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
assert!(effects.status().is_ok());
Ok((system_obj, effects))
}
#[instrument(level = "error", skip_all)]
async fn revert_uncommitted_epoch_transactions(
&self,
epoch_store: &AuthorityPerEpochStore,
) -> SuiResult {
{
let state = epoch_store.get_reconfig_state_write_lock_guard();
if state.should_accept_user_certs() {
epoch_store.close_user_certs(state);
}
}
let pending_certificates = epoch_store.pending_consensus_certificates();
info!(
"Reverting {} locally executed transactions that was not included in the epoch: {:?}",
pending_certificates.len(),
pending_certificates,
);
for digest in pending_certificates {
if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
info!("Not reverting pending consensus transaction {:?} - it was included in checkpoint", digest);
continue;
}
info!("Reverting {:?} at the end of epoch", digest);
self.execution_cache.revert_state_update(&digest)?;
}
info!("All uncommitted local transactions reverted");
Ok(())
}
fn check_coin_deny(
&self,
sender: SuiAddress,
input_objects: &CheckedInputObjects,
receiving_objects: &ReceivingObjects,
) -> SuiResult {
let all_objects = input_objects
.inner()
.iter_objects()
.chain(receiving_objects.iter_objects());
let coin_types = all_objects
.filter_map(|obj| {
if obj.is_gas_coin() {
None
} else {
obj.coin_type_maybe()
.map(|type_tag| type_tag.to_canonical_string(false))
}
})
.collect::<BTreeSet<_>>();
DenyList::check_coin_deny_list(sender, coin_types, &self.execution_cache)?;
Ok(())
}
#[instrument(level = "error", skip_all)]
async fn reopen_epoch_db(
&self,
cur_epoch_store: &AuthorityPerEpochStore,
new_committee: Committee,
epoch_start_configuration: EpochStartConfiguration,
expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
) -> SuiResult<Arc<AuthorityPerEpochStore>> {
let new_epoch = new_committee.epoch;
info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
assert_eq!(
epoch_start_configuration.epoch_start_state().epoch(),
new_committee.epoch
);
fail_point!("before-open-new-epoch-store");
let new_epoch_store = cur_epoch_store.new_at_next_epoch(
self.name,
new_committee,
epoch_start_configuration,
self.execution_cache.clone(),
expensive_safety_check_config,
cur_epoch_store.get_chain_identifier(),
);
self.epoch_store.store(new_epoch_store.clone());
cur_epoch_store.epoch_terminated().await;
Ok(new_epoch_store)
}
#[cfg(test)]
pub(crate) fn iter_live_object_set_for_testing(
&self,
) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
let include_wrapped_object = !self
.epoch_store_for_testing()
.protocol_config()
.simplified_unwrap_then_delete();
self.execution_cache
.iter_live_object_set(include_wrapped_object)
}
#[cfg(test)]
pub(crate) fn shutdown_execution_for_test(&self) {
self.tx_execution_shutdown
.lock()
.take()
.unwrap()
.send(())
.unwrap();
}
pub async fn prune_objects_and_compact_for_testing(&self) {
self.execution_cache
.prune_objects_and_compact_for_testing(&self.checkpoint_store)
.await
}
pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) -> SuiResult {
self.execution_cache.bulk_insert_genesis_objects(objects)?;
self.execution_cache
.force_reload_system_packages(&BuiltInFramework::all_package_ids());
Ok(())
}
}
pub struct RandomnessRoundReceiver {
authority_state: Arc<AuthorityState>,
randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
}
impl RandomnessRoundReceiver {
pub fn spawn(
authority_state: Arc<AuthorityState>,
randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
) -> JoinHandle<()> {
let rrr = RandomnessRoundReceiver {
authority_state,
randomness_rx,
};
spawn_monitored_task!(rrr.run())
}
async fn run(mut self) {
info!("RandomnessRoundReceiver event loop started");
loop {
tokio::select! {
maybe_recv = self.randomness_rx.recv() => {
if let Some((epoch, round, bytes)) = maybe_recv {
self.handle_new_randomness(epoch, round, bytes);
} else {
break;
}
},
}
}
info!("RandomnessRoundReceiver event loop ended");
}
#[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
if epoch_store.epoch() != epoch {
warn!(
"dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
epoch_store.epoch()
);
return;
}
let transaction = VerifiedTransaction::new_randomness_state_update(
epoch,
round,
bytes,
epoch_store
.epoch_start_config()
.randomness_obj_initial_shared_version()
.expect("randomness state obj must exist"),
);
debug!(
"created randomness state update transaction with digest: {:?}",
transaction.digest()
);
let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
let digest = *transaction.digest();
self.authority_state
.transaction_manager()
.enqueue(vec![transaction], &epoch_store);
let authority_state = self.authority_state.clone();
spawn_monitored_task!(async move {
let Ok(mut effects) = authority_state
.execution_cache
.notify_read_executed_effects(&[digest])
.await
else {
panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}");
};
let effects = effects.pop().expect("should return effects");
if *effects.status() != ExecutionStatus::Success {
panic!("failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}");
}
debug!("successfully executed randomness state update transaction at epoch {epoch}, round {round}");
});
}
}
#[async_trait]
impl TransactionKeyValueStoreTrait for AuthorityState {
async fn multi_get(
&self,
transactions: &[TransactionDigest],
effects: &[TransactionDigest],
events: &[TransactionEventsDigest],
) -> SuiResult<(
Vec<Option<Transaction>>,
Vec<Option<TransactionEffects>>,
Vec<Option<TransactionEvents>>,
)> {
let txns = if !transactions.is_empty() {
self.execution_cache
.multi_get_transaction_blocks(transactions)?
.into_iter()
.map(|t| t.map(|t| (*t).clone().into_inner()))
.collect()
} else {
vec![]
};
let fx = if !effects.is_empty() {
self.execution_cache.multi_get_executed_effects(effects)?
} else {
vec![]
};
let evts = if !events.is_empty() {
self.execution_cache.multi_get_events(events)?
} else {
vec![]
};
Ok((txns, fx, evts))
}
async fn multi_get_checkpoints(
&self,
checkpoint_summaries: &[CheckpointSequenceNumber],
checkpoint_contents: &[CheckpointSequenceNumber],
checkpoint_summaries_by_digest: &[CheckpointDigest],
checkpoint_contents_by_digest: &[CheckpointContentsDigest],
) -> SuiResult<(
Vec<Option<CertifiedCheckpointSummary>>,
Vec<Option<CheckpointContents>>,
Vec<Option<CertifiedCheckpointSummary>>,
Vec<Option<CheckpointContents>>,
)> {
let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
let store = self.get_checkpoint_store();
for seq in checkpoint_summaries {
let checkpoint = store
.get_checkpoint_by_sequence_number(*seq)?
.map(|c| c.into_inner());
summaries.push(checkpoint);
}
let mut contents = Vec::with_capacity(checkpoint_contents.len());
for seq in checkpoint_contents {
let checkpoint = store
.get_checkpoint_by_sequence_number(*seq)?
.and_then(|summary| {
store
.get_checkpoint_contents(&summary.content_digest)
.expect("db read cannot fail")
});
contents.push(checkpoint);
}
let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
for digest in checkpoint_summaries_by_digest {
let checkpoint = store
.get_checkpoint_by_digest(digest)?
.map(|c| c.into_inner());
summaries_by_digest.push(checkpoint);
}
let mut contents_by_digest = Vec::with_capacity(checkpoint_contents_by_digest.len());
for digest in checkpoint_contents_by_digest {
let checkpoint = store.get_checkpoint_contents(digest)?;
contents_by_digest.push(checkpoint);
}
Ok((summaries, contents, summaries_by_digest, contents_by_digest))
}
async fn deprecated_get_transaction_checkpoint(
&self,
digest: TransactionDigest,
) -> SuiResult<Option<CheckpointSequenceNumber>> {
self.execution_cache
.deprecated_get_transaction_checkpoint(&digest)
.map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
}
async fn get_object(
&self,
object_id: ObjectID,
version: VersionNumber,
) -> SuiResult<Option<Object>> {
self.execution_cache
.get_object_by_key(&object_id, version)
.map_err(Into::into)
}
async fn multi_get_transaction_checkpoint(
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
let res = self
.execution_cache
.deprecated_multi_get_transaction_checkpoint(digests)?;
Ok(res
.into_iter()
.map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
.collect())
}
}
#[cfg(msim)]
pub mod framework_injection {
use move_binary_format::CompiledModule;
use std::collections::BTreeMap;
use std::{cell::RefCell, collections::BTreeSet};
use sui_framework::{BuiltInFramework, SystemPackage};
use sui_types::base_types::{AuthorityName, ObjectID};
use sui_types::is_system_package;
type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
thread_local! {
static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
}
type Framework = Vec<CompiledModule>;
pub type PackageUpgradeCallback =
Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
enum PackageOverrideConfig {
Global(Framework),
PerValidator(PackageUpgradeCallback),
}
fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
modules
.iter()
.map(|m| {
let mut buf = Vec::new();
m.serialize(&mut buf).unwrap();
buf
})
.collect()
}
pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
OVERRIDE.with(|bs| {
bs.borrow_mut()
.insert(package_id, PackageOverrideConfig::Global(modules))
});
}
pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
OVERRIDE.with(|bs| {
bs.borrow_mut()
.insert(package_id, PackageOverrideConfig::PerValidator(func))
});
}
pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
OVERRIDE.with(|cfg| {
cfg.borrow().get(package_id).and_then(|entry| match entry {
PackageOverrideConfig::Global(framework) => {
Some(compiled_modules_to_bytes(framework))
}
PackageOverrideConfig::PerValidator(func) => {
func(name).map(|fw| compiled_modules_to_bytes(&fw))
}
})
})
}
pub fn get_override_modules(
package_id: &ObjectID,
name: AuthorityName,
) -> Option<Vec<CompiledModule>> {
OVERRIDE.with(|cfg| {
cfg.borrow().get(package_id).and_then(|entry| match entry {
PackageOverrideConfig::Global(framework) => Some(framework.clone()),
PackageOverrideConfig::PerValidator(func) => func(name),
})
})
}
pub fn get_override_system_package(
package_id: &ObjectID,
name: AuthorityName,
) -> Option<SystemPackage> {
let bytes = get_override_bytes(package_id, name)?;
let dependencies = if is_system_package(*package_id) {
BuiltInFramework::get_package_by_id(package_id)
.dependencies()
.to_vec()
} else {
BuiltInFramework::all_package_ids()
};
Some(SystemPackage {
id: *package_id,
bytes,
dependencies,
})
}
pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids().into_iter());
let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
cfg.borrow()
.keys()
.filter_map(|package| (!built_in.contains(package)).then_some(*package))
.collect()
});
extra
.into_iter()
.map(|package| SystemPackage {
id: package,
bytes: get_override_bytes(&package, name).unwrap(),
dependencies: BuiltInFramework::all_package_ids(),
})
.collect()
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ObjDumpFormat {
pub id: ObjectID,
pub version: VersionNumber,
pub digest: ObjectDigest,
pub object: Object,
}
impl ObjDumpFormat {
fn new(object: Object) -> Self {
let oref = object.compute_object_reference();
Self {
id: oref.0,
version: oref.1,
digest: oref.2,
object,
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct NodeStateDump {
pub tx_digest: TransactionDigest,
pub sender_signed_data: SenderSignedData,
pub executed_epoch: u64,
pub reference_gas_price: u64,
pub protocol_version: u64,
pub epoch_start_timestamp_ms: u64,
pub computed_effects: TransactionEffects,
pub expected_effects_digest: TransactionEffectsDigest,
pub relevant_system_packages: Vec<ObjDumpFormat>,
pub shared_objects: Vec<ObjDumpFormat>,
pub loaded_child_objects: Vec<ObjDumpFormat>,
pub modified_at_versions: Vec<ObjDumpFormat>,
pub runtime_reads: Vec<ObjDumpFormat>,
pub input_objects: Vec<ObjDumpFormat>,
}
impl NodeStateDump {
pub fn new(
tx_digest: &TransactionDigest,
effects: &TransactionEffects,
expected_effects_digest: TransactionEffectsDigest,
object_store: &dyn ObjectStore,
epoch_store: &Arc<AuthorityPerEpochStore>,
inner_temporary_store: &InnerTemporaryStore,
certificate: &VerifiedExecutableTransaction,
) -> SuiResult<Self> {
let executed_epoch = epoch_store.epoch();
let reference_gas_price = epoch_store.reference_gas_price();
let epoch_start_config = epoch_store.epoch_start_config();
let protocol_version = epoch_store.protocol_version().as_u64();
let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
let mut relevant_system_packages = Vec::new();
for sys_package_id in BuiltInFramework::all_package_ids() {
if let Some(w) = object_store.get_object(&sys_package_id)? {
relevant_system_packages.push(ObjDumpFormat::new(w))
}
}
let mut shared_objects = Vec::new();
for kind in effects.input_shared_objects() {
match kind {
InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
if let Some(w) = object_store.get_object_by_key(&obj_ref.0, obj_ref.1)? {
shared_objects.push(ObjDumpFormat::new(w))
}
}
InputSharedObject::ReadDeleted(..) | InputSharedObject::MutateDeleted(..) => (),
}
}
let mut loaded_child_objects = Vec::new();
for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
if let Some(w) = object_store.get_object_by_key(id, meta.version)? {
loaded_child_objects.push(ObjDumpFormat::new(w))
}
}
let mut modified_at_versions = Vec::new();
for (id, ver) in effects.modified_at_versions() {
if let Some(w) = object_store.get_object_by_key(&id, ver)? {
modified_at_versions.push(ObjDumpFormat::new(w))
}
}
let mut runtime_reads = Vec::new();
for obj in inner_temporary_store
.runtime_packages_loaded_from_db
.values()
{
runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
}
Ok(Self {
tx_digest: *tx_digest,
executed_epoch,
reference_gas_price,
epoch_start_timestamp_ms,
protocol_version,
relevant_system_packages,
shared_objects,
loaded_child_objects,
modified_at_versions,
runtime_reads,
sender_signed_data: certificate.clone().into_message(),
input_objects: inner_temporary_store
.input_objects
.values()
.map(|o| ObjDumpFormat::new(o.clone()))
.collect(),
computed_effects: effects.clone(),
expected_effects_digest,
})
}
pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
let mut objects = Vec::new();
objects.extend(self.relevant_system_packages.clone());
objects.extend(self.shared_objects.clone());
objects.extend(self.loaded_child_objects.clone());
objects.extend(self.modified_at_versions.clone());
objects.extend(self.runtime_reads.clone());
objects.extend(self.input_objects.clone());
objects
}
pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
let file_name = format!(
"{}_{}_NODE_DUMP.json",
self.tx_digest,
AuthorityState::unixtime_now_ms()
);
let mut path = path.to_path_buf();
path.push(&file_name);
let mut file = File::create(path.clone())?;
file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
Ok(path)
}
#[cfg(not(release))]
pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
let file = File::open(path)?;
serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
}
}