use super::execution_time_estimator::ExecutionTimeEstimator;
use crate::authority::transaction_deferral::DeferralKey;
use crate::consensus_handler::ConsensusCommitInfo;
use mysten_common::fatal;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use sui_protocol_config::{PerObjectCongestionControlMode, ProtocolConfig};
use sui_types::base_types::{ObjectID, TransactionDigest};
use sui_types::error::SuiResult;
use sui_types::executable_transaction::VerifiedExecutableTransaction;
use sui_types::messages_consensus::Round;
use sui_types::transaction::{Argument, SharedInputObject, TransactionDataAPI};
use tracing::trace;
#[derive(PartialEq, Eq, Clone, Debug)]
struct Params {
mode: PerObjectCongestionControlMode,
for_randomness: bool,
max_accumulated_txn_cost_per_object_in_commit: u64,
gas_budget_based_txn_cost_cap_factor: Option<u64>,
gas_budget_based_txn_cost_absolute_cap: Option<u64>,
max_txn_cost_overage_per_object_in_commit: u64,
allowed_txn_cost_overage_burst_per_object_in_commit: u64,
}
impl Params {
pub fn commit_budget(&self, commit_info: &ConsensusCommitInfo) -> u64 {
match self.mode {
PerObjectCongestionControlMode::ExecutionTimeEstimate(params) => {
let estimated_commit_period = commit_info.estimated_commit_period();
let commit_period_micros = estimated_commit_period.as_micros() as u64;
let mut budget = commit_period_micros
.checked_mul(params.target_utilization)
.unwrap_or(u64::MAX)
/ 100;
if self.for_randomness {
budget = budget
.checked_mul(params.randomness_scalar)
.unwrap_or(u64::MAX)
/ 100;
}
budget
}
_ => self.max_accumulated_txn_cost_per_object_in_commit,
}
}
pub fn max_burst(&self) -> u64 {
match self.mode {
PerObjectCongestionControlMode::ExecutionTimeEstimate(params) => {
let mut burst = params.allowed_txn_cost_overage_burst_limit_us;
if self.for_randomness {
burst = burst
.checked_mul(params.randomness_scalar)
.unwrap_or(u64::MAX)
/ 100;
}
burst
}
_ => self.allowed_txn_cost_overage_burst_per_object_in_commit,
}
}
pub fn max_overage(&self) -> u64 {
match self.mode {
PerObjectCongestionControlMode::ExecutionTimeEstimate(_) => u64::MAX,
_ => self.max_txn_cost_overage_per_object_in_commit,
}
}
pub fn gas_budget_based_txn_cost_cap_factor(&self) -> u64 {
match self.mode {
PerObjectCongestionControlMode::TotalGasBudgetWithCap => self
.gas_budget_based_txn_cost_cap_factor
.expect("cap factor must be set if TotalGasBudgetWithCap mode is used."),
_ => fatal!(
"gas_budget_based_txn_cost_cap_factor is only used in TotalGasBudgetWithCap mode."
),
}
}
pub fn gas_budget_based_txn_cost_absolute_cap(&self) -> Option<u64> {
match self.mode {
PerObjectCongestionControlMode::TotalGasBudgetWithCap => {
self.gas_budget_based_txn_cost_absolute_cap
}
_ => fatal!(
"gas_budget_based_txn_cost_absolute_cap is only used in TotalGasBudgetWithCap mode."
),
}
}
}
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct SharedObjectCongestionTracker {
object_execution_cost: HashMap<ObjectID, u64>,
params: Params,
}
impl SharedObjectCongestionTracker {
pub fn new(
initial_object_debts: impl IntoIterator<Item = (ObjectID, u64)>,
mode: PerObjectCongestionControlMode,
for_randomness: bool,
max_accumulated_txn_cost_per_object_in_commit: Option<u64>,
gas_budget_based_txn_cost_cap_factor: Option<u64>,
gas_budget_based_txn_cost_absolute_cap_commit_count: Option<u64>,
max_txn_cost_overage_per_object_in_commit: u64,
allowed_txn_cost_overage_burst_per_object_in_commit: u64,
) -> Self {
assert!(
allowed_txn_cost_overage_burst_per_object_in_commit <= max_txn_cost_overage_per_object_in_commit,
"burst limit must be <= absolute limit; allowed_txn_cost_overage_burst_per_object_in_commit = {allowed_txn_cost_overage_burst_per_object_in_commit}, max_txn_cost_overage_per_object_in_commit = {max_txn_cost_overage_per_object_in_commit}"
);
let object_execution_cost: HashMap<ObjectID, u64> =
initial_object_debts.into_iter().collect();
let max_accumulated_txn_cost_per_object_in_commit =
if mode == PerObjectCongestionControlMode::None {
0
} else {
max_accumulated_txn_cost_per_object_in_commit.expect(
"max_accumulated_txn_cost_per_object_in_commit must be set if mode is not None",
)
};
let gas_budget_based_txn_cost_absolute_cap =
gas_budget_based_txn_cost_absolute_cap_commit_count
.map(|m| m * max_accumulated_txn_cost_per_object_in_commit);
trace!(
"created SharedObjectCongestionTracker with
{} initial object debts,
mode: {mode:?},
max_accumulated_txn_cost_per_object_in_commit: {max_accumulated_txn_cost_per_object_in_commit:?},
gas_budget_based_txn_cost_cap_factor: {gas_budget_based_txn_cost_cap_factor:?},
gas_budget_based_txn_cost_absolute_cap: {gas_budget_based_txn_cost_absolute_cap:?},
max_txn_cost_overage_per_object_in_commit: {max_txn_cost_overage_per_object_in_commit:?}",
object_execution_cost.len(),
);
Self {
object_execution_cost,
params: Params {
mode,
for_randomness,
max_accumulated_txn_cost_per_object_in_commit,
gas_budget_based_txn_cost_cap_factor,
gas_budget_based_txn_cost_absolute_cap,
max_txn_cost_overage_per_object_in_commit,
allowed_txn_cost_overage_burst_per_object_in_commit,
},
}
}
pub fn from_protocol_config(
initial_object_debts: impl IntoIterator<Item = (ObjectID, u64)>,
protocol_config: &ProtocolConfig,
for_randomness: bool,
) -> SuiResult<Self> {
let max_accumulated_txn_cost_per_object_in_commit =
protocol_config.max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option();
Ok(Self::new(
initial_object_debts,
protocol_config.per_object_congestion_control_mode(),
for_randomness,
if for_randomness {
protocol_config
.max_accumulated_randomness_txn_cost_per_object_in_mysticeti_commit_as_option()
.or(max_accumulated_txn_cost_per_object_in_commit)
} else {
max_accumulated_txn_cost_per_object_in_commit
},
protocol_config.gas_budget_based_txn_cost_cap_factor_as_option(),
protocol_config.gas_budget_based_txn_cost_absolute_cap_commit_count_as_option(),
protocol_config
.max_txn_cost_overage_per_object_in_commit_as_option()
.unwrap_or(0),
protocol_config
.allowed_txn_cost_overage_burst_per_object_in_commit_as_option()
.unwrap_or(0),
))
}
pub fn compute_tx_start_at_cost(&self, shared_input_objects: &[SharedInputObject]) -> u64 {
shared_input_objects
.iter()
.map(|obj| *self.object_execution_cost.get(&obj.id).unwrap_or(&0))
.max()
.expect("There must be at least one object in shared_input_objects.")
}
pub fn get_tx_cost(
&self,
execution_time_estimator: Option<&ExecutionTimeEstimator>,
cert: &VerifiedExecutableTransaction,
) -> Option<u64> {
match &self.params.mode {
PerObjectCongestionControlMode::None => None,
PerObjectCongestionControlMode::TotalGasBudget => Some(cert.gas_budget()),
PerObjectCongestionControlMode::TotalTxCount => Some(1),
PerObjectCongestionControlMode::TotalGasBudgetWithCap => {
Some(std::cmp::min(cert.gas_budget(), self.get_tx_cost_cap(cert)))
}
PerObjectCongestionControlMode::ExecutionTimeEstimate(_) => Some(
execution_time_estimator
.expect("`execution_time_estimator` must be set for PerObjectCongestionControlMode::ExecutionTimeEstimate")
.get_estimate(cert.transaction_data())
.as_micros()
.try_into()
.unwrap_or(u64::MAX),
),
}
}
pub fn should_defer_due_to_object_congestion(
&self,
execution_time_estimator: Option<&ExecutionTimeEstimator>,
cert: &VerifiedExecutableTransaction,
previously_deferred_tx_digests: &HashMap<TransactionDigest, DeferralKey>,
commit_info: &ConsensusCommitInfo,
) -> Option<(DeferralKey, Vec<ObjectID>)> {
let commit_round = commit_info.round;
let tx_cost = self.get_tx_cost(execution_time_estimator, cert)?;
let shared_input_objects: Vec<_> = cert.shared_input_objects().collect();
if shared_input_objects.is_empty() {
return None;
}
let start_cost = self.compute_tx_start_at_cost(&shared_input_objects);
let end_cost = start_cost.saturating_add(tx_cost);
let budget = self.params.commit_budget(commit_info);
let burst_limit = budget.saturating_add(self.params.max_burst());
let absolute_limit = budget.saturating_add(self.params.max_overage());
if start_cost <= burst_limit && end_cost <= absolute_limit {
return None;
}
let mut congested_objects = vec![];
for obj in shared_input_objects {
if &start_cost == self.object_execution_cost.get(&obj.id).unwrap_or(&0) {
congested_objects.push(obj.id);
}
}
assert!(!congested_objects.is_empty());
let deferral_key =
if let Some(previous_key) = previously_deferred_tx_digests.get(cert.digest()) {
DeferralKey::new_for_consensus_round(
commit_round + 1,
previous_key.deferred_from_round(),
)
} else {
DeferralKey::new_for_consensus_round(commit_round + 1, commit_round)
};
Some((deferral_key, congested_objects))
}
pub fn bump_object_execution_cost(
&mut self,
execution_time_estimator: Option<&ExecutionTimeEstimator>,
cert: &VerifiedExecutableTransaction,
) {
let Some(tx_cost) = self.get_tx_cost(execution_time_estimator, cert) else {
return;
};
let shared_input_objects: Vec<_> = cert.shared_input_objects().collect();
let start_cost = self.compute_tx_start_at_cost(&shared_input_objects);
let end_cost = start_cost.saturating_add(tx_cost);
for obj in shared_input_objects {
if obj.mutable {
let old_end_cost = self.object_execution_cost.insert(obj.id, end_cost);
assert!(old_end_cost.is_none() || old_end_cost.unwrap() <= end_cost);
}
}
}
pub fn accumulated_debts(self, commit_info: &ConsensusCommitInfo) -> Vec<(ObjectID, u64)> {
if self.params.max_overage() == 0 {
return vec![]; }
self.object_execution_cost
.into_iter()
.filter_map(|(obj_id, cost)| {
let remaining_cost = cost.saturating_sub(self.params.commit_budget(commit_info));
if remaining_cost > 0 {
Some((obj_id, remaining_cost))
} else {
None
}
})
.collect()
}
pub fn max_cost(&self) -> u64 {
self.object_execution_cost
.values()
.max()
.copied()
.unwrap_or(0)
}
fn get_tx_cost_cap(&self, cert: &VerifiedExecutableTransaction) -> u64 {
let mut number_of_move_call = 0;
let mut number_of_move_input = 0;
for command in cert.transaction_data().kind().iter_commands() {
if let sui_types::transaction::Command::MoveCall(move_call) = command {
number_of_move_call += 1;
for aug in move_call.arguments.iter() {
if let Argument::Input(_) = aug {
number_of_move_input += 1;
}
}
}
}
let cap = (number_of_move_call + number_of_move_input) as u64
* self.params.gas_budget_based_txn_cost_cap_factor();
std::cmp::min(
cap,
self.params
.gas_budget_based_txn_cost_absolute_cap()
.unwrap_or(u64::MAX),
)
}
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub enum CongestionPerObjectDebt {
V1(Round, u64),
}
impl CongestionPerObjectDebt {
pub fn new(round: Round, debt: u64) -> Self {
Self::V1(round, debt)
}
pub fn into_v1(self) -> (Round, u64) {
match self {
Self::V1(round, debt) => (round, debt),
}
}
}
#[cfg(test)]
mod object_cost_tests {
use super::*;
use rstest::rstest;
use std::time::Duration;
use sui_protocol_config::ExecutionTimeEstimateParams;
use sui_test_transaction_builder::TestTransactionBuilder;
use sui_types::base_types::{random_object_ref, SequenceNumber};
use sui_types::crypto::{get_key_pair, AccountKeyPair};
use sui_types::programmable_transaction_builder::ProgrammableTransactionBuilder;
use sui_types::transaction::{CallArg, ObjectArg, VerifiedTransaction};
use sui_types::Identifier;
fn construct_shared_input_objects(objects: &[(ObjectID, bool)]) -> Vec<SharedInputObject> {
objects
.iter()
.map(|(id, mutable)| SharedInputObject {
id: *id,
initial_shared_version: SequenceNumber::new(),
mutable: *mutable,
})
.collect()
}
#[test]
fn test_compute_tx_start_at_cost() {
let object_id_0 = ObjectID::random();
let object_id_1 = ObjectID::random();
let object_id_2 = ObjectID::random();
let shared_object_congestion_tracker = SharedObjectCongestionTracker::new(
[(object_id_0, 5), (object_id_1, 10)],
PerObjectCongestionControlMode::TotalGasBudget,
false,
Some(0), None,
None,
0,
0,
);
let shared_input_objects = construct_shared_input_objects(&[(object_id_0, false)]);
assert_eq!(
shared_object_congestion_tracker.compute_tx_start_at_cost(&shared_input_objects),
5
);
let shared_input_objects = construct_shared_input_objects(&[(object_id_1, true)]);
assert_eq!(
shared_object_congestion_tracker.compute_tx_start_at_cost(&shared_input_objects),
10
);
let shared_input_objects =
construct_shared_input_objects(&[(object_id_0, false), (object_id_1, false)]);
assert_eq!(
shared_object_congestion_tracker.compute_tx_start_at_cost(&shared_input_objects),
10
);
let shared_input_objects =
construct_shared_input_objects(&[(object_id_0, true), (object_id_1, true)]);
assert_eq!(
shared_object_congestion_tracker.compute_tx_start_at_cost(&shared_input_objects),
10
);
let shared_input_objects = construct_shared_input_objects(&[(object_id_2, true)]);
assert_eq!(
shared_object_congestion_tracker.compute_tx_start_at_cost(&shared_input_objects),
0
);
}
fn build_transaction(
objects: &[(ObjectID, bool)],
gas_budget: u64,
) -> VerifiedExecutableTransaction {
let (sender, keypair): (_, AccountKeyPair) = get_key_pair();
let gas_object = random_object_ref();
VerifiedExecutableTransaction::new_system(
VerifiedTransaction::new_unchecked(
TestTransactionBuilder::new(sender, gas_object, 1000)
.with_gas_budget(gas_budget)
.move_call(
ObjectID::random(),
"unimportant_module",
"unimportant_function",
objects
.iter()
.map(|(id, mutable)| {
CallArg::Object(ObjectArg::SharedObject {
id: *id,
initial_shared_version: SequenceNumber::new(),
mutable: *mutable,
})
})
.collect(),
)
.build_and_sign(&keypair),
),
0,
)
}
fn build_programmable_transaction(
objects: &[(ObjectID, bool)],
number_of_commands: u64,
gas_budget: u64,
) -> VerifiedExecutableTransaction {
let (sender, keypair): (_, AccountKeyPair) = get_key_pair();
let gas_object = random_object_ref();
let package_id = ObjectID::random();
let mut pt_builder = ProgrammableTransactionBuilder::new();
let mut arguments = Vec::new();
for object in objects {
arguments.push(
pt_builder
.obj(ObjectArg::SharedObject {
id: object.0,
initial_shared_version: SequenceNumber::new(),
mutable: object.1,
})
.unwrap(),
);
}
for _ in 0..number_of_commands {
pt_builder.programmable_move_call(
package_id,
Identifier::new("unimportant_module").unwrap(),
Identifier::new("unimportant_function").unwrap(),
vec![],
arguments.clone(),
);
}
let pt = pt_builder.finish();
VerifiedExecutableTransaction::new_system(
VerifiedTransaction::new_unchecked(
TestTransactionBuilder::new(sender, gas_object, 1000)
.with_gas_budget(gas_budget)
.programmable(pt)
.build_and_sign(&keypair),
),
0,
)
}
#[rstest]
fn test_should_defer_return_correct_congested_objects(
#[values(
PerObjectCongestionControlMode::TotalGasBudget,
PerObjectCongestionControlMode::TotalTxCount,
PerObjectCongestionControlMode::TotalGasBudgetWithCap
)]
mode: PerObjectCongestionControlMode,
) {
let execution_time_estimator = ExecutionTimeEstimator::new_for_testing();
let shared_obj_0 = ObjectID::random();
let shared_obj_1 = ObjectID::random();
let tx_gas_budget = 100;
let max_accumulated_txn_cost_per_object_in_commit = match mode {
PerObjectCongestionControlMode::None => unreachable!(),
PerObjectCongestionControlMode::TotalGasBudget => tx_gas_budget + 1,
PerObjectCongestionControlMode::TotalTxCount => 2,
PerObjectCongestionControlMode::TotalGasBudgetWithCap => tx_gas_budget - 1,
PerObjectCongestionControlMode::ExecutionTimeEstimate(_) => 0, };
let shared_object_congestion_tracker = match mode {
PerObjectCongestionControlMode::None => unreachable!(),
PerObjectCongestionControlMode::TotalGasBudget => {
SharedObjectCongestionTracker::new(
[(shared_obj_0, 10), (shared_obj_1, 1)],
mode,
false,
Some(max_accumulated_txn_cost_per_object_in_commit),
None,
None,
0,
0,
)
}
PerObjectCongestionControlMode::TotalTxCount => {
SharedObjectCongestionTracker::new(
[(shared_obj_0, 2), (shared_obj_1, 1)],
mode,
false,
Some(max_accumulated_txn_cost_per_object_in_commit),
None,
None,
0,
0,
)
}
PerObjectCongestionControlMode::TotalGasBudgetWithCap => {
SharedObjectCongestionTracker::new(
[(shared_obj_0, 10), (shared_obj_1, 1)],
mode,
false,
Some(max_accumulated_txn_cost_per_object_in_commit),
Some(45), None,
0,
0,
)
}
PerObjectCongestionControlMode::ExecutionTimeEstimate(_) => {
SharedObjectCongestionTracker::new(
[(shared_obj_0, 750), (shared_obj_1, 0)],
mode,
false,
Some(max_accumulated_txn_cost_per_object_in_commit),
None,
None,
0,
0,
)
}
};
for mutable in [true, false].iter() {
let tx = build_transaction(&[(shared_obj_0, *mutable)], tx_gas_budget);
if let Some((_, congested_objects)) = shared_object_congestion_tracker
.should_defer_due_to_object_congestion(
Some(&execution_time_estimator),
&tx,
&HashMap::new(),
&ConsensusCommitInfo::new_for_congestion_test(
0,
0,
Duration::from_micros(1_500),
),
)
{
assert_eq!(congested_objects.len(), 1);
assert_eq!(congested_objects[0], shared_obj_0);
} else {
panic!("should defer");
}
}
for mutable in [true, false].iter() {
let tx = build_transaction(&[(shared_obj_1, *mutable)], tx_gas_budget);
assert!(shared_object_congestion_tracker
.should_defer_due_to_object_congestion(
Some(&execution_time_estimator),
&tx,
&HashMap::new(),
&ConsensusCommitInfo::new_for_congestion_test(
0,
0,
Duration::from_micros(1_500),
),
)
.is_none());
}
for mutable_0 in [true, false].iter() {
for mutable_1 in [true, false].iter() {
let tx = build_transaction(
&[(shared_obj_0, *mutable_0), (shared_obj_1, *mutable_1)],
tx_gas_budget,
);
if let Some((_, congested_objects)) = shared_object_congestion_tracker
.should_defer_due_to_object_congestion(
Some(&execution_time_estimator),
&tx,
&HashMap::new(),
&ConsensusCommitInfo::new_for_congestion_test(
0,
0,
Duration::from_micros(1_500),
),
)
{
assert_eq!(congested_objects.len(), 1);
assert_eq!(congested_objects[0], shared_obj_0);
} else {
panic!("should defer");
}
}
}
}
#[rstest]
fn test_should_defer_return_correct_deferral_key(
#[values(
PerObjectCongestionControlMode::TotalGasBudget,
PerObjectCongestionControlMode::TotalTxCount,
PerObjectCongestionControlMode::TotalGasBudgetWithCap,
PerObjectCongestionControlMode::ExecutionTimeEstimate(ExecutionTimeEstimateParams {
target_utilization: 0,
allowed_txn_cost_overage_burst_limit_us: 0,
max_estimate_us: u64::MAX,
randomness_scalar: 0,
}),
)]
mode: PerObjectCongestionControlMode,
) {
let execution_time_estimator = ExecutionTimeEstimator::new_for_testing();
let shared_obj_0 = ObjectID::random();
let tx = build_transaction(&[(shared_obj_0, true)], 100);
let shared_object_congestion_tracker = SharedObjectCongestionTracker::new(
[(shared_obj_0, 1)], mode,
false,
Some(0), Some(2),
None,
0,
0,
);
let mut previously_deferred_tx_digests = HashMap::new();
previously_deferred_tx_digests.insert(
TransactionDigest::random(),
DeferralKey::ConsensusRound {
future_round: 10,
deferred_from_round: 5,
},
);
if let Some((
DeferralKey::ConsensusRound {
future_round,
deferred_from_round,
},
_,
)) = shared_object_congestion_tracker.should_defer_due_to_object_congestion(
Some(&execution_time_estimator),
&tx,
&previously_deferred_tx_digests,
&ConsensusCommitInfo::new_for_congestion_test(
10,
10,
Duration::from_micros(10_000_000),
),
) {
assert_eq!(future_round, 11);
assert_eq!(deferred_from_round, 10);
} else {
panic!("should defer");
}
previously_deferred_tx_digests.insert(
*tx.digest(),
DeferralKey::Randomness {
deferred_from_round: 4,
},
);
if let Some((
DeferralKey::ConsensusRound {
future_round,
deferred_from_round,
},
_,
)) = shared_object_congestion_tracker.should_defer_due_to_object_congestion(
Some(&execution_time_estimator),
&tx,
&previously_deferred_tx_digests,
&ConsensusCommitInfo::new_for_congestion_test(
10,
10,
Duration::from_micros(10_000_000),
),
) {
assert_eq!(future_round, 11);
assert_eq!(deferred_from_round, 4);
} else {
panic!("should defer");
}
previously_deferred_tx_digests.insert(
*tx.digest(),
DeferralKey::ConsensusRound {
future_round: 10,
deferred_from_round: 5,
},
);
if let Some((
DeferralKey::ConsensusRound {
future_round,
deferred_from_round,
},
_,
)) = shared_object_congestion_tracker.should_defer_due_to_object_congestion(
Some(&execution_time_estimator),
&tx,
&previously_deferred_tx_digests,
&ConsensusCommitInfo::new_for_congestion_test(
10,
10,
Duration::from_micros(10_000_000),
),
) {
assert_eq!(future_round, 11);
assert_eq!(deferred_from_round, 5);
} else {
panic!("should defer");
}
}
#[rstest]
fn test_should_defer_allow_overage(
#[values(
PerObjectCongestionControlMode::TotalGasBudget,
PerObjectCongestionControlMode::TotalTxCount,
PerObjectCongestionControlMode::TotalGasBudgetWithCap,
PerObjectCongestionControlMode::ExecutionTimeEstimate(ExecutionTimeEstimateParams {
target_utilization: 16,
allowed_txn_cost_overage_burst_limit_us: 0,
randomness_scalar: 0,
max_estimate_us: u64::MAX,
}),
)]
mode: PerObjectCongestionControlMode,
) {
telemetry_subscribers::init_for_testing();
let execution_time_estimator = ExecutionTimeEstimator::new_for_testing();
let shared_obj_0 = ObjectID::random();
let shared_obj_1 = ObjectID::random();
let tx_gas_budget = 100;
let max_accumulated_txn_cost_per_object_in_commit = match mode {
PerObjectCongestionControlMode::None => unreachable!(),
PerObjectCongestionControlMode::TotalGasBudget => tx_gas_budget + 1,
PerObjectCongestionControlMode::TotalTxCount => 2,
PerObjectCongestionControlMode::TotalGasBudgetWithCap => tx_gas_budget - 1,
PerObjectCongestionControlMode::ExecutionTimeEstimate(_) => 0, };
let shared_object_congestion_tracker = match mode {
PerObjectCongestionControlMode::None => unreachable!(),
PerObjectCongestionControlMode::TotalGasBudget => {
SharedObjectCongestionTracker::new(
[(shared_obj_0, 102), (shared_obj_1, 90)],
mode,
false,
Some(max_accumulated_txn_cost_per_object_in_commit),
None,
None,
max_accumulated_txn_cost_per_object_in_commit * 10,
0,
)
}
PerObjectCongestionControlMode::TotalTxCount => {
SharedObjectCongestionTracker::new(
[(shared_obj_0, 3), (shared_obj_1, 2)],
mode,
false,
Some(max_accumulated_txn_cost_per_object_in_commit),
None,
None,
max_accumulated_txn_cost_per_object_in_commit * 10,
0,
)
}
PerObjectCongestionControlMode::TotalGasBudgetWithCap => {
SharedObjectCongestionTracker::new(
[(shared_obj_0, 100), (shared_obj_1, 90)],
mode,
false,
Some(max_accumulated_txn_cost_per_object_in_commit),
Some(45), None,
max_accumulated_txn_cost_per_object_in_commit * 10,
0,
)
}
PerObjectCongestionControlMode::ExecutionTimeEstimate(_) => {
SharedObjectCongestionTracker::new(
[(shared_obj_0, 1_700_000), (shared_obj_1, 300_000)],
mode,
false,
Some(max_accumulated_txn_cost_per_object_in_commit),
None,
None,
max_accumulated_txn_cost_per_object_in_commit * 10,
0,
)
}
};
for mutable in [true, false].iter() {
let tx = build_transaction(&[(shared_obj_0, *mutable)], tx_gas_budget);
if let Some((_, congested_objects)) = shared_object_congestion_tracker
.should_defer_due_to_object_congestion(
Some(&execution_time_estimator),
&tx,
&HashMap::new(),
&ConsensusCommitInfo::new_for_congestion_test(
0,
0,
Duration::from_micros(10_000_000),
),
)
{
assert_eq!(congested_objects.len(), 1);
assert_eq!(congested_objects[0], shared_obj_0);
} else {
panic!("should defer");
}
}
for mutable in [true, false].iter() {
let tx = build_transaction(&[(shared_obj_1, *mutable)], tx_gas_budget);
assert!(shared_object_congestion_tracker
.should_defer_due_to_object_congestion(
Some(&execution_time_estimator),
&tx,
&HashMap::new(),
&ConsensusCommitInfo::new_for_congestion_test(
0,
0,
Duration::from_micros(10_000_000)
),
)
.is_none());
}
for mutable_0 in [true, false].iter() {
for mutable_1 in [true, false].iter() {
let tx = build_transaction(
&[(shared_obj_0, *mutable_0), (shared_obj_1, *mutable_1)],
tx_gas_budget,
);
if let Some((_, congested_objects)) = shared_object_congestion_tracker
.should_defer_due_to_object_congestion(
Some(&execution_time_estimator),
&tx,
&HashMap::new(),
&ConsensusCommitInfo::new_for_congestion_test(
0,
0,
Duration::from_micros(10_000_000),
),
)
{
assert_eq!(congested_objects.len(), 1);
assert_eq!(congested_objects[0], shared_obj_0);
} else {
panic!("should defer");
}
}
}
}
#[rstest]
fn test_should_defer_allow_overage_with_burst(
#[values(
PerObjectCongestionControlMode::TotalGasBudget,
PerObjectCongestionControlMode::TotalTxCount,
PerObjectCongestionControlMode::TotalGasBudgetWithCap,
PerObjectCongestionControlMode::ExecutionTimeEstimate(ExecutionTimeEstimateParams {
target_utilization: 16,
allowed_txn_cost_overage_burst_limit_us: 1_500_000,
randomness_scalar: 0,
max_estimate_us: u64::MAX,
}),
)]
mode: PerObjectCongestionControlMode,
) {
telemetry_subscribers::init_for_testing();
let execution_time_estimator = ExecutionTimeEstimator::new_for_testing();
let shared_obj_0 = ObjectID::random();
let shared_obj_1 = ObjectID::random();
let tx_gas_budget = 100;
let max_accumulated_txn_cost_per_object_in_commit = match mode {
PerObjectCongestionControlMode::None => unreachable!(),
PerObjectCongestionControlMode::TotalGasBudget => tx_gas_budget,
PerObjectCongestionControlMode::TotalTxCount => 2,
PerObjectCongestionControlMode::TotalGasBudgetWithCap => tx_gas_budget,
PerObjectCongestionControlMode::ExecutionTimeEstimate(_) => 0, };
let allowed_txn_cost_overage_burst_per_object_in_commit = match mode {
PerObjectCongestionControlMode::None => unreachable!(),
PerObjectCongestionControlMode::TotalGasBudget => tx_gas_budget * 2,
PerObjectCongestionControlMode::TotalTxCount => 2,
PerObjectCongestionControlMode::TotalGasBudgetWithCap => tx_gas_budget * 2,
PerObjectCongestionControlMode::ExecutionTimeEstimate(_) => 0, };
let shared_object_congestion_tracker = match mode {
PerObjectCongestionControlMode::None => unreachable!(),
PerObjectCongestionControlMode::TotalGasBudget => {
SharedObjectCongestionTracker::new(
[(shared_obj_0, 301), (shared_obj_1, 199)],
mode,
false,
Some(max_accumulated_txn_cost_per_object_in_commit),
None,
None,
max_accumulated_txn_cost_per_object_in_commit * 10,
allowed_txn_cost_overage_burst_per_object_in_commit,
)
}
PerObjectCongestionControlMode::TotalTxCount => {
SharedObjectCongestionTracker::new(
[(shared_obj_0, 5), (shared_obj_1, 4)],
mode,
false,
Some(max_accumulated_txn_cost_per_object_in_commit),
None,
None,
max_accumulated_txn_cost_per_object_in_commit * 10,
allowed_txn_cost_overage_burst_per_object_in_commit,
)
}
PerObjectCongestionControlMode::TotalGasBudgetWithCap => {
SharedObjectCongestionTracker::new(
[(shared_obj_0, 301), (shared_obj_1, 250)],
mode,
false,
Some(max_accumulated_txn_cost_per_object_in_commit),
Some(45), None,
max_accumulated_txn_cost_per_object_in_commit * 10,
allowed_txn_cost_overage_burst_per_object_in_commit,
)
}
PerObjectCongestionControlMode::ExecutionTimeEstimate(_) => {
SharedObjectCongestionTracker::new(
[(shared_obj_0, 4_000_000), (shared_obj_1, 2_000_000)],
mode,
false,
Some(max_accumulated_txn_cost_per_object_in_commit),
None,
None,
max_accumulated_txn_cost_per_object_in_commit * 10,
allowed_txn_cost_overage_burst_per_object_in_commit,
)
}
};
for mutable in [true, false].iter() {
let tx = build_transaction(&[(shared_obj_0, *mutable)], tx_gas_budget);
if let Some((_, congested_objects)) = shared_object_congestion_tracker
.should_defer_due_to_object_congestion(
Some(&execution_time_estimator),
&tx,
&HashMap::new(),
&ConsensusCommitInfo::new_for_congestion_test(
0,
0,
Duration::from_micros(10_000_000),
),
)
{
assert_eq!(congested_objects.len(), 1);
assert_eq!(congested_objects[0], shared_obj_0);
} else {
panic!("should defer");
}
}
for mutable in [true, false].iter() {
let tx = build_transaction(&[(shared_obj_1, *mutable)], tx_gas_budget);
assert!(shared_object_congestion_tracker
.should_defer_due_to_object_congestion(
Some(&execution_time_estimator),
&tx,
&HashMap::new(),
&ConsensusCommitInfo::new_for_congestion_test(
0,
0,
Duration::from_micros(10_000_000)
),
)
.is_none());
}
for mutable_0 in [true, false].iter() {
for mutable_1 in [true, false].iter() {
let tx = build_transaction(
&[(shared_obj_0, *mutable_0), (shared_obj_1, *mutable_1)],
tx_gas_budget,
);
if let Some((_, congested_objects)) = shared_object_congestion_tracker
.should_defer_due_to_object_congestion(
Some(&execution_time_estimator),
&tx,
&HashMap::new(),
&ConsensusCommitInfo::new_for_congestion_test(
0,
0,
Duration::from_micros(10_000_000),
),
)
{
assert_eq!(congested_objects.len(), 1);
assert_eq!(congested_objects[0], shared_obj_0);
} else {
panic!("should defer");
}
}
}
}
#[rstest]
fn test_bump_object_execution_cost(
#[values(
PerObjectCongestionControlMode::TotalGasBudget,
PerObjectCongestionControlMode::TotalTxCount,
PerObjectCongestionControlMode::TotalGasBudgetWithCap,
PerObjectCongestionControlMode::ExecutionTimeEstimate(ExecutionTimeEstimateParams {
target_utilization: 0,
allowed_txn_cost_overage_burst_limit_us: 0,
randomness_scalar: 0,
max_estimate_us: u64::MAX,
}),
)]
mode: PerObjectCongestionControlMode,
) {
telemetry_subscribers::init_for_testing();
let execution_time_estimator = ExecutionTimeEstimator::new_for_testing();
let object_id_0 = ObjectID::random();
let object_id_1 = ObjectID::random();
let object_id_2 = ObjectID::random();
let cap_factor = Some(1);
let mut shared_object_congestion_tracker = SharedObjectCongestionTracker::new(
[(object_id_0, 5), (object_id_1, 10)],
mode,
false,
Some(0), cap_factor,
None,
0,
0,
);
assert_eq!(shared_object_congestion_tracker.max_cost(), 10);
let cert = build_transaction(&[(object_id_0, false), (object_id_1, false)], 10);
shared_object_congestion_tracker
.bump_object_execution_cost(Some(&execution_time_estimator), &cert);
assert_eq!(
shared_object_congestion_tracker,
SharedObjectCongestionTracker::new(
[(object_id_0, 5), (object_id_1, 10)],
mode,
false,
Some(0), cap_factor,
None,
0,
0,
)
);
assert_eq!(shared_object_congestion_tracker.max_cost(), 10);
let cert = build_transaction(&[(object_id_0, true), (object_id_1, false)], 10);
shared_object_congestion_tracker
.bump_object_execution_cost(Some(&execution_time_estimator), &cert);
let expected_object_0_cost = match mode {
PerObjectCongestionControlMode::None => unreachable!(),
PerObjectCongestionControlMode::TotalGasBudget => 20,
PerObjectCongestionControlMode::TotalTxCount => 11,
PerObjectCongestionControlMode::TotalGasBudgetWithCap => 13, PerObjectCongestionControlMode::ExecutionTimeEstimate(_) => 1_010,
};
assert_eq!(
shared_object_congestion_tracker,
SharedObjectCongestionTracker::new(
[(object_id_0, expected_object_0_cost), (object_id_1, 10)],
mode,
false,
Some(0), cap_factor,
None,
0,
0,
)
);
assert_eq!(
shared_object_congestion_tracker.max_cost(),
expected_object_0_cost
);
let cert = build_transaction(
&[
(object_id_0, true),
(object_id_1, true),
(object_id_2, true),
],
10,
);
let expected_object_cost = match mode {
PerObjectCongestionControlMode::None => unreachable!(),
PerObjectCongestionControlMode::TotalGasBudget => 30,
PerObjectCongestionControlMode::TotalTxCount => 12,
PerObjectCongestionControlMode::TotalGasBudgetWithCap => 17, PerObjectCongestionControlMode::ExecutionTimeEstimate(_) => 2_010,
};
shared_object_congestion_tracker
.bump_object_execution_cost(Some(&execution_time_estimator), &cert);
assert_eq!(
shared_object_congestion_tracker,
SharedObjectCongestionTracker::new(
[
(object_id_0, expected_object_cost),
(object_id_1, expected_object_cost),
(object_id_2, expected_object_cost)
],
mode,
false,
Some(0), cap_factor,
None,
0,
0,
)
);
assert_eq!(
shared_object_congestion_tracker.max_cost(),
expected_object_cost
);
let cert = build_programmable_transaction(
&[
(object_id_0, true),
(object_id_1, true),
(object_id_2, true),
],
7,
30,
);
let expected_object_cost = match mode {
PerObjectCongestionControlMode::None => unreachable!(),
PerObjectCongestionControlMode::TotalGasBudget => 60,
PerObjectCongestionControlMode::TotalTxCount => 13,
PerObjectCongestionControlMode::TotalGasBudgetWithCap => 45, PerObjectCongestionControlMode::ExecutionTimeEstimate(_) => 9_010,
};
shared_object_congestion_tracker
.bump_object_execution_cost(Some(&execution_time_estimator), &cert);
assert_eq!(
shared_object_congestion_tracker,
SharedObjectCongestionTracker::new(
[
(object_id_0, expected_object_cost),
(object_id_1, expected_object_cost),
(object_id_2, expected_object_cost)
],
mode,
false,
Some(0), cap_factor,
None,
0,
0,
)
);
assert_eq!(
shared_object_congestion_tracker.max_cost(),
expected_object_cost
);
}
#[rstest]
fn test_accumulated_debts(
#[values(
PerObjectCongestionControlMode::TotalGasBudget,
PerObjectCongestionControlMode::TotalTxCount,
PerObjectCongestionControlMode::TotalGasBudgetWithCap,
PerObjectCongestionControlMode::ExecutionTimeEstimate(ExecutionTimeEstimateParams {
target_utilization: 100,
allowed_txn_cost_overage_burst_limit_us: 1_600 * 5,
randomness_scalar: 0,
max_estimate_us: u64::MAX,
}),
)]
mode: PerObjectCongestionControlMode,
) {
telemetry_subscribers::init_for_testing();
let execution_time_estimator = ExecutionTimeEstimator::new_for_testing();
let shared_obj_0 = ObjectID::random();
let shared_obj_1 = ObjectID::random();
let tx_gas_budget = 100;
let max_accumulated_txn_cost_per_object_in_commit = match mode {
PerObjectCongestionControlMode::None => unreachable!(),
PerObjectCongestionControlMode::TotalGasBudget
| PerObjectCongestionControlMode::TotalGasBudgetWithCap => 90,
PerObjectCongestionControlMode::TotalTxCount => 2,
PerObjectCongestionControlMode::ExecutionTimeEstimate(_) => 0, };
let mut shared_object_congestion_tracker = match mode {
PerObjectCongestionControlMode::None => unreachable!(),
PerObjectCongestionControlMode::TotalGasBudget => {
SharedObjectCongestionTracker::new(
[(shared_obj_0, 80), (shared_obj_1, 80)],
mode,
false,
Some(max_accumulated_txn_cost_per_object_in_commit),
None,
None,
max_accumulated_txn_cost_per_object_in_commit * 10,
max_accumulated_txn_cost_per_object_in_commit * 5,
)
}
PerObjectCongestionControlMode::TotalGasBudgetWithCap => {
SharedObjectCongestionTracker::new(
[(shared_obj_0, 80), (shared_obj_1, 80)],
mode,
false,
Some(max_accumulated_txn_cost_per_object_in_commit),
Some(45),
None,
max_accumulated_txn_cost_per_object_in_commit * 10,
max_accumulated_txn_cost_per_object_in_commit * 5,
)
}
PerObjectCongestionControlMode::TotalTxCount => {
SharedObjectCongestionTracker::new(
[(shared_obj_0, 2), (shared_obj_1, 2)],
mode,
false,
Some(max_accumulated_txn_cost_per_object_in_commit),
None,
None,
max_accumulated_txn_cost_per_object_in_commit * 10,
max_accumulated_txn_cost_per_object_in_commit * 5,
)
}
PerObjectCongestionControlMode::ExecutionTimeEstimate(_) => {
SharedObjectCongestionTracker::new(
[(shared_obj_0, 500), (shared_obj_1, 500)],
mode,
false,
Some(max_accumulated_txn_cost_per_object_in_commit),
None,
None,
max_accumulated_txn_cost_per_object_in_commit * 10,
max_accumulated_txn_cost_per_object_in_commit * 5,
)
}
};
for mutable in [true, false].iter() {
let tx = build_transaction(&[(shared_obj_0, *mutable)], tx_gas_budget);
shared_object_congestion_tracker
.bump_object_execution_cost(Some(&execution_time_estimator), &tx);
}
let accumulated_debts = shared_object_congestion_tracker.accumulated_debts(
&ConsensusCommitInfo::new_for_congestion_test(0, 0, Duration::from_micros(800)),
);
assert_eq!(accumulated_debts.len(), 1);
match mode {
PerObjectCongestionControlMode::None => unreachable!(),
PerObjectCongestionControlMode::TotalGasBudget => {
assert_eq!(accumulated_debts[0], (shared_obj_0, 90)); }
PerObjectCongestionControlMode::TotalGasBudgetWithCap => {
assert_eq!(accumulated_debts[0], (shared_obj_0, 80)); }
PerObjectCongestionControlMode::TotalTxCount => {
assert_eq!(accumulated_debts[0], (shared_obj_0, 1)); }
PerObjectCongestionControlMode::ExecutionTimeEstimate(_) => {
assert_eq!(accumulated_debts[0], (shared_obj_0, 700));
}
}
}
#[test]
fn test_accumulated_debts_empty() {
let object_id_0 = ObjectID::random();
let object_id_1 = ObjectID::random();
let object_id_2 = ObjectID::random();
let shared_object_congestion_tracker = SharedObjectCongestionTracker::new(
[(object_id_0, 5), (object_id_1, 10), (object_id_2, 100)],
PerObjectCongestionControlMode::TotalGasBudget,
false,
Some(100),
None,
None,
0,
0,
);
let accumulated_debts = shared_object_congestion_tracker.accumulated_debts(
&ConsensusCommitInfo::new_for_congestion_test(0, 0, Duration::ZERO),
);
assert!(accumulated_debts.is_empty());
}
#[test]
fn test_tx_cost_absolute_cap() {
let execution_time_estimator = ExecutionTimeEstimator::new_for_testing();
let object_id_0 = ObjectID::random();
let object_id_1 = ObjectID::random();
let object_id_2 = ObjectID::random();
let tx_gas_budget = 2000;
let mut shared_object_congestion_tracker = SharedObjectCongestionTracker::new(
[(object_id_0, 5), (object_id_1, 10), (object_id_2, 100)],
PerObjectCongestionControlMode::TotalGasBudgetWithCap,
false,
Some(100),
Some(1000),
Some(2),
1000,
0,
);
let tx = build_transaction(
&[
(object_id_0, false),
(object_id_1, false),
(object_id_2, true),
],
tx_gas_budget,
);
assert!(shared_object_congestion_tracker
.should_defer_due_to_object_congestion(
Some(&execution_time_estimator),
&tx,
&HashMap::new(),
&ConsensusCommitInfo::new_for_congestion_test(0, 0, Duration::ZERO),
)
.is_none());
shared_object_congestion_tracker
.bump_object_execution_cost(Some(&execution_time_estimator), &tx);
assert_eq!(300, shared_object_congestion_tracker.max_cost());
let accumulated_debts = shared_object_congestion_tracker.accumulated_debts(
&ConsensusCommitInfo::new_for_congestion_test(0, 0, Duration::ZERO),
);
assert_eq!(accumulated_debts.len(), 1);
assert_eq!(accumulated_debts[0], (object_id_2, 200));
}
}