use arc_swap::ArcSwapOption;
use enum_dispatch::enum_dispatch;
use fastcrypto::groups::bls12381;
use fastcrypto_tbls::dkg;
use fastcrypto_tbls::nodes::PartyId;
use fastcrypto_zkp::bn254::zk_login::{JwkId, OIDCProvider, JWK};
use fastcrypto_zkp::bn254::zk_login_api::ZkLoginEnv;
use futures::future::{join_all, select, Either};
use futures::FutureExt;
use itertools::{izip, Itertools};
use narwhal_executor::ExecutionIndices;
use parking_lot::RwLock;
use parking_lot::{Mutex, RwLockReadGuard, RwLockWriteGuard};
use rocksdb::Options;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
use std::future::Future;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use sui_config::node::ExpensiveSafetyCheckConfig;
use sui_types::accumulator::Accumulator;
use sui_types::authenticator_state::{get_authenticator_state, ActiveJwk};
use sui_types::base_types::{AuthorityName, EpochId, ObjectID, SequenceNumber, TransactionDigest};
use sui_types::base_types::{ConciseableName, ObjectRef};
use sui_types::committee::Committee;
use sui_types::committee::CommitteeTrait;
use sui_types::crypto::{AuthoritySignInfo, AuthorityStrongQuorumSignInfo, RandomnessRound};
use sui_types::digests::ChainIdentifier;
use sui_types::error::{SuiError, SuiResult};
use sui_types::signature::GenericSignature;
use sui_types::storage::InputKey;
use sui_types::transaction::{
AuthenticatorStateUpdate, CertifiedTransaction, InputObjectKind, SenderSignedData, Transaction,
TransactionDataAPI, TransactionKey, TransactionKind, VerifiedCertificate,
VerifiedSignedTransaction, VerifiedTransaction,
};
use tokio::sync::OnceCell;
use tracing::{debug, error, info, instrument, trace, warn};
use typed_store::rocks::{read_size_from_env, ReadWriteOptions};
use typed_store::{
rocks::{default_db_options, DBBatch, DBMap, DBOptions, MetricConf},
traits::{TableSummary, TypedStoreDebug},
TypedStoreError,
};
use super::authority_store_tables::ENV_VAR_LOCKS_BLOCK_CACHE_SIZE;
use super::epoch_start_configuration::EpochStartConfigTrait;
use super::shared_object_congestion_tracker::SharedObjectCongestionTracker;
use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration};
use crate::authority::AuthorityMetrics;
use crate::authority::ResolverWrapper;
use crate::checkpoints::{
BuilderCheckpointSummary, CheckpointHeight, CheckpointServiceNotify, EpochStats,
PendingCheckpoint, PendingCheckpointInfo, PendingCheckpointV2, PendingCheckpointV2Contents,
};
use crate::authority::shared_object_version_manager::{
AssignedTxAndVersions, ConsensusSharedObjVerAssignment, SharedObjVerManager,
};
use crate::consensus_handler::{
SequencedConsensusTransaction, SequencedConsensusTransactionKey,
SequencedConsensusTransactionKind, VerifiedSequencedConsensusTransaction,
};
use crate::epoch::epoch_metrics::EpochMetrics;
use crate::epoch::randomness::{DkgStatus, RandomnessManager, RandomnessReporter};
use crate::epoch::reconfiguration::ReconfigState;
use crate::execution_cache::{ExecutionCache, ExecutionCacheRead};
use crate::module_cache_metrics::ResolverMetrics;
use crate::post_consensus_tx_reorder::PostConsensusTxReorder;
use crate::signature_verifier::*;
use crate::stake_aggregator::{GenericMultiStakeAggregator, StakeAggregator};
use move_bytecode_utils::module_cache::SyncModuleCache;
use mysten_common::sync::notify_once::NotifyOnce;
use mysten_common::sync::notify_read::NotifyRead;
use mysten_metrics::monitored_scope;
use narwhal_types::{Round, TimestampMs};
use prometheus::IntCounter;
use std::str::FromStr;
use sui_execution::{self, Executor};
use sui_macros::fail_point;
use sui_protocol_config::{Chain, PerObjectCongestionControlMode, ProtocolConfig, ProtocolVersion};
use sui_storage::mutex_table::{MutexGuard, MutexTable};
use sui_types::effects::TransactionEffects;
use sui_types::executable_transaction::{
TrustedExecutableTransaction, VerifiedExecutableTransaction,
};
use sui_types::message_envelope::TrustedEnvelope;
use sui_types::messages_checkpoint::{
CheckpointContents, CheckpointSequenceNumber, CheckpointSignatureMessage, CheckpointSummary,
};
use sui_types::messages_consensus::{
check_total_jwk_size, AuthorityCapabilities, ConsensusTransaction, ConsensusTransactionKey,
ConsensusTransactionKind,
};
use sui_types::storage::GetSharedLocks;
use sui_types::sui_system_state::epoch_start_sui_system_state::{
EpochStartSystemState, EpochStartSystemStateTrait,
};
use tap::TapOptional;
use tokio::time::Instant;
use typed_store::{retry_transaction_forever, Map};
use typed_store_derive::DBMapUtils;
const LAST_CONSENSUS_STATS_ADDR: u64 = 0;
const RECONFIG_STATE_INDEX: u64 = 0;
const OVERRIDE_PROTOCOL_UPGRADE_BUFFER_STAKE_INDEX: u64 = 0;
pub const EPOCH_DB_PREFIX: &str = "epoch_";
pub(crate) type PkG = bls12381::G2Element;
pub(crate) type EncG = bls12381::G2Element;
pub struct CertLockGuard(MutexGuard);
pub struct CertTxGuard(CertLockGuard);
impl CertTxGuard {
pub fn release(self) {}
pub fn commit_tx(self) {}
}
type JwkAggregator = GenericMultiStakeAggregator<(JwkId, JWK), true>;
pub enum ConsensusCertificateResult {
Ignored,
SuiTransaction(VerifiedExecutableTransaction),
Deferred(DeferralKey),
RandomnessConsensusMessage,
ConsensusMessage,
IgnoredSystem,
}
#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
pub struct ExecutionIndicesWithHash {
pub index: ExecutionIndices,
pub hash: u64,
}
#[enum_dispatch]
pub trait ConsensusStatsAPI {
fn is_initialized(&self) -> bool;
fn get_num_messages(&self, authority: usize) -> u64;
fn inc_num_messages(&mut self, authority: usize) -> u64;
fn get_num_user_transactions(&self, authority: usize) -> u64;
fn inc_num_user_transactions(&mut self, authority: usize) -> u64;
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
#[enum_dispatch(ConsensusStatsAPI)]
pub enum ConsensusStats {
V1(ConsensusStatsV1),
}
impl ConsensusStats {
pub fn new(size: usize) -> Self {
Self::V1(ConsensusStatsV1 {
num_messages: vec![0; size],
num_user_transactions: vec![0; size],
})
}
}
impl Default for ConsensusStats {
fn default() -> Self {
Self::new(0)
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct ConsensusStatsV1 {
pub num_messages: Vec<u64>,
pub num_user_transactions: Vec<u64>,
}
impl ConsensusStatsAPI for ConsensusStatsV1 {
fn is_initialized(&self) -> bool {
!self.num_messages.is_empty()
}
fn get_num_messages(&self, authority: usize) -> u64 {
self.num_messages[authority]
}
fn inc_num_messages(&mut self, authority: usize) -> u64 {
self.num_messages[authority] += 1;
self.num_messages[authority]
}
fn get_num_user_transactions(&self, authority: usize) -> u64 {
self.num_user_transactions[authority]
}
fn inc_num_user_transactions(&mut self, authority: usize) -> u64 {
self.num_user_transactions[authority] += 1;
self.num_user_transactions[authority]
}
}
#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
pub struct ExecutionIndicesWithStats {
pub index: ExecutionIndices,
pub hash: u64,
pub stats: ConsensusStats,
}
pub struct ExecutionComponents {
pub(crate) executor: Arc<dyn Executor + Send + Sync>,
pub(crate) module_cache: Arc<SyncModuleCache<ResolverWrapper<ExecutionCache>>>,
metrics: Arc<ResolverMetrics>,
}
pub struct AuthorityPerEpochStore {
pub(crate) name: AuthorityName,
committee: Arc<Committee>,
tables: ArcSwapOption<AuthorityEpochTables>,
protocol_config: ProtocolConfig,
parent_path: PathBuf,
db_options: Option<Options>,
reconfig_state_mem: RwLock<ReconfigState>,
consensus_notify_read: NotifyRead<SequencedConsensusTransactionKey, ()>,
pub(crate) signature_verifier: SignatureVerifier,
pub(crate) checkpoint_state_notify_read: NotifyRead<CheckpointSequenceNumber, Accumulator>,
executed_digests_notify_read: NotifyRead<TransactionKey, TransactionDigest>,
epoch_alive_notify: NotifyOnce,
user_certs_closed_notify: NotifyOnce,
epoch_alive: tokio::sync::RwLock<bool>,
end_of_publish: Mutex<StakeAggregator<(), true>>,
pending_consensus_certificates: Mutex<HashSet<TransactionDigest>>,
mutex_table: MutexTable<TransactionDigest>,
pub(crate) epoch_open_time: Instant,
epoch_close_time: RwLock<Option<Instant>>,
pub(crate) metrics: Arc<EpochMetrics>,
epoch_start_configuration: Arc<EpochStartConfiguration>,
execution_component: ExecutionComponents,
chain_identifier: ChainIdentifier,
jwk_aggregator: Mutex<JwkAggregator>,
randomness_manager: OnceCell<tokio::sync::Mutex<RandomnessManager>>,
randomness_reporter: OnceCell<RandomnessReporter>,
}
#[derive(DBMapUtils)]
pub struct AuthorityEpochTables {
#[default_options_override_fn = "signed_transactions_table_default_config"]
signed_transactions:
DBMap<TransactionDigest, TrustedEnvelope<SenderSignedData, AuthoritySignInfo>>,
#[default_options_override_fn = "owned_object_transaction_locks_table_default_config"]
owned_object_locked_transactions: DBMap<ObjectRef, LockDetailsWrapper>,
effects_signatures: DBMap<TransactionDigest, AuthoritySignInfo>,
pub(crate) transaction_cert_signatures: DBMap<TransactionDigest, AuthorityStrongQuorumSignInfo>,
assigned_shared_object_versions: DBMap<TransactionDigest, Vec<(ObjectID, SequenceNumber)>>,
assigned_shared_object_versions_v2: DBMap<TransactionKey, Vec<(ObjectID, SequenceNumber)>>,
next_shared_object_versions: DBMap<ObjectID, SequenceNumber>,
#[default_options_override_fn = "pending_execution_table_default_config"]
pub(crate) pending_execution: DBMap<TransactionDigest, TrustedExecutableTransaction>,
consensus_message_processed: DBMap<SequencedConsensusTransactionKey, bool>,
#[default_options_override_fn = "pending_consensus_transactions_table_default_config"]
pending_consensus_transactions: DBMap<ConsensusTransactionKey, ConsensusTransaction>,
#[allow(dead_code)]
consensus_message_order: DBMap<ExecutionIndices, TransactionDigest>,
last_consensus_index: DBMap<u64, ExecutionIndicesWithHash>,
last_consensus_stats: DBMap<u64, ExecutionIndicesWithStats>,
#[allow(dead_code)]
checkpoint_boundary: DBMap<u64, u64>,
reconfig_state: DBMap<u64, ReconfigState>,
end_of_publish: DBMap<AuthorityName, ()>,
#[allow(dead_code)]
final_epoch_checkpoint: DBMap<u64, u64>,
#[default_options_override_fn = "pending_checkpoints_table_default_config"]
pending_checkpoints: DBMap<CheckpointHeight, PendingCheckpoint>,
#[default_options_override_fn = "pending_checkpoints_table_default_config"]
pending_checkpoints_v2: DBMap<CheckpointHeight, PendingCheckpointV2>,
builder_digest_to_checkpoint: DBMap<TransactionDigest, CheckpointSequenceNumber>,
transaction_key_to_digest: DBMap<TransactionKey, TransactionDigest>,
pending_checkpoint_signatures:
DBMap<(CheckpointSequenceNumber, u64), CheckpointSignatureMessage>,
user_signatures_for_checkpoints: DBMap<TransactionDigest, Vec<GenericSignature>>,
#[allow(dead_code)]
builder_checkpoint_summary: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
builder_checkpoint_summary_v2: DBMap<CheckpointSequenceNumber, BuilderCheckpointSummary>,
pub state_hash_by_checkpoint: DBMap<CheckpointSequenceNumber, Accumulator>,
authority_capabilities: DBMap<AuthorityName, AuthorityCapabilities>,
override_protocol_upgrade_buffer_stake: DBMap<u64, u64>,
pub(crate) executed_transactions_to_checkpoint:
DBMap<TransactionDigest, CheckpointSequenceNumber>,
#[allow(dead_code)]
oauth_provider_jwk: DBMap<JwkId, JWK>,
pending_jwks: DBMap<(AuthorityName, JwkId, JWK), ()>,
active_jwks: DBMap<(u64, (JwkId, JWK)), ()>,
deferred_transactions: DBMap<DeferralKey, Vec<VerifiedSequencedConsensusTransaction>>,
#[allow(dead_code)]
randomness_rounds_written: DBMap<narwhal_types::RandomnessRound, ()>,
pub(crate) dkg_processed_messages: DBMap<PartyId, dkg::ProcessedMessage<PkG, EncG>>,
pub(crate) dkg_used_messages: DBMap<u64, dkg::UsedProcessedMessages<PkG, EncG>>,
pub(crate) dkg_confirmations: DBMap<PartyId, dkg::Confirmation<EncG>>,
pub(crate) dkg_output: DBMap<u64, dkg::Output<PkG, EncG>>,
pub(crate) randomness_rounds_pending: DBMap<RandomnessRound, ()>,
pub(crate) randomness_next_round: DBMap<u64, RandomnessRound>,
pub(crate) randomness_highest_completed_round: DBMap<u64, RandomnessRound>,
pub(crate) randomness_last_round_timestamp: DBMap<u64, TimestampMs>,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum DeferralKey {
Randomness {
deferred_from_round: Round, },
ConsensusRound {
future_round: Round,
deferred_from_round: Round,
},
}
impl DeferralKey {
fn new_for_randomness(deferred_from_round: Round) -> Self {
Self::Randomness {
deferred_from_round,
}
}
pub fn new_for_consensus_round(future_round: Round, deferred_from_round: Round) -> Self {
Self::ConsensusRound {
future_round,
deferred_from_round,
}
}
fn full_range_for_randomness() -> (Self, Self) {
(
Self::Randomness {
deferred_from_round: 0,
},
Self::Randomness {
deferred_from_round: u64::MAX,
},
)
}
fn range_for_up_to_consensus_round(consensus_round: Round) -> (Self, Self) {
(
Self::ConsensusRound {
future_round: 0,
deferred_from_round: 0,
},
Self::ConsensusRound {
future_round: consensus_round.checked_add(1).unwrap(),
deferred_from_round: 0,
},
)
}
}
#[tokio::test]
async fn test_deferral_key_sort_order() {
use rand::prelude::*;
#[derive(DBMapUtils)]
struct TestDB {
deferred_certs: DBMap<DeferralKey, ()>,
}
let tempdir = tempfile::tempdir().unwrap();
let db = TestDB::open_tables_read_write(
tempdir.path().to_owned(),
MetricConf::new("test_db"),
None,
None,
);
for _ in 0..10000 {
let future_round = rand::thread_rng().gen_range(0..u64::MAX);
let current_round = rand::thread_rng().gen_range(0..u64::MAX);
let key = DeferralKey::new_for_consensus_round(future_round, current_round);
db.deferred_certs.insert(&key, &()).unwrap();
}
let mut previous_future_round = 0;
for (key, _) in db.deferred_certs.unbounded_iter() {
match key {
DeferralKey::Randomness { .. } => (),
DeferralKey::ConsensusRound { future_round, .. } => {
assert!(previous_future_round <= future_round);
previous_future_round = future_round;
}
}
}
}
#[tokio::test]
async fn test_fetching_deferred_txs() {
use rand::prelude::*;
#[derive(DBMapUtils)]
struct TestDB {
deferred_certs: DBMap<DeferralKey, ()>,
}
let tempdir = tempfile::tempdir().unwrap();
let db = TestDB::open_tables_read_write(
tempdir.path().to_owned(),
MetricConf::new("test_db"),
None,
None,
);
let min_future_round = 100;
let max_future_round = 300;
for _ in 0..10000 {
let future_round = rand::thread_rng().gen_range(min_future_round..=max_future_round);
let current_round = rand::thread_rng().gen_range(0..u64::MAX);
db.deferred_certs
.insert(
&DeferralKey::new_for_consensus_round(future_round, current_round),
&(),
)
.unwrap();
db.deferred_certs
.insert(&DeferralKey::new_for_randomness(current_round), &())
.unwrap();
}
let (min, max) = DeferralKey::range_for_up_to_consensus_round(200);
let mut previous_future_round = 0;
let mut result_count = 0;
for result in db
.deferred_certs
.safe_iter_with_bounds(Some(min), Some(max))
{
let (key, _) = result.unwrap();
match key {
DeferralKey::Randomness { .. } => {
panic!("Should not receive randomness deferral txn.")
}
DeferralKey::ConsensusRound { future_round, .. } => {
assert!(previous_future_round <= future_round);
previous_future_round = future_round;
assert!(future_round <= 200);
result_count += 1;
}
}
}
assert!(result_count > 0);
}
enum DeferralReason {
RandomnessNotReady,
SharedObjectCongestion(Vec<ObjectID>),
}
fn signed_transactions_table_default_config() -> DBOptions {
default_db_options()
.optimize_for_write_throughput()
.optimize_for_large_values_no_scan(1 << 10)
}
fn owned_object_transaction_locks_table_default_config() -> DBOptions {
DBOptions {
options: default_db_options()
.optimize_for_write_throughput()
.optimize_for_read(read_size_from_env(ENV_VAR_LOCKS_BLOCK_CACHE_SIZE).unwrap_or(1024))
.options,
rw_options: ReadWriteOptions::default().set_ignore_range_deletions(false),
}
}
fn pending_execution_table_default_config() -> DBOptions {
default_db_options()
.optimize_for_write_throughput()
.optimize_for_large_values_no_scan(1 << 10)
}
fn pending_consensus_transactions_table_default_config() -> DBOptions {
default_db_options()
.optimize_for_write_throughput()
.optimize_for_large_values_no_scan(1 << 10)
}
fn pending_checkpoints_table_default_config() -> DBOptions {
default_db_options()
.optimize_for_write_throughput()
.optimize_for_large_values_no_scan(1 << 10)
}
impl AuthorityEpochTables {
pub fn open(epoch: EpochId, parent_path: &Path, db_options: Option<Options>) -> Self {
Self::open_tables_transactional(
Self::path(epoch, parent_path),
MetricConf::new("epoch"),
db_options,
None,
)
}
pub fn open_readonly(epoch: EpochId, parent_path: &Path) -> AuthorityEpochTablesReadOnly {
Self::get_read_only_handle(
Self::path(epoch, parent_path),
None,
None,
MetricConf::new("epoch_readonly"),
)
}
pub fn path(epoch: EpochId, parent_path: &Path) -> PathBuf {
parent_path.join(format!("{}{}", EPOCH_DB_PREFIX, epoch))
}
fn load_reconfig_state(&self) -> SuiResult<ReconfigState> {
let state = self
.reconfig_state
.get(&RECONFIG_STATE_INDEX)?
.unwrap_or_default();
Ok(state)
}
pub fn get_all_pending_consensus_transactions(&self) -> Vec<ConsensusTransaction> {
self.pending_consensus_transactions
.unbounded_iter()
.map(|(_k, v)| v)
.collect()
}
pub fn reset_db_for_execution_since_genesis(&self) -> SuiResult {
self.executed_transactions_to_checkpoint.unsafe_clear()?;
Ok(())
}
pub fn remove_executed_tx_subtle(&self, digest: &TransactionDigest) -> SuiResult {
self.executed_transactions_to_checkpoint.remove(digest)?;
Ok(())
}
pub fn get_last_consensus_index(&self) -> SuiResult<Option<ExecutionIndicesWithHash>> {
Ok(self.last_consensus_index.get(&LAST_CONSENSUS_STATS_ADDR)?)
}
pub fn get_last_consensus_stats(&self) -> SuiResult<Option<ExecutionIndicesWithStats>> {
Ok(self.last_consensus_stats.get(&LAST_CONSENSUS_STATS_ADDR)?)
}
pub fn get_pending_checkpoint_signatures_iter(
&self,
checkpoint_seq: CheckpointSequenceNumber,
starting_index: u64,
) -> SuiResult<
impl Iterator<Item = ((CheckpointSequenceNumber, u64), CheckpointSignatureMessage)> + '_,
> {
let key = (checkpoint_seq, starting_index);
debug!("Scanning pending checkpoint signatures from {:?}", key);
let iter = self
.pending_checkpoint_signatures
.unbounded_iter()
.skip_to(&key)?;
Ok::<_, SuiError>(iter)
}
pub fn get_locked_transaction(&self, obj_ref: &ObjectRef) -> SuiResult<Option<LockDetails>> {
Ok(self
.owned_object_locked_transactions
.get(obj_ref)?
.map(|l| l.migrate().into_inner()))
}
pub fn multi_get_locked_transactions(
&self,
owned_input_objects: &[ObjectRef],
) -> SuiResult<Vec<Option<LockDetails>>> {
Ok(self
.owned_object_locked_transactions
.multi_get(owned_input_objects)?
.into_iter()
.map(|l| l.map(|l| l.migrate().into_inner()))
.collect())
}
pub fn write_transaction_locks(
&self,
transaction: VerifiedSignedTransaction,
locks_to_write: impl Iterator<Item = (ObjectRef, LockDetails)>,
) -> SuiResult {
let mut batch = self.owned_object_locked_transactions.batch();
batch.insert_batch(
&self.owned_object_locked_transactions,
locks_to_write.map(|(obj_ref, lock)| (obj_ref, LockDetailsWrapper::from(lock))),
)?;
batch.insert_batch(
&self.signed_transactions,
std::iter::once((*transaction.digest(), transaction.serializable_ref())),
)?;
batch.write()?;
Ok(())
}
}
pub(crate) const MUTEX_TABLE_SIZE: usize = 1024;
impl AuthorityPerEpochStore {
#[instrument(name = "AuthorityPerEpochStore::new", level = "error", skip_all, fields(epoch = committee.epoch))]
pub fn new(
name: AuthorityName,
committee: Arc<Committee>,
parent_path: &Path,
db_options: Option<Options>,
metrics: Arc<EpochMetrics>,
epoch_start_configuration: EpochStartConfiguration,
execution_cache: Arc<ExecutionCache>,
cache_metrics: Arc<ResolverMetrics>,
signature_verifier_metrics: Arc<SignatureVerifierMetrics>,
expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
chain_identifier: ChainIdentifier,
) -> Arc<Self> {
let current_time = Instant::now();
let epoch_id = committee.epoch;
let tables = AuthorityEpochTables::open(epoch_id, parent_path, db_options.clone());
let end_of_publish =
StakeAggregator::from_iter(committee.clone(), tables.end_of_publish.unbounded_iter());
let reconfig_state = tables
.load_reconfig_state()
.expect("Load reconfig state at initialization cannot fail");
let epoch_alive_notify = NotifyOnce::new();
let pending_consensus_transactions = tables.get_all_pending_consensus_transactions();
let pending_consensus_certificates: HashSet<_> = pending_consensus_transactions
.iter()
.filter_map(|transaction| {
if let ConsensusTransactionKind::UserTransaction(certificate) = &transaction.kind {
Some(*certificate.digest())
} else {
None
}
})
.collect();
assert_eq!(
epoch_start_configuration.epoch_start_state().epoch(),
epoch_id
);
let epoch_start_configuration = Arc::new(epoch_start_configuration);
metrics.current_epoch.set(epoch_id as i64);
metrics
.current_voting_right
.set(committee.weight(&name) as i64);
let protocol_version = epoch_start_configuration
.epoch_start_state()
.protocol_version();
let protocol_config =
ProtocolConfig::get_for_version(protocol_version, chain_identifier.chain());
let execution_component = ExecutionComponents::new(
&protocol_config,
execution_cache.clone(),
cache_metrics,
expensive_safety_check_config,
);
let zklogin_env = match chain_identifier.chain() {
Chain::Mainnet | Chain::Testnet => ZkLoginEnv::Prod,
_ => ZkLoginEnv::Test,
};
let supported_providers = protocol_config
.zklogin_supported_providers()
.iter()
.map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
.collect::<Vec<_>>();
let signature_verifier = SignatureVerifier::new(
committee.clone(),
signature_verifier_metrics,
supported_providers,
zklogin_env,
protocol_config.verify_legacy_zklogin_address(),
protocol_config.accept_zklogin_in_multisig(),
protocol_config.zklogin_max_epoch_upper_bound_delta(),
);
let authenticator_state_exists = epoch_start_configuration
.authenticator_obj_initial_shared_version()
.is_some();
let authenticator_state_enabled =
authenticator_state_exists && protocol_config.enable_jwk_consensus_updates();
if authenticator_state_enabled {
info!("authenticator_state enabled");
let authenticator_state = get_authenticator_state(execution_cache.as_ref())
.expect("Read cannot fail")
.expect("Authenticator state must exist");
for active_jwk in &authenticator_state.active_jwks {
let ActiveJwk { jwk_id, jwk, epoch } = active_jwk;
assert!(epoch <= &epoch_id);
signature_verifier.insert_jwk(jwk_id, jwk);
}
} else {
info!("authenticator_state disabled");
}
let is_validator = committee.authority_index(&name).is_some();
if is_validator {
assert!(epoch_start_configuration
.flags()
.contains(&EpochFlag::InMemoryCheckpointRoots));
}
let mut jwk_aggregator = JwkAggregator::new(committee.clone());
for ((authority, id, jwk), _) in tables.pending_jwks.unbounded_iter().seek_to_first() {
jwk_aggregator.insert(authority, (id, jwk));
}
let jwk_aggregator = Mutex::new(jwk_aggregator);
let s = Arc::new(Self {
name,
committee,
protocol_config,
tables: ArcSwapOption::new(Some(Arc::new(tables))),
parent_path: parent_path.to_path_buf(),
db_options,
reconfig_state_mem: RwLock::new(reconfig_state),
epoch_alive_notify,
user_certs_closed_notify: NotifyOnce::new(),
epoch_alive: tokio::sync::RwLock::new(true),
consensus_notify_read: NotifyRead::new(),
signature_verifier,
checkpoint_state_notify_read: NotifyRead::new(),
executed_digests_notify_read: NotifyRead::new(),
end_of_publish: Mutex::new(end_of_publish),
pending_consensus_certificates: Mutex::new(pending_consensus_certificates),
mutex_table: MutexTable::new(MUTEX_TABLE_SIZE),
epoch_open_time: current_time,
epoch_close_time: Default::default(),
metrics,
epoch_start_configuration,
execution_component,
chain_identifier,
jwk_aggregator,
randomness_manager: OnceCell::new(),
randomness_reporter: OnceCell::new(),
});
s.update_buffer_stake_metric();
s
}
pub fn tables(&self) -> SuiResult<Arc<AuthorityEpochTables>> {
match self.tables.load_full() {
Some(tables) => Ok(tables),
None => Err(SuiError::EpochEnded(self.epoch())),
}
}
pub fn release_db_handles(&self) {
self.tables.store(None);
}
pub fn authenticator_state_enabled(&self) -> bool {
self.protocol_config().enable_jwk_consensus_updates() && self.authenticator_state_exists()
}
pub fn authenticator_state_exists(&self) -> bool {
self.epoch_start_configuration
.authenticator_obj_initial_shared_version()
.is_some()
}
pub fn randomness_state_enabled(&self) -> bool {
self.protocol_config().random_beacon() && self.randomness_state_exists()
}
pub fn randomness_state_exists(&self) -> bool {
self.epoch_start_configuration
.randomness_obj_initial_shared_version()
.is_some()
}
pub fn randomness_reporter(&self) -> Option<RandomnessReporter> {
self.randomness_reporter.get().cloned()
}
pub fn set_randomness_manager(
&self,
mut randomness_manager: RandomnessManager,
) -> SuiResult<()> {
let reporter = randomness_manager.reporter();
let result = randomness_manager.start_dkg();
if self
.randomness_manager
.set(tokio::sync::Mutex::new(randomness_manager))
.is_err()
{
error!("BUG: `set_randomness_manager` called more than once; this should never happen");
}
if self.randomness_reporter.set(reporter).is_err() {
error!("BUG: `set_randomness_manager` called more than once; this should never happen");
}
result
}
pub fn coin_deny_list_state_exists(&self) -> bool {
self.epoch_start_configuration
.coin_deny_list_obj_initial_shared_version()
.is_some()
}
pub fn coin_deny_list_state_enabled(&self) -> bool {
self.protocol_config().enable_coin_deny_list() && self.coin_deny_list_state_exists()
}
pub fn get_parent_path(&self) -> PathBuf {
self.parent_path.clone()
}
pub fn epoch_start_config(&self) -> &Arc<EpochStartConfiguration> {
&self.epoch_start_configuration
}
pub fn epoch_start_state(&self) -> &EpochStartSystemState {
self.epoch_start_configuration.epoch_start_state()
}
pub fn get_chain_identifier(&self) -> ChainIdentifier {
self.chain_identifier
}
pub fn new_at_next_epoch(
&self,
name: AuthorityName,
new_committee: Committee,
epoch_start_configuration: EpochStartConfiguration,
cache: Arc<ExecutionCache>,
expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
chain_identifier: ChainIdentifier,
) -> Arc<Self> {
assert_eq!(self.epoch() + 1, new_committee.epoch);
self.record_reconfig_halt_duration_metric();
self.record_epoch_total_duration_metric();
Self::new(
name,
Arc::new(new_committee),
&self.parent_path,
self.db_options.clone(),
self.metrics.clone(),
epoch_start_configuration,
cache,
self.execution_component.metrics(),
self.signature_verifier.metrics.clone(),
expensive_safety_check_config,
chain_identifier,
)
}
pub fn committee(&self) -> &Arc<Committee> {
&self.committee
}
pub fn protocol_config(&self) -> &ProtocolConfig {
&self.protocol_config
}
pub fn epoch(&self) -> EpochId {
self.committee.epoch
}
pub fn get_state_hash_for_checkpoint(
&self,
checkpoint: &CheckpointSequenceNumber,
) -> SuiResult<Option<Accumulator>> {
Ok(self.tables()?.state_hash_by_checkpoint.get(checkpoint)?)
}
pub fn insert_state_hash_for_checkpoint(
&self,
checkpoint: &CheckpointSequenceNumber,
accumulator: &Accumulator,
) -> SuiResult {
Ok(self
.tables()?
.state_hash_by_checkpoint
.insert(checkpoint, accumulator)?)
}
pub fn reference_gas_price(&self) -> u64 {
self.epoch_start_state().reference_gas_price()
}
pub fn protocol_version(&self) -> ProtocolVersion {
self.epoch_start_state().protocol_version()
}
pub fn module_cache(&self) -> &Arc<SyncModuleCache<ResolverWrapper<ExecutionCache>>> {
&self.execution_component.module_cache
}
pub fn executor(&self) -> &Arc<dyn Executor + Send + Sync> {
&self.execution_component.executor
}
pub async fn acquire_tx_guard(
&self,
cert: &VerifiedExecutableTransaction,
) -> SuiResult<CertTxGuard> {
let digest = cert.digest();
Ok(CertTxGuard(self.acquire_tx_lock(digest).await))
}
pub async fn acquire_tx_lock(&self, digest: &TransactionDigest) -> CertLockGuard {
CertLockGuard(self.mutex_table.acquire_lock(*digest).await)
}
pub fn store_reconfig_state(&self, new_state: &ReconfigState) -> SuiResult {
self.tables()?
.reconfig_state
.insert(&RECONFIG_STATE_INDEX, new_state)?;
Ok(())
}
fn store_reconfig_state_batch(
&self,
new_state: &ReconfigState,
batch: &mut DBBatch,
) -> SuiResult {
batch.insert_batch(
&self.tables()?.reconfig_state,
[(&RECONFIG_STATE_INDEX, new_state)],
)?;
Ok(())
}
pub fn insert_signed_transaction(&self, transaction: VerifiedSignedTransaction) -> SuiResult {
Ok(self
.tables()?
.signed_transactions
.insert(transaction.digest(), transaction.serializable_ref())?)
}
#[cfg(test)]
pub fn delete_signed_transaction_for_test(&self, transaction: &TransactionDigest) {
self.tables()
.expect("test should not cross epoch boundary")
.signed_transactions
.remove(transaction)
.unwrap();
}
#[cfg(test)]
pub fn delete_object_locks_for_test(&self, objects: &[ObjectRef]) {
for object in objects {
self.tables()
.expect("test should not cross epoch boundary")
.owned_object_locked_transactions
.remove(object)
.unwrap();
}
}
pub fn get_signed_transaction(
&self,
tx_digest: &TransactionDigest,
) -> SuiResult<Option<VerifiedSignedTransaction>> {
Ok(self
.tables()?
.signed_transactions
.get(tx_digest)?
.map(|t| t.into()))
}
#[instrument(level = "trace", skip_all)]
pub fn insert_tx_cert_and_effects_signature(
&self,
tx_key: &TransactionKey,
tx_digest: &TransactionDigest,
cert_sig: Option<&AuthorityStrongQuorumSignInfo>,
effects_signature: Option<&AuthoritySignInfo>,
) -> SuiResult {
let mut batch = self.tables()?.effects_signatures.batch();
if let Some(cert_sig) = cert_sig {
batch.insert_batch(
&self.tables()?.transaction_cert_signatures,
[(tx_digest, cert_sig)],
)?;
}
if let Some(effects_signature) = effects_signature {
batch.insert_batch(
&self.tables()?.effects_signatures,
[(tx_digest, effects_signature)],
)?;
}
if !matches!(tx_key, TransactionKey::Digest(_)) {
batch.insert_batch(
&self.tables()?.transaction_key_to_digest,
[(tx_key, tx_digest)],
)?;
}
batch.write()?;
if !matches!(tx_key, TransactionKey::Digest(_)) {
self.executed_digests_notify_read.notify(tx_key, tx_digest);
}
Ok(())
}
pub fn effects_signatures_exists<'a>(
&self,
digests: impl IntoIterator<Item = &'a TransactionDigest>,
) -> SuiResult<Vec<bool>> {
Ok(self
.tables()?
.effects_signatures
.multi_contains_keys(digests)?)
}
pub fn get_effects_signature(
&self,
tx_digest: &TransactionDigest,
) -> SuiResult<Option<AuthoritySignInfo>> {
Ok(self.tables()?.effects_signatures.get(tx_digest)?)
}
pub fn get_transaction_cert_sig(
&self,
tx_digest: &TransactionDigest,
) -> SuiResult<Option<AuthorityStrongQuorumSignInfo>> {
Ok(self.tables()?.transaction_cert_signatures.get(tx_digest)?)
}
pub(crate) fn get_input_object_keys(
&self,
key: &TransactionKey,
objects: &[InputObjectKind],
) -> BTreeSet<InputKey> {
let mut shared_locks = HashMap::<ObjectID, SequenceNumber>::new();
objects
.iter()
.map(|kind| {
match kind {
InputObjectKind::SharedMoveObject { id, .. } => {
if shared_locks.is_empty() {
shared_locks = self
.get_shared_locks(key)
.expect("Read from storage should not fail!")
.into_iter()
.collect();
}
let Some(version) = shared_locks.get(id) else {
panic!(
"Shared object locks should have been set. key: {key:?}, obj \
id: {id:?}",
)
};
InputKey::VersionedObject {
id: *id,
version: *version,
}
}
InputObjectKind::MovePackage(id) => InputKey::Package { id: *id },
InputObjectKind::ImmOrOwnedMoveObject(objref) => InputKey::VersionedObject {
id: objref.0,
version: objref.1,
},
}
})
.collect()
}
pub fn get_last_consensus_index(&self) -> SuiResult<ExecutionIndicesWithHash> {
self.tables()?
.get_last_consensus_index()
.map(|x| x.unwrap_or_default())
.map_err(SuiError::from)
}
pub fn get_last_consensus_stats(&self) -> SuiResult<ExecutionIndicesWithStats> {
match self
.tables()?
.get_last_consensus_stats()
.map_err(SuiError::from)?
{
Some(stats) => Ok(stats),
None => {
let indices = self
.tables()?
.get_last_consensus_index()
.map(|x| x.unwrap_or_default())
.map_err(SuiError::from)?;
Ok(ExecutionIndicesWithStats {
index: indices.index,
hash: indices.hash,
stats: ConsensusStats::default(),
})
}
}
}
pub fn get_accumulators_in_checkpoint_range(
&self,
from_checkpoint: CheckpointSequenceNumber,
to_checkpoint: CheckpointSequenceNumber,
) -> SuiResult<Vec<(CheckpointSequenceNumber, Accumulator)>> {
self.tables()?
.state_hash_by_checkpoint
.safe_range_iter(from_checkpoint..=to_checkpoint)
.collect::<Result<Vec<_>, _>>()
.map_err(Into::into)
}
pub async fn notify_read_checkpoint_state_digests(
&self,
checkpoints: Vec<CheckpointSequenceNumber>,
) -> SuiResult<Vec<Accumulator>> {
let registrations = self.checkpoint_state_notify_read.register_all(&checkpoints);
let accumulators = self
.tables()?
.state_hash_by_checkpoint
.multi_get(checkpoints)?;
let results =
accumulators
.into_iter()
.zip(registrations.into_iter())
.map(|(a, r)| match a {
Some(ready) => Either::Left(futures::future::ready(ready)),
None => Either::Right(r),
});
Ok(join_all(results).await)
}
pub fn all_pending_execution(&self) -> SuiResult<Vec<VerifiedExecutableTransaction>> {
Ok(self
.tables()?
.pending_execution
.unbounded_iter()
.map(|(_, cert)| cert.into())
.collect())
}
#[instrument(level = "trace", skip_all)]
pub fn multi_remove_pending_execution(&self, digests: &[TransactionDigest]) -> SuiResult<()> {
let tables = match self.tables() {
Ok(tables) => tables,
Err(SuiError::EpochEnded(_)) => return Ok(()),
Err(e) => return Err(e),
};
let mut batch = tables.pending_execution.batch();
batch.delete_batch(&tables.pending_execution, digests)?;
batch.write()?;
Ok(())
}
pub fn get_all_pending_consensus_transactions(&self) -> Vec<ConsensusTransaction> {
self.tables()
.expect("recovery should not cross epoch boundary")
.get_all_pending_consensus_transactions()
}
#[cfg(test)]
pub fn get_next_object_version(&self, obj: &ObjectID) -> Option<SequenceNumber> {
self.tables()
.expect("test should not cross epoch boundary")
.next_shared_object_versions
.get(obj)
.unwrap()
}
pub fn set_shared_object_versions_for_testing(
&self,
tx_digest: &TransactionDigest,
assigned_versions: &Vec<(ObjectID, SequenceNumber)>,
) -> SuiResult {
if self.randomness_state_enabled() {
self.tables()?
.assigned_shared_object_versions_v2
.insert(&TransactionKey::Digest(*tx_digest), assigned_versions)?;
} else {
self.tables()?
.assigned_shared_object_versions
.insert(tx_digest, assigned_versions)?;
}
Ok(())
}
pub fn insert_finalized_transactions(
&self,
digests: &[TransactionDigest],
sequence: CheckpointSequenceNumber,
) -> SuiResult {
let mut batch = self.tables()?.executed_transactions_to_checkpoint.batch();
batch.insert_batch(
&self.tables()?.executed_transactions_to_checkpoint,
digests.iter().map(|d| (*d, sequence)),
)?;
batch.write()?;
trace!("Transactions {digests:?} finalized at checkpoint {sequence}");
Ok(())
}
pub fn is_transaction_executed_in_checkpoint(
&self,
digest: &TransactionDigest,
) -> SuiResult<bool> {
Ok(self
.tables()?
.executed_transactions_to_checkpoint
.contains_key(digest)?)
}
pub fn get_transaction_checkpoint(
&self,
digest: &TransactionDigest,
) -> SuiResult<Option<CheckpointSequenceNumber>> {
Ok(self
.tables()?
.executed_transactions_to_checkpoint
.get(digest)?)
}
pub fn multi_get_transaction_checkpoint(
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
Ok(self
.tables()?
.executed_transactions_to_checkpoint
.multi_get(digests)?
.into_iter()
.collect())
}
pub fn per_epoch_finalized_txns_enabled(&self) -> bool {
self.epoch_start_configuration
.flags()
.contains(&EpochFlag::PerEpochFinalizedTransactions)
}
pub fn object_lock_split_tables_enabled(&self) -> bool {
self.epoch_start_configuration
.object_lock_split_tables_enabled()
}
pub(crate) async fn get_or_init_next_object_versions(
&self,
objects_to_init: &[(ObjectID, SequenceNumber)],
cache_reader: &dyn ExecutionCacheRead,
) -> SuiResult<HashMap<ObjectID, SequenceNumber>> {
let mut ret: HashMap<_, _>;
retry_transaction_forever!({
let tables = self.tables()?;
let mut db_transaction = tables.next_shared_object_versions.transaction()?;
let ids: Vec<_> = objects_to_init.iter().map(|(id, _)| *id).collect();
let next_versions = db_transaction
.multi_get(&self.tables()?.next_shared_object_versions, ids.clone())?;
let uninitialized_objects: Vec<(ObjectID, SequenceNumber)> = next_versions
.iter()
.zip(objects_to_init)
.filter_map(|(next_version, id_and_version)| match next_version {
None => Some(*id_and_version),
Some(_) => None,
})
.collect();
if uninitialized_objects.is_empty() {
return Ok(izip!(ids, next_versions.into_iter().map(|v| v.unwrap())).collect());
}
let versions_to_write: Vec<_> = uninitialized_objects
.iter()
.map(|(id, initial_version)| {
match cache_reader.get_object(id).expect("read cannot fail") {
Some(obj) => (*id, obj.version()),
None => (*id, *initial_version),
}
})
.collect();
ret = izip!(ids.clone(), next_versions.into_iter(),)
.filter_map(|(id, next_version)| next_version.map(|v| (id, v)))
.chain(versions_to_write.iter().cloned())
.collect();
debug!(
?versions_to_write,
"initializing next_shared_object_versions"
);
db_transaction.insert_batch(
&self.tables()?.next_shared_object_versions,
versions_to_write,
)?;
db_transaction.commit()
})?;
Ok(ret)
}
async fn set_assigned_shared_object_versions_with_db_batch(
&self,
versions: AssignedTxAndVersions,
db_batch: &mut DBBatch,
) -> SuiResult {
debug!("set_assigned_shared_object_versions: {:?}", versions);
if self.randomness_state_enabled() {
db_batch.insert_batch(&self.tables()?.assigned_shared_object_versions_v2, versions)?;
} else {
db_batch.insert_batch(
&self.tables()?.assigned_shared_object_versions,
versions
.iter()
.map(|(key, versions)| (key.unwrap_digest(), versions)),
)?;
}
Ok(())
}
pub async fn assign_shared_object_versions_idempotent(
&self,
cache_reader: &dyn ExecutionCacheRead,
certificates: &[VerifiedExecutableTransaction],
) -> SuiResult {
let mut db_batch = self.tables()?.assigned_shared_object_versions.batch();
let assigned_versions = SharedObjVerManager::assign_versions_from_consensus(
self,
cache_reader,
certificates,
None,
)
.await?
.assigned_versions;
self.set_assigned_shared_object_versions_with_db_batch(assigned_versions, &mut db_batch)
.await?;
db_batch.write()?;
Ok(())
}
fn defer_transactions(
&self,
batch: &mut DBBatch,
key: DeferralKey,
transactions: Vec<VerifiedSequencedConsensusTransaction>,
) -> SuiResult {
batch.insert_batch(
&self.tables()?.deferred_transactions,
std::iter::once((key, transactions)),
)?;
Ok(())
}
fn load_deferred_transactions_for_randomness(
&self,
batch: &mut DBBatch,
) -> SuiResult<Vec<(DeferralKey, Vec<VerifiedSequencedConsensusTransaction>)>> {
let (min, max) = DeferralKey::full_range_for_randomness();
self.load_deferred_transactions(batch, min, max)
}
fn load_and_process_deferred_transactions_for_randomness(
&self,
batch: &mut DBBatch,
previously_deferred_tx_digests: &mut HashMap<TransactionDigest, DeferralKey>,
sequenced_randomness_transactions: &mut Vec<VerifiedSequencedConsensusTransaction>,
) -> SuiResult {
let deferred_randomness_txs = self.load_deferred_transactions_for_randomness(batch)?;
previously_deferred_tx_digests.extend(deferred_randomness_txs.iter().flat_map(
|(deferral_key, txs)| {
txs.iter().map(|tx| match tx.0.transaction.key() {
SequencedConsensusTransactionKey::External(
ConsensusTransactionKey::Certificate(digest),
) => (digest, *deferral_key),
_ => {
panic!("deferred randomness transaction was not a user certificate: {tx:?}")
}
})
},
));
sequenced_randomness_transactions
.extend(deferred_randomness_txs.into_iter().flat_map(|(_, txs)| txs));
Ok(())
}
fn load_deferred_transactions_for_up_to_consensus_round(
&self,
batch: &mut DBBatch,
consensus_round: u64,
) -> SuiResult<Vec<(DeferralKey, Vec<VerifiedSequencedConsensusTransaction>)>> {
let (min, max) = DeferralKey::range_for_up_to_consensus_round(consensus_round);
self.load_deferred_transactions(batch, min, max)
}
fn load_deferred_transactions(
&self,
batch: &mut DBBatch,
min: DeferralKey,
max: DeferralKey,
) -> SuiResult<Vec<(DeferralKey, Vec<VerifiedSequencedConsensusTransaction>)>> {
debug!("Query epoch store to load deferred txn {:?} {:?}", min, max);
let mut keys = Vec::new();
let mut txns = Vec::new();
self.tables()?
.deferred_transactions
.safe_iter_with_bounds(Some(min), Some(max))
.try_for_each(|result| match result {
Ok((key, txs)) => {
debug!(
"Loaded {:?} deferred txn with deferral key {:?}",
txs.len(),
key
);
keys.push(key);
txns.push((key, txs));
Ok(())
}
Err(err) => Err(err),
})?;
#[cfg(debug_assertions)]
{
let mut seen = HashSet::new();
for deferred_txn_batch in &txns {
for txn in &deferred_txn_batch.1 {
assert!(seen.insert(txn.0.key()));
}
}
}
batch.delete_batch(&self.tables()?.deferred_transactions, keys)?;
Ok(txns)
}
pub fn get_all_deferred_transactions_for_test(
&self,
) -> SuiResult<Vec<(DeferralKey, Vec<VerifiedSequencedConsensusTransaction>)>> {
Ok(self
.tables()?
.deferred_transactions
.safe_iter()
.collect::<Result<Vec<_>, _>>()?)
}
fn should_defer(
&self,
cert: &VerifiedExecutableTransaction,
commit_round: Round,
dkg_failed: bool,
generating_randomness: bool,
previously_deferred_tx_digests: &HashMap<TransactionDigest, DeferralKey>,
shared_object_congestion_tracker: &SharedObjectCongestionTracker,
) -> Option<(DeferralKey, DeferralReason)> {
if !dkg_failed
&& !generating_randomness
&& self.randomness_state_enabled()
&& cert.is_randomness_reader()
{
return Some((
DeferralKey::new_for_randomness(commit_round),
DeferralReason::RandomnessNotReady,
));
}
match self.protocol_config().per_object_congestion_control_mode() {
PerObjectCongestionControlMode::None => None,
PerObjectCongestionControlMode::TotalGasBudget => {
if let Some((deferral_key, congested_objects)) = shared_object_congestion_tracker
.should_defer_due_to_object_congestion(
cert,
self.protocol_config()
.max_accumulated_txn_cost_per_object_in_checkpoint(),
previously_deferred_tx_digests,
commit_round,
)
{
Some((
deferral_key,
DeferralReason::SharedObjectCongestion(congested_objects),
))
} else {
None
}
}
}
}
fn update_object_execution_cost(
&self,
cert: &VerifiedExecutableTransaction,
shared_object_congestion_tracker: &mut SharedObjectCongestionTracker,
) {
match self.protocol_config().per_object_congestion_control_mode() {
PerObjectCongestionControlMode::None => {}
PerObjectCongestionControlMode::TotalGasBudget => {
shared_object_congestion_tracker.bump_object_execution_cost(
&cert.shared_input_objects().collect::<Vec<_>>(),
cert.gas_budget(),
);
}
}
}
#[instrument(level = "trace", skip_all)]
pub async fn acquire_shared_locks_from_effects(
&self,
certificate: &VerifiedExecutableTransaction,
effects: &TransactionEffects,
cache_reader: &dyn ExecutionCacheRead,
) -> SuiResult {
let versions = SharedObjVerManager::assign_versions_from_effects(
&[(certificate, effects)],
self,
cache_reader,
)
.await?;
let mut db_batch = self.tables()?.assigned_shared_object_versions.batch();
self.set_assigned_shared_object_versions_with_db_batch(versions, &mut db_batch)
.await?;
db_batch.write()?;
Ok(())
}
pub fn insert_pending_consensus_transactions(
&self,
transaction: &ConsensusTransaction,
lock: Option<&RwLockReadGuard<ReconfigState>>,
) -> SuiResult {
self.tables()?
.pending_consensus_transactions
.insert(&transaction.key(), transaction)?;
if let ConsensusTransactionKind::UserTransaction(cert) = &transaction.kind {
let state = lock.expect("Must pass reconfiguration lock when storing certificate");
assert!(
state.should_accept_user_certs(),
"Reconfiguration state should allow accepting user transactions"
);
self.pending_consensus_certificates
.lock()
.insert(*cert.digest());
}
Ok(())
}
pub fn remove_pending_consensus_transaction(&self, key: &ConsensusTransactionKey) -> SuiResult {
self.tables()?.pending_consensus_transactions.remove(key)?;
if let ConsensusTransactionKey::Certificate(cert) = key {
self.pending_consensus_certificates.lock().remove(cert);
}
Ok(())
}
pub fn pending_consensus_certificates_count(&self) -> usize {
self.pending_consensus_certificates.lock().len()
}
pub fn pending_consensus_certificates_empty(&self) -> bool {
self.pending_consensus_certificates.lock().is_empty()
}
pub fn pending_consensus_certificates(&self) -> HashSet<TransactionDigest> {
self.pending_consensus_certificates.lock().clone()
}
pub fn deferred_transactions_empty(&self) -> bool {
self.tables()
.expect("deferred transactions should not be read past end of epoch")
.deferred_transactions
.is_empty()
}
pub fn insert_pending_execution(
&self,
certs: &[TrustedExecutableTransaction],
) -> SuiResult<()> {
let mut batch = self.tables()?.pending_execution.batch();
batch.insert_batch(
&self.tables()?.pending_execution,
certs
.iter()
.map(|cert| (*cert.inner().digest(), cert.clone())),
)?;
batch.write()?;
Ok(())
}
pub fn is_tx_cert_consensus_message_processed(
&self,
certificate: &CertifiedTransaction,
) -> SuiResult<bool> {
self.is_consensus_message_processed(&SequencedConsensusTransactionKey::External(
ConsensusTransactionKey::Certificate(*certificate.digest()),
))
}
pub fn is_consensus_message_processed(
&self,
key: &SequencedConsensusTransactionKey,
) -> SuiResult<bool> {
Ok(self
.tables()?
.consensus_message_processed
.contains_key(key)?)
}
pub async fn consensus_message_processed_notify(
&self,
key: SequencedConsensusTransactionKey,
) -> Result<(), SuiError> {
let registration = self.consensus_notify_read.register_one(&key);
if self.is_consensus_message_processed(&key)? {
return Ok(());
}
registration.await;
Ok(())
}
pub fn check_consensus_messages_processed<'a>(
&self,
keys: impl Iterator<Item = &'a SequencedConsensusTransactionKey>,
) -> SuiResult<Vec<bool>> {
Ok(self
.tables()?
.consensus_message_processed
.multi_contains_keys(keys)?)
}
pub async fn consensus_messages_processed_notify(
&self,
keys: Vec<SequencedConsensusTransactionKey>,
) -> Result<(), SuiError> {
let registrations = self.consensus_notify_read.register_all(&keys);
let unprocessed_keys_registrations = registrations
.into_iter()
.zip(self.check_consensus_messages_processed(keys.iter())?)
.filter(|(_, processed)| !processed)
.map(|(registration, _)| registration);
join_all(unprocessed_keys_registrations).await;
Ok(())
}
pub fn has_sent_end_of_publish(&self, authority: &AuthorityName) -> SuiResult<bool> {
Ok(self
.end_of_publish
.try_lock()
.expect("No contention on end_of_publish lock")
.contains_key(authority))
}
pub async fn notify_read_executed_digests(
&self,
keys: &[TransactionKey],
) -> SuiResult<Vec<TransactionDigest>> {
let non_digest_keys: Vec<_> = keys
.iter()
.filter_map(|key| {
if matches!(key, TransactionKey::Digest(_)) {
None
} else {
Some(*key)
}
})
.collect();
let registrations = self
.executed_digests_notify_read
.register_all(&non_digest_keys);
let executed_digests = self
.tables()?
.transaction_key_to_digest
.multi_get(&non_digest_keys)?;
let futures = executed_digests
.into_iter()
.zip(registrations)
.map(|(d, r)| match d {
Some(ready) => Either::Left(futures::future::ready(ready)),
None => Either::Right(r),
});
let mut results = VecDeque::from(join_all(futures).await);
Ok(keys
.iter()
.map(|key| {
if let TransactionKey::Digest(digest) = key {
*digest
} else {
results
.pop_front()
.expect("number of returned results should match number of non-digest keys")
}
})
.collect())
}
pub fn user_signatures_for_checkpoint(
&self,
transactions: &[VerifiedTransaction],
digests: &[TransactionDigest],
) -> SuiResult<Vec<Vec<GenericSignature>>> {
assert_eq!(transactions.len(), digests.len());
let signatures = self
.tables()?
.user_signatures_for_checkpoints
.multi_get(digests)?;
let mut result = Vec::with_capacity(digests.len());
for (signatures, transaction) in signatures.into_iter().zip(transactions.iter()) {
let signatures = if let Some(signatures) = signatures {
signatures
} else if matches!(
transaction.inner().transaction_data().kind(),
TransactionKind::RandomnessStateUpdate(_)
) {
transaction.tx_signatures().to_vec()
} else {
return Err(SuiError::from(
format!(
"Can not find user signature for checkpoint for transaction {:?}",
transaction.key()
)
.as_str(),
));
};
result.push(signatures);
}
Ok(result)
}
pub fn clear_override_protocol_upgrade_buffer_stake(&self) -> SuiResult {
warn!(
epoch = ?self.epoch(),
"clearing buffer_stake_for_protocol_upgrade_bps override"
);
self.tables()?
.override_protocol_upgrade_buffer_stake
.remove(&OVERRIDE_PROTOCOL_UPGRADE_BUFFER_STAKE_INDEX)?;
self.update_buffer_stake_metric();
Ok(())
}
pub fn set_override_protocol_upgrade_buffer_stake(&self, new_stake_bps: u64) -> SuiResult {
warn!(
?new_stake_bps,
epoch = ?self.epoch(),
"storing buffer_stake_for_protocol_upgrade_bps override"
);
self.tables()?
.override_protocol_upgrade_buffer_stake
.insert(
&OVERRIDE_PROTOCOL_UPGRADE_BUFFER_STAKE_INDEX,
&new_stake_bps,
)?;
self.update_buffer_stake_metric();
Ok(())
}
fn update_buffer_stake_metric(&self) {
self.metrics
.effective_buffer_stake
.set(self.get_effective_buffer_stake_bps() as i64);
}
pub fn get_effective_buffer_stake_bps(&self) -> u64 {
self.tables()
.expect("epoch initialization should have finished")
.override_protocol_upgrade_buffer_stake
.get(&OVERRIDE_PROTOCOL_UPGRADE_BUFFER_STAKE_INDEX)
.expect("force_protocol_upgrade read cannot fail")
.tap_some(|b| warn!("using overridden buffer stake value of {}", b))
.unwrap_or_else(|| {
self.protocol_config()
.buffer_stake_for_protocol_upgrade_bps()
})
}
pub fn record_capabilities(&self, capabilities: &AuthorityCapabilities) -> SuiResult {
info!("received capabilities {:?}", capabilities);
let authority = &capabilities.authority;
if let Some(cap) = self.tables()?.authority_capabilities.get(authority)? {
if cap.generation >= capabilities.generation {
debug!(
"ignoring new capabilities {:?} in favor of previous capabilities {:?}",
capabilities, cap
);
return Ok(());
}
}
self.tables()?
.authority_capabilities
.insert(authority, capabilities)?;
Ok(())
}
pub fn get_capabilities(&self) -> SuiResult<Vec<AuthorityCapabilities>> {
let result: Result<Vec<AuthorityCapabilities>, TypedStoreError> = self
.tables()?
.authority_capabilities
.values()
.map_into()
.collect();
Ok(result?)
}
pub fn record_jwk_vote(
&self,
batch: &mut DBBatch,
round: u64,
authority: AuthorityName,
id: &JwkId,
jwk: &JWK,
) -> SuiResult {
info!(
"received jwk vote from {:?} for jwk ({:?}, {:?})",
authority.concise(),
id,
jwk
);
if !self.authenticator_state_enabled() {
info!(
"ignoring vote because authenticator state object does exist yet
(it will be created at the end of this epoch)"
);
return Ok(());
}
let mut jwk_aggregator = self.jwk_aggregator.lock();
let votes = jwk_aggregator.votes_for_authority(authority);
if votes
>= self
.protocol_config()
.max_jwk_votes_per_validator_per_epoch()
{
warn!(
"validator {:?} has already voted {} times this epoch, ignoring vote",
authority, votes,
);
return Ok(());
}
batch.insert_batch(
&self.tables()?.pending_jwks,
std::iter::once(((authority, id.clone(), jwk.clone()), ())),
)?;
let key = (id.clone(), jwk.clone());
let previously_active = jwk_aggregator.has_quorum_for_key(&key);
let insert_result = jwk_aggregator.insert(authority, key.clone());
if !previously_active && insert_result.is_quorum_reached() {
info!(epoch = ?self.epoch(), ?round, jwk = ?key, "jwk became active");
batch.insert_batch(
&self.tables()?.active_jwks,
std::iter::once(((round, key), ())),
)?;
}
Ok(())
}
pub(crate) fn get_new_jwks(&self, round: u64) -> SuiResult<Vec<ActiveJwk>> {
let epoch = self.epoch();
let empty_jwk_id = JwkId::new(String::new(), String::new());
let empty_jwk = JWK {
kty: String::new(),
e: String::new(),
n: String::new(),
alg: String::new(),
};
let start = (round, (empty_jwk_id.clone(), empty_jwk.clone()));
let end = (round + 1, (empty_jwk_id, empty_jwk));
Ok(self
.tables()?
.active_jwks
.safe_iter_with_bounds(Some(start), Some(end))
.map_ok(|((r, (jwk_id, jwk)), _)| {
debug_assert!(round == r);
ActiveJwk { jwk_id, jwk, epoch }
})
.collect::<Result<Vec<_>, _>>()?)
}
pub fn jwk_active_in_current_epoch(&self, jwk_id: &JwkId, jwk: &JWK) -> bool {
let jwk_aggregator = self.jwk_aggregator.lock();
jwk_aggregator.has_quorum_for_key(&(jwk_id.clone(), jwk.clone()))
}
fn record_consensus_message_processed(
&self,
batch: &mut DBBatch,
key: SequencedConsensusTransactionKey,
) -> SuiResult {
batch.insert_batch(&self.tables()?.consensus_message_processed, [(key, true)])?;
Ok(())
}
fn record_consensus_commit_stats(
&self,
batch: &mut DBBatch,
consensus_stats: &ExecutionIndicesWithStats,
) -> SuiResult {
batch.insert_batch(
&self.tables()?.last_consensus_index,
[(
LAST_CONSENSUS_STATS_ADDR,
ExecutionIndicesWithHash {
index: consensus_stats.index,
hash: consensus_stats.hash,
},
)],
)?;
batch.insert_batch(
&self.tables()?.last_consensus_stats,
[(LAST_CONSENSUS_STATS_ADDR, consensus_stats)],
)?;
Ok(())
}
pub fn test_insert_user_signature(
&self,
digest: TransactionDigest,
signatures: Vec<GenericSignature>,
) {
self.tables()
.expect("test should not cross epoch boundary")
.user_signatures_for_checkpoints
.insert(&digest, &signatures)
.unwrap();
let key = ConsensusTransactionKey::Certificate(digest);
let key = SequencedConsensusTransactionKey::External(key);
self.tables()
.expect("test should not cross epoch boundary")
.consensus_message_processed
.insert(&key, &true)
.unwrap();
self.consensus_notify_read.notify(&key, &());
}
pub fn finish_consensus_certificate_process_with_batch(
&self,
batch: &mut DBBatch,
certificates: &[VerifiedExecutableTransaction],
) -> SuiResult {
for certificate in certificates {
batch.insert_batch(
&self.tables()?.pending_execution,
[(*certificate.digest(), certificate.clone().serializable())],
)?;
debug_assert!(!self
.tables()?
.user_signatures_for_checkpoints
.contains_key(certificate.digest())?);
batch.insert_batch(
&self.tables()?.user_signatures_for_checkpoints,
[(*certificate.digest(), certificate.tx_signatures().to_vec())],
)?;
}
Ok(())
}
pub fn get_reconfig_state_read_lock_guard(&self) -> RwLockReadGuard<ReconfigState> {
self.reconfig_state_mem.read()
}
pub fn get_reconfig_state_write_lock_guard(&self) -> RwLockWriteGuard<ReconfigState> {
self.reconfig_state_mem.write()
}
pub fn close_user_certs(&self, mut lock_guard: RwLockWriteGuard<'_, ReconfigState>) {
lock_guard.close_user_certs();
self.store_reconfig_state(&lock_guard)
.expect("Updating reconfig state cannot fail");
let mut epoch_close_time = self.epoch_close_time.write();
if epoch_close_time.is_none() {
*epoch_close_time = Some(Instant::now());
self.user_certs_closed_notify
.notify()
.expect("user_certs_closed_notify called twice on same epoch store");
}
}
pub async fn user_certs_closed_notify(&self) {
self.user_certs_closed_notify.wait().await
}
pub async fn epoch_terminated(&self) {
self.epoch_alive_notify
.notify()
.expect("epoch_terminated called twice on same epoch store");
debug!("Epoch terminated - waiting for pending tasks to complete");
*self.epoch_alive.write().await = false;
debug!("All pending epoch tasks completed");
}
pub async fn wait_epoch_terminated(&self) {
self.epoch_alive_notify.wait().await
}
#[allow(clippy::result_unit_err)]
pub async fn within_alive_epoch<F: Future + Send>(&self, f: F) -> Result<F::Output, ()> {
let guard = self.epoch_alive.read().await;
if !*guard {
return Err(());
}
let terminated = self.wait_epoch_terminated().boxed();
let f = f.boxed();
match select(terminated, f).await {
Either::Left((_, _f)) => Err(()),
Either::Right((result, _)) => Ok(result),
}
}
#[instrument(level = "trace", skip_all)]
pub fn verify_transaction(&self, tx: Transaction) -> SuiResult<VerifiedTransaction> {
self.signature_verifier
.verify_tx(tx.data())
.map(|_| VerifiedTransaction::new_from_verified(tx))
}
fn verify_consensus_transaction(
&self,
transaction: SequencedConsensusTransaction,
skipped_consensus_txns: &IntCounter,
) -> Option<VerifiedSequencedConsensusTransaction> {
let _scope = monitored_scope("VerifyConsensusTransaction");
if self
.is_consensus_message_processed(&transaction.transaction.key())
.expect("Storage error")
{
debug!(
consensus_index=?transaction.consensus_index.transaction_index,
tracking_id=?transaction.transaction.get_tracking_id(),
"handle_consensus_transaction UserTransaction [skip]",
);
skipped_consensus_txns.inc();
return None;
}
match &transaction.transaction {
SequencedConsensusTransactionKind::External(ConsensusTransaction {
kind: ConsensusTransactionKind::UserTransaction(_certificate),
..
}) => {}
SequencedConsensusTransactionKind::External(ConsensusTransaction {
kind: ConsensusTransactionKind::CheckpointSignature(data),
..
}) => {
if transaction.sender_authority() != data.summary.auth_sig().authority {
warn!("CheckpointSignature authority {} does not match narwhal certificate source {}", data.summary.auth_sig().authority, transaction.certificate_author_index );
return None;
}
}
SequencedConsensusTransactionKind::External(ConsensusTransaction {
kind: ConsensusTransactionKind::EndOfPublish(authority),
..
}) => {
if &transaction.sender_authority() != authority {
warn!(
"EndOfPublish authority {} does not match narwhal certificate source {}",
authority, transaction.certificate_author_index
);
return None;
}
}
SequencedConsensusTransactionKind::External(ConsensusTransaction {
kind: ConsensusTransactionKind::CapabilityNotification(capabilities),
..
}) => {
if transaction.sender_authority() != capabilities.authority {
warn!(
"CapabilityNotification authority {} does not match narwhal certificate source {}",
capabilities.authority,
transaction.certificate_author_index
);
return None;
}
}
SequencedConsensusTransactionKind::External(ConsensusTransaction {
kind: ConsensusTransactionKind::NewJWKFetched(authority, id, jwk),
..
}) => {
if transaction.sender_authority() != *authority {
warn!(
"NewJWKFetched authority {} does not match narwhal certificate source {}",
authority, transaction.certificate_author_index,
);
return None;
}
if !check_total_jwk_size(id, jwk) {
warn!(
"{:?} sent jwk that exceeded max size",
transaction.sender_authority().concise()
);
return None;
}
}
SequencedConsensusTransactionKind::External(ConsensusTransaction {
kind: ConsensusTransactionKind::RandomnessStateUpdate(_round, _bytes),
..
}) => {}
SequencedConsensusTransactionKind::External(ConsensusTransaction {
kind: ConsensusTransactionKind::RandomnessDkgMessage(authority, _bytes),
..
}) => {
if transaction.sender_authority() != *authority {
warn!(
"RandomnessDkgMessage authority {} does not match narwhal certificate source {}",
authority,
transaction.certificate_author_index
);
return None;
}
}
SequencedConsensusTransactionKind::External(ConsensusTransaction {
kind: ConsensusTransactionKind::RandomnessDkgConfirmation(authority, _bytes),
..
}) => {
if transaction.sender_authority() != *authority {
warn!(
"RandomnessDkgConfirmation authority {} does not match narwhal certificate source {}",
authority,
transaction.certificate_author_index
);
return None;
}
}
SequencedConsensusTransactionKind::System(_) => {}
}
Some(VerifiedSequencedConsensusTransaction(transaction))
}
fn db_batch(&self) -> SuiResult<DBBatch> {
Ok(self.tables()?.last_consensus_index.batch())
}
#[cfg(test)]
pub fn db_batch_for_test(&self) -> DBBatch {
self.db_batch()
.expect("test should not be write past end of epoch")
}
#[instrument(level = "debug", skip_all)]
pub(crate) async fn process_consensus_transactions_and_commit_boundary<
'a,
C: CheckpointServiceNotify,
>(
&self,
transactions: Vec<SequencedConsensusTransaction>,
consensus_stats: &ExecutionIndicesWithStats,
checkpoint_service: &Arc<C>,
cache_reader: &dyn ExecutionCacheRead,
commit_round: Round,
commit_timestamp: TimestampMs,
authority_metrics: &Arc<AuthorityMetrics>,
) -> SuiResult<Vec<VerifiedExecutableTransaction>> {
let verified_transactions: Vec<_> = transactions
.into_iter()
.filter_map(|transaction| {
self.verify_consensus_transaction(
transaction,
&authority_metrics.skipped_consensus_txns,
)
})
.collect();
let mut system_transactions = Vec::with_capacity(verified_transactions.len());
let mut current_commit_sequenced_consensus_transactions =
Vec::with_capacity(verified_transactions.len());
let mut current_commit_sequenced_randomness_transactions =
Vec::with_capacity(verified_transactions.len());
let mut end_of_publish_transactions = Vec::with_capacity(verified_transactions.len());
for tx in verified_transactions {
if tx.0.is_end_of_publish() {
end_of_publish_transactions.push(tx);
} else if tx.0.is_system() {
system_transactions.push(tx);
} else if tx
.0
.is_user_tx_with_randomness(self.randomness_state_enabled())
{
current_commit_sequenced_randomness_transactions.push(tx);
} else {
current_commit_sequenced_consensus_transactions.push(tx);
}
}
let mut batch = self
.db_batch()
.expect("Failed to create DBBatch for processing consensus transactions");
let deferred_txs: Vec<(DeferralKey, Vec<VerifiedSequencedConsensusTransaction>)> = self
.load_deferred_transactions_for_up_to_consensus_round(&mut batch, commit_round)?
.into_iter()
.collect();
let mut previously_deferred_tx_digests: HashMap<TransactionDigest, DeferralKey> =
deferred_txs
.iter()
.flat_map(|(deferral_key, txs)| {
txs.iter().map(|tx| match tx.0.transaction.key() {
SequencedConsensusTransactionKey::External(
ConsensusTransactionKey::Certificate(digest),
) => (digest, *deferral_key),
_ => panic!("deferred transaction was not a user certificate: {tx:?}"),
})
})
.collect();
let mut sequenced_transactions: Vec<VerifiedSequencedConsensusTransaction> =
Vec::with_capacity(
current_commit_sequenced_consensus_transactions.len()
+ previously_deferred_tx_digests.len(),
);
let mut sequenced_randomness_transactions: Vec<VerifiedSequencedConsensusTransaction> =
Vec::with_capacity(
current_commit_sequenced_randomness_transactions.len()
+ previously_deferred_tx_digests.len(),
);
let mut randomness_manager = match self.randomness_manager.get() {
Some(rm) => Some(rm.lock().await),
None => None,
};
let mut dkg_failed = false;
let randomness_round = if self.randomness_state_enabled() {
let randomness_manager = randomness_manager
.as_mut()
.expect("randomness manager should exist if randomness is enabled");
match randomness_manager.dkg_status() {
DkgStatus::Pending => None,
DkgStatus::Failed => {
dkg_failed = true;
None
}
DkgStatus::Successful => {
if self
.get_reconfig_state_read_lock_guard()
.should_accept_tx()
{
randomness_manager.reserve_next_randomness(commit_timestamp, &mut batch)?
} else {
None
}
}
}
} else {
None
};
if dkg_failed || randomness_round.is_some() {
self.load_and_process_deferred_transactions_for_randomness(
&mut batch,
&mut previously_deferred_tx_digests,
&mut sequenced_randomness_transactions,
)?;
}
for tx in deferred_txs
.into_iter()
.flat_map(|(_, txs)| txs.into_iter())
{
if tx
.0
.is_user_tx_with_randomness(self.randomness_state_enabled())
{
sequenced_randomness_transactions.push(tx);
} else {
sequenced_transactions.push(tx);
}
}
sequenced_transactions.extend(current_commit_sequenced_consensus_transactions);
sequenced_randomness_transactions.extend(current_commit_sequenced_randomness_transactions);
let mut roots: BTreeSet<_> = system_transactions
.iter()
.chain(sequenced_transactions.iter())
.filter_map(|transaction| {
transaction
.0
.transaction
.executable_transaction_digest()
.map(TransactionKey::Digest)
})
.collect();
let mut randomness_roots: BTreeSet<_> = sequenced_randomness_transactions
.iter()
.filter_map(|transaction| {
transaction
.0
.transaction
.executable_transaction_digest()
.map(TransactionKey::Digest)
})
.collect();
PostConsensusTxReorder::reorder(
&mut sequenced_transactions,
self.protocol_config.consensus_transaction_ordering(),
);
PostConsensusTxReorder::reorder(
&mut sequenced_randomness_transactions,
self.protocol_config.consensus_transaction_ordering(),
);
let consensus_transactions: Vec<_> = system_transactions
.into_iter()
.chain(sequenced_transactions)
.chain(sequenced_randomness_transactions)
.collect();
let (transactions_to_schedule, notifications, deferred_tx_roots, lock, final_round) = self
.process_consensus_transactions(
&mut batch,
&consensus_transactions,
&end_of_publish_transactions,
checkpoint_service,
cache_reader,
commit_round,
previously_deferred_tx_digests,
randomness_manager.as_deref_mut(),
dkg_failed,
randomness_round,
authority_metrics,
)
.await?;
self.finish_consensus_certificate_process_with_batch(
&mut batch,
&transactions_to_schedule,
)?;
self.record_consensus_commit_stats(&mut batch, consensus_stats)?;
let should_accept_tx = if let Some(lock) = &lock {
lock.should_accept_tx()
} else {
self.get_reconfig_state_read_lock_guard().should_accept_tx()
};
let make_checkpoint = should_accept_tx || final_round;
if make_checkpoint {
for deferred in deferred_tx_roots {
roots.remove(&deferred);
randomness_roots.remove(&deferred);
}
let checkpoint_height = if self.randomness_state_enabled() {
commit_round * 2
} else {
commit_round
};
let pending_checkpoint = PendingCheckpointV2::V2(PendingCheckpointV2Contents {
roots: roots.into_iter().collect(),
details: PendingCheckpointInfo {
timestamp_ms: commit_timestamp,
last_of_epoch: final_round && randomness_round.is_none(),
checkpoint_height,
},
});
self.write_pending_checkpoint(&mut batch, &pending_checkpoint)?;
if let Some(randomness_round) = randomness_round {
randomness_roots.insert(TransactionKey::RandomnessRound(
self.epoch(),
randomness_round,
));
let pending_checkpoint = PendingCheckpointV2::V2(PendingCheckpointV2Contents {
roots: randomness_roots.into_iter().collect(),
details: PendingCheckpointInfo {
timestamp_ms: commit_timestamp,
last_of_epoch: final_round,
checkpoint_height: checkpoint_height + 1,
},
});
self.write_pending_checkpoint(&mut batch, &pending_checkpoint)?;
}
}
batch.write()?;
if make_checkpoint {
debug!(
?commit_round,
"Notifying checkpoint service about new pending checkpoint(s)",
);
checkpoint_service.notify_checkpoint()?;
}
if let Some(randomness_round) = randomness_round {
let epoch = self.epoch();
randomness_manager
.as_ref()
.expect("randomness manager should exist if randomness round is provided")
.generate_randomness(epoch, randomness_round);
}
self.process_notifications(¬ifications, &end_of_publish_transactions);
if final_round {
info!(
epoch=?self.epoch(),
lock=?lock.as_ref(),
final_round=?final_round,
"Notified last checkpoint"
);
self.record_end_of_message_quorum_time_metric();
}
Ok(transactions_to_schedule)
}
async fn process_consensus_transaction_shared_object_versions(
&self,
cache_reader: &dyn ExecutionCacheRead,
transactions: &[VerifiedExecutableTransaction],
randomness_round: Option<RandomnessRound>,
db_batch: &mut DBBatch,
) -> SuiResult {
let ConsensusSharedObjVerAssignment {
shared_input_next_versions,
assigned_versions,
} = SharedObjVerManager::assign_versions_from_consensus(
self,
cache_reader,
transactions,
randomness_round,
)
.await?;
self.set_assigned_shared_object_versions_with_db_batch(assigned_versions, db_batch)
.await?;
db_batch.insert_batch(
&self.tables()?.next_shared_object_versions,
shared_input_next_versions,
)?;
Ok(())
}
#[cfg(any(test, feature = "test-utils"))]
pub fn get_highest_pending_checkpoint_height(&self) -> CheckpointHeight {
if self.randomness_state_enabled() {
self.tables()
.expect("test should not cross epoch boundary")
.pending_checkpoints_v2
.unbounded_iter()
.skip_to_last()
.next()
.map(|(key, _)| key)
.unwrap_or_default()
} else {
self.tables()
.expect("test should not cross epoch boundary")
.pending_checkpoints
.unbounded_iter()
.skip_to_last()
.next()
.map(|(key, _)| key)
.unwrap_or_default()
}
}
#[cfg(any(test, feature = "test-utils"))]
pub async fn process_consensus_transactions_for_tests<C: CheckpointServiceNotify>(
self: &Arc<Self>,
transactions: Vec<SequencedConsensusTransaction>,
checkpoint_service: &Arc<C>,
cache_reader: &dyn ExecutionCacheRead,
authority_metrics: &Arc<AuthorityMetrics>,
) -> SuiResult<Vec<VerifiedExecutableTransaction>> {
self.process_consensus_transactions_and_commit_boundary(
transactions,
&ExecutionIndicesWithStats::default(),
checkpoint_service,
cache_reader,
self.get_highest_pending_checkpoint_height() + 1,
0,
authority_metrics,
)
.await
}
fn process_notifications(
&self,
notifications: &[SequencedConsensusTransactionKey],
end_of_publish: &[VerifiedSequencedConsensusTransaction],
) {
for key in notifications
.iter()
.cloned()
.chain(end_of_publish.iter().map(|tx| tx.0.transaction.key()))
{
self.consensus_notify_read.notify(&key, &());
}
}
#[instrument(level = "debug", skip_all)]
#[allow(clippy::type_complexity)]
pub(crate) async fn process_consensus_transactions<C: CheckpointServiceNotify>(
&self,
batch: &mut DBBatch,
transactions: &[VerifiedSequencedConsensusTransaction],
end_of_publish_transactions: &[VerifiedSequencedConsensusTransaction],
checkpoint_service: &Arc<C>,
cache_reader: &dyn ExecutionCacheRead,
commit_round: Round,
previously_deferred_tx_digests: HashMap<TransactionDigest, DeferralKey>,
mut randomness_manager: Option<&mut RandomnessManager>,
dkg_failed: bool,
randomness_round: Option<RandomnessRound>,
authority_metrics: &Arc<AuthorityMetrics>,
) -> SuiResult<(
Vec<VerifiedExecutableTransaction>, Vec<SequencedConsensusTransactionKey>, Vec<TransactionKey>, Option<RwLockWriteGuard<ReconfigState>>,
bool, )> {
if randomness_round.is_some() {
assert!(!dkg_failed); }
let mut verified_certificates = Vec::with_capacity(transactions.len());
let mut notifications = Vec::with_capacity(transactions.len());
let mut deferred_tx_roots = Vec::with_capacity(transactions.len());
let mut deferred_txns: BTreeMap<DeferralKey, Vec<VerifiedSequencedConsensusTransaction>> =
BTreeMap::new();
let mut shared_object_congestion_tracker: SharedObjectCongestionTracker =
Default::default();
let mut shared_object_using_randomness_congestion_tracker: SharedObjectCongestionTracker =
Default::default();
let mut randomness_state_updated = false;
for tx in transactions {
let key = tx.0.transaction.key();
let mut ignored = false;
let execution_cost = if tx
.0
.is_user_tx_with_randomness(self.randomness_state_enabled())
{
&mut shared_object_using_randomness_congestion_tracker
} else {
&mut shared_object_congestion_tracker
};
match self
.process_consensus_transaction(
batch,
tx,
checkpoint_service,
commit_round,
&previously_deferred_tx_digests,
randomness_manager.as_deref_mut(),
dkg_failed,
randomness_round.is_some(),
execution_cost,
authority_metrics,
)
.await?
{
ConsensusCertificateResult::SuiTransaction(cert) => {
notifications.push(key.clone());
verified_certificates.push(cert);
}
ConsensusCertificateResult::Deferred(deferral_key) => {
deferred_txns
.entry(deferral_key)
.or_default()
.push(tx.clone());
if let Some(txn_key) =
tx.0.transaction
.executable_transaction_digest()
.map(TransactionKey::Digest)
{
deferred_tx_roots.push(txn_key);
notifications.push(key.clone());
}
}
ConsensusCertificateResult::RandomnessConsensusMessage => {
randomness_state_updated = true;
notifications.push(key.clone());
}
ConsensusCertificateResult::ConsensusMessage => notifications.push(key.clone()),
ConsensusCertificateResult::IgnoredSystem => (),
ConsensusCertificateResult::Ignored => ignored = true,
}
if !ignored {
self.record_consensus_message_processed(batch, key.clone())?;
}
}
let commit_has_deferred_txns = !deferred_txns.is_empty();
let mut total_deferred_txns = 0;
for (key, txns) in deferred_txns.into_iter() {
total_deferred_txns += txns.len();
self.defer_transactions(batch, key, txns)?;
}
authority_metrics
.consensus_handler_deferred_transactions
.inc_by(total_deferred_txns as u64);
if randomness_state_updated {
if let Some(randomness_manager) = randomness_manager.as_mut() {
randomness_manager.advance_dkg(batch, commit_round).await?;
}
}
self.process_consensus_transaction_shared_object_versions(
cache_reader,
&verified_certificates,
randomness_round,
batch,
)
.await?;
let (lock, final_round) = self.process_end_of_publish_transactions_and_reconfig(
batch,
end_of_publish_transactions,
commit_has_deferred_txns,
)?;
Ok((
verified_certificates,
notifications,
deferred_tx_roots,
lock,
final_round,
))
}
fn process_end_of_publish_transactions_and_reconfig(
&self,
write_batch: &mut DBBatch,
transactions: &[VerifiedSequencedConsensusTransaction],
commit_has_deferred_txns: bool,
) -> SuiResult<(
Option<RwLockWriteGuard<ReconfigState>>,
bool, )> {
let mut lock = None;
for transaction in transactions {
let VerifiedSequencedConsensusTransaction(SequencedConsensusTransaction {
transaction,
..
}) = transaction;
if let SequencedConsensusTransactionKind::External(ConsensusTransaction {
kind: ConsensusTransactionKind::EndOfPublish(authority),
..
}) = transaction
{
debug!(
"Received EndOfPublish for epoch {} from {:?}",
self.committee.epoch,
authority.concise()
);
let collected_end_of_publish = if lock.is_none()
&& self
.get_reconfig_state_read_lock_guard()
.should_accept_consensus_certs()
{
write_batch.insert_batch(&self.tables()?.end_of_publish, [(authority, ())])?;
self.end_of_publish.try_lock()
.expect("No contention on Authority::end_of_publish as it is only accessed from consensus handler")
.insert_generic(*authority, ()).is_quorum_reached()
} else {
debug!("Ignoring end of publish message from validator {:?} as we already collected enough end of publish messages", authority.concise());
false
};
if collected_end_of_publish {
assert!(lock.is_none());
debug!(
"Collected enough end_of_publish messages for epoch {} with last message from validator {:?}",
self.committee.epoch,
authority.concise(),
);
let mut l = self.get_reconfig_state_write_lock_guard();
l.close_all_certs();
self.store_reconfig_state_batch(&l, write_batch)?;
lock = Some(l);
};
self.record_consensus_message_processed(write_batch, transaction.key())?;
} else {
panic!(
"process_end_of_publish_transactions_and_reconfig called with non-end-of-publish transaction"
);
}
}
let is_reject_all_certs = if let Some(lock) = &lock {
lock.is_reject_all_certs()
} else {
self.get_reconfig_state_read_lock_guard()
.is_reject_all_certs()
};
if !is_reject_all_certs || !self.deferred_transactions_empty() || commit_has_deferred_txns {
if is_reject_all_certs {
debug!(
"Blocking end of epoch on deferred transactions, from previous commits?={}, from this commit?={commit_has_deferred_txns}",
!self.deferred_transactions_empty(),
);
}
return Ok((lock, false));
}
let mut lock = lock.unwrap_or_else(|| self.get_reconfig_state_write_lock_guard());
lock.close_all_tx();
self.store_reconfig_state_batch(&lock, write_batch)?;
Ok((Some(lock), true))
}
#[instrument(level = "trace", skip_all)]
async fn process_consensus_transaction<C: CheckpointServiceNotify>(
&self,
batch: &mut DBBatch,
transaction: &VerifiedSequencedConsensusTransaction,
checkpoint_service: &Arc<C>,
commit_round: Round,
previously_deferred_tx_digests: &HashMap<TransactionDigest, DeferralKey>,
mut randomness_manager: Option<&mut RandomnessManager>,
dkg_failed: bool,
generating_randomness: bool,
shared_object_congestion_tracker: &mut SharedObjectCongestionTracker,
authority_metrics: &Arc<AuthorityMetrics>,
) -> SuiResult<ConsensusCertificateResult> {
let _scope = monitored_scope("HandleConsensusTransaction");
let VerifiedSequencedConsensusTransaction(SequencedConsensusTransaction {
certificate_author_index: _,
certificate_author,
consensus_index,
transaction,
}) = transaction;
let tracking_id = transaction.get_tracking_id();
match &transaction {
SequencedConsensusTransactionKind::External(ConsensusTransaction {
kind: ConsensusTransactionKind::UserTransaction(certificate),
..
}) => {
if certificate.epoch() != self.epoch() {
debug!(
"Certificate epoch ({:?}) doesn't match the current epoch ({:?})",
certificate.epoch(),
self.epoch()
);
return Ok(ConsensusCertificateResult::Ignored);
}
if self.has_sent_end_of_publish(certificate_author)?
&& !previously_deferred_tx_digests.contains_key(certificate.digest())
{
warn!("[Byzantine authority] Authority {:?} sent a new, previously unseen certificate {:?} after it sent EndOfPublish message to consensus", certificate_author.concise(), certificate.digest());
return Ok(ConsensusCertificateResult::Ignored);
}
let certificate = VerifiedCertificate::new_unchecked(*certificate.clone());
let certificate = VerifiedExecutableTransaction::new_from_certificate(certificate);
debug!(
?tracking_id,
tx_digest = ?certificate.digest(),
"handle_consensus_transaction UserTransaction",
);
if !self
.get_reconfig_state_read_lock_guard()
.should_accept_consensus_certs()
&& !previously_deferred_tx_digests.contains_key(certificate.digest())
{
debug!("Ignoring consensus certificate for transaction {:?} because of end of epoch",
certificate.digest());
return Ok(ConsensusCertificateResult::Ignored);
}
let deferral_info = self.should_defer(
&certificate,
commit_round,
dkg_failed,
generating_randomness,
previously_deferred_tx_digests,
shared_object_congestion_tracker,
);
if let Some((deferral_key, deferral_reason)) = deferral_info {
debug!(
"Deferring consensus certificate for transaction {:?} until {deferral_key:?}",
certificate.digest(),
);
if let DeferralReason::SharedObjectCongestion(_) = deferral_reason {
authority_metrics
.consensus_handler_congested_transactions
.inc();
}
return Ok(ConsensusCertificateResult::Deferred(deferral_key));
}
if dkg_failed
&& self.randomness_state_enabled()
&& certificate.is_randomness_reader()
{
debug!(
"Ignoring randomness-using certificate for transaction {:?} because DKG failed",
certificate.digest(),
);
return Ok(ConsensusCertificateResult::Ignored);
}
if certificate.contains_shared_object() {
self.update_object_execution_cost(
&certificate,
shared_object_congestion_tracker,
);
}
Ok(ConsensusCertificateResult::SuiTransaction(certificate))
}
SequencedConsensusTransactionKind::External(ConsensusTransaction {
kind: ConsensusTransactionKind::CheckpointSignature(info),
..
}) => {
checkpoint_service.notify_checkpoint_signature(self, info)?;
Ok(ConsensusCertificateResult::ConsensusMessage)
}
SequencedConsensusTransactionKind::External(ConsensusTransaction {
kind: ConsensusTransactionKind::EndOfPublish(_),
..
}) => {
panic!("process_consensus_transaction called with end-of-publish transaction");
}
SequencedConsensusTransactionKind::External(ConsensusTransaction {
kind: ConsensusTransactionKind::CapabilityNotification(capabilities),
..
}) => {
let authority = capabilities.authority;
if self
.get_reconfig_state_read_lock_guard()
.should_accept_consensus_certs()
{
debug!(
"Received CapabilityNotification from {:?}",
authority.concise()
);
self.record_capabilities(capabilities)?;
} else {
debug!(
"Ignoring CapabilityNotification from {:?} because of end of epoch",
authority.concise()
);
}
Ok(ConsensusCertificateResult::ConsensusMessage)
}
SequencedConsensusTransactionKind::External(ConsensusTransaction {
kind: ConsensusTransactionKind::NewJWKFetched(authority, jwk_id, jwk),
..
}) => {
if self
.get_reconfig_state_read_lock_guard()
.should_accept_consensus_certs()
{
self.record_jwk_vote(
batch,
consensus_index.last_committed_round,
*authority,
jwk_id,
jwk,
)?;
} else {
debug!(
"Ignoring NewJWKFetched from {:?} because of end of epoch",
authority.concise()
);
}
Ok(ConsensusCertificateResult::ConsensusMessage)
}
SequencedConsensusTransactionKind::External(ConsensusTransaction {
kind: ConsensusTransactionKind::RandomnessStateUpdate(_, _),
..
}) => {
panic!("process_consensus_transaction called with external RandomnessStateUpdate");
}
SequencedConsensusTransactionKind::External(ConsensusTransaction {
kind: ConsensusTransactionKind::RandomnessDkgMessage(authority, bytes),
..
}) => {
if self.get_reconfig_state_read_lock_guard().should_accept_tx() {
if let Some(randomness_manager) = randomness_manager.as_mut() {
debug!(
"Received RandomnessDkgMessage from {:?}",
authority.concise()
);
match bcs::from_bytes(bytes) {
Ok(message) => randomness_manager.add_message(authority, message)?,
Err(e) => {
warn!(
"Failed to deserialize RandomnessDkgMessage from {:?}: {e:?}",
authority.concise(),
);
}
}
} else {
debug!(
"Ignoring RandomnessDkgMessage from {:?} because randomness is not enabled",
authority.concise()
);
}
} else {
debug!(
"Ignoring RandomnessDkgMessage from {:?} because of end of epoch",
authority.concise()
);
}
Ok(ConsensusCertificateResult::RandomnessConsensusMessage)
}
SequencedConsensusTransactionKind::External(ConsensusTransaction {
kind: ConsensusTransactionKind::RandomnessDkgConfirmation(authority, bytes),
..
}) => {
if self.get_reconfig_state_read_lock_guard().should_accept_tx() {
if let Some(randomness_manager) = randomness_manager.as_mut() {
debug!(
"Received RandomnessDkgConfirmation from {:?}",
authority.concise()
);
match bcs::from_bytes(bytes) {
Ok(confirmation) => randomness_manager.add_confirmation(
batch,
authority,
confirmation,
)?,
Err(e) => {
warn!(
"Failed to deserialize RandomnessDkgMessage from {:?}: {e:?}",
authority.concise(),
);
}
}
} else {
debug!(
"Ignoring RandomnessDkgMessage from {:?} because randomness is not enabled",
authority.concise()
);
}
} else {
debug!(
"Ignoring RandomnessDkgMessage from {:?} because of end of epoch",
authority.concise()
);
}
Ok(ConsensusCertificateResult::RandomnessConsensusMessage)
}
SequencedConsensusTransactionKind::System(system_transaction) => {
if !self.get_reconfig_state_read_lock_guard().should_accept_tx() {
debug!(
"Ignoring system transaction {:?} because of end of epoch",
system_transaction.digest()
);
return Ok(ConsensusCertificateResult::IgnoredSystem);
}
assert!(system_transaction.contains_shared_object());
Ok(ConsensusCertificateResult::SuiTransaction(
system_transaction.clone(),
))
}
}
}
pub(crate) fn write_pending_checkpoint(
&self,
batch: &mut DBBatch,
checkpoint: &PendingCheckpointV2,
) -> SuiResult {
if let Some(pending) = self.get_pending_checkpoint(&checkpoint.height())? {
if pending.roots() != checkpoint.roots() {
panic!("Received checkpoint at index {} that contradicts previously stored checkpoint. Old roots: {:?}, new roots: {:?}", checkpoint.height(), pending.roots(), checkpoint.roots());
}
debug!(
checkpoint_commit_height = checkpoint.height(),
"Ignoring duplicate checkpoint notification",
);
return Ok(());
}
debug!(
checkpoint_commit_height = checkpoint.height(),
"Pending checkpoint has {} roots",
checkpoint.roots().len(),
);
trace!(
checkpoint_commit_height = checkpoint.height(),
"Transaction roots for pending checkpoint: {:?}",
checkpoint.roots()
);
if self.randomness_state_enabled() {
batch.insert_batch(
&self.tables()?.pending_checkpoints_v2,
std::iter::once((checkpoint.height(), checkpoint)),
)?;
} else {
batch.insert_batch(
&self.tables()?.pending_checkpoints,
std::iter::once((checkpoint.height(), checkpoint.clone().expect_v1())),
)?;
}
Ok(())
}
pub fn get_pending_checkpoints(
&self,
last: Option<CheckpointHeight>,
) -> SuiResult<Vec<(CheckpointHeight, PendingCheckpointV2)>> {
let tables = self.tables()?;
if self.randomness_state_enabled() {
let mut iter = tables.pending_checkpoints_v2.unbounded_iter();
if let Some(last_processed_height) = last {
iter = iter.skip_to(&(last_processed_height + 1))?;
}
Ok(iter.collect())
} else {
let mut iter = tables.pending_checkpoints.unbounded_iter();
if let Some(last_processed_height) = last {
iter = iter.skip_to(&(last_processed_height + 1))?;
}
Ok(iter.map(|(height, cp)| (height, cp.into())).collect())
}
}
pub fn get_pending_checkpoint(
&self,
index: &CheckpointHeight,
) -> SuiResult<Option<PendingCheckpointV2>> {
if self.randomness_state_enabled() {
Ok(self.tables()?.pending_checkpoints_v2.get(index)?)
} else {
Ok(self
.tables()?
.pending_checkpoints
.get(index)?
.map(|c| c.into()))
}
}
pub fn process_pending_checkpoint(
&self,
commit_height: CheckpointHeight,
content_info: Vec<(CheckpointSummary, CheckpointContents)>,
) -> SuiResult<()> {
let mut batch = self.tables()?.pending_checkpoints.batch();
for (position_in_commit, (summary, transactions)) in content_info.into_iter().enumerate() {
let sequence_number = summary.sequence_number;
let summary = BuilderCheckpointSummary {
summary,
checkpoint_height: Some(commit_height),
position_in_commit,
};
batch.insert_batch(
&self.tables()?.builder_checkpoint_summary_v2,
[(&sequence_number, summary)],
)?;
batch.insert_batch(
&self.tables()?.builder_digest_to_checkpoint,
transactions
.iter()
.map(|tx| (tx.transaction, sequence_number)),
)?;
}
Ok(batch.write()?)
}
pub fn put_genesis_checkpoint_in_builder(
&self,
summary: &CheckpointSummary,
contents: &CheckpointContents,
) -> SuiResult<()> {
let sequence = summary.sequence_number;
for transaction in contents.iter() {
let digest = transaction.transaction;
debug!(
"Manually inserting genesis transaction in checkpoint DB: {:?}",
digest
);
self.tables()?
.builder_digest_to_checkpoint
.insert(&digest, &sequence)?;
}
let builder_summary = BuilderCheckpointSummary {
summary: summary.clone(),
checkpoint_height: None,
position_in_commit: 0,
};
self.tables()?
.builder_checkpoint_summary_v2
.insert(summary.sequence_number(), &builder_summary)?;
Ok(())
}
pub fn last_built_checkpoint_commit_height(&self) -> SuiResult<Option<CheckpointHeight>> {
Ok(self
.tables()?
.builder_checkpoint_summary_v2
.unbounded_iter()
.skip_to_last()
.next()
.and_then(|(_, b)| b.checkpoint_height))
}
pub fn last_built_checkpoint_summary(
&self,
) -> SuiResult<Option<(CheckpointSequenceNumber, CheckpointSummary)>> {
Ok(self
.tables()?
.builder_checkpoint_summary_v2
.unbounded_iter()
.skip_to_last()
.next()
.map(|(seq, s)| (seq, s.summary)))
}
pub fn get_built_checkpoint_summary(
&self,
sequence: CheckpointSequenceNumber,
) -> SuiResult<Option<CheckpointSummary>> {
Ok(self
.tables()?
.builder_checkpoint_summary_v2
.get(&sequence)?
.map(|s| s.summary))
}
pub fn builder_included_transactions_in_checkpoint<'a>(
&self,
digests: impl Iterator<Item = &'a TransactionDigest>,
) -> SuiResult<Vec<bool>> {
Ok(self
.tables()?
.builder_digest_to_checkpoint
.multi_contains_keys(digests)?)
}
pub fn get_last_checkpoint_signature_index(&self) -> SuiResult<u64> {
Ok(self
.tables()?
.pending_checkpoint_signatures
.unbounded_iter()
.skip_to_last()
.next()
.map(|((_, index), _)| index)
.unwrap_or_default())
}
pub fn insert_checkpoint_signature(
&self,
checkpoint_seq: CheckpointSequenceNumber,
index: u64,
info: &CheckpointSignatureMessage,
) -> SuiResult<()> {
Ok(self
.tables()?
.pending_checkpoint_signatures
.insert(&(checkpoint_seq, index), info)?)
}
pub(crate) fn record_epoch_pending_certs_process_time_metric(&self) {
if let Some(epoch_close_time) = *self.epoch_close_time.read() {
self.metrics
.epoch_pending_certs_processed_time_since_epoch_close_ms
.set(epoch_close_time.elapsed().as_millis() as i64);
}
}
pub fn record_end_of_message_quorum_time_metric(&self) {
if let Some(epoch_close_time) = *self.epoch_close_time.read() {
self.metrics
.epoch_end_of_publish_quorum_time_since_epoch_close_ms
.set(epoch_close_time.elapsed().as_millis() as i64);
}
}
pub(crate) fn report_epoch_metrics_at_last_checkpoint(&self, stats: EpochStats) {
if let Some(epoch_close_time) = *self.epoch_close_time.read() {
self.metrics
.epoch_last_checkpoint_created_time_since_epoch_close_ms
.set(epoch_close_time.elapsed().as_millis() as i64);
}
info!(epoch=?self.epoch(), "Epoch statistics: checkpoint_count={:?}, transaction_count={:?}, total_gas_reward={:?}", stats.checkpoint_count, stats.transaction_count, stats.total_gas_reward);
self.metrics
.epoch_checkpoint_count
.set(stats.checkpoint_count as i64);
self.metrics
.epoch_transaction_count
.set(stats.transaction_count as i64);
self.metrics
.epoch_total_gas_reward
.set(stats.total_gas_reward as i64);
}
pub fn record_epoch_reconfig_start_time_metric(&self) {
if let Some(epoch_close_time) = *self.epoch_close_time.read() {
self.metrics
.epoch_reconfig_start_time_since_epoch_close_ms
.set(epoch_close_time.elapsed().as_millis() as i64);
}
}
fn record_reconfig_halt_duration_metric(&self) {
if let Some(epoch_close_time) = *self.epoch_close_time.read() {
self.metrics
.epoch_validator_halt_duration_ms
.set(epoch_close_time.elapsed().as_millis() as i64);
}
}
pub(crate) fn record_epoch_first_checkpoint_creation_time_metric(&self) {
self.metrics
.epoch_first_checkpoint_ready_time_since_epoch_begin_ms
.set(self.epoch_open_time.elapsed().as_millis() as i64);
}
pub fn record_is_safe_mode_metric(&self, safe_mode: bool) {
self.metrics.is_safe_mode.set(safe_mode as i64);
}
pub fn record_checkpoint_builder_is_safe_mode_metric(&self, safe_mode: bool) {
if safe_mode {
fail_point!("record_checkpoint_builder_is_safe_mode_metric");
}
self.metrics
.checkpoint_builder_advance_epoch_is_safe_mode
.set(safe_mode as i64)
}
fn record_epoch_total_duration_metric(&self) {
self.metrics.current_epoch.set(self.epoch() as i64);
self.metrics
.epoch_total_duration
.set(self.epoch_open_time.elapsed().as_millis() as i64);
}
pub(crate) fn update_authenticator_state(&self, update: &AuthenticatorStateUpdate) {
info!("Updating authenticator state: {:?}", update);
for active_jwk in &update.new_active_jwks {
let ActiveJwk { jwk_id, jwk, .. } = active_jwk;
self.signature_verifier.insert_jwk(jwk_id, jwk);
}
}
pub fn clear_signature_cache(&self) {
self.signature_verifier.clear_signature_cache();
}
}
impl GetSharedLocks for AuthorityPerEpochStore {
fn get_shared_locks(
&self,
key: &TransactionKey,
) -> Result<Vec<(ObjectID, SequenceNumber)>, SuiError> {
if self.randomness_state_enabled() {
Ok(self
.tables()?
.assigned_shared_object_versions_v2
.get(key)?
.unwrap_or_default())
} else {
Ok(self
.tables()?
.assigned_shared_object_versions
.get(key.unwrap_digest())?
.unwrap_or_default())
}
}
}
impl ExecutionComponents {
fn new(
protocol_config: &ProtocolConfig,
store: Arc<ExecutionCache>,
metrics: Arc<ResolverMetrics>,
_expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
) -> Self {
let silent = true;
let executor = sui_execution::executor(protocol_config, silent, None)
.expect("Creating an executor should not fail here");
let module_cache = Arc::new(SyncModuleCache::new(ResolverWrapper::new(
store,
metrics.clone(),
)));
Self {
executor,
module_cache,
metrics,
}
}
pub(crate) fn metrics(&self) -> Arc<ResolverMetrics> {
self.metrics.clone()
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum LockDetailsWrapper {
V1(TransactionDigest),
}
impl LockDetailsWrapper {
pub fn migrate(self) -> Self {
self
}
pub fn inner(&self) -> &LockDetails {
match self {
Self::V1(v1) => v1,
#[allow(unreachable_patterns)]
_ => panic!("lock details should have been migrated to latest version at read time"),
}
}
pub fn into_inner(self) -> LockDetails {
match self {
Self::V1(v1) => v1,
#[allow(unreachable_patterns)]
_ => panic!("lock details should have been migrated to latest version at read time"),
}
}
}
pub type LockDetails = TransactionDigest;
impl From<LockDetails> for LockDetailsWrapper {
fn from(details: LockDetails) -> Self {
LockDetailsWrapper::V1(details)
}
}