1use crate::{
5 accumulators::funds_read::AccountFundsRead,
6 authority::{
7 AuthorityMetrics, ExecutionEnv, authority_per_epoch_store::AuthorityPerEpochStore,
8 shared_object_version_manager::Schedulable,
9 },
10 execution_cache::{ObjectCacheRead, TransactionCacheRead},
11 execution_scheduler::{
12 ExecutingGuard, PendingCertificateStats,
13 funds_withdraw_scheduler::{
14 AddressFundsSchedulerMetrics, FundsSettlement, ScheduleStatus, TxFundsWithdraw,
15 WithdrawReservations, scheduler::FundsWithdrawScheduler,
16 },
17 },
18};
19use futures::stream::{FuturesUnordered, StreamExt};
20use mysten_common::ZipDebugEqIteratorExt;
21use mysten_common::{assert_reachable, debug_fatal};
22use mysten_metrics::spawn_monitored_task;
23use parking_lot::Mutex;
24use std::{
25 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
26 sync::Arc,
27};
28use sui_config::node::{AuthorityOverloadConfig, FundsWithdrawSchedulerType};
29use sui_types::{
30 SUI_ACCUMULATOR_ROOT_OBJECT_ID,
31 base_types::{FullObjectID, ObjectID},
32 digests::TransactionDigest,
33 error::SuiResult,
34 executable_transaction::VerifiedExecutableTransaction,
35 storage::InputKey,
36 transaction::{
37 SenderSignedData, SharedInputObject, SharedObjectMutability, TransactionData,
38 TransactionDataAPI, TransactionKey,
39 },
40};
41use tokio::sync::mpsc::UnboundedSender;
42use tokio::time::Instant;
43use tracing::{debug, error, instrument};
44
45use super::{PendingCertificate, overload_tracker::OverloadTracker};
46
47pub(crate) struct BarrierDependencyBuilder {
49 dep_state: BTreeMap<ObjectID, BTreeSet<TransactionDigest>>,
50}
51
52impl BarrierDependencyBuilder {
53 pub fn new() -> Self {
54 Self {
55 dep_state: Default::default(),
56 }
57 }
58
59 pub fn process_tx(
65 &mut self,
66 tx_digest: TransactionDigest,
67 tx: &TransactionData,
68 ) -> BTreeSet<TransactionDigest> {
69 let mut barrier_deps = BTreeSet::new();
70 for SharedInputObject { id, mutability, .. } in tx.kind().shared_input_objects() {
71 match mutability {
72 SharedObjectMutability::NonExclusiveWrite => {
73 self.dep_state.entry(id).or_default().insert(tx_digest);
74 }
75 SharedObjectMutability::Mutable => {
76 if let Some(deps) = self.dep_state.remove(&id) {
79 barrier_deps.extend(deps);
80 }
81 }
82 SharedObjectMutability::Immutable => (),
83 }
84 }
85 barrier_deps
86 }
87}
88
89#[derive(Clone)]
90pub struct ExecutionScheduler {
91 object_cache_read: Arc<dyn ObjectCacheRead>,
92 transaction_cache_read: Arc<dyn TransactionCacheRead>,
93 overload_tracker: Arc<OverloadTracker>,
94 tx_ready_certificates: UnboundedSender<PendingCertificate>,
95 address_funds_withdraw_scheduler: Arc<Mutex<Option<FundsWithdrawScheduler>>>,
96 funds_withdraw_scheduler_type: FundsWithdrawSchedulerType,
97 metrics: Arc<AuthorityMetrics>,
98 address_funds_scheduler_metrics: Arc<AddressFundsSchedulerMetrics>,
99}
100
101struct PendingGuard<'a> {
102 scheduler: &'a ExecutionScheduler,
103 cert: &'a VerifiedExecutableTransaction,
104}
105
106impl<'a> PendingGuard<'a> {
107 pub fn new(scheduler: &'a ExecutionScheduler, cert: &'a VerifiedExecutableTransaction) -> Self {
108 scheduler
109 .metrics
110 .transaction_manager_num_pending_certificates
111 .inc();
112 scheduler
113 .overload_tracker
114 .add_pending_certificate(cert.data());
115 Self { scheduler, cert }
116 }
117}
118
119impl Drop for PendingGuard<'_> {
120 fn drop(&mut self) {
121 self.scheduler
122 .metrics
123 .transaction_manager_num_pending_certificates
124 .dec();
125 self.scheduler
126 .overload_tracker
127 .remove_pending_certificate(self.cert.data());
128 }
129}
130
131impl ExecutionScheduler {
132 pub fn new(
133 object_cache_read: Arc<dyn ObjectCacheRead>,
134 account_funds_read: Arc<dyn AccountFundsRead>,
135 transaction_cache_read: Arc<dyn TransactionCacheRead>,
136 tx_ready_certificates: UnboundedSender<PendingCertificate>,
137 epoch_store: &Arc<AuthorityPerEpochStore>,
138 funds_withdraw_scheduler_type: FundsWithdrawSchedulerType,
139 metrics: Arc<AuthorityMetrics>,
140 prometheus_registry: &prometheus::Registry,
141 ) -> Self {
142 tracing::info!(
143 ?funds_withdraw_scheduler_type,
144 "Creating new ExecutionScheduler"
145 );
146 let address_funds_scheduler_metrics =
147 Arc::new(AddressFundsSchedulerMetrics::new(prometheus_registry));
148 let address_funds_withdraw_scheduler = Self::initialize_funds_withdraw_scheduler(
149 epoch_store,
150 &object_cache_read,
151 account_funds_read,
152 funds_withdraw_scheduler_type,
153 &address_funds_scheduler_metrics,
154 );
155 Self {
156 object_cache_read,
157 transaction_cache_read,
158 overload_tracker: Arc::new(OverloadTracker::new()),
159 tx_ready_certificates,
160 address_funds_withdraw_scheduler: Arc::new(Mutex::new(
161 address_funds_withdraw_scheduler,
162 )),
163 funds_withdraw_scheduler_type,
164 metrics,
165 address_funds_scheduler_metrics,
166 }
167 }
168
169 fn initialize_funds_withdraw_scheduler(
170 epoch_store: &Arc<AuthorityPerEpochStore>,
171 object_cache_read: &Arc<dyn ObjectCacheRead>,
172 account_funds_read: Arc<dyn AccountFundsRead>,
173 scheduler_type: FundsWithdrawSchedulerType,
174 address_funds_scheduler_metrics: &Arc<AddressFundsSchedulerMetrics>,
175 ) -> Option<FundsWithdrawScheduler> {
176 let withdraw_scheduler_enabled =
177 epoch_store.node_role().runs_consensus() && epoch_store.accumulators_enabled();
178 if !withdraw_scheduler_enabled {
179 return None;
180 }
181 let starting_accumulator_version = object_cache_read
182 .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
183 .expect("Accumulator root object must be present if funds accumulator is enabled")
184 .version();
185 let address_funds_withdraw_scheduler = FundsWithdrawScheduler::new(
186 account_funds_read.clone(),
187 starting_accumulator_version,
188 scheduler_type,
189 address_funds_scheduler_metrics.clone(),
190 );
191
192 Some(address_funds_withdraw_scheduler)
193 }
194
195 #[instrument(level = "debug", skip_all, fields(tx_digest = ?cert.digest()))]
196 async fn schedule_transaction(
197 self,
198 cert: VerifiedExecutableTransaction,
199 execution_env: ExecutionEnv,
200 epoch_store: &Arc<AuthorityPerEpochStore>,
201 ) {
202 let enqueue_time = Instant::now();
203 let tx_digest = cert.digest();
204 let digests = [*tx_digest];
205
206 let tx_data = cert.transaction_data();
207 let input_object_kinds = tx_data
208 .input_objects()
209 .expect("input_objects() cannot fail");
210 let mut input_object_keys: Vec<_> = epoch_store
211 .get_input_object_keys(
212 &cert.key(),
213 &input_object_kinds,
214 &execution_env.assigned_versions,
215 )
216 .into_iter()
217 .collect();
218
219 if tx_data.kind().has_coin_reservations()
224 && let Some(accumulator_version) = execution_env.assigned_versions.accumulator_version
225 && let Some(initial_shared_version) =
226 (**epoch_store.epoch_start_config()).accumulator_root_obj_initial_shared_version()
227 {
228 input_object_keys.push(InputKey::VersionedObject {
229 id: FullObjectID::new(SUI_ACCUMULATOR_ROOT_OBJECT_ID, Some(initial_shared_version)),
230 version: accumulator_version,
231 });
232 }
233
234 let receiving_object_keys: HashSet<_> = tx_data
235 .receiving_objects()
236 .into_iter()
237 .map(|entry| {
238 InputKey::VersionedObject {
239 id: FullObjectID::new(entry.0, None),
241 version: entry.1,
242 }
243 })
244 .collect();
245 let input_and_receiving_keys = [
246 input_object_keys,
247 receiving_object_keys.iter().cloned().collect(),
248 ]
249 .concat();
250
251 let epoch = epoch_store.epoch();
252 debug!(
253 ?tx_digest,
254 "Scheduled transaction, waiting for input objects: {:?}", input_and_receiving_keys,
255 );
256
257 let availability = self
258 .object_cache_read
259 .multi_input_objects_available_cache_only(&input_and_receiving_keys);
260 let missing_input_keys: Vec<_> = input_and_receiving_keys
264 .into_iter()
265 .zip_debug_eq(availability)
266 .filter_map(|(key, available)| if !available { Some(key) } else { None })
267 .collect();
268
269 let has_missing_barrier_dependencies = self
270 .transaction_cache_read
271 .multi_get_executed_effects_digests(&execution_env.barrier_dependencies)
272 .into_iter()
273 .any(|r| r.is_none());
274
275 if missing_input_keys.is_empty() && !has_missing_barrier_dependencies {
276 self.metrics
277 .transaction_manager_num_enqueued_certificates
278 .with_label_values(&["ready"])
279 .inc();
280 debug!(?tx_digest, "Input objects already available");
281 self.send_transaction_for_execution(&cert, execution_env, enqueue_time);
282 return;
283 }
284
285 let _pending_guard = PendingGuard::new(&self, &cert);
286 self.metrics
287 .transaction_manager_num_enqueued_certificates
288 .with_label_values(&["pending"])
289 .inc();
290
291 if !execution_env.barrier_dependencies.is_empty() {
292 debug!(
293 "waiting for barrier dependencies to be executed: {:?}",
294 execution_env.barrier_dependencies
295 );
296 self.transaction_cache_read
297 .notify_read_executed_effects_digests(
298 "wait_for_barrier_dependencies",
299 &execution_env.barrier_dependencies,
300 )
301 .await;
302 }
303
304 tokio::select! {
305 _ = self.object_cache_read
306 .notify_read_input_objects(&missing_input_keys, &receiving_object_keys, epoch)
307 => {
308 self.metrics
309 .transaction_manager_transaction_queue_age_s
310 .observe(enqueue_time.elapsed().as_secs_f64());
311 debug!(?tx_digest, "Input objects available");
312 self.send_transaction_for_execution(
314 &cert,
315 execution_env,
316 enqueue_time,
317 );
318 }
319 _ = self.transaction_cache_read.notify_read_executed_effects_digests(
320 "ExecutionScheduler::notify_read_executed_effects_digests",
321 &digests,
322 ) => {
323 debug!(?tx_digest, "Transaction already executed");
324 }
325 };
326 }
327
328 pub fn send_transaction_for_execution(
329 &self,
330 cert: &VerifiedExecutableTransaction,
331 execution_env: ExecutionEnv,
332 enqueue_time: Instant,
333 ) {
334 let pending_cert = PendingCertificate {
335 certificate: cert.clone(),
336 execution_env,
337 stats: PendingCertificateStats {
338 enqueue_time,
339 ready_time: Some(Instant::now()),
340 },
341 executing_guard: Some(ExecutingGuard::new(
342 self.metrics
343 .transaction_manager_num_executing_certificates
344 .clone(),
345 )),
346 };
347 let _ = self.tx_ready_certificates.send(pending_cert);
348 }
349
350 fn schedule_funds_withdraws(
351 &self,
352 certs: Vec<(VerifiedExecutableTransaction, ExecutionEnv)>,
353 epoch_store: &Arc<AuthorityPerEpochStore>,
354 ) {
355 if certs.is_empty() {
356 return;
357 }
358 let mut withdraws = BTreeMap::new();
359 let mut prev_version = None;
360 for (cert, env) in &certs {
361 let tx_withdraws = cert
362 .transaction_data()
363 .process_funds_withdrawals_for_execution(epoch_store.get_chain_identifier());
364 assert!(!tx_withdraws.is_empty());
365 let accumulator_version = env
366 .assigned_versions
367 .accumulator_version
368 .expect("accumulator_version must be set when there are withdraws");
369 if let Some(prev_version) = prev_version {
370 assert!(prev_version <= accumulator_version);
372 }
373 prev_version = Some(accumulator_version);
374 let tx_digest = *cert.digest();
375 withdraws
376 .entry(accumulator_version)
377 .or_insert(Vec::new())
378 .push(TxFundsWithdraw {
379 tx_digest,
380 reservations: tx_withdraws,
381 });
382 }
383 let mut receivers = FuturesUnordered::new();
384 {
385 let guard = self.address_funds_withdraw_scheduler.lock();
386 let withdraw_scheduler = guard
387 .as_ref()
388 .expect("Funds withdraw scheduler must be enabled if there are withdraws");
389 for (version, tx_withdraws) in withdraws {
390 receivers.extend(withdraw_scheduler.schedule_withdraws(WithdrawReservations {
391 accumulator_version: version,
392 withdraws: tx_withdraws,
393 }));
394 }
395 }
397 let scheduler = self.clone();
398 let epoch_store = epoch_store.clone();
399 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
400 let mut cert_map = HashMap::new();
401 for (cert, env) in certs {
402 cert_map.insert(*cert.digest(), (cert, env));
403 }
404 while let Some(result) = receivers.next().await {
405 match result {
406 Ok((tx_digest, status)) => match status {
407 ScheduleStatus::InsufficientFunds => {
408 assert_reachable!("tx cancelled, insufficient funds");
409 debug!(
410 ?tx_digest,
411 "Funds withdraw scheduling result: Insufficient funds"
412 );
413 let (cert, env) = cert_map.remove(&tx_digest).expect("cert must exist");
414 let env = env.with_insufficient_funds();
415 scheduler.enqueue_transactions(vec![(cert, env)], &epoch_store);
416 }
417 ScheduleStatus::SufficientFunds => {
418 assert_reachable!("tx scheduled, sufficient funds");
419 debug!(?tx_digest, "Funds withdraw scheduling result: Success");
420 let (cert, env) = cert_map.remove(&tx_digest).expect("cert must exist");
421 scheduler.enqueue_transactions(vec![(cert, env)], &epoch_store);
422 }
423 ScheduleStatus::SkipSchedule => {
424 assert_reachable!("tx withdrawal scheduling skipped");
425 debug!(?tx_digest, "Skip scheduling funds withdraw");
426 }
427 },
428 Err(e) => {
429 error!("Withdraw scheduler stopped: {:?}", e);
430 }
431 }
432 }
433 }));
434 }
435
436 fn schedule_tx_keys(
437 &self,
438 tx_with_keys: Vec<(TransactionKey, ExecutionEnv)>,
439 epoch_store: &Arc<AuthorityPerEpochStore>,
440 ) {
441 if tx_with_keys.is_empty() {
442 return;
443 }
444
445 let scheduler = self.clone();
446 let epoch_store = epoch_store.clone();
447 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
448 let tx_keys: Vec<_> = tx_with_keys.iter().map(|(key, _)| key).cloned().collect();
449 let digests = epoch_store
450 .notify_read_tx_key_to_digest(&tx_keys)
451 .await
452 .expect("db error");
453 let transactions = scheduler
454 .transaction_cache_read
455 .multi_get_transaction_blocks(&digests)
456 .into_iter()
457 .map(|tx| {
458 let tx = tx.expect("tx must exist").as_ref().clone();
459 VerifiedExecutableTransaction::new_system(tx, epoch_store.epoch())
460 })
461 .zip_debug_eq(tx_with_keys.into_iter().map(|(_, env)| env))
462 .collect::<Vec<_>>();
463 scheduler.enqueue_transactions(transactions, &epoch_store);
464 }));
465 }
466
467 #[cfg(debug_assertions)]
470 fn assert_cert_not_executed_previous_epochs(&self, cert: &VerifiedExecutableTransaction) {
471 let epoch = cert.epoch();
472 let digest = *cert.digest();
473 let digests = [digest];
474 let executed = self
475 .transaction_cache_read
476 .multi_get_executed_effects(&digests)
477 .pop()
478 .unwrap();
479 if let Some(executed) = executed {
482 use sui_types::effects::TransactionEffectsAPI;
483
484 assert_eq!(
485 executed.executed_epoch(),
486 epoch,
487 "Transaction {} was executed in epoch {}, but scheduled again in epoch {}",
488 digest,
489 executed.executed_epoch(),
490 epoch
491 );
492 }
493 }
494}
495
496impl ExecutionScheduler {
497 pub fn enqueue(
498 &self,
499 certs: Vec<(Schedulable, ExecutionEnv)>,
500 epoch_store: &Arc<AuthorityPerEpochStore>,
501 ) {
502 let mut ordinary_txns = Vec::with_capacity(certs.len());
504 let mut tx_with_keys = Vec::new();
505 let mut tx_with_withdraws = Vec::new();
506
507 for (schedulable, env) in certs {
508 match schedulable {
509 Schedulable::Transaction(tx) => {
510 if tx.transaction_data().has_funds_withdrawals() {
511 tx_with_withdraws.push((tx, env));
512 } else {
513 ordinary_txns.push((tx, env));
514 }
515 }
516 s @ Schedulable::RandomnessStateUpdate(..) => {
517 tx_with_keys.push((s.key(), env));
518 }
519 Schedulable::AccumulatorSettlement(_, _) => {
520 unreachable!("handled by SettlementScheduler");
521 }
522 Schedulable::ConsensusCommitPrologue(_, _, _) => {
523 unreachable!("Schedulable::ConsensusCommitPrologue should not be enqueued");
527 }
528 }
529 }
530
531 self.enqueue_transactions(ordinary_txns, epoch_store);
532 self.schedule_tx_keys(tx_with_keys, epoch_store);
533 self.schedule_funds_withdraws(tx_with_withdraws, epoch_store);
534 }
535
536 pub fn enqueue_transactions(
537 &self,
538 certs: Vec<(VerifiedExecutableTransaction, ExecutionEnv)>,
539 epoch_store: &Arc<AuthorityPerEpochStore>,
540 ) {
541 let certs: Vec<_> = certs
543 .into_iter()
544 .filter_map(|cert| {
545 if cert.0.epoch() == epoch_store.epoch() {
546 #[cfg(debug_assertions)]
547 self.assert_cert_not_executed_previous_epochs(&cert.0);
548
549 Some(cert)
550 } else {
551 debug_fatal!(
552 "We should never enqueue certificate from wrong epoch. Expected={} Certificate={:?}",
553 epoch_store.epoch(),
554 cert.0.epoch()
555 );
556 None
557 }
558 })
559 .collect();
560 let digests: Vec<_> = certs.iter().map(|(cert, _)| *cert.digest()).collect();
561 let executed = self
562 .transaction_cache_read
563 .multi_get_executed_effects_digests(&digests);
564 let mut already_executed_certs_num = 0;
565 let pending_certs = certs.into_iter().zip_debug_eq(executed).filter_map(
566 |((cert, execution_env), executed)| {
567 if executed.is_none() {
568 Some((cert, execution_env))
569 } else {
570 already_executed_certs_num += 1;
571 None
572 }
573 },
574 );
575
576 for (cert, execution_env) in pending_certs {
577 let scheduler = self.clone();
578 let epoch_store = epoch_store.clone();
579 spawn_monitored_task!(
580 epoch_store.within_alive_epoch(scheduler.schedule_transaction(
581 cert,
582 execution_env,
583 &epoch_store,
584 ))
585 );
586 }
587
588 self.metrics
589 .transaction_manager_num_enqueued_certificates
590 .with_label_values(&["already_executed"])
591 .inc_by(already_executed_certs_num);
592 }
593
594 pub fn settle_address_funds(&self, settlement: FundsSettlement) {
595 self.address_funds_withdraw_scheduler
596 .lock()
597 .as_ref()
598 .expect("Funds withdraw scheduler must be enabled if there are settlements")
599 .settle_funds(settlement);
600 }
601
602 pub fn reconfigure(
605 &self,
606 new_epoch_store: &Arc<AuthorityPerEpochStore>,
607 account_funds_read: &Arc<dyn AccountFundsRead>,
608 ) {
609 let address_funds_withdraw_scheduler = Self::initialize_funds_withdraw_scheduler(
610 new_epoch_store,
611 &self.object_cache_read,
612 account_funds_read.clone(),
613 self.funds_withdraw_scheduler_type,
614 &self.address_funds_scheduler_metrics,
615 );
616 let mut guard = self.address_funds_withdraw_scheduler.lock();
617 if let Some(old_scheduler) = guard.as_ref() {
618 old_scheduler.close_epoch();
619 }
620 *guard = address_funds_withdraw_scheduler;
621 drop(guard);
622 }
623
624 pub fn check_execution_overload(
625 &self,
626 overload_config: &AuthorityOverloadConfig,
627 tx_data: &SenderSignedData,
628 ) -> SuiResult {
629 let inflight_queue_len = self.num_pending_certificates();
630 self.overload_tracker
631 .check_execution_overload(overload_config, tx_data, inflight_queue_len)
632 }
633
634 pub fn num_pending_certificates(&self) -> usize {
635 (self
636 .metrics
637 .transaction_manager_num_pending_certificates
638 .get()
639 + self
640 .metrics
641 .transaction_manager_num_executing_certificates
642 .get()) as usize
643 }
644
645 #[cfg(test)]
646 pub async fn check_empty_for_testing(&self) {
647 for _ in 0..10 {
648 if self.num_pending_certificates() == 0 {
649 return;
650 }
651 tokio::task::yield_now().await;
652 }
653 assert_eq!(self.num_pending_certificates(), 0);
654 }
655}
656
657#[cfg(test)]
658mod test {
659 use super::{
660 BarrierDependencyBuilder, ExecutionScheduler, FundsWithdrawSchedulerType,
661 PendingCertificate,
662 };
663 use crate::authority::ExecutionEnv;
664 use crate::authority::shared_object_version_manager::AssignedVersions;
665 use crate::authority::{AuthorityState, authority_tests::init_state_with_objects};
666 use std::collections::BTreeSet;
667 use std::{time::Duration, vec};
668 use sui_test_transaction_builder::TestTransactionBuilder;
669 use sui_types::base_types::{SuiAddress, random_object_ref};
670 use sui_types::executable_transaction::VerifiedExecutableTransaction;
671 use sui_types::object::Owner;
672 use sui_types::programmable_transaction_builder::ProgrammableTransactionBuilder;
673 use sui_types::transaction::{
674 SharedObjectMutability, Transaction, TransactionData, TransactionKind, VerifiedTransaction,
675 };
676 use sui_types::{
677 SUI_FRAMEWORK_PACKAGE_ID,
678 base_types::{ObjectID, SequenceNumber},
679 crypto::deterministic_random_account_key,
680 object::Object,
681 transaction::{CallArg, ObjectArg},
682 };
683 use tokio::time::Instant;
684 use tokio::{
685 sync::mpsc::{UnboundedReceiver, error::TryRecvError, unbounded_channel},
686 time::sleep,
687 };
688
689 #[allow(clippy::disallowed_methods)] fn make_execution_scheduler(
691 state: &AuthorityState,
692 ) -> (ExecutionScheduler, UnboundedReceiver<PendingCertificate>) {
693 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
696 let registry = prometheus::Registry::new();
697 let execution_scheduler = ExecutionScheduler::new(
698 state.get_object_cache_reader().clone(),
699 state.get_account_funds_read().clone(),
700 state.get_transaction_cache_reader().clone(),
701 tx_ready_certificates,
702 &state.epoch_store_for_testing(),
703 FundsWithdrawSchedulerType::default(),
704 state.metrics.clone(),
705 ®istry,
706 );
707
708 (execution_scheduler, rx_ready_certificates)
709 }
710
711 fn make_transaction(gas_object: Object, input: Vec<CallArg>) -> VerifiedExecutableTransaction {
712 let rgp = 100;
715 let (sender, keypair) = deterministic_random_account_key();
716 let transaction =
717 TestTransactionBuilder::new(sender, gas_object.compute_object_reference(), rgp)
718 .move_call(SUI_FRAMEWORK_PACKAGE_ID, "counter", "assert_value", input)
719 .build_and_sign(&keypair);
720 VerifiedExecutableTransaction::new_system(
721 VerifiedTransaction::new_unchecked(transaction),
722 0,
723 )
724 }
725
726 #[tokio::test(flavor = "current_thread", start_paused = true)]
727 async fn execution_scheduler_basics() {
728 let (owner, _keypair) = deterministic_random_account_key();
730 let gas_objects: Vec<Object> = (0..10)
731 .map(|_| {
732 let gas_object_id = ObjectID::random();
733 Object::with_id_owner_for_testing(gas_object_id, owner)
734 })
735 .collect();
736 let state = init_state_with_objects(gas_objects.clone()).await;
737
738 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
741 assert!(
743 rx_ready_certificates
744 .try_recv()
745 .is_err_and(|err| err == TryRecvError::Empty)
746 );
747 assert_eq!(execution_scheduler.num_pending_certificates(), 0);
749
750 execution_scheduler.enqueue_transactions(vec![], &state.epoch_store_for_testing());
752 assert!(
754 rx_ready_certificates
755 .try_recv()
756 .is_err_and(|err| err == TryRecvError::Empty)
757 );
758
759 let transaction = make_transaction(gas_objects[0].clone(), vec![]);
761 let tx_start_time = Instant::now();
762 execution_scheduler.enqueue_transactions(
763 vec![(transaction.clone(), ExecutionEnv::new())],
764 &state.epoch_store_for_testing(),
765 );
766 let pending_certificate = rx_ready_certificates.recv().await.unwrap();
768
769 assert!(pending_certificate.stats.enqueue_time >= tx_start_time);
771 assert!(
772 pending_certificate.stats.ready_time.unwrap() >= pending_certificate.stats.enqueue_time
773 );
774
775 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
776
777 drop(pending_certificate);
779
780 execution_scheduler.check_empty_for_testing().await;
782
783 let gas_object_new = Object::with_id_owner_version_for_testing(
785 ObjectID::random(),
786 0.into(),
787 Owner::AddressOwner(owner),
788 );
789 let transaction = make_transaction(gas_object_new.clone(), vec![]);
790 let tx_start_time = Instant::now();
791 execution_scheduler.enqueue_transactions(
792 vec![(transaction.clone(), ExecutionEnv::new())],
793 &state.epoch_store_for_testing(),
794 );
795 sleep(Duration::from_secs(1)).await;
797 assert!(
798 rx_ready_certificates
799 .try_recv()
800 .is_err_and(|err| err == TryRecvError::Empty)
801 );
802
803 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
804
805 execution_scheduler.enqueue_transactions(
807 vec![(transaction.clone(), ExecutionEnv::new())],
808 &state.epoch_store_for_testing(),
809 );
810 sleep(Duration::from_secs(1)).await;
811 assert!(
812 rx_ready_certificates
813 .try_recv()
814 .is_err_and(|err| err == TryRecvError::Empty)
815 );
816
817 assert_eq!(execution_scheduler.num_pending_certificates(), 2);
818
819 state
821 .get_cache_writer()
822 .write_object_entry_for_test(gas_object_new);
823 let pending_certificate = rx_ready_certificates.recv().await.unwrap();
826 let pending_certificate2 = rx_ready_certificates.recv().await.unwrap();
827 assert_eq!(
828 pending_certificate.certificate.digest(),
829 pending_certificate2.certificate.digest()
830 );
831
832 assert!(pending_certificate.stats.enqueue_time >= tx_start_time);
835 assert!(
836 pending_certificate.stats.ready_time.unwrap() - pending_certificate.stats.enqueue_time
837 >= Duration::from_secs(2)
838 );
839
840 drop(pending_certificate);
842 drop(pending_certificate2);
843
844 execution_scheduler.check_empty_for_testing().await;
846 }
847
848 #[tokio::test(flavor = "current_thread", start_paused = true)]
857 async fn execution_scheduler_object_dependency() {
858 telemetry_subscribers::init_for_testing();
859 let (owner, _keypair) = deterministic_random_account_key();
861 let gas_objects: Vec<Object> = (0..10)
862 .map(|_| {
863 let gas_object_id = ObjectID::random();
864 Object::with_id_owner_for_testing(gas_object_id, owner)
865 })
866 .collect();
867 let shared_object = Object::shared_for_testing();
868 let initial_shared_version = shared_object.owner().start_version().unwrap();
869 let shared_object_2 = Object::shared_for_testing();
870 let initial_shared_version_2 = shared_object_2.owner().start_version().unwrap();
871
872 let state = init_state_with_objects(
873 [
874 gas_objects.clone(),
875 vec![shared_object.clone(), shared_object_2.clone()],
876 ]
877 .concat(),
878 )
879 .await;
880
881 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
884 assert!(rx_ready_certificates.try_recv().is_err());
886
887 let shared_version = 1000.into();
889 let shared_object_arg_read = ObjectArg::SharedObject {
890 id: shared_object.id(),
891 initial_shared_version,
892 mutability: SharedObjectMutability::Immutable,
893 };
894 let transaction_read_0 = make_transaction(
895 gas_objects[0].clone(),
896 vec![CallArg::Object(shared_object_arg_read)],
897 );
898 let transaction_read_1 = make_transaction(
899 gas_objects[1].clone(),
900 vec![CallArg::Object(shared_object_arg_read)],
901 );
902 let tx_read_0_assigned_versions = vec![(
903 (
904 shared_object.id(),
905 shared_object.owner().start_version().unwrap(),
906 ),
907 shared_version,
908 )];
909 let tx_read_1_assigned_versions = vec![(
910 (
911 shared_object.id(),
912 shared_object.owner().start_version().unwrap(),
913 ),
914 shared_version,
915 )];
916
917 let shared_object_arg_default = ObjectArg::SharedObject {
919 id: shared_object.id(),
920 initial_shared_version,
921 mutability: SharedObjectMutability::Mutable,
922 };
923 let transaction_default = make_transaction(
924 gas_objects[2].clone(),
925 vec![CallArg::Object(shared_object_arg_default)],
926 );
927 let tx_default_assigned_versions = vec![(
928 (
929 shared_object.id(),
930 shared_object.owner().start_version().unwrap(),
931 ),
932 shared_version,
933 )];
934
935 let shared_version_2 = 1000.into();
937 let shared_object_arg_read_2 = ObjectArg::SharedObject {
938 id: shared_object_2.id(),
939 initial_shared_version: initial_shared_version_2,
940 mutability: SharedObjectMutability::Immutable,
941 };
942 let transaction_read_2 = make_transaction(
943 gas_objects[3].clone(),
944 vec![
945 CallArg::Object(shared_object_arg_default),
946 CallArg::Object(shared_object_arg_read_2),
947 ],
948 );
949 let tx_read_2_assigned_versions = vec![
950 (
951 (
952 shared_object.id(),
953 shared_object.owner().start_version().unwrap(),
954 ),
955 shared_version,
956 ),
957 (
958 (
959 shared_object_2.id(),
960 shared_object_2.owner().start_version().unwrap(),
961 ),
962 shared_version_2,
963 ),
964 ];
965
966 execution_scheduler.enqueue_transactions(
967 vec![
968 (
969 transaction_read_0.clone(),
970 ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
971 tx_read_0_assigned_versions,
972 None,
973 )),
974 ),
975 (
976 transaction_read_1.clone(),
977 ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
978 tx_read_1_assigned_versions,
979 None,
980 )),
981 ),
982 (
983 transaction_default.clone(),
984 ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
985 tx_default_assigned_versions,
986 None,
987 )),
988 ),
989 (
990 transaction_read_2.clone(),
991 ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
992 tx_read_2_assigned_versions,
993 None,
994 )),
995 ),
996 ],
997 &state.epoch_store_for_testing(),
998 );
999
1000 sleep(Duration::from_secs(1)).await;
1002 assert!(rx_ready_certificates.try_recv().is_err());
1003
1004 assert_eq!(execution_scheduler.num_pending_certificates(), 4);
1005
1006 let mut new_shared_object = shared_object.clone();
1008 new_shared_object
1009 .data
1010 .try_as_move_mut()
1011 .unwrap()
1012 .increment_version_to(shared_version_2);
1013 state
1014 .get_cache_writer()
1015 .write_object_entry_for_test(new_shared_object);
1016
1017 let tx_0 = rx_ready_certificates.recv().await.unwrap().certificate;
1019 let tx_1 = rx_ready_certificates.recv().await.unwrap().certificate;
1020 let tx_2 = rx_ready_certificates.recv().await.unwrap().certificate;
1021 {
1022 let mut want_digests = vec![
1023 transaction_read_0.digest(),
1024 transaction_read_1.digest(),
1025 transaction_default.digest(),
1026 ];
1027 want_digests.sort();
1028 let mut got_digests = vec![tx_0.digest(), tx_1.digest(), tx_2.digest()];
1029 got_digests.sort();
1030 assert_eq!(want_digests, got_digests);
1031 }
1032
1033 sleep(Duration::from_secs(1)).await;
1034 assert!(rx_ready_certificates.try_recv().is_err());
1035
1036 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1037
1038 let mut new_shared_object_2 = shared_object_2.clone();
1040 new_shared_object_2
1041 .data
1042 .try_as_move_mut()
1043 .unwrap()
1044 .increment_version_to(shared_version_2);
1045 state
1046 .get_cache_writer()
1047 .write_object_entry_for_test(new_shared_object_2);
1048
1049 let tx_3 = rx_ready_certificates.recv().await.unwrap().certificate;
1051 assert_eq!(transaction_read_2.digest(), tx_3.digest());
1052
1053 sleep(Duration::from_secs(1)).await;
1054 assert!(rx_ready_certificates.try_recv().is_err());
1055
1056 execution_scheduler.check_empty_for_testing().await;
1057 }
1058
1059 #[tokio::test(flavor = "current_thread", start_paused = true)]
1060 async fn execution_scheduler_receiving_notify_commit() {
1061 telemetry_subscribers::init_for_testing();
1062 let (owner, _keypair) = deterministic_random_account_key();
1064 let gas_objects: Vec<Object> = (0..10)
1065 .map(|_| {
1066 let gas_object_id = ObjectID::random();
1067 Object::with_id_owner_for_testing(gas_object_id, owner)
1068 })
1069 .collect();
1070 let state = init_state_with_objects(gas_objects.clone()).await;
1071
1072 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1075 assert!(rx_ready_certificates.try_recv().is_err());
1077 execution_scheduler.check_empty_for_testing().await;
1079
1080 let obj_id = ObjectID::random();
1081 let object_arguments: Vec<_> = (0..10)
1082 .map(|i| {
1083 let object = Object::with_id_owner_version_for_testing(
1084 obj_id,
1085 i.into(),
1086 Owner::AddressOwner(owner),
1087 );
1088 let object_arg = if i % 2 == 0 || i == 3 {
1095 ObjectArg::Receiving(object.compute_object_reference())
1096 } else {
1097 ObjectArg::ImmOrOwnedObject(object.compute_object_reference())
1098 };
1099 let txn =
1100 make_transaction(gas_objects[0].clone(), vec![CallArg::Object(object_arg)]);
1101 (object, txn)
1102 })
1103 .collect();
1104
1105 for (i, (_, txn)) in object_arguments.iter().enumerate() {
1106 execution_scheduler.enqueue_transactions(
1109 vec![(txn.clone(), ExecutionEnv::new())],
1110 &state.epoch_store_for_testing(),
1111 );
1112 sleep(Duration::from_secs(1)).await;
1113 assert!(rx_ready_certificates.try_recv().is_err());
1114 assert_eq!(execution_scheduler.num_pending_certificates(), i + 1);
1115 }
1116
1117 let len = object_arguments.len();
1120 for (i, (object, txn)) in object_arguments.into_iter().enumerate() {
1121 state
1124 .get_cache_writer()
1125 .write_object_entry_for_test(object.clone());
1126
1127 rx_ready_certificates.recv().await.unwrap();
1130
1131 sleep(Duration::from_secs(1)).await;
1134 assert!(rx_ready_certificates.try_recv().is_err());
1135
1136 drop(txn);
1138
1139 assert_eq!(execution_scheduler.num_pending_certificates(), len - i - 1);
1142 }
1143
1144 execution_scheduler.check_empty_for_testing().await;
1146 }
1147
1148 #[tokio::test(flavor = "current_thread", start_paused = true)]
1149 async fn execution_scheduler_receiving_object_ready_notifications() {
1150 telemetry_subscribers::init_for_testing();
1151 let (owner, _keypair) = deterministic_random_account_key();
1153 let gas_objects: Vec<Object> = (0..10)
1154 .map(|_| {
1155 let gas_object_id = ObjectID::random();
1156 Object::with_id_owner_for_testing(gas_object_id, owner)
1157 })
1158 .collect();
1159 let state = init_state_with_objects(gas_objects.clone()).await;
1160
1161 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1164 assert!(rx_ready_certificates.try_recv().is_err());
1166 execution_scheduler.check_empty_for_testing().await;
1168
1169 let obj_id = ObjectID::random();
1170 let receiving_object_new0 =
1171 Object::with_id_owner_version_for_testing(obj_id, 0.into(), Owner::AddressOwner(owner));
1172 let receiving_object_new1 =
1173 Object::with_id_owner_version_for_testing(obj_id, 1.into(), Owner::AddressOwner(owner));
1174 let receiving_object_arg0 =
1175 ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1176 let receive_object_transaction0 = make_transaction(
1177 gas_objects[0].clone(),
1178 vec![CallArg::Object(receiving_object_arg0)],
1179 );
1180
1181 let receiving_object_arg1 =
1182 ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1183 let receive_object_transaction1 = make_transaction(
1184 gas_objects[0].clone(),
1185 vec![CallArg::Object(receiving_object_arg1)],
1186 );
1187
1188 execution_scheduler.enqueue_transactions(
1190 vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1191 &state.epoch_store_for_testing(),
1192 );
1193 sleep(Duration::from_secs(1)).await;
1194 assert!(rx_ready_certificates.try_recv().is_err());
1195 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1196
1197 execution_scheduler.enqueue_transactions(
1199 vec![(receive_object_transaction1.clone(), ExecutionEnv::new())],
1200 &state.epoch_store_for_testing(),
1201 );
1202 sleep(Duration::from_secs(1)).await;
1203 assert!(rx_ready_certificates.try_recv().is_err());
1204 assert_eq!(execution_scheduler.num_pending_certificates(), 2);
1205
1206 execution_scheduler.enqueue_transactions(
1208 vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1209 &state.epoch_store_for_testing(),
1210 );
1211 sleep(Duration::from_secs(1)).await;
1212 assert!(rx_ready_certificates.try_recv().is_err());
1213 assert_eq!(execution_scheduler.num_pending_certificates(), 3);
1214
1215 state
1217 .get_cache_writer()
1218 .write_object_entry_for_test(receiving_object_new0.clone());
1219
1220 rx_ready_certificates.recv().await.unwrap();
1223
1224 state
1226 .get_cache_writer()
1227 .write_object_entry_for_test(receiving_object_new1.clone());
1228
1229 rx_ready_certificates.recv().await.unwrap();
1232 }
1233
1234 #[tokio::test(flavor = "current_thread", start_paused = true)]
1235 async fn execution_scheduler_receiving_object_ready_notifications_multiple_of_same_receiving() {
1236 telemetry_subscribers::init_for_testing();
1237 let (owner, _keypair) = deterministic_random_account_key();
1239 let gas_objects: Vec<Object> = (0..10)
1240 .map(|_| {
1241 let gas_object_id = ObjectID::random();
1242 Object::with_id_owner_for_testing(gas_object_id, owner)
1243 })
1244 .collect();
1245 let state = init_state_with_objects(gas_objects.clone()).await;
1246
1247 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1250 assert!(rx_ready_certificates.try_recv().is_err());
1252 execution_scheduler.check_empty_for_testing().await;
1254
1255 let obj_id = ObjectID::random();
1256 let receiving_object_new0 =
1257 Object::with_id_owner_version_for_testing(obj_id, 0.into(), Owner::AddressOwner(owner));
1258 let receiving_object_new1 =
1259 Object::with_id_owner_version_for_testing(obj_id, 1.into(), Owner::AddressOwner(owner));
1260 let receiving_object_arg0 =
1261 ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1262 let receive_object_transaction0 = make_transaction(
1263 gas_objects[0].clone(),
1264 vec![CallArg::Object(receiving_object_arg0)],
1265 );
1266
1267 let receive_object_transaction01 = make_transaction(
1268 gas_objects[1].clone(),
1269 vec![CallArg::Object(receiving_object_arg0)],
1270 );
1271
1272 let receiving_object_arg1 =
1273 ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1274 let receive_object_transaction1 = make_transaction(
1275 gas_objects[0].clone(),
1276 vec![CallArg::Object(receiving_object_arg1)],
1277 );
1278
1279 let gas_receiving_arg = ObjectArg::Receiving(gas_objects[3].compute_object_reference());
1282 let tx1 = make_transaction(
1283 gas_objects[0].clone(),
1284 vec![CallArg::Object(gas_receiving_arg)],
1285 );
1286
1287 execution_scheduler.enqueue_transactions(
1289 vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1290 &state.epoch_store_for_testing(),
1291 );
1292 sleep(Duration::from_secs(1)).await;
1293 assert!(rx_ready_certificates.try_recv().is_err());
1294 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1295
1296 execution_scheduler.enqueue_transactions(
1298 vec![(receive_object_transaction1.clone(), ExecutionEnv::new())],
1299 &state.epoch_store_for_testing(),
1300 );
1301 sleep(Duration::from_secs(1)).await;
1302 assert!(rx_ready_certificates.try_recv().is_err());
1303 assert_eq!(execution_scheduler.num_pending_certificates(), 2);
1304
1305 execution_scheduler.enqueue_transactions(
1308 vec![(receive_object_transaction01.clone(), ExecutionEnv::new())],
1309 &state.epoch_store_for_testing(),
1310 );
1311 sleep(Duration::from_secs(1)).await;
1312 assert!(rx_ready_certificates.try_recv().is_err());
1313 assert_eq!(execution_scheduler.num_pending_certificates(), 3);
1314
1315 state
1317 .get_cache_writer()
1318 .write_object_entry_for_test(receiving_object_new0.clone());
1319
1320 rx_ready_certificates.recv().await.unwrap();
1323
1324 rx_ready_certificates.recv().await.unwrap();
1325
1326 assert!(rx_ready_certificates.try_recv().is_err());
1328
1329 execution_scheduler.enqueue_transactions(
1332 vec![(tx1.clone(), ExecutionEnv::new())],
1333 &state.epoch_store_for_testing(),
1334 );
1335 sleep(Duration::from_secs(1)).await;
1336 rx_ready_certificates.recv().await.unwrap();
1337
1338 state
1340 .get_cache_writer()
1341 .write_object_entry_for_test(receiving_object_new1.clone());
1342
1343 rx_ready_certificates.recv().await.unwrap();
1346 }
1347
1348 #[tokio::test(flavor = "current_thread", start_paused = true)]
1349 async fn execution_scheduler_receiving_object_ready_if_current_version_greater() {
1350 telemetry_subscribers::init_for_testing();
1351 let (owner, _keypair) = deterministic_random_account_key();
1353 let mut gas_objects: Vec<Object> = (0..10)
1354 .map(|_| {
1355 let gas_object_id = ObjectID::random();
1356 Object::with_id_owner_for_testing(gas_object_id, owner)
1357 })
1358 .collect();
1359 let receiving_object = Object::with_id_owner_version_for_testing(
1360 ObjectID::random(),
1361 10.into(),
1362 Owner::AddressOwner(owner),
1363 );
1364 gas_objects.push(receiving_object.clone());
1365 let state = init_state_with_objects(gas_objects.clone()).await;
1366
1367 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1370 assert!(rx_ready_certificates.try_recv().is_err());
1372 execution_scheduler.check_empty_for_testing().await;
1374
1375 let receiving_object_new0 = Object::with_id_owner_version_for_testing(
1376 receiving_object.id(),
1377 0.into(),
1378 Owner::AddressOwner(owner),
1379 );
1380 let receiving_object_new1 = Object::with_id_owner_version_for_testing(
1381 receiving_object.id(),
1382 1.into(),
1383 Owner::AddressOwner(owner),
1384 );
1385 let receiving_object_arg0 =
1386 ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1387 let receive_object_transaction0 = make_transaction(
1388 gas_objects[0].clone(),
1389 vec![CallArg::Object(receiving_object_arg0)],
1390 );
1391
1392 let receive_object_transaction01 = make_transaction(
1393 gas_objects[1].clone(),
1394 vec![CallArg::Object(receiving_object_arg0)],
1395 );
1396
1397 let receiving_object_arg1 =
1398 ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1399 let receive_object_transaction1 = make_transaction(
1400 gas_objects[0].clone(),
1401 vec![CallArg::Object(receiving_object_arg1)],
1402 );
1403
1404 execution_scheduler.enqueue_transactions(
1406 vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1407 &state.epoch_store_for_testing(),
1408 );
1409 execution_scheduler.enqueue_transactions(
1410 vec![(receive_object_transaction01.clone(), ExecutionEnv::new())],
1411 &state.epoch_store_for_testing(),
1412 );
1413 execution_scheduler.enqueue_transactions(
1414 vec![(receive_object_transaction1.clone(), ExecutionEnv::new())],
1415 &state.epoch_store_for_testing(),
1416 );
1417 sleep(Duration::from_secs(1)).await;
1418 rx_ready_certificates.recv().await.unwrap();
1419 rx_ready_certificates.recv().await.unwrap();
1420 rx_ready_certificates.recv().await.unwrap();
1421 assert!(rx_ready_certificates.try_recv().is_err());
1422 }
1423
1424 #[tokio::test(flavor = "current_thread", start_paused = true)]
1427 async fn execution_scheduler_with_cancelled_transactions() {
1428 let (owner, _keypair) = deterministic_random_account_key();
1430 let gas_object = Object::with_id_owner_for_testing(ObjectID::random(), owner);
1431 let shared_object_1 = Object::shared_for_testing();
1432 let initial_shared_version_1 = shared_object_1.owner().start_version().unwrap();
1433 let shared_object_2 = Object::shared_for_testing();
1434 let initial_shared_version_2 = shared_object_2.owner().start_version().unwrap();
1435 let owned_object = Object::with_id_owner_for_testing(ObjectID::random(), owner);
1436
1437 let state = init_state_with_objects(vec![
1438 gas_object.clone(),
1439 shared_object_1.clone(),
1440 shared_object_2.clone(),
1441 owned_object.clone(),
1442 ])
1443 .await;
1444
1445 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1448 assert!(rx_ready_certificates.try_recv().is_err());
1450
1451 let shared_object_arg_1 = ObjectArg::SharedObject {
1453 id: shared_object_1.id(),
1454 initial_shared_version: initial_shared_version_1,
1455 mutability: SharedObjectMutability::Mutable,
1456 };
1457 let shared_object_arg_2 = ObjectArg::SharedObject {
1458 id: shared_object_2.id(),
1459 initial_shared_version: initial_shared_version_2,
1460 mutability: SharedObjectMutability::Mutable,
1461 };
1462
1463 let owned_version = 2000.into();
1465 let mut owned_ref = owned_object.compute_object_reference();
1466 owned_ref.1 = owned_version;
1467 let owned_object_arg = ObjectArg::ImmOrOwnedObject(owned_ref);
1468
1469 let cancelled_transaction = make_transaction(
1470 gas_object.clone(),
1471 vec![
1472 CallArg::Object(shared_object_arg_1),
1473 CallArg::Object(shared_object_arg_2),
1474 CallArg::Object(owned_object_arg),
1475 ],
1476 );
1477 let assigned_versions = vec![
1478 (
1479 (
1480 shared_object_1.id(),
1481 shared_object_1.owner().start_version().unwrap(),
1482 ),
1483 SequenceNumber::CANCELLED_READ,
1484 ),
1485 (
1486 (
1487 shared_object_2.id(),
1488 shared_object_2.owner().start_version().unwrap(),
1489 ),
1490 SequenceNumber::CONGESTED,
1491 ),
1492 ];
1493
1494 execution_scheduler.enqueue_transactions(
1495 vec![(
1496 cancelled_transaction.clone(),
1497 ExecutionEnv::new()
1498 .with_assigned_versions(AssignedVersions::new(assigned_versions, None)),
1499 )],
1500 &state.epoch_store_for_testing(),
1501 );
1502
1503 sleep(Duration::from_secs(1)).await;
1505 assert!(rx_ready_certificates.try_recv().is_err());
1506
1507 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1508
1509 let mut new_owned_object = owned_object.clone();
1511 new_owned_object
1512 .data
1513 .try_as_move_mut()
1514 .unwrap()
1515 .increment_version_to(owned_version);
1516 state
1517 .get_cache_writer()
1518 .write_object_entry_for_test(new_owned_object);
1519
1520 let available_txn = rx_ready_certificates.recv().await.unwrap().certificate;
1522 assert_eq!(available_txn.digest(), cancelled_transaction.digest());
1523
1524 sleep(Duration::from_secs(1)).await;
1525 assert!(rx_ready_certificates.try_recv().is_err());
1526
1527 execution_scheduler.check_empty_for_testing().await;
1528 }
1529
1530 #[test]
1531 fn test_barrier_dependency_builder() {
1532 let make_transaction = |non_exclusive_writes: Vec<u32>, exclusive_writes: Vec<u32>| {
1533 assert!(
1534 non_exclusive_writes
1535 .iter()
1536 .all(|id| !exclusive_writes.contains(id))
1537 );
1538 assert!(
1539 exclusive_writes
1540 .iter()
1541 .all(|id| !non_exclusive_writes.contains(id))
1542 );
1543
1544 let non_exclusive_writes = non_exclusive_writes
1545 .into_iter()
1546 .map(|id| ObjectID::from_single_byte(id as u8));
1547 let exclusive_writes = exclusive_writes
1548 .into_iter()
1549 .map(|id| ObjectID::from_single_byte(id as u8));
1550 let mut builder = ProgrammableTransactionBuilder::new();
1551 for non_exclusive_write in non_exclusive_writes {
1552 builder
1553 .obj(ObjectArg::SharedObject {
1554 id: non_exclusive_write,
1555 initial_shared_version: SequenceNumber::new(),
1556 mutability: SharedObjectMutability::NonExclusiveWrite,
1557 })
1558 .unwrap();
1559 }
1560
1561 for exclusive_write in exclusive_writes {
1562 builder
1563 .obj(ObjectArg::SharedObject {
1564 id: exclusive_write,
1565 initial_shared_version: SequenceNumber::new(),
1566 mutability: SharedObjectMutability::Mutable,
1567 })
1568 .unwrap();
1569 }
1570
1571 let tx = TransactionKind::ProgrammableTransaction(builder.finish());
1572 let tx_data =
1573 TransactionData::new(tx, SuiAddress::default(), random_object_ref(), 1, 1);
1574 Transaction::from_data_and_signer(tx_data, vec![])
1575 };
1576
1577 {
1579 let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1580 let tx1 = make_transaction(vec![1], vec![]);
1581 let tx2 = make_transaction(vec![], vec![1]);
1582
1583 let tx1_deps =
1584 barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1585 let tx2_deps =
1586 barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1587 assert!(tx1_deps.is_empty());
1588 assert_eq!(Vec::from_iter(tx2_deps), vec![*tx1.digest()]);
1589 }
1590
1591 {
1594 let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1595 let tx1 = make_transaction(vec![1, 2], vec![]);
1596 let tx2 = make_transaction(vec![], vec![1]);
1597 let tx3 = make_transaction(vec![], vec![2]);
1598
1599 let tx1_deps =
1600 barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1601 let tx2_deps =
1602 barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1603 let tx3_deps =
1604 barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1605 assert!(tx1_deps.is_empty());
1606 assert_eq!(Vec::from_iter(tx2_deps), vec![*tx1.digest()]);
1607 assert_eq!(Vec::from_iter(tx3_deps), vec![*tx1.digest()]);
1608 }
1609
1610 {
1612 let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1613 let tx1 = make_transaction(vec![1], vec![]);
1614 let tx2 = make_transaction(vec![2], vec![]);
1615 let tx3 = make_transaction(vec![], vec![1, 2]);
1616
1617 let tx1_deps =
1618 barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1619 let tx2_deps =
1620 barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1621 let tx3_deps =
1622 barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1623 assert!(tx1_deps.is_empty());
1624 assert!(tx2_deps.is_empty());
1625 assert_eq!(tx3_deps, BTreeSet::from([*tx1.digest(), *tx2.digest()]));
1626 }
1627
1628 {
1630 let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1631 let tx1 = make_transaction(vec![1], vec![]);
1632 let tx2 = make_transaction(vec![], vec![1]);
1633 let tx3 = make_transaction(vec![], vec![1]);
1634
1635 let tx1_deps =
1636 barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1637 let tx2_deps =
1638 barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1639 let tx3_deps =
1640 barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1641 assert!(tx1_deps.is_empty());
1642 assert_eq!(tx2_deps, BTreeSet::from([*tx1.digest()]));
1643 assert!(tx3_deps.is_empty());
1644 }
1645 }
1646}