1use crate::{
5 authority::{
6 AuthorityMetrics, ExecutionEnv, authority_per_epoch_store::AuthorityPerEpochStore,
7 shared_object_version_manager::Schedulable,
8 },
9 execution_cache::{ObjectCacheRead, TransactionCacheRead},
10 execution_scheduler::{
11 ExecutingGuard, PendingCertificateStats,
12 balance_withdraw_scheduler::{
13 BalanceSettlement, ScheduleStatus, TxBalanceWithdraw,
14 scheduler::BalanceWithdrawScheduler,
15 },
16 },
17};
18use futures::stream::{FuturesUnordered, StreamExt};
19use mysten_common::debug_fatal;
20use mysten_metrics::spawn_monitored_task;
21use parking_lot::Mutex;
22use std::{
23 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
24 sync::Arc,
25};
26use sui_config::node::AuthorityOverloadConfig;
27use sui_types::{
28 SUI_ACCUMULATOR_ROOT_OBJECT_ID,
29 base_types::{FullObjectID, ObjectID},
30 digests::TransactionDigest,
31 error::SuiResult,
32 executable_transaction::VerifiedExecutableTransaction,
33 storage::{ChildObjectResolver, InputKey},
34 transaction::{
35 SenderSignedData, SharedInputObject, SharedObjectMutability, TransactionData,
36 TransactionDataAPI, TransactionKey,
37 },
38};
39use tokio::sync::mpsc::UnboundedSender;
40use tokio::time::Instant;
41use tracing::{debug, error, instrument};
42
43use super::{PendingCertificate, overload_tracker::OverloadTracker};
44
45pub(crate) struct BarrierDependencyBuilder {
47 dep_state: BTreeMap<ObjectID, BTreeSet<TransactionDigest>>,
48}
49
50impl BarrierDependencyBuilder {
51 pub fn new() -> Self {
52 Self {
53 dep_state: Default::default(),
54 }
55 }
56
57 pub fn process_tx(
63 &mut self,
64 tx_digest: TransactionDigest,
65 tx: &TransactionData,
66 ) -> BTreeSet<TransactionDigest> {
67 let mut barrier_deps = BTreeSet::new();
68 for SharedInputObject { id, mutability, .. } in tx.kind().shared_input_objects() {
69 match mutability {
70 SharedObjectMutability::NonExclusiveWrite => {
71 self.dep_state.entry(id).or_default().insert(tx_digest);
72 }
73 SharedObjectMutability::Mutable => {
74 if let Some(deps) = self.dep_state.remove(&id) {
77 barrier_deps.extend(deps);
78 }
79 }
80 SharedObjectMutability::Immutable => (),
81 }
82 }
83 barrier_deps
84 }
85}
86
87#[derive(Clone)]
88pub struct ExecutionScheduler {
89 object_cache_read: Arc<dyn ObjectCacheRead>,
90 transaction_cache_read: Arc<dyn TransactionCacheRead>,
91 overload_tracker: Arc<OverloadTracker>,
92 tx_ready_certificates: UnboundedSender<PendingCertificate>,
93 balance_withdraw_scheduler: Arc<Mutex<Option<BalanceWithdrawScheduler>>>,
94 metrics: Arc<AuthorityMetrics>,
95}
96
97struct PendingGuard<'a> {
98 scheduler: &'a ExecutionScheduler,
99 cert: &'a VerifiedExecutableTransaction,
100}
101
102impl<'a> PendingGuard<'a> {
103 pub fn new(scheduler: &'a ExecutionScheduler, cert: &'a VerifiedExecutableTransaction) -> Self {
104 scheduler
105 .metrics
106 .transaction_manager_num_pending_certificates
107 .inc();
108 scheduler
109 .overload_tracker
110 .add_pending_certificate(cert.data());
111 Self { scheduler, cert }
112 }
113}
114
115impl Drop for PendingGuard<'_> {
116 fn drop(&mut self) {
117 self.scheduler
118 .metrics
119 .transaction_manager_num_pending_certificates
120 .dec();
121 self.scheduler
122 .overload_tracker
123 .remove_pending_certificate(self.cert.data());
124 }
125}
126
127impl ExecutionScheduler {
128 pub fn new(
129 object_cache_read: Arc<dyn ObjectCacheRead>,
130 child_object_resolver: Arc<dyn ChildObjectResolver + Send + Sync>,
131 transaction_cache_read: Arc<dyn TransactionCacheRead>,
132 tx_ready_certificates: UnboundedSender<PendingCertificate>,
133 epoch_store: &Arc<AuthorityPerEpochStore>,
134 metrics: Arc<AuthorityMetrics>,
135 ) -> Self {
136 tracing::info!("Creating new ExecutionScheduler");
137 let balance_withdraw_scheduler =
138 Arc::new(Mutex::new(Self::initialize_balance_withdraw_scheduler(
139 epoch_store,
140 &object_cache_read,
141 child_object_resolver,
142 )));
143 Self {
144 object_cache_read,
145 transaction_cache_read,
146 overload_tracker: Arc::new(OverloadTracker::new()),
147 tx_ready_certificates,
148 balance_withdraw_scheduler,
149 metrics,
150 }
151 }
152
153 fn initialize_balance_withdraw_scheduler(
154 epoch_store: &Arc<AuthorityPerEpochStore>,
155 object_cache_read: &Arc<dyn ObjectCacheRead>,
156 child_object_resolver: Arc<dyn ChildObjectResolver + Send + Sync>,
157 ) -> Option<BalanceWithdrawScheduler> {
158 let withdraw_scheduler_enabled =
159 epoch_store.is_validator() && epoch_store.accumulators_enabled();
160 if !withdraw_scheduler_enabled {
161 return None;
162 }
163 let starting_accumulator_version = object_cache_read
164 .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
165 .expect("Accumulator root object must be present if balance accumulator is enabled")
166 .version();
167 Some(BalanceWithdrawScheduler::new(
168 Arc::new(child_object_resolver),
169 starting_accumulator_version,
170 ))
171 }
172
173 #[instrument(level = "debug", skip_all, fields(tx_digest = ?cert.digest()))]
174 async fn schedule_transaction(
175 self,
176 cert: VerifiedExecutableTransaction,
177 execution_env: ExecutionEnv,
178 epoch_store: &Arc<AuthorityPerEpochStore>,
179 ) {
180 let enqueue_time = Instant::now();
181 let tx_digest = cert.digest();
182 let digests = [*tx_digest];
183
184 let tx_data = cert.transaction_data();
185 let input_object_kinds = tx_data
186 .input_objects()
187 .expect("input_objects() cannot fail");
188 let input_object_keys: Vec<_> = epoch_store
189 .get_input_object_keys(
190 &cert.key(),
191 &input_object_kinds,
192 &execution_env.assigned_versions,
193 )
194 .into_iter()
195 .collect();
196
197 let receiving_object_keys: HashSet<_> = tx_data
198 .receiving_objects()
199 .into_iter()
200 .map(|entry| {
201 InputKey::VersionedObject {
202 id: FullObjectID::new(entry.0, None),
204 version: entry.1,
205 }
206 })
207 .collect();
208 let input_and_receiving_keys = [
209 input_object_keys,
210 receiving_object_keys.iter().cloned().collect(),
211 ]
212 .concat();
213
214 let epoch = epoch_store.epoch();
215 debug!(
216 ?tx_digest,
217 "Scheduled transaction, waiting for input objects: {:?}", input_and_receiving_keys,
218 );
219
220 let availability = self
221 .object_cache_read
222 .multi_input_objects_available_cache_only(&input_and_receiving_keys);
223 let missing_input_keys: Vec<_> = input_and_receiving_keys
227 .into_iter()
228 .zip(availability)
229 .filter_map(|(key, available)| if !available { Some(key) } else { None })
230 .collect();
231
232 let has_missing_barrier_dependencies = self
233 .transaction_cache_read
234 .multi_get_executed_effects_digests(&execution_env.barrier_dependencies)
235 .into_iter()
236 .any(|r| r.is_none());
237
238 if missing_input_keys.is_empty() && !has_missing_barrier_dependencies {
239 self.metrics
240 .transaction_manager_num_enqueued_certificates
241 .with_label_values(&["ready"])
242 .inc();
243 debug!(?tx_digest, "Input objects already available");
244 self.send_transaction_for_execution(&cert, execution_env, enqueue_time);
245 return;
246 }
247
248 let _pending_guard = PendingGuard::new(&self, &cert);
249 self.metrics
250 .transaction_manager_num_enqueued_certificates
251 .with_label_values(&["pending"])
252 .inc();
253
254 if !execution_env.barrier_dependencies.is_empty() {
255 debug!(
256 "waiting for barrier dependencies to be executed: {:?}",
257 execution_env.barrier_dependencies
258 );
259 self.transaction_cache_read
260 .notify_read_executed_effects_digests(
261 "wait_for_barrier_dependencies",
262 &execution_env.barrier_dependencies,
263 )
264 .await;
265 }
266
267 tokio::select! {
268 _ = self.object_cache_read
269 .notify_read_input_objects(&missing_input_keys, &receiving_object_keys, epoch)
270 => {
271 self.metrics
272 .transaction_manager_transaction_queue_age_s
273 .observe(enqueue_time.elapsed().as_secs_f64());
274 debug!(?tx_digest, "Input objects available");
275 self.send_transaction_for_execution(
277 &cert,
278 execution_env,
279 enqueue_time,
280 );
281 }
282 _ = self.transaction_cache_read.notify_read_executed_effects_digests(
283 "ExecutionScheduler::notify_read_executed_effects_digests",
284 &digests,
285 ) => {
286 debug!(?tx_digest, "Transaction already executed");
287 }
288 };
289 }
290
291 fn send_transaction_for_execution(
292 &self,
293 cert: &VerifiedExecutableTransaction,
294 execution_env: ExecutionEnv,
295 enqueue_time: Instant,
296 ) {
297 let pending_cert = PendingCertificate {
298 certificate: cert.clone(),
299 execution_env,
300 stats: PendingCertificateStats {
301 enqueue_time,
302 ready_time: Some(Instant::now()),
303 },
304 executing_guard: Some(ExecutingGuard::new(
305 self.metrics
306 .transaction_manager_num_executing_certificates
307 .clone(),
308 )),
309 };
310 let _ = self.tx_ready_certificates.send(pending_cert);
311 }
312
313 fn schedule_balance_withdraws(
314 &self,
315 certs: Vec<(VerifiedExecutableTransaction, ExecutionEnv)>,
316 epoch_store: &Arc<AuthorityPerEpochStore>,
317 ) {
318 if certs.is_empty() {
319 return;
320 }
321 let mut withdraws = BTreeMap::new();
322 let mut prev_version = None;
323 for (cert, env) in &certs {
324 let tx_withdraws = cert
325 .transaction_data()
326 .process_funds_withdrawals_for_execution();
327 assert!(!tx_withdraws.is_empty());
328 let accumulator_version = env
329 .assigned_versions
330 .accumulator_version
331 .expect("accumulator_version must be set when there are withdraws");
332 if let Some(prev_version) = prev_version {
333 assert!(prev_version <= accumulator_version);
335 }
336 prev_version = Some(accumulator_version);
337 let tx_digest = *cert.digest();
338 withdraws
339 .entry(accumulator_version)
340 .or_insert(Vec::new())
341 .push(TxBalanceWithdraw {
342 tx_digest,
343 reservations: tx_withdraws,
344 });
345 }
346 let mut receivers = FuturesUnordered::new();
347 {
348 let guard = self.balance_withdraw_scheduler.lock();
349 let withdraw_scheduler = guard
350 .as_ref()
351 .expect("Balance withdraw scheduler must be enabled if there are withdraws");
352 for (version, tx_withdraws) in withdraws {
353 receivers.extend(withdraw_scheduler.schedule_withdraws(version, tx_withdraws));
354 }
355 }
357 let scheduler = self.clone();
358 let epoch_store = epoch_store.clone();
359 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
360 let mut cert_map = HashMap::new();
361 for (cert, env) in certs {
362 cert_map.insert(*cert.digest(), (cert, env));
363 }
364 while let Some(result) = receivers.next().await {
365 match result {
366 Ok(result) => match result.status {
367 ScheduleStatus::InsufficientBalance => {
368 let tx_digest = result.tx_digest;
369 debug!(
370 ?tx_digest,
371 "Balance withdraw scheduling result: Insufficient balance"
372 );
373 let (cert, env) = cert_map.remove(&tx_digest).expect("cert must exist");
374 let env = env.with_insufficient_balance();
375 scheduler.enqueue_transactions(vec![(cert, env)], &epoch_store);
376 }
377 ScheduleStatus::SufficientBalance => {
378 let tx_digest = result.tx_digest;
379 debug!(?tx_digest, "Balance withdraw scheduling result: Success");
380 let (cert, env) = cert_map.remove(&tx_digest).expect("cert must exist");
381 let env = env.with_sufficient_balance();
382 scheduler.enqueue_transactions(vec![(cert, env)], &epoch_store);
383 }
384 ScheduleStatus::SkipSchedule => {
385 let tx_digest = result.tx_digest;
386 debug!(?tx_digest, "Skip scheduling balance withdraw");
387 }
388 },
389 Err(e) => {
390 error!("Withdraw scheduler stopped: {:?}", e);
391 }
392 }
393 }
394 }));
395 }
396
397 fn schedule_settlement_transactions(
398 &self,
399 settlement_txns: Vec<(TransactionKey, ExecutionEnv)>,
400 epoch_store: &Arc<AuthorityPerEpochStore>,
401 ) {
402 if !settlement_txns.is_empty() {
403 let scheduler = self.clone();
404 let epoch_store = epoch_store.clone();
405
406 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
407 let mut futures: FuturesUnordered<_> =
408 settlement_txns
409 .into_iter()
410 .map(|(key, env)| {
411 let epoch_store = epoch_store.clone();
412 async move {
413 (epoch_store.wait_for_settlement_transactions(key).await, env)
414 }
415 })
416 .collect();
417
418 while let Some((txns, env)) = futures.next().await {
419 let mut barrier_deps = BarrierDependencyBuilder::new();
420 let txns = txns
421 .into_iter()
422 .map(|tx| {
423 let deps = barrier_deps.process_tx(*tx.digest(), tx.transaction_data());
424 let env = env.clone().with_barrier_dependencies(deps);
425 (tx, env)
426 })
427 .collect::<Vec<_>>();
428
429 scheduler.enqueue_transactions(txns, &epoch_store);
430 }
431 }));
432 }
433 }
434
435 fn schedule_tx_keys(
436 &self,
437 tx_with_keys: Vec<(TransactionKey, ExecutionEnv)>,
438 epoch_store: &Arc<AuthorityPerEpochStore>,
439 ) {
440 if tx_with_keys.is_empty() {
441 return;
442 }
443
444 let scheduler = self.clone();
445 let epoch_store = epoch_store.clone();
446 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
447 let tx_keys: Vec<_> = tx_with_keys.iter().map(|(key, _)| key).cloned().collect();
448 let digests = epoch_store
449 .notify_read_tx_key_to_digest(&tx_keys)
450 .await
451 .expect("db error");
452 let transactions = scheduler
453 .transaction_cache_read
454 .multi_get_transaction_blocks(&digests)
455 .into_iter()
456 .map(|tx| {
457 let tx = tx.expect("tx must exist").as_ref().clone();
458 VerifiedExecutableTransaction::new_system(tx, epoch_store.epoch())
459 })
460 .zip(tx_with_keys.into_iter().map(|(_, env)| env))
461 .collect::<Vec<_>>();
462 scheduler.enqueue_transactions(transactions, &epoch_store);
463 }));
464 }
465
466 #[cfg(debug_assertions)]
469 fn assert_cert_not_executed_previous_epochs(&self, cert: &VerifiedExecutableTransaction) {
470 let epoch = cert.epoch();
471 let digest = *cert.digest();
472 let digests = [digest];
473 let executed = self
474 .transaction_cache_read
475 .multi_get_executed_effects(&digests)
476 .pop()
477 .unwrap();
478 if let Some(executed) = executed {
481 use sui_types::effects::TransactionEffectsAPI;
482
483 assert_eq!(
484 executed.executed_epoch(),
485 epoch,
486 "Transaction {} was executed in epoch {}, but scheduled again in epoch {}",
487 digest,
488 executed.executed_epoch(),
489 epoch
490 );
491 }
492 }
493}
494
495impl ExecutionScheduler {
496 pub fn enqueue(
497 &self,
498 certs: Vec<(Schedulable, ExecutionEnv)>,
499 epoch_store: &Arc<AuthorityPerEpochStore>,
500 ) {
501 let mut ordinary_txns = Vec::with_capacity(certs.len());
503 let mut tx_with_keys = Vec::new();
504 let mut tx_with_withdraws = Vec::new();
505 let mut settlement_txns = Vec::new();
506
507 for (schedulable, env) in certs {
508 match schedulable {
509 Schedulable::Transaction(tx) => {
510 if tx.transaction_data().has_funds_withdrawals() {
511 tx_with_withdraws.push((tx, env));
512 } else {
513 ordinary_txns.push((tx, env));
514 }
515 }
516 s @ Schedulable::RandomnessStateUpdate(..) => {
517 tx_with_keys.push((s.key(), env));
518 }
519 Schedulable::AccumulatorSettlement(_, _) => {
520 settlement_txns.push((schedulable.key(), env));
521 }
522 Schedulable::ConsensusCommitPrologue(_, _, _) => {
523 unreachable!("Schedulable::ConsensusCommitPrologue should not be enqueued");
527 }
528 }
529 }
530
531 self.enqueue_transactions(ordinary_txns, epoch_store);
532 self.schedule_tx_keys(tx_with_keys, epoch_store);
533 self.schedule_balance_withdraws(tx_with_withdraws, epoch_store);
534 self.schedule_settlement_transactions(settlement_txns, epoch_store);
535 }
536
537 pub fn enqueue_transactions(
538 &self,
539 certs: Vec<(VerifiedExecutableTransaction, ExecutionEnv)>,
540 epoch_store: &Arc<AuthorityPerEpochStore>,
541 ) {
542 let certs: Vec<_> = certs
544 .into_iter()
545 .filter_map(|cert| {
546 if cert.0.epoch() == epoch_store.epoch() {
547 #[cfg(debug_assertions)]
548 self.assert_cert_not_executed_previous_epochs(&cert.0);
549
550 Some(cert)
551 } else {
552 debug_fatal!(
553 "We should never enqueue certificate from wrong epoch. Expected={} Certificate={:?}",
554 epoch_store.epoch(),
555 cert.0.epoch()
556 );
557 None
558 }
559 })
560 .collect();
561 let digests: Vec<_> = certs.iter().map(|(cert, _)| *cert.digest()).collect();
562 let executed = self
563 .transaction_cache_read
564 .multi_get_executed_effects_digests(&digests);
565 let mut already_executed_certs_num = 0;
566 let pending_certs =
567 certs
568 .into_iter()
569 .zip(executed)
570 .filter_map(|((cert, execution_env), executed)| {
571 if executed.is_none() {
572 Some((cert, execution_env))
573 } else {
574 already_executed_certs_num += 1;
575 None
576 }
577 });
578
579 for (cert, execution_env) in pending_certs {
580 let scheduler = self.clone();
581 let epoch_store = epoch_store.clone();
582 spawn_monitored_task!(
583 epoch_store.within_alive_epoch(scheduler.schedule_transaction(
584 cert,
585 execution_env,
586 &epoch_store,
587 ))
588 );
589 }
590
591 self.metrics
592 .transaction_manager_num_enqueued_certificates
593 .with_label_values(&["already_executed"])
594 .inc_by(already_executed_certs_num);
595 }
596
597 pub fn settle_balances(&self, settlement: BalanceSettlement) {
598 self.balance_withdraw_scheduler
599 .lock()
600 .as_ref()
601 .expect("Balance withdraw scheduler must be enabled if there are settlements")
602 .settle_balances(settlement);
603 }
604
605 pub fn reconfigure(
608 &self,
609 new_epoch_store: &Arc<AuthorityPerEpochStore>,
610 child_object_resolver: &Arc<dyn ChildObjectResolver + Send + Sync>,
611 ) {
612 let scheduler = Self::initialize_balance_withdraw_scheduler(
613 new_epoch_store,
614 &self.object_cache_read,
615 child_object_resolver.clone(),
616 );
617 let mut guard = self.balance_withdraw_scheduler.lock();
618 if let Some(old_scheduler) = guard.as_ref() {
619 old_scheduler.close_epoch();
620 }
621 *guard = scheduler;
622 }
623
624 pub fn check_execution_overload(
625 &self,
626 overload_config: &AuthorityOverloadConfig,
627 tx_data: &SenderSignedData,
628 ) -> SuiResult {
629 let inflight_queue_len = self.num_pending_certificates();
630 self.overload_tracker
631 .check_execution_overload(overload_config, tx_data, inflight_queue_len)
632 }
633
634 pub fn num_pending_certificates(&self) -> usize {
635 (self
636 .metrics
637 .transaction_manager_num_pending_certificates
638 .get()
639 + self
640 .metrics
641 .transaction_manager_num_executing_certificates
642 .get()) as usize
643 }
644
645 #[cfg(test)]
646 pub fn check_empty_for_testing(&self) {
647 assert_eq!(self.num_pending_certificates(), 0);
648 }
649}
650
651#[cfg(test)]
652mod test {
653 use super::{BarrierDependencyBuilder, ExecutionScheduler, PendingCertificate};
654 use crate::authority::ExecutionEnv;
655 use crate::authority::shared_object_version_manager::AssignedVersions;
656 use crate::authority::{AuthorityState, authority_tests::init_state_with_objects};
657 use crate::execution_scheduler::SchedulingSource;
658 use std::collections::BTreeSet;
659 use std::{time::Duration, vec};
660 use sui_test_transaction_builder::TestTransactionBuilder;
661 use sui_types::base_types::{SuiAddress, random_object_ref};
662 use sui_types::executable_transaction::VerifiedExecutableTransaction;
663 use sui_types::object::Owner;
664 use sui_types::programmable_transaction_builder::ProgrammableTransactionBuilder;
665 use sui_types::transaction::{
666 SharedObjectMutability, Transaction, TransactionData, TransactionKind, VerifiedTransaction,
667 };
668 use sui_types::{
669 SUI_FRAMEWORK_PACKAGE_ID,
670 base_types::{ObjectID, SequenceNumber},
671 crypto::deterministic_random_account_key,
672 object::Object,
673 transaction::{CallArg, ObjectArg},
674 };
675 use tokio::time::Instant;
676 use tokio::{
677 sync::mpsc::{UnboundedReceiver, error::TryRecvError, unbounded_channel},
678 time::sleep,
679 };
680
681 #[allow(clippy::disallowed_methods)] fn make_execution_scheduler(
683 state: &AuthorityState,
684 ) -> (ExecutionScheduler, UnboundedReceiver<PendingCertificate>) {
685 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
688 let execution_scheduler = ExecutionScheduler::new(
689 state.get_object_cache_reader().clone(),
690 state.get_child_object_resolver().clone(),
691 state.get_transaction_cache_reader().clone(),
692 tx_ready_certificates,
693 &state.epoch_store_for_testing(),
694 state.metrics.clone(),
695 );
696
697 (execution_scheduler, rx_ready_certificates)
698 }
699
700 fn make_transaction(gas_object: Object, input: Vec<CallArg>) -> VerifiedExecutableTransaction {
701 let rgp = 100;
704 let (sender, keypair) = deterministic_random_account_key();
705 let transaction =
706 TestTransactionBuilder::new(sender, gas_object.compute_object_reference(), rgp)
707 .move_call(SUI_FRAMEWORK_PACKAGE_ID, "counter", "assert_value", input)
708 .build_and_sign(&keypair);
709 VerifiedExecutableTransaction::new_system(
710 VerifiedTransaction::new_unchecked(transaction),
711 0,
712 )
713 }
714
715 #[tokio::test(flavor = "current_thread", start_paused = true)]
716 async fn execution_scheduler_basics() {
717 let (owner, _keypair) = deterministic_random_account_key();
719 let gas_objects: Vec<Object> = (0..10)
720 .map(|_| {
721 let gas_object_id = ObjectID::random();
722 Object::with_id_owner_for_testing(gas_object_id, owner)
723 })
724 .collect();
725 let state = init_state_with_objects(gas_objects.clone()).await;
726
727 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
730 assert!(
732 rx_ready_certificates
733 .try_recv()
734 .is_err_and(|err| err == TryRecvError::Empty)
735 );
736 assert_eq!(execution_scheduler.num_pending_certificates(), 0);
738
739 execution_scheduler.enqueue_transactions(vec![], &state.epoch_store_for_testing());
741 assert!(
743 rx_ready_certificates
744 .try_recv()
745 .is_err_and(|err| err == TryRecvError::Empty)
746 );
747
748 let transaction = make_transaction(gas_objects[0].clone(), vec![]);
750 let tx_start_time = Instant::now();
751 execution_scheduler.enqueue_transactions(
752 vec![(
753 transaction.clone(),
754 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
755 )],
756 &state.epoch_store_for_testing(),
757 );
758 let pending_certificate = rx_ready_certificates.recv().await.unwrap();
760
761 assert!(pending_certificate.stats.enqueue_time >= tx_start_time);
763 assert!(
764 pending_certificate.stats.ready_time.unwrap() >= pending_certificate.stats.enqueue_time
765 );
766
767 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
768
769 drop(pending_certificate);
771
772 execution_scheduler.check_empty_for_testing();
774
775 let gas_object_new = Object::with_id_owner_version_for_testing(
777 ObjectID::random(),
778 0.into(),
779 Owner::AddressOwner(owner),
780 );
781 let transaction = make_transaction(gas_object_new.clone(), vec![]);
782 let tx_start_time = Instant::now();
783 execution_scheduler.enqueue_transactions(
784 vec![(
785 transaction.clone(),
786 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
787 )],
788 &state.epoch_store_for_testing(),
789 );
790 sleep(Duration::from_secs(1)).await;
792 assert!(
793 rx_ready_certificates
794 .try_recv()
795 .is_err_and(|err| err == TryRecvError::Empty)
796 );
797
798 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
799
800 execution_scheduler.enqueue_transactions(
802 vec![(
803 transaction.clone(),
804 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
805 )],
806 &state.epoch_store_for_testing(),
807 );
808 sleep(Duration::from_secs(1)).await;
809 assert!(
810 rx_ready_certificates
811 .try_recv()
812 .is_err_and(|err| err == TryRecvError::Empty)
813 );
814
815 assert_eq!(execution_scheduler.num_pending_certificates(), 2);
816
817 state
819 .get_cache_writer()
820 .write_object_entry_for_test(gas_object_new);
821 let pending_certificate = rx_ready_certificates.recv().await.unwrap();
824 let pending_certificate2 = rx_ready_certificates.recv().await.unwrap();
825 assert_eq!(
826 pending_certificate.certificate.digest(),
827 pending_certificate2.certificate.digest()
828 );
829
830 assert!(pending_certificate.stats.enqueue_time >= tx_start_time);
833 assert!(
834 pending_certificate.stats.ready_time.unwrap() - pending_certificate.stats.enqueue_time
835 >= Duration::from_secs(2)
836 );
837
838 drop(pending_certificate);
840 drop(pending_certificate2);
841
842 execution_scheduler.check_empty_for_testing();
844 }
845
846 #[tokio::test(flavor = "current_thread", start_paused = true)]
855 async fn execution_scheduler_object_dependency() {
856 telemetry_subscribers::init_for_testing();
857 let (owner, _keypair) = deterministic_random_account_key();
859 let gas_objects: Vec<Object> = (0..10)
860 .map(|_| {
861 let gas_object_id = ObjectID::random();
862 Object::with_id_owner_for_testing(gas_object_id, owner)
863 })
864 .collect();
865 let shared_object = Object::shared_for_testing();
866 let initial_shared_version = shared_object.owner().start_version().unwrap();
867 let shared_object_2 = Object::shared_for_testing();
868 let initial_shared_version_2 = shared_object_2.owner().start_version().unwrap();
869
870 let state = init_state_with_objects(
871 [
872 gas_objects.clone(),
873 vec![shared_object.clone(), shared_object_2.clone()],
874 ]
875 .concat(),
876 )
877 .await;
878
879 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
882 assert!(rx_ready_certificates.try_recv().is_err());
884
885 let shared_version = 1000.into();
887 let shared_object_arg_read = ObjectArg::SharedObject {
888 id: shared_object.id(),
889 initial_shared_version,
890 mutability: SharedObjectMutability::Immutable,
891 };
892 let transaction_read_0 = make_transaction(
893 gas_objects[0].clone(),
894 vec![CallArg::Object(shared_object_arg_read)],
895 );
896 let transaction_read_1 = make_transaction(
897 gas_objects[1].clone(),
898 vec![CallArg::Object(shared_object_arg_read)],
899 );
900 let tx_read_0_assigned_versions = vec![(
901 (
902 shared_object.id(),
903 shared_object.owner().start_version().unwrap(),
904 ),
905 shared_version,
906 )];
907 let tx_read_1_assigned_versions = vec![(
908 (
909 shared_object.id(),
910 shared_object.owner().start_version().unwrap(),
911 ),
912 shared_version,
913 )];
914
915 let shared_object_arg_default = ObjectArg::SharedObject {
917 id: shared_object.id(),
918 initial_shared_version,
919 mutability: SharedObjectMutability::Mutable,
920 };
921 let transaction_default = make_transaction(
922 gas_objects[2].clone(),
923 vec![CallArg::Object(shared_object_arg_default)],
924 );
925 let tx_default_assigned_versions = vec![(
926 (
927 shared_object.id(),
928 shared_object.owner().start_version().unwrap(),
929 ),
930 shared_version,
931 )];
932
933 let shared_version_2 = 1000.into();
935 let shared_object_arg_read_2 = ObjectArg::SharedObject {
936 id: shared_object_2.id(),
937 initial_shared_version: initial_shared_version_2,
938 mutability: SharedObjectMutability::Immutable,
939 };
940 let transaction_read_2 = make_transaction(
941 gas_objects[3].clone(),
942 vec![
943 CallArg::Object(shared_object_arg_default),
944 CallArg::Object(shared_object_arg_read_2),
945 ],
946 );
947 let tx_read_2_assigned_versions = vec![
948 (
949 (
950 shared_object.id(),
951 shared_object.owner().start_version().unwrap(),
952 ),
953 shared_version,
954 ),
955 (
956 (
957 shared_object_2.id(),
958 shared_object_2.owner().start_version().unwrap(),
959 ),
960 shared_version_2,
961 ),
962 ];
963
964 execution_scheduler.enqueue_transactions(
965 vec![
966 (
967 transaction_read_0.clone(),
968 ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
969 tx_read_0_assigned_versions,
970 None,
971 )),
972 ),
973 (
974 transaction_read_1.clone(),
975 ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
976 tx_read_1_assigned_versions,
977 None,
978 )),
979 ),
980 (
981 transaction_default.clone(),
982 ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
983 tx_default_assigned_versions,
984 None,
985 )),
986 ),
987 (
988 transaction_read_2.clone(),
989 ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
990 tx_read_2_assigned_versions,
991 None,
992 )),
993 ),
994 ],
995 &state.epoch_store_for_testing(),
996 );
997
998 sleep(Duration::from_secs(1)).await;
1000 assert!(rx_ready_certificates.try_recv().is_err());
1001
1002 assert_eq!(execution_scheduler.num_pending_certificates(), 4);
1003
1004 let mut new_shared_object = shared_object.clone();
1006 new_shared_object
1007 .data
1008 .try_as_move_mut()
1009 .unwrap()
1010 .increment_version_to(shared_version_2);
1011 state
1012 .get_cache_writer()
1013 .write_object_entry_for_test(new_shared_object);
1014
1015 let tx_0 = rx_ready_certificates.recv().await.unwrap().certificate;
1017 let tx_1 = rx_ready_certificates.recv().await.unwrap().certificate;
1018 let tx_2 = rx_ready_certificates.recv().await.unwrap().certificate;
1019 {
1020 let mut want_digests = vec![
1021 transaction_read_0.digest(),
1022 transaction_read_1.digest(),
1023 transaction_default.digest(),
1024 ];
1025 want_digests.sort();
1026 let mut got_digests = vec![tx_0.digest(), tx_1.digest(), tx_2.digest()];
1027 got_digests.sort();
1028 assert_eq!(want_digests, got_digests);
1029 }
1030
1031 sleep(Duration::from_secs(1)).await;
1032 assert!(rx_ready_certificates.try_recv().is_err());
1033
1034 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1035
1036 let mut new_shared_object_2 = shared_object_2.clone();
1038 new_shared_object_2
1039 .data
1040 .try_as_move_mut()
1041 .unwrap()
1042 .increment_version_to(shared_version_2);
1043 state
1044 .get_cache_writer()
1045 .write_object_entry_for_test(new_shared_object_2);
1046
1047 let tx_3 = rx_ready_certificates.recv().await.unwrap().certificate;
1049 assert_eq!(transaction_read_2.digest(), tx_3.digest());
1050
1051 sleep(Duration::from_secs(1)).await;
1052 assert!(rx_ready_certificates.try_recv().is_err());
1053
1054 execution_scheduler.check_empty_for_testing();
1055 }
1056
1057 #[tokio::test(flavor = "current_thread", start_paused = true)]
1058 async fn execution_scheduler_receiving_notify_commit() {
1059 telemetry_subscribers::init_for_testing();
1060 let (owner, _keypair) = deterministic_random_account_key();
1062 let gas_objects: Vec<Object> = (0..10)
1063 .map(|_| {
1064 let gas_object_id = ObjectID::random();
1065 Object::with_id_owner_for_testing(gas_object_id, owner)
1066 })
1067 .collect();
1068 let state = init_state_with_objects(gas_objects.clone()).await;
1069
1070 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1073 assert!(rx_ready_certificates.try_recv().is_err());
1075 execution_scheduler.check_empty_for_testing();
1077
1078 let obj_id = ObjectID::random();
1079 let object_arguments: Vec<_> = (0..10)
1080 .map(|i| {
1081 let object = Object::with_id_owner_version_for_testing(
1082 obj_id,
1083 i.into(),
1084 Owner::AddressOwner(owner),
1085 );
1086 let object_arg = if i % 2 == 0 || i == 3 {
1093 ObjectArg::Receiving(object.compute_object_reference())
1094 } else {
1095 ObjectArg::ImmOrOwnedObject(object.compute_object_reference())
1096 };
1097 let txn =
1098 make_transaction(gas_objects[0].clone(), vec![CallArg::Object(object_arg)]);
1099 (object, txn)
1100 })
1101 .collect();
1102
1103 for (i, (_, txn)) in object_arguments.iter().enumerate() {
1104 execution_scheduler.enqueue_transactions(
1107 vec![(
1108 txn.clone(),
1109 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1110 )],
1111 &state.epoch_store_for_testing(),
1112 );
1113 sleep(Duration::from_secs(1)).await;
1114 assert!(rx_ready_certificates.try_recv().is_err());
1115 assert_eq!(execution_scheduler.num_pending_certificates(), i + 1);
1116 }
1117
1118 let len = object_arguments.len();
1121 for (i, (object, txn)) in object_arguments.into_iter().enumerate() {
1122 state
1125 .get_cache_writer()
1126 .write_object_entry_for_test(object.clone());
1127
1128 rx_ready_certificates.recv().await.unwrap();
1131
1132 sleep(Duration::from_secs(1)).await;
1135 assert!(rx_ready_certificates.try_recv().is_err());
1136
1137 drop(txn);
1139
1140 assert_eq!(execution_scheduler.num_pending_certificates(), len - i - 1);
1143 }
1144
1145 execution_scheduler.check_empty_for_testing();
1147 }
1148
1149 #[tokio::test(flavor = "current_thread", start_paused = true)]
1150 async fn execution_scheduler_receiving_object_ready_notifications() {
1151 telemetry_subscribers::init_for_testing();
1152 let (owner, _keypair) = deterministic_random_account_key();
1154 let gas_objects: Vec<Object> = (0..10)
1155 .map(|_| {
1156 let gas_object_id = ObjectID::random();
1157 Object::with_id_owner_for_testing(gas_object_id, owner)
1158 })
1159 .collect();
1160 let state = init_state_with_objects(gas_objects.clone()).await;
1161
1162 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1165 assert!(rx_ready_certificates.try_recv().is_err());
1167 execution_scheduler.check_empty_for_testing();
1169
1170 let obj_id = ObjectID::random();
1171 let receiving_object_new0 =
1172 Object::with_id_owner_version_for_testing(obj_id, 0.into(), Owner::AddressOwner(owner));
1173 let receiving_object_new1 =
1174 Object::with_id_owner_version_for_testing(obj_id, 1.into(), Owner::AddressOwner(owner));
1175 let receiving_object_arg0 =
1176 ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1177 let receive_object_transaction0 = make_transaction(
1178 gas_objects[0].clone(),
1179 vec![CallArg::Object(receiving_object_arg0)],
1180 );
1181
1182 let receiving_object_arg1 =
1183 ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1184 let receive_object_transaction1 = make_transaction(
1185 gas_objects[0].clone(),
1186 vec![CallArg::Object(receiving_object_arg1)],
1187 );
1188
1189 execution_scheduler.enqueue_transactions(
1191 vec![(
1192 receive_object_transaction0.clone(),
1193 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1194 )],
1195 &state.epoch_store_for_testing(),
1196 );
1197 sleep(Duration::from_secs(1)).await;
1198 assert!(rx_ready_certificates.try_recv().is_err());
1199 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1200
1201 execution_scheduler.enqueue_transactions(
1203 vec![(
1204 receive_object_transaction1.clone(),
1205 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1206 )],
1207 &state.epoch_store_for_testing(),
1208 );
1209 sleep(Duration::from_secs(1)).await;
1210 assert!(rx_ready_certificates.try_recv().is_err());
1211 assert_eq!(execution_scheduler.num_pending_certificates(), 2);
1212
1213 execution_scheduler.enqueue_transactions(
1215 vec![(
1216 receive_object_transaction0.clone(),
1217 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1218 )],
1219 &state.epoch_store_for_testing(),
1220 );
1221 sleep(Duration::from_secs(1)).await;
1222 assert!(rx_ready_certificates.try_recv().is_err());
1223 assert_eq!(execution_scheduler.num_pending_certificates(), 3);
1224
1225 state
1227 .get_cache_writer()
1228 .write_object_entry_for_test(receiving_object_new0.clone());
1229
1230 rx_ready_certificates.recv().await.unwrap();
1233
1234 state
1236 .get_cache_writer()
1237 .write_object_entry_for_test(receiving_object_new1.clone());
1238
1239 rx_ready_certificates.recv().await.unwrap();
1242 }
1243
1244 #[tokio::test(flavor = "current_thread", start_paused = true)]
1245 async fn execution_scheduler_receiving_object_ready_notifications_multiple_of_same_receiving() {
1246 telemetry_subscribers::init_for_testing();
1247 let (owner, _keypair) = deterministic_random_account_key();
1249 let gas_objects: Vec<Object> = (0..10)
1250 .map(|_| {
1251 let gas_object_id = ObjectID::random();
1252 Object::with_id_owner_for_testing(gas_object_id, owner)
1253 })
1254 .collect();
1255 let state = init_state_with_objects(gas_objects.clone()).await;
1256
1257 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1260 assert!(rx_ready_certificates.try_recv().is_err());
1262 execution_scheduler.check_empty_for_testing();
1264
1265 let obj_id = ObjectID::random();
1266 let receiving_object_new0 =
1267 Object::with_id_owner_version_for_testing(obj_id, 0.into(), Owner::AddressOwner(owner));
1268 let receiving_object_new1 =
1269 Object::with_id_owner_version_for_testing(obj_id, 1.into(), Owner::AddressOwner(owner));
1270 let receiving_object_arg0 =
1271 ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1272 let receive_object_transaction0 = make_transaction(
1273 gas_objects[0].clone(),
1274 vec![CallArg::Object(receiving_object_arg0)],
1275 );
1276
1277 let receive_object_transaction01 = make_transaction(
1278 gas_objects[1].clone(),
1279 vec![CallArg::Object(receiving_object_arg0)],
1280 );
1281
1282 let receiving_object_arg1 =
1283 ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1284 let receive_object_transaction1 = make_transaction(
1285 gas_objects[0].clone(),
1286 vec![CallArg::Object(receiving_object_arg1)],
1287 );
1288
1289 let gas_receiving_arg = ObjectArg::Receiving(gas_objects[3].compute_object_reference());
1292 let tx1 = make_transaction(
1293 gas_objects[0].clone(),
1294 vec![CallArg::Object(gas_receiving_arg)],
1295 );
1296
1297 execution_scheduler.enqueue_transactions(
1299 vec![(
1300 receive_object_transaction0.clone(),
1301 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1302 )],
1303 &state.epoch_store_for_testing(),
1304 );
1305 sleep(Duration::from_secs(1)).await;
1306 assert!(rx_ready_certificates.try_recv().is_err());
1307 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1308
1309 execution_scheduler.enqueue_transactions(
1311 vec![(
1312 receive_object_transaction1.clone(),
1313 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1314 )],
1315 &state.epoch_store_for_testing(),
1316 );
1317 sleep(Duration::from_secs(1)).await;
1318 assert!(rx_ready_certificates.try_recv().is_err());
1319 assert_eq!(execution_scheduler.num_pending_certificates(), 2);
1320
1321 execution_scheduler.enqueue_transactions(
1324 vec![(
1325 receive_object_transaction01.clone(),
1326 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1327 )],
1328 &state.epoch_store_for_testing(),
1329 );
1330 sleep(Duration::from_secs(1)).await;
1331 assert!(rx_ready_certificates.try_recv().is_err());
1332 assert_eq!(execution_scheduler.num_pending_certificates(), 3);
1333
1334 state
1336 .get_cache_writer()
1337 .write_object_entry_for_test(receiving_object_new0.clone());
1338
1339 rx_ready_certificates.recv().await.unwrap();
1342
1343 rx_ready_certificates.recv().await.unwrap();
1344
1345 assert!(rx_ready_certificates.try_recv().is_err());
1347
1348 execution_scheduler.enqueue_transactions(
1351 vec![(
1352 tx1.clone(),
1353 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1354 )],
1355 &state.epoch_store_for_testing(),
1356 );
1357 sleep(Duration::from_secs(1)).await;
1358 rx_ready_certificates.recv().await.unwrap();
1359
1360 state
1362 .get_cache_writer()
1363 .write_object_entry_for_test(receiving_object_new1.clone());
1364
1365 rx_ready_certificates.recv().await.unwrap();
1368 }
1369
1370 #[tokio::test(flavor = "current_thread", start_paused = true)]
1371 async fn execution_scheduler_receiving_object_ready_if_current_version_greater() {
1372 telemetry_subscribers::init_for_testing();
1373 let (owner, _keypair) = deterministic_random_account_key();
1375 let mut gas_objects: Vec<Object> = (0..10)
1376 .map(|_| {
1377 let gas_object_id = ObjectID::random();
1378 Object::with_id_owner_for_testing(gas_object_id, owner)
1379 })
1380 .collect();
1381 let receiving_object = Object::with_id_owner_version_for_testing(
1382 ObjectID::random(),
1383 10.into(),
1384 Owner::AddressOwner(owner),
1385 );
1386 gas_objects.push(receiving_object.clone());
1387 let state = init_state_with_objects(gas_objects.clone()).await;
1388
1389 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1392 assert!(rx_ready_certificates.try_recv().is_err());
1394 execution_scheduler.check_empty_for_testing();
1396
1397 let receiving_object_new0 = Object::with_id_owner_version_for_testing(
1398 receiving_object.id(),
1399 0.into(),
1400 Owner::AddressOwner(owner),
1401 );
1402 let receiving_object_new1 = Object::with_id_owner_version_for_testing(
1403 receiving_object.id(),
1404 1.into(),
1405 Owner::AddressOwner(owner),
1406 );
1407 let receiving_object_arg0 =
1408 ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1409 let receive_object_transaction0 = make_transaction(
1410 gas_objects[0].clone(),
1411 vec![CallArg::Object(receiving_object_arg0)],
1412 );
1413
1414 let receive_object_transaction01 = make_transaction(
1415 gas_objects[1].clone(),
1416 vec![CallArg::Object(receiving_object_arg0)],
1417 );
1418
1419 let receiving_object_arg1 =
1420 ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1421 let receive_object_transaction1 = make_transaction(
1422 gas_objects[0].clone(),
1423 vec![CallArg::Object(receiving_object_arg1)],
1424 );
1425
1426 execution_scheduler.enqueue_transactions(
1428 vec![(
1429 receive_object_transaction0.clone(),
1430 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1431 )],
1432 &state.epoch_store_for_testing(),
1433 );
1434 execution_scheduler.enqueue_transactions(
1435 vec![(
1436 receive_object_transaction01.clone(),
1437 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1438 )],
1439 &state.epoch_store_for_testing(),
1440 );
1441 execution_scheduler.enqueue_transactions(
1442 vec![(
1443 receive_object_transaction1.clone(),
1444 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1445 )],
1446 &state.epoch_store_for_testing(),
1447 );
1448 sleep(Duration::from_secs(1)).await;
1449 rx_ready_certificates.recv().await.unwrap();
1450 rx_ready_certificates.recv().await.unwrap();
1451 rx_ready_certificates.recv().await.unwrap();
1452 assert!(rx_ready_certificates.try_recv().is_err());
1453 }
1454
1455 #[tokio::test(flavor = "current_thread", start_paused = true)]
1458 async fn execution_scheduler_with_cancelled_transactions() {
1459 let (owner, _keypair) = deterministic_random_account_key();
1461 let gas_object = Object::with_id_owner_for_testing(ObjectID::random(), owner);
1462 let shared_object_1 = Object::shared_for_testing();
1463 let initial_shared_version_1 = shared_object_1.owner().start_version().unwrap();
1464 let shared_object_2 = Object::shared_for_testing();
1465 let initial_shared_version_2 = shared_object_2.owner().start_version().unwrap();
1466 let owned_object = Object::with_id_owner_for_testing(ObjectID::random(), owner);
1467
1468 let state = init_state_with_objects(vec![
1469 gas_object.clone(),
1470 shared_object_1.clone(),
1471 shared_object_2.clone(),
1472 owned_object.clone(),
1473 ])
1474 .await;
1475
1476 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1479 assert!(rx_ready_certificates.try_recv().is_err());
1481
1482 let shared_object_arg_1 = ObjectArg::SharedObject {
1484 id: shared_object_1.id(),
1485 initial_shared_version: initial_shared_version_1,
1486 mutability: SharedObjectMutability::Mutable,
1487 };
1488 let shared_object_arg_2 = ObjectArg::SharedObject {
1489 id: shared_object_2.id(),
1490 initial_shared_version: initial_shared_version_2,
1491 mutability: SharedObjectMutability::Mutable,
1492 };
1493
1494 let owned_version = 2000.into();
1496 let mut owned_ref = owned_object.compute_object_reference();
1497 owned_ref.1 = owned_version;
1498 let owned_object_arg = ObjectArg::ImmOrOwnedObject(owned_ref);
1499
1500 let cancelled_transaction = make_transaction(
1501 gas_object.clone(),
1502 vec![
1503 CallArg::Object(shared_object_arg_1),
1504 CallArg::Object(shared_object_arg_2),
1505 CallArg::Object(owned_object_arg),
1506 ],
1507 );
1508 let assigned_versions = vec![
1509 (
1510 (
1511 shared_object_1.id(),
1512 shared_object_1.owner().start_version().unwrap(),
1513 ),
1514 SequenceNumber::CANCELLED_READ,
1515 ),
1516 (
1517 (
1518 shared_object_2.id(),
1519 shared_object_2.owner().start_version().unwrap(),
1520 ),
1521 SequenceNumber::CONGESTED,
1522 ),
1523 ];
1524
1525 execution_scheduler.enqueue_transactions(
1526 vec![(
1527 cancelled_transaction.clone(),
1528 ExecutionEnv::new()
1529 .with_assigned_versions(AssignedVersions::new(assigned_versions, None)),
1530 )],
1531 &state.epoch_store_for_testing(),
1532 );
1533
1534 sleep(Duration::from_secs(1)).await;
1536 assert!(rx_ready_certificates.try_recv().is_err());
1537
1538 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1539
1540 let mut new_owned_object = owned_object.clone();
1542 new_owned_object
1543 .data
1544 .try_as_move_mut()
1545 .unwrap()
1546 .increment_version_to(owned_version);
1547 state
1548 .get_cache_writer()
1549 .write_object_entry_for_test(new_owned_object);
1550
1551 let available_txn = rx_ready_certificates.recv().await.unwrap().certificate;
1553 assert_eq!(available_txn.digest(), cancelled_transaction.digest());
1554
1555 sleep(Duration::from_secs(1)).await;
1556 assert!(rx_ready_certificates.try_recv().is_err());
1557
1558 execution_scheduler.check_empty_for_testing();
1559 }
1560
1561 #[test]
1562 fn test_barrier_dependency_builder() {
1563 let make_transaction = |non_exclusive_writes: Vec<u32>, exclusive_writes: Vec<u32>| {
1564 assert!(
1565 non_exclusive_writes
1566 .iter()
1567 .all(|id| !exclusive_writes.contains(id))
1568 );
1569 assert!(
1570 exclusive_writes
1571 .iter()
1572 .all(|id| !non_exclusive_writes.contains(id))
1573 );
1574
1575 let non_exclusive_writes = non_exclusive_writes
1576 .into_iter()
1577 .map(|id| ObjectID::from_single_byte(id as u8));
1578 let exclusive_writes = exclusive_writes
1579 .into_iter()
1580 .map(|id| ObjectID::from_single_byte(id as u8));
1581 let mut builder = ProgrammableTransactionBuilder::new();
1582 for non_exclusive_write in non_exclusive_writes {
1583 builder
1584 .obj(ObjectArg::SharedObject {
1585 id: non_exclusive_write,
1586 initial_shared_version: SequenceNumber::new(),
1587 mutability: SharedObjectMutability::NonExclusiveWrite,
1588 })
1589 .unwrap();
1590 }
1591
1592 for exclusive_write in exclusive_writes {
1593 builder
1594 .obj(ObjectArg::SharedObject {
1595 id: exclusive_write,
1596 initial_shared_version: SequenceNumber::new(),
1597 mutability: SharedObjectMutability::Mutable,
1598 })
1599 .unwrap();
1600 }
1601
1602 let tx = TransactionKind::ProgrammableTransaction(builder.finish());
1603 let tx_data =
1604 TransactionData::new(tx, SuiAddress::default(), random_object_ref(), 1, 1);
1605 Transaction::from_data_and_signer(tx_data, vec![])
1606 };
1607
1608 {
1610 let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1611 let tx1 = make_transaction(vec![1], vec![]);
1612 let tx2 = make_transaction(vec![], vec![1]);
1613
1614 let tx1_deps =
1615 barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1616 let tx2_deps =
1617 barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1618 assert!(tx1_deps.is_empty());
1619 assert_eq!(Vec::from_iter(tx2_deps), vec![*tx1.digest()]);
1620 }
1621
1622 {
1625 let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1626 let tx1 = make_transaction(vec![1, 2], vec![]);
1627 let tx2 = make_transaction(vec![], vec![1]);
1628 let tx3 = make_transaction(vec![], vec![2]);
1629
1630 let tx1_deps =
1631 barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1632 let tx2_deps =
1633 barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1634 let tx3_deps =
1635 barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1636 assert!(tx1_deps.is_empty());
1637 assert_eq!(Vec::from_iter(tx2_deps), vec![*tx1.digest()]);
1638 assert_eq!(Vec::from_iter(tx3_deps), vec![*tx1.digest()]);
1639 }
1640
1641 {
1643 let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1644 let tx1 = make_transaction(vec![1], vec![]);
1645 let tx2 = make_transaction(vec![2], vec![]);
1646 let tx3 = make_transaction(vec![], vec![1, 2]);
1647
1648 let tx1_deps =
1649 barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1650 let tx2_deps =
1651 barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1652 let tx3_deps =
1653 barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1654 assert!(tx1_deps.is_empty());
1655 assert!(tx2_deps.is_empty());
1656 assert_eq!(tx3_deps, BTreeSet::from([*tx1.digest(), *tx2.digest()]));
1657 }
1658
1659 {
1661 let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1662 let tx1 = make_transaction(vec![1], vec![]);
1663 let tx2 = make_transaction(vec![], vec![1]);
1664 let tx3 = make_transaction(vec![], vec![1]);
1665
1666 let tx1_deps =
1667 barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1668 let tx2_deps =
1669 barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1670 let tx3_deps =
1671 barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1672 assert!(tx1_deps.is_empty());
1673 assert_eq!(tx2_deps, BTreeSet::from([*tx1.digest()]));
1674 assert!(tx3_deps.is_empty());
1675 }
1676 }
1677}