use crate::{
authority::{
authority_per_epoch_store::AuthorityPerEpochStore,
shared_object_version_manager::{Schedulable, WithdrawType},
AuthorityMetrics, ExecutionEnv,
},
execution_cache::{ObjectCacheRead, TransactionCacheRead},
execution_scheduler::{
balance_withdraw_scheduler::{
scheduler::BalanceWithdrawScheduler, BalanceSettlement, ScheduleStatus,
TxBalanceWithdraw,
},
ExecutingGuard, PendingCertificateStats,
},
};
use futures::stream::{FuturesUnordered, StreamExt};
use mysten_common::debug_fatal;
use mysten_metrics::spawn_monitored_task;
use std::{
collections::{BTreeMap, HashMap, HashSet},
sync::Arc,
};
use sui_config::node::AuthorityOverloadConfig;
use sui_types::{
base_types::{FullObjectID, SequenceNumber},
error::SuiResult,
executable_transaction::VerifiedExecutableTransaction,
storage::{ChildObjectResolver, InputKey},
transaction::{SenderSignedData, TransactionDataAPI, TransactionKey},
SUI_ACCUMULATOR_ROOT_OBJECT_ID,
};
use tokio::sync::mpsc::UnboundedSender;
use tokio::time::Instant;
use tracing::{debug, error};
use super::{overload_tracker::OverloadTracker, PendingCertificate};
#[derive(Clone)]
pub struct ExecutionScheduler {
object_cache_read: Arc<dyn ObjectCacheRead>,
transaction_cache_read: Arc<dyn TransactionCacheRead>,
overload_tracker: Arc<OverloadTracker>,
tx_ready_certificates: UnboundedSender<PendingCertificate>,
balance_withdraw_scheduler: Option<Arc<BalanceWithdrawScheduler>>,
metrics: Arc<AuthorityMetrics>,
}
struct PendingGuard<'a> {
scheduler: &'a ExecutionScheduler,
cert: &'a VerifiedExecutableTransaction,
}
impl<'a> PendingGuard<'a> {
pub fn new(scheduler: &'a ExecutionScheduler, cert: &'a VerifiedExecutableTransaction) -> Self {
scheduler
.metrics
.transaction_manager_num_pending_certificates
.inc();
scheduler
.overload_tracker
.add_pending_certificate(cert.data());
Self { scheduler, cert }
}
}
impl Drop for PendingGuard<'_> {
fn drop(&mut self) {
self.scheduler
.metrics
.transaction_manager_num_pending_certificates
.dec();
self.scheduler
.overload_tracker
.remove_pending_certificate(self.cert.data());
}
}
impl ExecutionScheduler {
pub fn new(
object_cache_read: Arc<dyn ObjectCacheRead>,
child_object_resolver: Arc<dyn ChildObjectResolver + Send + Sync>,
transaction_cache_read: Arc<dyn TransactionCacheRead>,
tx_ready_certificates: UnboundedSender<PendingCertificate>,
epoch_store: &Arc<AuthorityPerEpochStore>,
metrics: Arc<AuthorityMetrics>,
) -> Self {
tracing::info!("Creating new ExecutionScheduler");
let balance_accumulator_enabled = epoch_store.accumulators_enabled();
let balance_withdraw_scheduler = if balance_accumulator_enabled {
let starting_accumulator_version = object_cache_read
.get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
.expect("Accumulator root object must be present if balance accumulator is enabled")
.version();
Some(BalanceWithdrawScheduler::new(
Arc::new(child_object_resolver),
starting_accumulator_version,
))
} else {
None
};
Self {
object_cache_read,
transaction_cache_read,
overload_tracker: Arc::new(OverloadTracker::new()),
tx_ready_certificates,
balance_withdraw_scheduler,
metrics,
}
}
async fn schedule_transaction(
self,
cert: VerifiedExecutableTransaction,
execution_env: ExecutionEnv,
epoch_store: &Arc<AuthorityPerEpochStore>,
) {
let enqueue_time = Instant::now();
let tx_digest = cert.digest();
let digests = [*tx_digest];
let tx_data = cert.transaction_data();
let input_object_kinds = tx_data
.input_objects()
.expect("input_objects() cannot fail");
let input_object_keys: Vec<_> = epoch_store
.get_input_object_keys(
&cert.key(),
&input_object_kinds,
&execution_env.assigned_versions,
)
.into_iter()
.collect();
let receiving_object_keys: HashSet<_> = tx_data
.receiving_objects()
.into_iter()
.map(|entry| {
InputKey::VersionedObject {
id: FullObjectID::new(entry.0, None),
version: entry.1,
}
})
.collect();
let input_and_receiving_keys = [
input_object_keys,
receiving_object_keys.iter().cloned().collect(),
]
.concat();
let epoch = epoch_store.epoch();
debug!(
?tx_digest,
"Scheduled transaction, waiting for input objects: {:?}", input_and_receiving_keys,
);
let availability = self
.object_cache_read
.multi_input_objects_available_cache_only(&input_and_receiving_keys);
let missing_input_keys: Vec<_> = input_and_receiving_keys
.into_iter()
.zip(availability)
.filter_map(|(key, available)| if !available { Some(key) } else { None })
.collect();
if missing_input_keys.is_empty() {
self.metrics
.transaction_manager_num_enqueued_certificates
.with_label_values(&["ready"])
.inc();
debug!(?tx_digest, "Input objects already available");
self.send_transaction_for_execution(&cert, execution_env, enqueue_time);
return;
}
let _pending_guard = PendingGuard::new(&self, &cert);
self.metrics
.transaction_manager_num_enqueued_certificates
.with_label_values(&["pending"])
.inc();
tokio::select! {
_ = self.object_cache_read
.notify_read_input_objects(&missing_input_keys, &receiving_object_keys, epoch)
=> {
self.metrics
.transaction_manager_transaction_queue_age_s
.observe(enqueue_time.elapsed().as_secs_f64());
debug!(?tx_digest, "Input objects available");
self.send_transaction_for_execution(
&cert,
execution_env,
enqueue_time,
);
}
_ = self.transaction_cache_read.notify_read_executed_effects_digests(
"ExecutionScheduler::notify_read_executed_effects_digests",
&digests,
) => {
debug!(?tx_digest, "Transaction already executed");
}
};
}
fn send_transaction_for_execution(
&self,
cert: &VerifiedExecutableTransaction,
execution_env: ExecutionEnv,
enqueue_time: Instant,
) {
let pending_cert = PendingCertificate {
certificate: cert.clone(),
execution_env,
stats: PendingCertificateStats {
enqueue_time,
ready_time: Some(Instant::now()),
},
executing_guard: Some(ExecutingGuard::new(
self.metrics
.transaction_manager_num_executing_certificates
.clone(),
)),
};
let _ = self.tx_ready_certificates.send(pending_cert);
}
fn schedule_balance_withdraws(
&self,
certs: Vec<(VerifiedExecutableTransaction, SequenceNumber, ExecutionEnv)>,
epoch_store: &Arc<AuthorityPerEpochStore>,
) {
if certs.is_empty() {
return;
}
let scheduler = self
.balance_withdraw_scheduler
.as_ref()
.expect("Balance withdraw scheduler must be enabled if there are withdraws");
let mut withdraws = BTreeMap::new();
let mut prev_version = None;
for (cert, version, _) in &certs {
let tx_withdraws = cert
.transaction_data()
.process_funds_withdrawals()
.expect("Balance withdraws should have already been checked");
assert!(!tx_withdraws.is_empty());
if let Some(prev_version) = prev_version {
assert!(prev_version <= *version);
}
prev_version = Some(*version);
let tx_digest = *cert.digest();
withdraws
.entry(*version)
.or_insert(Vec::new())
.push(TxBalanceWithdraw {
tx_digest,
reservations: tx_withdraws,
});
}
let mut receivers = FuturesUnordered::new();
for (version, tx_withdraws) in withdraws {
receivers.extend(scheduler.schedule_withdraws(version, tx_withdraws));
}
let scheduler = self.clone();
let epoch_store = epoch_store.clone();
spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
let mut cert_map = HashMap::new();
for (cert, _, env) in certs {
cert_map.insert(*cert.digest(), (cert, env));
}
while let Some(result) = receivers.next().await {
match result {
Ok(result) => match result.status {
ScheduleStatus::InsufficientBalance => {
let tx_digest = result.tx_digest;
debug!(
?tx_digest,
"Balance withdraw scheduling result: Insufficient balance"
);
let (cert, env) = cert_map.remove(&tx_digest).expect("cert must exist");
let env = env.with_insufficient_balance();
scheduler.enqueue_transactions(vec![(cert, env)], &epoch_store);
}
ScheduleStatus::SufficientBalance => {
let tx_digest = result.tx_digest;
debug!(?tx_digest, "Balance withdraw scheduling result: Success");
let (cert, env) = cert_map.remove(&tx_digest).expect("cert must exist");
let env = env.with_sufficient_balance();
scheduler.enqueue_transactions(vec![(cert, env)], &epoch_store);
}
ScheduleStatus::AlreadyExecuted => {
let tx_digest = result.tx_digest;
debug!(?tx_digest, "Withdraw already executed");
}
},
Err(e) => {
error!("Withdraw scheduler stopped: {:?}", e);
}
}
}
}));
}
fn schedule_settlement_transactions(
&self,
settlement_txns: Vec<(TransactionKey, ExecutionEnv)>,
epoch_store: &Arc<AuthorityPerEpochStore>,
) {
if !settlement_txns.is_empty() {
let scheduler = self.clone();
let epoch_store = epoch_store.clone();
spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
let mut futures: FuturesUnordered<_> =
settlement_txns
.into_iter()
.map(|(key, env)| {
let epoch_store = epoch_store.clone();
async move {
(epoch_store.wait_for_settlement_transactions(key).await, env)
}
})
.collect();
while let Some((txns, env)) = futures.next().await {
let txns = txns
.into_iter()
.map(|tx| (tx, env.clone()))
.collect::<Vec<_>>();
scheduler.enqueue_transactions(txns, &epoch_store);
}
}));
}
}
fn schedule_tx_keys(
&self,
tx_with_keys: Vec<(TransactionKey, ExecutionEnv)>,
epoch_store: &Arc<AuthorityPerEpochStore>,
) {
if tx_with_keys.is_empty() {
return;
}
let scheduler = self.clone();
let epoch_store = epoch_store.clone();
spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
let tx_keys: Vec<_> = tx_with_keys.iter().map(|(key, _)| key).cloned().collect();
let digests = epoch_store
.notify_read_tx_key_to_digest(&tx_keys)
.await
.expect("db error");
let transactions = scheduler
.transaction_cache_read
.multi_get_transaction_blocks(&digests)
.into_iter()
.map(|tx| {
let tx = tx.expect("tx must exist").as_ref().clone();
VerifiedExecutableTransaction::new_system(tx, epoch_store.epoch())
})
.zip(tx_with_keys.into_iter().map(|(_, env)| env))
.collect::<Vec<_>>();
scheduler.enqueue_transactions(transactions, &epoch_store);
}));
}
#[cfg(debug_assertions)]
fn assert_cert_not_executed_previous_epochs(&self, cert: &VerifiedExecutableTransaction) {
let epoch = cert.epoch();
let digest = *cert.digest();
let digests = [digest];
let executed = self
.transaction_cache_read
.multi_get_executed_effects(&digests)
.pop()
.unwrap();
if let Some(executed) = executed {
use sui_types::effects::TransactionEffectsAPI;
assert_eq!(
executed.executed_epoch(),
epoch,
"Transaction {} was executed in epoch {}, but scheduled again in epoch {}",
digest,
executed.executed_epoch(),
epoch
);
}
}
}
impl ExecutionScheduler {
pub fn enqueue(
&self,
certs: Vec<(Schedulable, ExecutionEnv)>,
epoch_store: &Arc<AuthorityPerEpochStore>,
) {
let mut ordinary_txns = Vec::with_capacity(certs.len());
let mut tx_with_keys = Vec::new();
let mut tx_with_withdraws = Vec::new();
let mut settlement_txns = Vec::new();
for (schedulable, env) in certs {
match schedulable {
Schedulable::Transaction(tx) => {
match env.assigned_versions.withdraw_type {
WithdrawType::Withdraw(accumulator_version) => {
tx_with_withdraws.push((tx, accumulator_version, env));
}
WithdrawType::NonWithdraw => {
ordinary_txns.push((tx, env));
}
}
}
s @ Schedulable::RandomnessStateUpdate(..) => {
tx_with_keys.push((s.key(), env));
}
Schedulable::AccumulatorSettlement(_, _) => {
settlement_txns.push((schedulable.key(), env));
}
}
}
self.enqueue_transactions(ordinary_txns, epoch_store);
self.schedule_tx_keys(tx_with_keys, epoch_store);
self.schedule_balance_withdraws(tx_with_withdraws, epoch_store);
self.schedule_settlement_transactions(settlement_txns, epoch_store);
}
pub fn enqueue_transactions(
&self,
certs: Vec<(VerifiedExecutableTransaction, ExecutionEnv)>,
epoch_store: &Arc<AuthorityPerEpochStore>,
) {
let certs: Vec<_> = certs
.into_iter()
.filter_map(|cert| {
if cert.0.epoch() == epoch_store.epoch() {
#[cfg(debug_assertions)]
self.assert_cert_not_executed_previous_epochs(&cert.0);
Some(cert)
} else {
debug_fatal!(
"We should never enqueue certificate from wrong epoch. Expected={} Certificate={:?}",
epoch_store.epoch(),
cert.0.epoch()
);
None
}
})
.collect();
let digests: Vec<_> = certs.iter().map(|(cert, _)| *cert.digest()).collect();
let executed = self
.transaction_cache_read
.multi_get_executed_effects_digests(&digests);
let mut already_executed_certs_num = 0;
let pending_certs =
certs
.into_iter()
.zip(executed)
.filter_map(|((cert, execution_env), executed)| {
if executed.is_none() {
Some((cert, execution_env))
} else {
already_executed_certs_num += 1;
None
}
});
for (cert, execution_env) in pending_certs {
let scheduler = self.clone();
let epoch_store = epoch_store.clone();
spawn_monitored_task!(
epoch_store.within_alive_epoch(scheduler.schedule_transaction(
cert,
execution_env,
&epoch_store,
))
);
}
self.metrics
.transaction_manager_num_enqueued_certificates
.with_label_values(&["already_executed"])
.inc_by(already_executed_certs_num);
}
pub fn settle_balances(&self, settlement: BalanceSettlement) {
self.balance_withdraw_scheduler
.as_ref()
.expect("Balance withdraw scheduler must be enabled if there are settlements")
.settle_balances(settlement);
}
pub fn check_execution_overload(
&self,
overload_config: &AuthorityOverloadConfig,
tx_data: &SenderSignedData,
) -> SuiResult {
let inflight_queue_len = self.num_pending_certificates();
self.overload_tracker
.check_execution_overload(overload_config, tx_data, inflight_queue_len)
}
pub fn num_pending_certificates(&self) -> usize {
(self
.metrics
.transaction_manager_num_pending_certificates
.get()
+ self
.metrics
.transaction_manager_num_executing_certificates
.get()) as usize
}
#[cfg(test)]
pub fn check_empty_for_testing(&self) {
assert_eq!(self.num_pending_certificates(), 0);
}
}
#[cfg(test)]
mod test {
use std::{time::Duration, vec};
use crate::authority::shared_object_version_manager::AssignedVersions;
use sui_test_transaction_builder::TestTransactionBuilder;
use sui_types::executable_transaction::VerifiedExecutableTransaction;
use sui_types::object::Owner;
use sui_types::transaction::VerifiedTransaction;
use sui_types::{
base_types::{ObjectID, SequenceNumber},
crypto::deterministic_random_account_key,
object::Object,
transaction::{CallArg, ObjectArg},
SUI_FRAMEWORK_PACKAGE_ID,
};
use tokio::time::Instant;
use tokio::{
sync::mpsc::{error::TryRecvError, unbounded_channel, UnboundedReceiver},
time::sleep,
};
use crate::authority::ExecutionEnv;
use crate::authority::{authority_tests::init_state_with_objects, AuthorityState};
use crate::execution_scheduler::SchedulingSource;
use super::{ExecutionScheduler, PendingCertificate};
#[allow(clippy::disallowed_methods)] fn make_execution_scheduler(
state: &AuthorityState,
) -> (ExecutionScheduler, UnboundedReceiver<PendingCertificate>) {
let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
let execution_scheduler = ExecutionScheduler::new(
state.get_object_cache_reader().clone(),
state.get_child_object_resolver().clone(),
state.get_transaction_cache_reader().clone(),
tx_ready_certificates,
&state.epoch_store_for_testing(),
state.metrics.clone(),
);
(execution_scheduler, rx_ready_certificates)
}
fn make_transaction(gas_object: Object, input: Vec<CallArg>) -> VerifiedExecutableTransaction {
let rgp = 100;
let (sender, keypair) = deterministic_random_account_key();
let transaction =
TestTransactionBuilder::new(sender, gas_object.compute_object_reference(), rgp)
.move_call(SUI_FRAMEWORK_PACKAGE_ID, "counter", "assert_value", input)
.build_and_sign(&keypair);
VerifiedExecutableTransaction::new_system(
VerifiedTransaction::new_unchecked(transaction),
0,
)
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn execution_scheduler_basics() {
let (owner, _keypair) = deterministic_random_account_key();
let gas_objects: Vec<Object> = (0..10)
.map(|_| {
let gas_object_id = ObjectID::random();
Object::with_id_owner_for_testing(gas_object_id, owner)
})
.collect();
let state = init_state_with_objects(gas_objects.clone()).await;
let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
assert!(rx_ready_certificates
.try_recv()
.is_err_and(|err| err == TryRecvError::Empty));
assert_eq!(execution_scheduler.num_pending_certificates(), 0);
execution_scheduler.enqueue_transactions(vec![], &state.epoch_store_for_testing());
assert!(rx_ready_certificates
.try_recv()
.is_err_and(|err| err == TryRecvError::Empty));
let transaction = make_transaction(gas_objects[0].clone(), vec![]);
let tx_start_time = Instant::now();
execution_scheduler.enqueue_transactions(
vec![(
transaction.clone(),
ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
)],
&state.epoch_store_for_testing(),
);
let pending_certificate = rx_ready_certificates.recv().await.unwrap();
assert!(pending_certificate.stats.enqueue_time >= tx_start_time);
assert!(
pending_certificate.stats.ready_time.unwrap() >= pending_certificate.stats.enqueue_time
);
assert_eq!(execution_scheduler.num_pending_certificates(), 1);
drop(pending_certificate);
execution_scheduler.check_empty_for_testing();
let gas_object_new = Object::with_id_owner_version_for_testing(
ObjectID::random(),
0.into(),
Owner::AddressOwner(owner),
);
let transaction = make_transaction(gas_object_new.clone(), vec![]);
let tx_start_time = Instant::now();
execution_scheduler.enqueue_transactions(
vec![(
transaction.clone(),
ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
)],
&state.epoch_store_for_testing(),
);
sleep(Duration::from_secs(1)).await;
assert!(rx_ready_certificates
.try_recv()
.is_err_and(|err| err == TryRecvError::Empty));
assert_eq!(execution_scheduler.num_pending_certificates(), 1);
execution_scheduler.enqueue_transactions(
vec![(
transaction.clone(),
ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
)],
&state.epoch_store_for_testing(),
);
sleep(Duration::from_secs(1)).await;
assert!(rx_ready_certificates
.try_recv()
.is_err_and(|err| err == TryRecvError::Empty));
assert_eq!(execution_scheduler.num_pending_certificates(), 2);
state
.get_cache_writer()
.write_object_entry_for_test(gas_object_new);
let pending_certificate = rx_ready_certificates.recv().await.unwrap();
let pending_certificate2 = rx_ready_certificates.recv().await.unwrap();
assert_eq!(
pending_certificate.certificate.digest(),
pending_certificate2.certificate.digest()
);
assert!(pending_certificate.stats.enqueue_time >= tx_start_time);
assert!(
pending_certificate.stats.ready_time.unwrap() - pending_certificate.stats.enqueue_time
>= Duration::from_secs(2)
);
drop(pending_certificate);
drop(pending_certificate2);
execution_scheduler.check_empty_for_testing();
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn execution_scheduler_object_dependency() {
telemetry_subscribers::init_for_testing();
let (owner, _keypair) = deterministic_random_account_key();
let gas_objects: Vec<Object> = (0..10)
.map(|_| {
let gas_object_id = ObjectID::random();
Object::with_id_owner_for_testing(gas_object_id, owner)
})
.collect();
let shared_object = Object::shared_for_testing();
let initial_shared_version = shared_object.owner().start_version().unwrap();
let shared_object_2 = Object::shared_for_testing();
let initial_shared_version_2 = shared_object_2.owner().start_version().unwrap();
let state = init_state_with_objects(
[
gas_objects.clone(),
vec![shared_object.clone(), shared_object_2.clone()],
]
.concat(),
)
.await;
let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
assert!(rx_ready_certificates.try_recv().is_err());
let shared_version = 1000.into();
let shared_object_arg_read = ObjectArg::SharedObject {
id: shared_object.id(),
initial_shared_version,
mutable: false,
};
let transaction_read_0 = make_transaction(
gas_objects[0].clone(),
vec![CallArg::Object(shared_object_arg_read)],
);
let transaction_read_1 = make_transaction(
gas_objects[1].clone(),
vec![CallArg::Object(shared_object_arg_read)],
);
let tx_read_0_assigned_versions = vec![(
(
shared_object.id(),
shared_object.owner().start_version().unwrap(),
),
shared_version,
)];
let tx_read_1_assigned_versions = vec![(
(
shared_object.id(),
shared_object.owner().start_version().unwrap(),
),
shared_version,
)];
let shared_object_arg_default = ObjectArg::SharedObject {
id: shared_object.id(),
initial_shared_version,
mutable: true,
};
let transaction_default = make_transaction(
gas_objects[2].clone(),
vec![CallArg::Object(shared_object_arg_default)],
);
let tx_default_assigned_versions = vec![(
(
shared_object.id(),
shared_object.owner().start_version().unwrap(),
),
shared_version,
)];
let shared_version_2 = 1000.into();
let shared_object_arg_read_2 = ObjectArg::SharedObject {
id: shared_object_2.id(),
initial_shared_version: initial_shared_version_2,
mutable: false,
};
let transaction_read_2 = make_transaction(
gas_objects[3].clone(),
vec![
CallArg::Object(shared_object_arg_default),
CallArg::Object(shared_object_arg_read_2),
],
);
let tx_read_2_assigned_versions = vec![
(
(
shared_object.id(),
shared_object.owner().start_version().unwrap(),
),
shared_version,
),
(
(
shared_object_2.id(),
shared_object_2.owner().start_version().unwrap(),
),
shared_version_2,
),
];
execution_scheduler.enqueue_transactions(
vec![
(
transaction_read_0.clone(),
ExecutionEnv::new().with_assigned_versions(AssignedVersions::non_withdraw(
tx_read_0_assigned_versions,
)),
),
(
transaction_read_1.clone(),
ExecutionEnv::new().with_assigned_versions(AssignedVersions::non_withdraw(
tx_read_1_assigned_versions,
)),
),
(
transaction_default.clone(),
ExecutionEnv::new().with_assigned_versions(AssignedVersions::non_withdraw(
tx_default_assigned_versions,
)),
),
(
transaction_read_2.clone(),
ExecutionEnv::new().with_assigned_versions(AssignedVersions::non_withdraw(
tx_read_2_assigned_versions,
)),
),
],
&state.epoch_store_for_testing(),
);
sleep(Duration::from_secs(1)).await;
assert!(rx_ready_certificates.try_recv().is_err());
assert_eq!(execution_scheduler.num_pending_certificates(), 4);
let mut new_shared_object = shared_object.clone();
new_shared_object
.data
.try_as_move_mut()
.unwrap()
.increment_version_to(shared_version_2);
state
.get_cache_writer()
.write_object_entry_for_test(new_shared_object);
let tx_0 = rx_ready_certificates.recv().await.unwrap().certificate;
let tx_1 = rx_ready_certificates.recv().await.unwrap().certificate;
let tx_2 = rx_ready_certificates.recv().await.unwrap().certificate;
{
let mut want_digests = vec![
transaction_read_0.digest(),
transaction_read_1.digest(),
transaction_default.digest(),
];
want_digests.sort();
let mut got_digests = vec![tx_0.digest(), tx_1.digest(), tx_2.digest()];
got_digests.sort();
assert_eq!(want_digests, got_digests);
}
sleep(Duration::from_secs(1)).await;
assert!(rx_ready_certificates.try_recv().is_err());
assert_eq!(execution_scheduler.num_pending_certificates(), 1);
let mut new_shared_object_2 = shared_object_2.clone();
new_shared_object_2
.data
.try_as_move_mut()
.unwrap()
.increment_version_to(shared_version_2);
state
.get_cache_writer()
.write_object_entry_for_test(new_shared_object_2);
let tx_3 = rx_ready_certificates.recv().await.unwrap().certificate;
assert_eq!(transaction_read_2.digest(), tx_3.digest());
sleep(Duration::from_secs(1)).await;
assert!(rx_ready_certificates.try_recv().is_err());
execution_scheduler.check_empty_for_testing();
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn execution_scheduler_receiving_notify_commit() {
telemetry_subscribers::init_for_testing();
let (owner, _keypair) = deterministic_random_account_key();
let gas_objects: Vec<Object> = (0..10)
.map(|_| {
let gas_object_id = ObjectID::random();
Object::with_id_owner_for_testing(gas_object_id, owner)
})
.collect();
let state = init_state_with_objects(gas_objects.clone()).await;
let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
assert!(rx_ready_certificates.try_recv().is_err());
execution_scheduler.check_empty_for_testing();
let obj_id = ObjectID::random();
let object_arguments: Vec<_> = (0..10)
.map(|i| {
let object = Object::with_id_owner_version_for_testing(
obj_id,
i.into(),
Owner::AddressOwner(owner),
);
let object_arg = if i % 2 == 0 || i == 3 {
ObjectArg::Receiving(object.compute_object_reference())
} else {
ObjectArg::ImmOrOwnedObject(object.compute_object_reference())
};
let txn =
make_transaction(gas_objects[0].clone(), vec![CallArg::Object(object_arg)]);
(object, txn)
})
.collect();
for (i, (_, txn)) in object_arguments.iter().enumerate() {
execution_scheduler.enqueue_transactions(
vec![(
txn.clone(),
ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
)],
&state.epoch_store_for_testing(),
);
sleep(Duration::from_secs(1)).await;
assert!(rx_ready_certificates.try_recv().is_err());
assert_eq!(execution_scheduler.num_pending_certificates(), i + 1);
}
let len = object_arguments.len();
for (i, (object, txn)) in object_arguments.into_iter().enumerate() {
state
.get_cache_writer()
.write_object_entry_for_test(object.clone());
rx_ready_certificates.recv().await.unwrap();
sleep(Duration::from_secs(1)).await;
assert!(rx_ready_certificates.try_recv().is_err());
drop(txn);
assert_eq!(execution_scheduler.num_pending_certificates(), len - i - 1);
}
execution_scheduler.check_empty_for_testing();
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn execution_scheduler_receiving_object_ready_notifications() {
telemetry_subscribers::init_for_testing();
let (owner, _keypair) = deterministic_random_account_key();
let gas_objects: Vec<Object> = (0..10)
.map(|_| {
let gas_object_id = ObjectID::random();
Object::with_id_owner_for_testing(gas_object_id, owner)
})
.collect();
let state = init_state_with_objects(gas_objects.clone()).await;
let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
assert!(rx_ready_certificates.try_recv().is_err());
execution_scheduler.check_empty_for_testing();
let obj_id = ObjectID::random();
let receiving_object_new0 =
Object::with_id_owner_version_for_testing(obj_id, 0.into(), Owner::AddressOwner(owner));
let receiving_object_new1 =
Object::with_id_owner_version_for_testing(obj_id, 1.into(), Owner::AddressOwner(owner));
let receiving_object_arg0 =
ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
let receive_object_transaction0 = make_transaction(
gas_objects[0].clone(),
vec![CallArg::Object(receiving_object_arg0)],
);
let receiving_object_arg1 =
ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
let receive_object_transaction1 = make_transaction(
gas_objects[0].clone(),
vec![CallArg::Object(receiving_object_arg1)],
);
execution_scheduler.enqueue_transactions(
vec![(
receive_object_transaction0.clone(),
ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
)],
&state.epoch_store_for_testing(),
);
sleep(Duration::from_secs(1)).await;
assert!(rx_ready_certificates.try_recv().is_err());
assert_eq!(execution_scheduler.num_pending_certificates(), 1);
execution_scheduler.enqueue_transactions(
vec![(
receive_object_transaction1.clone(),
ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
)],
&state.epoch_store_for_testing(),
);
sleep(Duration::from_secs(1)).await;
assert!(rx_ready_certificates.try_recv().is_err());
assert_eq!(execution_scheduler.num_pending_certificates(), 2);
execution_scheduler.enqueue_transactions(
vec![(
receive_object_transaction0.clone(),
ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
)],
&state.epoch_store_for_testing(),
);
sleep(Duration::from_secs(1)).await;
assert!(rx_ready_certificates.try_recv().is_err());
assert_eq!(execution_scheduler.num_pending_certificates(), 3);
state
.get_cache_writer()
.write_object_entry_for_test(receiving_object_new0.clone());
rx_ready_certificates.recv().await.unwrap();
state
.get_cache_writer()
.write_object_entry_for_test(receiving_object_new1.clone());
rx_ready_certificates.recv().await.unwrap();
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn execution_scheduler_receiving_object_ready_notifications_multiple_of_same_receiving() {
telemetry_subscribers::init_for_testing();
let (owner, _keypair) = deterministic_random_account_key();
let gas_objects: Vec<Object> = (0..10)
.map(|_| {
let gas_object_id = ObjectID::random();
Object::with_id_owner_for_testing(gas_object_id, owner)
})
.collect();
let state = init_state_with_objects(gas_objects.clone()).await;
let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
assert!(rx_ready_certificates.try_recv().is_err());
execution_scheduler.check_empty_for_testing();
let obj_id = ObjectID::random();
let receiving_object_new0 =
Object::with_id_owner_version_for_testing(obj_id, 0.into(), Owner::AddressOwner(owner));
let receiving_object_new1 =
Object::with_id_owner_version_for_testing(obj_id, 1.into(), Owner::AddressOwner(owner));
let receiving_object_arg0 =
ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
let receive_object_transaction0 = make_transaction(
gas_objects[0].clone(),
vec![CallArg::Object(receiving_object_arg0)],
);
let receive_object_transaction01 = make_transaction(
gas_objects[1].clone(),
vec![CallArg::Object(receiving_object_arg0)],
);
let receiving_object_arg1 =
ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
let receive_object_transaction1 = make_transaction(
gas_objects[0].clone(),
vec![CallArg::Object(receiving_object_arg1)],
);
let gas_receiving_arg = ObjectArg::Receiving(gas_objects[3].compute_object_reference());
let tx1 = make_transaction(
gas_objects[0].clone(),
vec![CallArg::Object(gas_receiving_arg)],
);
execution_scheduler.enqueue_transactions(
vec![(
receive_object_transaction0.clone(),
ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
)],
&state.epoch_store_for_testing(),
);
sleep(Duration::from_secs(1)).await;
assert!(rx_ready_certificates.try_recv().is_err());
assert_eq!(execution_scheduler.num_pending_certificates(), 1);
execution_scheduler.enqueue_transactions(
vec![(
receive_object_transaction1.clone(),
ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
)],
&state.epoch_store_for_testing(),
);
sleep(Duration::from_secs(1)).await;
assert!(rx_ready_certificates.try_recv().is_err());
assert_eq!(execution_scheduler.num_pending_certificates(), 2);
execution_scheduler.enqueue_transactions(
vec![(
receive_object_transaction01.clone(),
ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
)],
&state.epoch_store_for_testing(),
);
sleep(Duration::from_secs(1)).await;
assert!(rx_ready_certificates.try_recv().is_err());
assert_eq!(execution_scheduler.num_pending_certificates(), 3);
state
.get_cache_writer()
.write_object_entry_for_test(receiving_object_new0.clone());
rx_ready_certificates.recv().await.unwrap();
rx_ready_certificates.recv().await.unwrap();
assert!(rx_ready_certificates.try_recv().is_err());
execution_scheduler.enqueue_transactions(
vec![(
tx1.clone(),
ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
)],
&state.epoch_store_for_testing(),
);
sleep(Duration::from_secs(1)).await;
rx_ready_certificates.recv().await.unwrap();
state
.get_cache_writer()
.write_object_entry_for_test(receiving_object_new1.clone());
rx_ready_certificates.recv().await.unwrap();
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn execution_scheduler_receiving_object_ready_if_current_version_greater() {
telemetry_subscribers::init_for_testing();
let (owner, _keypair) = deterministic_random_account_key();
let mut gas_objects: Vec<Object> = (0..10)
.map(|_| {
let gas_object_id = ObjectID::random();
Object::with_id_owner_for_testing(gas_object_id, owner)
})
.collect();
let receiving_object = Object::with_id_owner_version_for_testing(
ObjectID::random(),
10.into(),
Owner::AddressOwner(owner),
);
gas_objects.push(receiving_object.clone());
let state = init_state_with_objects(gas_objects.clone()).await;
let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
assert!(rx_ready_certificates.try_recv().is_err());
execution_scheduler.check_empty_for_testing();
let receiving_object_new0 = Object::with_id_owner_version_for_testing(
receiving_object.id(),
0.into(),
Owner::AddressOwner(owner),
);
let receiving_object_new1 = Object::with_id_owner_version_for_testing(
receiving_object.id(),
1.into(),
Owner::AddressOwner(owner),
);
let receiving_object_arg0 =
ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
let receive_object_transaction0 = make_transaction(
gas_objects[0].clone(),
vec![CallArg::Object(receiving_object_arg0)],
);
let receive_object_transaction01 = make_transaction(
gas_objects[1].clone(),
vec![CallArg::Object(receiving_object_arg0)],
);
let receiving_object_arg1 =
ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
let receive_object_transaction1 = make_transaction(
gas_objects[0].clone(),
vec![CallArg::Object(receiving_object_arg1)],
);
execution_scheduler.enqueue_transactions(
vec![(
receive_object_transaction0.clone(),
ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
)],
&state.epoch_store_for_testing(),
);
execution_scheduler.enqueue_transactions(
vec![(
receive_object_transaction01.clone(),
ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
)],
&state.epoch_store_for_testing(),
);
execution_scheduler.enqueue_transactions(
vec![(
receive_object_transaction1.clone(),
ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
)],
&state.epoch_store_for_testing(),
);
sleep(Duration::from_secs(1)).await;
rx_ready_certificates.recv().await.unwrap();
rx_ready_certificates.recv().await.unwrap();
rx_ready_certificates.recv().await.unwrap();
assert!(rx_ready_certificates.try_recv().is_err());
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn execution_scheduler_with_cancelled_transactions() {
let (owner, _keypair) = deterministic_random_account_key();
let gas_object = Object::with_id_owner_for_testing(ObjectID::random(), owner);
let shared_object_1 = Object::shared_for_testing();
let initial_shared_version_1 = shared_object_1.owner().start_version().unwrap();
let shared_object_2 = Object::shared_for_testing();
let initial_shared_version_2 = shared_object_2.owner().start_version().unwrap();
let owned_object = Object::with_id_owner_for_testing(ObjectID::random(), owner);
let state = init_state_with_objects(vec![
gas_object.clone(),
shared_object_1.clone(),
shared_object_2.clone(),
owned_object.clone(),
])
.await;
let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
assert!(rx_ready_certificates.try_recv().is_err());
let shared_object_arg_1 = ObjectArg::SharedObject {
id: shared_object_1.id(),
initial_shared_version: initial_shared_version_1,
mutable: true,
};
let shared_object_arg_2 = ObjectArg::SharedObject {
id: shared_object_2.id(),
initial_shared_version: initial_shared_version_2,
mutable: true,
};
let owned_version = 2000.into();
let mut owned_ref = owned_object.compute_object_reference();
owned_ref.1 = owned_version;
let owned_object_arg = ObjectArg::ImmOrOwnedObject(owned_ref);
let cancelled_transaction = make_transaction(
gas_object.clone(),
vec![
CallArg::Object(shared_object_arg_1),
CallArg::Object(shared_object_arg_2),
CallArg::Object(owned_object_arg),
],
);
let assigned_versions = vec![
(
(
shared_object_1.id(),
shared_object_1.owner().start_version().unwrap(),
),
SequenceNumber::CANCELLED_READ,
),
(
(
shared_object_2.id(),
shared_object_2.owner().start_version().unwrap(),
),
SequenceNumber::CONGESTED,
),
];
execution_scheduler.enqueue_transactions(
vec![(
cancelled_transaction.clone(),
ExecutionEnv::new()
.with_assigned_versions(AssignedVersions::non_withdraw(assigned_versions)),
)],
&state.epoch_store_for_testing(),
);
sleep(Duration::from_secs(1)).await;
assert!(rx_ready_certificates.try_recv().is_err());
assert_eq!(execution_scheduler.num_pending_certificates(), 1);
let mut new_owned_object = owned_object.clone();
new_owned_object
.data
.try_as_move_mut()
.unwrap()
.increment_version_to(owned_version);
state
.get_cache_writer()
.write_object_entry_for_test(new_owned_object);
let available_txn = rx_ready_certificates.recv().await.unwrap().certificate;
assert_eq!(available_txn.digest(), cancelled_transaction.digest());
sleep(Duration::from_secs(1)).await;
assert!(rx_ready_certificates.try_recv().is_err());
execution_scheduler.check_empty_for_testing();
}
}