1use crate::{
5 accumulators::funds_read::AccountFundsRead,
6 authority::{
7 AuthorityMetrics, ExecutionEnv, authority_per_epoch_store::AuthorityPerEpochStore,
8 epoch_start_configuration::EpochStartConfigTrait,
9 shared_object_version_manager::Schedulable,
10 },
11 execution_cache::{ObjectCacheRead, TransactionCacheRead},
12 execution_scheduler::{
13 ExecutingGuard, PendingCertificateStats,
14 funds_withdraw_scheduler::{
15 AddressFundsSchedulerMetrics, FundsSettlement, ScheduleStatus, TxFundsWithdraw,
16 WithdrawReservations, scheduler::FundsWithdrawScheduler,
17 },
18 },
19};
20use futures::stream::{FuturesUnordered, StreamExt};
21use mysten_common::ZipDebugEqIteratorExt;
22use mysten_common::{assert_reachable, debug_fatal};
23use mysten_metrics::spawn_monitored_task;
24use parking_lot::Mutex;
25use std::{
26 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
27 sync::Arc,
28};
29use sui_config::node::{AuthorityOverloadConfig, FundsWithdrawSchedulerType};
30use sui_types::{
31 SUI_ACCUMULATOR_ROOT_OBJECT_ID,
32 base_types::{FullObjectID, ObjectID},
33 digests::TransactionDigest,
34 error::SuiResult,
35 executable_transaction::VerifiedExecutableTransaction,
36 storage::InputKey,
37 transaction::{
38 SenderSignedData, SharedInputObject, SharedObjectMutability, TransactionData,
39 TransactionDataAPI, TransactionKey,
40 },
41};
42use tokio::sync::mpsc::UnboundedSender;
43use tokio::time::Instant;
44use tracing::{debug, error, instrument};
45
46use super::{PendingCertificate, overload_tracker::OverloadTracker};
47
48pub(crate) struct BarrierDependencyBuilder {
50 dep_state: BTreeMap<ObjectID, BTreeSet<TransactionDigest>>,
51}
52
53impl BarrierDependencyBuilder {
54 pub fn new() -> Self {
55 Self {
56 dep_state: Default::default(),
57 }
58 }
59
60 pub fn process_tx(
66 &mut self,
67 tx_digest: TransactionDigest,
68 tx: &TransactionData,
69 ) -> BTreeSet<TransactionDigest> {
70 let mut barrier_deps = BTreeSet::new();
71 for SharedInputObject { id, mutability, .. } in tx.kind().shared_input_objects() {
72 match mutability {
73 SharedObjectMutability::NonExclusiveWrite => {
74 self.dep_state.entry(id).or_default().insert(tx_digest);
75 }
76 SharedObjectMutability::Mutable => {
77 if let Some(deps) = self.dep_state.remove(&id) {
80 barrier_deps.extend(deps);
81 }
82 }
83 SharedObjectMutability::Immutable => (),
84 }
85 }
86 barrier_deps
87 }
88}
89
90#[derive(Clone)]
91pub struct ExecutionScheduler {
92 object_cache_read: Arc<dyn ObjectCacheRead>,
93 transaction_cache_read: Arc<dyn TransactionCacheRead>,
94 overload_tracker: Arc<OverloadTracker>,
95 tx_ready_certificates: UnboundedSender<PendingCertificate>,
96 address_funds_withdraw_scheduler: Arc<Mutex<Option<FundsWithdrawScheduler>>>,
97 funds_withdraw_scheduler_type: FundsWithdrawSchedulerType,
98 metrics: Arc<AuthorityMetrics>,
99 address_funds_scheduler_metrics: Arc<AddressFundsSchedulerMetrics>,
100}
101
102struct PendingGuard<'a> {
103 scheduler: &'a ExecutionScheduler,
104 cert: &'a VerifiedExecutableTransaction,
105}
106
107impl<'a> PendingGuard<'a> {
108 pub fn new(scheduler: &'a ExecutionScheduler, cert: &'a VerifiedExecutableTransaction) -> Self {
109 scheduler
110 .metrics
111 .transaction_manager_num_pending_certificates
112 .inc();
113 scheduler
114 .overload_tracker
115 .add_pending_certificate(cert.data());
116 Self { scheduler, cert }
117 }
118}
119
120impl Drop for PendingGuard<'_> {
121 fn drop(&mut self) {
122 self.scheduler
123 .metrics
124 .transaction_manager_num_pending_certificates
125 .dec();
126 self.scheduler
127 .overload_tracker
128 .remove_pending_certificate(self.cert.data());
129 }
130}
131
132impl ExecutionScheduler {
133 pub fn new(
134 object_cache_read: Arc<dyn ObjectCacheRead>,
135 account_funds_read: Arc<dyn AccountFundsRead>,
136 transaction_cache_read: Arc<dyn TransactionCacheRead>,
137 tx_ready_certificates: UnboundedSender<PendingCertificate>,
138 epoch_store: &Arc<AuthorityPerEpochStore>,
139 funds_withdraw_scheduler_type: FundsWithdrawSchedulerType,
140 metrics: Arc<AuthorityMetrics>,
141 prometheus_registry: &prometheus::Registry,
142 ) -> Self {
143 tracing::info!(
144 ?funds_withdraw_scheduler_type,
145 "Creating new ExecutionScheduler"
146 );
147 let address_funds_scheduler_metrics =
148 Arc::new(AddressFundsSchedulerMetrics::new(prometheus_registry));
149 let address_funds_withdraw_scheduler = Self::initialize_funds_withdraw_scheduler(
150 epoch_store,
151 &object_cache_read,
152 account_funds_read,
153 funds_withdraw_scheduler_type,
154 &address_funds_scheduler_metrics,
155 );
156 Self {
157 object_cache_read,
158 transaction_cache_read,
159 overload_tracker: Arc::new(OverloadTracker::new()),
160 tx_ready_certificates,
161 address_funds_withdraw_scheduler: Arc::new(Mutex::new(
162 address_funds_withdraw_scheduler,
163 )),
164 funds_withdraw_scheduler_type,
165 metrics,
166 address_funds_scheduler_metrics,
167 }
168 }
169
170 fn initialize_funds_withdraw_scheduler(
171 epoch_store: &Arc<AuthorityPerEpochStore>,
172 object_cache_read: &Arc<dyn ObjectCacheRead>,
173 account_funds_read: Arc<dyn AccountFundsRead>,
174 scheduler_type: FundsWithdrawSchedulerType,
175 address_funds_scheduler_metrics: &Arc<AddressFundsSchedulerMetrics>,
176 ) -> Option<FundsWithdrawScheduler> {
177 let withdraw_scheduler_enabled =
178 epoch_store.is_validator() && epoch_store.accumulators_enabled();
179 if !withdraw_scheduler_enabled {
180 return None;
181 }
182 let starting_accumulator_version = object_cache_read
183 .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
184 .expect("Accumulator root object must be present if funds accumulator is enabled")
185 .version();
186 let address_funds_withdraw_scheduler = FundsWithdrawScheduler::new(
187 account_funds_read.clone(),
188 starting_accumulator_version,
189 scheduler_type,
190 address_funds_scheduler_metrics.clone(),
191 );
192
193 Some(address_funds_withdraw_scheduler)
194 }
195
196 #[instrument(level = "debug", skip_all, fields(tx_digest = ?cert.digest()))]
197 async fn schedule_transaction(
198 self,
199 cert: VerifiedExecutableTransaction,
200 execution_env: ExecutionEnv,
201 epoch_store: &Arc<AuthorityPerEpochStore>,
202 ) {
203 let enqueue_time = Instant::now();
204 let tx_digest = cert.digest();
205 let digests = [*tx_digest];
206
207 let tx_data = cert.transaction_data();
208 let input_object_kinds = tx_data
209 .input_objects()
210 .expect("input_objects() cannot fail");
211 let mut input_object_keys: Vec<_> = epoch_store
212 .get_input_object_keys(
213 &cert.key(),
214 &input_object_kinds,
215 &execution_env.assigned_versions,
216 )
217 .into_iter()
218 .collect();
219
220 if tx_data.kind().has_coin_reservations()
225 && let Some(accumulator_version) = execution_env.assigned_versions.accumulator_version
226 && let Some(initial_shared_version) =
227 (**epoch_store.epoch_start_config()).accumulator_root_obj_initial_shared_version()
228 {
229 input_object_keys.push(InputKey::VersionedObject {
230 id: FullObjectID::new(SUI_ACCUMULATOR_ROOT_OBJECT_ID, Some(initial_shared_version)),
231 version: accumulator_version,
232 });
233 }
234
235 let receiving_object_keys: HashSet<_> = tx_data
236 .receiving_objects()
237 .into_iter()
238 .map(|entry| {
239 InputKey::VersionedObject {
240 id: FullObjectID::new(entry.0, None),
242 version: entry.1,
243 }
244 })
245 .collect();
246 let input_and_receiving_keys = [
247 input_object_keys,
248 receiving_object_keys.iter().cloned().collect(),
249 ]
250 .concat();
251
252 let epoch = epoch_store.epoch();
253 debug!(
254 ?tx_digest,
255 "Scheduled transaction, waiting for input objects: {:?}", input_and_receiving_keys,
256 );
257
258 let availability = self
259 .object_cache_read
260 .multi_input_objects_available_cache_only(&input_and_receiving_keys);
261 let missing_input_keys: Vec<_> = input_and_receiving_keys
265 .into_iter()
266 .zip_debug_eq(availability)
267 .filter_map(|(key, available)| if !available { Some(key) } else { None })
268 .collect();
269
270 let has_missing_barrier_dependencies = self
271 .transaction_cache_read
272 .multi_get_executed_effects_digests(&execution_env.barrier_dependencies)
273 .into_iter()
274 .any(|r| r.is_none());
275
276 if missing_input_keys.is_empty() && !has_missing_barrier_dependencies {
277 self.metrics
278 .transaction_manager_num_enqueued_certificates
279 .with_label_values(&["ready"])
280 .inc();
281 debug!(?tx_digest, "Input objects already available");
282 self.send_transaction_for_execution(&cert, execution_env, enqueue_time);
283 return;
284 }
285
286 let _pending_guard = PendingGuard::new(&self, &cert);
287 self.metrics
288 .transaction_manager_num_enqueued_certificates
289 .with_label_values(&["pending"])
290 .inc();
291
292 if !execution_env.barrier_dependencies.is_empty() {
293 debug!(
294 "waiting for barrier dependencies to be executed: {:?}",
295 execution_env.barrier_dependencies
296 );
297 self.transaction_cache_read
298 .notify_read_executed_effects_digests(
299 "wait_for_barrier_dependencies",
300 &execution_env.barrier_dependencies,
301 )
302 .await;
303 }
304
305 tokio::select! {
306 _ = self.object_cache_read
307 .notify_read_input_objects(&missing_input_keys, &receiving_object_keys, epoch)
308 => {
309 self.metrics
310 .transaction_manager_transaction_queue_age_s
311 .observe(enqueue_time.elapsed().as_secs_f64());
312 debug!(?tx_digest, "Input objects available");
313 self.send_transaction_for_execution(
315 &cert,
316 execution_env,
317 enqueue_time,
318 );
319 }
320 _ = self.transaction_cache_read.notify_read_executed_effects_digests(
321 "ExecutionScheduler::notify_read_executed_effects_digests",
322 &digests,
323 ) => {
324 debug!(?tx_digest, "Transaction already executed");
325 }
326 };
327 }
328
329 pub fn send_transaction_for_execution(
330 &self,
331 cert: &VerifiedExecutableTransaction,
332 execution_env: ExecutionEnv,
333 enqueue_time: Instant,
334 ) {
335 let pending_cert = PendingCertificate {
336 certificate: cert.clone(),
337 execution_env,
338 stats: PendingCertificateStats {
339 enqueue_time,
340 ready_time: Some(Instant::now()),
341 },
342 executing_guard: Some(ExecutingGuard::new(
343 self.metrics
344 .transaction_manager_num_executing_certificates
345 .clone(),
346 )),
347 };
348 let _ = self.tx_ready_certificates.send(pending_cert);
349 }
350
351 fn schedule_funds_withdraws(
352 &self,
353 certs: Vec<(VerifiedExecutableTransaction, ExecutionEnv)>,
354 epoch_store: &Arc<AuthorityPerEpochStore>,
355 ) {
356 if certs.is_empty() {
357 return;
358 }
359 let mut withdraws = BTreeMap::new();
360 let mut prev_version = None;
361 for (cert, env) in &certs {
362 let tx_withdraws = cert
363 .transaction_data()
364 .process_funds_withdrawals_for_execution(epoch_store.get_chain_identifier());
365 assert!(!tx_withdraws.is_empty());
366 let accumulator_version = env
367 .assigned_versions
368 .accumulator_version
369 .expect("accumulator_version must be set when there are withdraws");
370 if let Some(prev_version) = prev_version {
371 assert!(prev_version <= accumulator_version);
373 }
374 prev_version = Some(accumulator_version);
375 let tx_digest = *cert.digest();
376 withdraws
377 .entry(accumulator_version)
378 .or_insert(Vec::new())
379 .push(TxFundsWithdraw {
380 tx_digest,
381 reservations: tx_withdraws,
382 });
383 }
384 let mut receivers = FuturesUnordered::new();
385 {
386 let guard = self.address_funds_withdraw_scheduler.lock();
387 let withdraw_scheduler = guard
388 .as_ref()
389 .expect("Funds withdraw scheduler must be enabled if there are withdraws");
390 for (version, tx_withdraws) in withdraws {
391 receivers.extend(withdraw_scheduler.schedule_withdraws(WithdrawReservations {
392 accumulator_version: version,
393 withdraws: tx_withdraws,
394 }));
395 }
396 }
398 let scheduler = self.clone();
399 let epoch_store = epoch_store.clone();
400 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
401 let mut cert_map = HashMap::new();
402 for (cert, env) in certs {
403 cert_map.insert(*cert.digest(), (cert, env));
404 }
405 while let Some(result) = receivers.next().await {
406 match result {
407 Ok((tx_digest, status)) => match status {
408 ScheduleStatus::InsufficientFunds => {
409 assert_reachable!("tx cancelled, insufficient funds");
410 debug!(
411 ?tx_digest,
412 "Funds withdraw scheduling result: Insufficient funds"
413 );
414 let (cert, env) = cert_map.remove(&tx_digest).expect("cert must exist");
415 let env = env.with_insufficient_funds();
416 scheduler.enqueue_transactions(vec![(cert, env)], &epoch_store);
417 }
418 ScheduleStatus::SufficientFunds => {
419 assert_reachable!("tx scheduled, sufficient funds");
420 debug!(?tx_digest, "Funds withdraw scheduling result: Success");
421 let (cert, env) = cert_map.remove(&tx_digest).expect("cert must exist");
422 scheduler.enqueue_transactions(vec![(cert, env)], &epoch_store);
423 }
424 ScheduleStatus::SkipSchedule => {
425 assert_reachable!("tx withdrawal scheduling skipped");
426 debug!(?tx_digest, "Skip scheduling funds withdraw");
427 }
428 },
429 Err(e) => {
430 error!("Withdraw scheduler stopped: {:?}", e);
431 }
432 }
433 }
434 }));
435 }
436
437 fn schedule_tx_keys(
438 &self,
439 tx_with_keys: Vec<(TransactionKey, ExecutionEnv)>,
440 epoch_store: &Arc<AuthorityPerEpochStore>,
441 ) {
442 if tx_with_keys.is_empty() {
443 return;
444 }
445
446 let scheduler = self.clone();
447 let epoch_store = epoch_store.clone();
448 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
449 let tx_keys: Vec<_> = tx_with_keys.iter().map(|(key, _)| key).cloned().collect();
450 let digests = epoch_store
451 .notify_read_tx_key_to_digest(&tx_keys)
452 .await
453 .expect("db error");
454 let transactions = scheduler
455 .transaction_cache_read
456 .multi_get_transaction_blocks(&digests)
457 .into_iter()
458 .map(|tx| {
459 let tx = tx.expect("tx must exist").as_ref().clone();
460 VerifiedExecutableTransaction::new_system(tx, epoch_store.epoch())
461 })
462 .zip_debug_eq(tx_with_keys.into_iter().map(|(_, env)| env))
463 .collect::<Vec<_>>();
464 scheduler.enqueue_transactions(transactions, &epoch_store);
465 }));
466 }
467
468 #[cfg(debug_assertions)]
471 fn assert_cert_not_executed_previous_epochs(&self, cert: &VerifiedExecutableTransaction) {
472 let epoch = cert.epoch();
473 let digest = *cert.digest();
474 let digests = [digest];
475 let executed = self
476 .transaction_cache_read
477 .multi_get_executed_effects(&digests)
478 .pop()
479 .unwrap();
480 if let Some(executed) = executed {
483 use sui_types::effects::TransactionEffectsAPI;
484
485 assert_eq!(
486 executed.executed_epoch(),
487 epoch,
488 "Transaction {} was executed in epoch {}, but scheduled again in epoch {}",
489 digest,
490 executed.executed_epoch(),
491 epoch
492 );
493 }
494 }
495}
496
497impl ExecutionScheduler {
498 pub fn enqueue(
499 &self,
500 certs: Vec<(Schedulable, ExecutionEnv)>,
501 epoch_store: &Arc<AuthorityPerEpochStore>,
502 ) {
503 let mut ordinary_txns = Vec::with_capacity(certs.len());
505 let mut tx_with_keys = Vec::new();
506 let mut tx_with_withdraws = Vec::new();
507
508 for (schedulable, env) in certs {
509 match schedulable {
510 Schedulable::Transaction(tx) => {
511 if tx.transaction_data().has_funds_withdrawals() {
512 tx_with_withdraws.push((tx, env));
513 } else {
514 ordinary_txns.push((tx, env));
515 }
516 }
517 s @ Schedulable::RandomnessStateUpdate(..) => {
518 tx_with_keys.push((s.key(), env));
519 }
520 Schedulable::AccumulatorSettlement(_, _) => {
521 unreachable!("handled by SettlementScheduler");
522 }
523 Schedulable::ConsensusCommitPrologue(_, _, _) => {
524 unreachable!("Schedulable::ConsensusCommitPrologue should not be enqueued");
528 }
529 }
530 }
531
532 self.enqueue_transactions(ordinary_txns, epoch_store);
533 self.schedule_tx_keys(tx_with_keys, epoch_store);
534 self.schedule_funds_withdraws(tx_with_withdraws, epoch_store);
535 }
536
537 pub fn enqueue_transactions(
538 &self,
539 certs: Vec<(VerifiedExecutableTransaction, ExecutionEnv)>,
540 epoch_store: &Arc<AuthorityPerEpochStore>,
541 ) {
542 let certs: Vec<_> = certs
544 .into_iter()
545 .filter_map(|cert| {
546 if cert.0.epoch() == epoch_store.epoch() {
547 #[cfg(debug_assertions)]
548 self.assert_cert_not_executed_previous_epochs(&cert.0);
549
550 Some(cert)
551 } else {
552 debug_fatal!(
553 "We should never enqueue certificate from wrong epoch. Expected={} Certificate={:?}",
554 epoch_store.epoch(),
555 cert.0.epoch()
556 );
557 None
558 }
559 })
560 .collect();
561 let digests: Vec<_> = certs.iter().map(|(cert, _)| *cert.digest()).collect();
562 let executed = self
563 .transaction_cache_read
564 .multi_get_executed_effects_digests(&digests);
565 let mut already_executed_certs_num = 0;
566 let pending_certs = certs.into_iter().zip_debug_eq(executed).filter_map(
567 |((cert, execution_env), executed)| {
568 if executed.is_none() {
569 Some((cert, execution_env))
570 } else {
571 already_executed_certs_num += 1;
572 None
573 }
574 },
575 );
576
577 for (cert, execution_env) in pending_certs {
578 let scheduler = self.clone();
579 let epoch_store = epoch_store.clone();
580 spawn_monitored_task!(
581 epoch_store.within_alive_epoch(scheduler.schedule_transaction(
582 cert,
583 execution_env,
584 &epoch_store,
585 ))
586 );
587 }
588
589 self.metrics
590 .transaction_manager_num_enqueued_certificates
591 .with_label_values(&["already_executed"])
592 .inc_by(already_executed_certs_num);
593 }
594
595 pub fn settle_address_funds(&self, settlement: FundsSettlement) {
596 self.address_funds_withdraw_scheduler
597 .lock()
598 .as_ref()
599 .expect("Funds withdraw scheduler must be enabled if there are settlements")
600 .settle_funds(settlement);
601 }
602
603 pub fn reconfigure(
606 &self,
607 new_epoch_store: &Arc<AuthorityPerEpochStore>,
608 account_funds_read: &Arc<dyn AccountFundsRead>,
609 ) {
610 let address_funds_withdraw_scheduler = Self::initialize_funds_withdraw_scheduler(
611 new_epoch_store,
612 &self.object_cache_read,
613 account_funds_read.clone(),
614 self.funds_withdraw_scheduler_type,
615 &self.address_funds_scheduler_metrics,
616 );
617 let mut guard = self.address_funds_withdraw_scheduler.lock();
618 if let Some(old_scheduler) = guard.as_ref() {
619 old_scheduler.close_epoch();
620 }
621 *guard = address_funds_withdraw_scheduler;
622 drop(guard);
623 }
624
625 pub fn check_execution_overload(
626 &self,
627 overload_config: &AuthorityOverloadConfig,
628 tx_data: &SenderSignedData,
629 ) -> SuiResult {
630 let inflight_queue_len = self.num_pending_certificates();
631 self.overload_tracker
632 .check_execution_overload(overload_config, tx_data, inflight_queue_len)
633 }
634
635 pub fn num_pending_certificates(&self) -> usize {
636 (self
637 .metrics
638 .transaction_manager_num_pending_certificates
639 .get()
640 + self
641 .metrics
642 .transaction_manager_num_executing_certificates
643 .get()) as usize
644 }
645
646 #[cfg(test)]
647 pub async fn check_empty_for_testing(&self) {
648 for _ in 0..10 {
649 if self.num_pending_certificates() == 0 {
650 return;
651 }
652 tokio::task::yield_now().await;
653 }
654 assert_eq!(self.num_pending_certificates(), 0);
655 }
656}
657
658#[cfg(test)]
659mod test {
660 use super::{
661 BarrierDependencyBuilder, ExecutionScheduler, FundsWithdrawSchedulerType,
662 PendingCertificate,
663 };
664 use crate::authority::ExecutionEnv;
665 use crate::authority::shared_object_version_manager::AssignedVersions;
666 use crate::authority::{AuthorityState, authority_tests::init_state_with_objects};
667 use std::collections::BTreeSet;
668 use std::{time::Duration, vec};
669 use sui_test_transaction_builder::TestTransactionBuilder;
670 use sui_types::base_types::{SuiAddress, random_object_ref};
671 use sui_types::executable_transaction::VerifiedExecutableTransaction;
672 use sui_types::object::Owner;
673 use sui_types::programmable_transaction_builder::ProgrammableTransactionBuilder;
674 use sui_types::transaction::{
675 SharedObjectMutability, Transaction, TransactionData, TransactionKind, VerifiedTransaction,
676 };
677 use sui_types::{
678 SUI_FRAMEWORK_PACKAGE_ID,
679 base_types::{ObjectID, SequenceNumber},
680 crypto::deterministic_random_account_key,
681 object::Object,
682 transaction::{CallArg, ObjectArg},
683 };
684 use tokio::time::Instant;
685 use tokio::{
686 sync::mpsc::{UnboundedReceiver, error::TryRecvError, unbounded_channel},
687 time::sleep,
688 };
689
690 #[allow(clippy::disallowed_methods)] fn make_execution_scheduler(
692 state: &AuthorityState,
693 ) -> (ExecutionScheduler, UnboundedReceiver<PendingCertificate>) {
694 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
697 let registry = prometheus::Registry::new();
698 let execution_scheduler = ExecutionScheduler::new(
699 state.get_object_cache_reader().clone(),
700 state.get_account_funds_read().clone(),
701 state.get_transaction_cache_reader().clone(),
702 tx_ready_certificates,
703 &state.epoch_store_for_testing(),
704 FundsWithdrawSchedulerType::default(),
705 state.metrics.clone(),
706 ®istry,
707 );
708
709 (execution_scheduler, rx_ready_certificates)
710 }
711
712 fn make_transaction(gas_object: Object, input: Vec<CallArg>) -> VerifiedExecutableTransaction {
713 let rgp = 100;
716 let (sender, keypair) = deterministic_random_account_key();
717 let transaction =
718 TestTransactionBuilder::new(sender, gas_object.compute_object_reference(), rgp)
719 .move_call(SUI_FRAMEWORK_PACKAGE_ID, "counter", "assert_value", input)
720 .build_and_sign(&keypair);
721 VerifiedExecutableTransaction::new_system(
722 VerifiedTransaction::new_unchecked(transaction),
723 0,
724 )
725 }
726
727 #[tokio::test(flavor = "current_thread", start_paused = true)]
728 async fn execution_scheduler_basics() {
729 let (owner, _keypair) = deterministic_random_account_key();
731 let gas_objects: Vec<Object> = (0..10)
732 .map(|_| {
733 let gas_object_id = ObjectID::random();
734 Object::with_id_owner_for_testing(gas_object_id, owner)
735 })
736 .collect();
737 let state = init_state_with_objects(gas_objects.clone()).await;
738
739 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
742 assert!(
744 rx_ready_certificates
745 .try_recv()
746 .is_err_and(|err| err == TryRecvError::Empty)
747 );
748 assert_eq!(execution_scheduler.num_pending_certificates(), 0);
750
751 execution_scheduler.enqueue_transactions(vec![], &state.epoch_store_for_testing());
753 assert!(
755 rx_ready_certificates
756 .try_recv()
757 .is_err_and(|err| err == TryRecvError::Empty)
758 );
759
760 let transaction = make_transaction(gas_objects[0].clone(), vec![]);
762 let tx_start_time = Instant::now();
763 execution_scheduler.enqueue_transactions(
764 vec![(transaction.clone(), ExecutionEnv::new())],
765 &state.epoch_store_for_testing(),
766 );
767 let pending_certificate = rx_ready_certificates.recv().await.unwrap();
769
770 assert!(pending_certificate.stats.enqueue_time >= tx_start_time);
772 assert!(
773 pending_certificate.stats.ready_time.unwrap() >= pending_certificate.stats.enqueue_time
774 );
775
776 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
777
778 drop(pending_certificate);
780
781 execution_scheduler.check_empty_for_testing().await;
783
784 let gas_object_new = Object::with_id_owner_version_for_testing(
786 ObjectID::random(),
787 0.into(),
788 Owner::AddressOwner(owner),
789 );
790 let transaction = make_transaction(gas_object_new.clone(), vec![]);
791 let tx_start_time = Instant::now();
792 execution_scheduler.enqueue_transactions(
793 vec![(transaction.clone(), ExecutionEnv::new())],
794 &state.epoch_store_for_testing(),
795 );
796 sleep(Duration::from_secs(1)).await;
798 assert!(
799 rx_ready_certificates
800 .try_recv()
801 .is_err_and(|err| err == TryRecvError::Empty)
802 );
803
804 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
805
806 execution_scheduler.enqueue_transactions(
808 vec![(transaction.clone(), ExecutionEnv::new())],
809 &state.epoch_store_for_testing(),
810 );
811 sleep(Duration::from_secs(1)).await;
812 assert!(
813 rx_ready_certificates
814 .try_recv()
815 .is_err_and(|err| err == TryRecvError::Empty)
816 );
817
818 assert_eq!(execution_scheduler.num_pending_certificates(), 2);
819
820 state
822 .get_cache_writer()
823 .write_object_entry_for_test(gas_object_new);
824 let pending_certificate = rx_ready_certificates.recv().await.unwrap();
827 let pending_certificate2 = rx_ready_certificates.recv().await.unwrap();
828 assert_eq!(
829 pending_certificate.certificate.digest(),
830 pending_certificate2.certificate.digest()
831 );
832
833 assert!(pending_certificate.stats.enqueue_time >= tx_start_time);
836 assert!(
837 pending_certificate.stats.ready_time.unwrap() - pending_certificate.stats.enqueue_time
838 >= Duration::from_secs(2)
839 );
840
841 drop(pending_certificate);
843 drop(pending_certificate2);
844
845 execution_scheduler.check_empty_for_testing().await;
847 }
848
849 #[tokio::test(flavor = "current_thread", start_paused = true)]
858 async fn execution_scheduler_object_dependency() {
859 telemetry_subscribers::init_for_testing();
860 let (owner, _keypair) = deterministic_random_account_key();
862 let gas_objects: Vec<Object> = (0..10)
863 .map(|_| {
864 let gas_object_id = ObjectID::random();
865 Object::with_id_owner_for_testing(gas_object_id, owner)
866 })
867 .collect();
868 let shared_object = Object::shared_for_testing();
869 let initial_shared_version = shared_object.owner().start_version().unwrap();
870 let shared_object_2 = Object::shared_for_testing();
871 let initial_shared_version_2 = shared_object_2.owner().start_version().unwrap();
872
873 let state = init_state_with_objects(
874 [
875 gas_objects.clone(),
876 vec![shared_object.clone(), shared_object_2.clone()],
877 ]
878 .concat(),
879 )
880 .await;
881
882 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
885 assert!(rx_ready_certificates.try_recv().is_err());
887
888 let shared_version = 1000.into();
890 let shared_object_arg_read = ObjectArg::SharedObject {
891 id: shared_object.id(),
892 initial_shared_version,
893 mutability: SharedObjectMutability::Immutable,
894 };
895 let transaction_read_0 = make_transaction(
896 gas_objects[0].clone(),
897 vec![CallArg::Object(shared_object_arg_read)],
898 );
899 let transaction_read_1 = make_transaction(
900 gas_objects[1].clone(),
901 vec![CallArg::Object(shared_object_arg_read)],
902 );
903 let tx_read_0_assigned_versions = vec![(
904 (
905 shared_object.id(),
906 shared_object.owner().start_version().unwrap(),
907 ),
908 shared_version,
909 )];
910 let tx_read_1_assigned_versions = vec![(
911 (
912 shared_object.id(),
913 shared_object.owner().start_version().unwrap(),
914 ),
915 shared_version,
916 )];
917
918 let shared_object_arg_default = ObjectArg::SharedObject {
920 id: shared_object.id(),
921 initial_shared_version,
922 mutability: SharedObjectMutability::Mutable,
923 };
924 let transaction_default = make_transaction(
925 gas_objects[2].clone(),
926 vec![CallArg::Object(shared_object_arg_default)],
927 );
928 let tx_default_assigned_versions = vec![(
929 (
930 shared_object.id(),
931 shared_object.owner().start_version().unwrap(),
932 ),
933 shared_version,
934 )];
935
936 let shared_version_2 = 1000.into();
938 let shared_object_arg_read_2 = ObjectArg::SharedObject {
939 id: shared_object_2.id(),
940 initial_shared_version: initial_shared_version_2,
941 mutability: SharedObjectMutability::Immutable,
942 };
943 let transaction_read_2 = make_transaction(
944 gas_objects[3].clone(),
945 vec![
946 CallArg::Object(shared_object_arg_default),
947 CallArg::Object(shared_object_arg_read_2),
948 ],
949 );
950 let tx_read_2_assigned_versions = vec![
951 (
952 (
953 shared_object.id(),
954 shared_object.owner().start_version().unwrap(),
955 ),
956 shared_version,
957 ),
958 (
959 (
960 shared_object_2.id(),
961 shared_object_2.owner().start_version().unwrap(),
962 ),
963 shared_version_2,
964 ),
965 ];
966
967 execution_scheduler.enqueue_transactions(
968 vec![
969 (
970 transaction_read_0.clone(),
971 ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
972 tx_read_0_assigned_versions,
973 None,
974 )),
975 ),
976 (
977 transaction_read_1.clone(),
978 ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
979 tx_read_1_assigned_versions,
980 None,
981 )),
982 ),
983 (
984 transaction_default.clone(),
985 ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
986 tx_default_assigned_versions,
987 None,
988 )),
989 ),
990 (
991 transaction_read_2.clone(),
992 ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
993 tx_read_2_assigned_versions,
994 None,
995 )),
996 ),
997 ],
998 &state.epoch_store_for_testing(),
999 );
1000
1001 sleep(Duration::from_secs(1)).await;
1003 assert!(rx_ready_certificates.try_recv().is_err());
1004
1005 assert_eq!(execution_scheduler.num_pending_certificates(), 4);
1006
1007 let mut new_shared_object = shared_object.clone();
1009 new_shared_object
1010 .data
1011 .try_as_move_mut()
1012 .unwrap()
1013 .increment_version_to(shared_version_2);
1014 state
1015 .get_cache_writer()
1016 .write_object_entry_for_test(new_shared_object);
1017
1018 let tx_0 = rx_ready_certificates.recv().await.unwrap().certificate;
1020 let tx_1 = rx_ready_certificates.recv().await.unwrap().certificate;
1021 let tx_2 = rx_ready_certificates.recv().await.unwrap().certificate;
1022 {
1023 let mut want_digests = vec![
1024 transaction_read_0.digest(),
1025 transaction_read_1.digest(),
1026 transaction_default.digest(),
1027 ];
1028 want_digests.sort();
1029 let mut got_digests = vec![tx_0.digest(), tx_1.digest(), tx_2.digest()];
1030 got_digests.sort();
1031 assert_eq!(want_digests, got_digests);
1032 }
1033
1034 sleep(Duration::from_secs(1)).await;
1035 assert!(rx_ready_certificates.try_recv().is_err());
1036
1037 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1038
1039 let mut new_shared_object_2 = shared_object_2.clone();
1041 new_shared_object_2
1042 .data
1043 .try_as_move_mut()
1044 .unwrap()
1045 .increment_version_to(shared_version_2);
1046 state
1047 .get_cache_writer()
1048 .write_object_entry_for_test(new_shared_object_2);
1049
1050 let tx_3 = rx_ready_certificates.recv().await.unwrap().certificate;
1052 assert_eq!(transaction_read_2.digest(), tx_3.digest());
1053
1054 sleep(Duration::from_secs(1)).await;
1055 assert!(rx_ready_certificates.try_recv().is_err());
1056
1057 execution_scheduler.check_empty_for_testing().await;
1058 }
1059
1060 #[tokio::test(flavor = "current_thread", start_paused = true)]
1061 async fn execution_scheduler_receiving_notify_commit() {
1062 telemetry_subscribers::init_for_testing();
1063 let (owner, _keypair) = deterministic_random_account_key();
1065 let gas_objects: Vec<Object> = (0..10)
1066 .map(|_| {
1067 let gas_object_id = ObjectID::random();
1068 Object::with_id_owner_for_testing(gas_object_id, owner)
1069 })
1070 .collect();
1071 let state = init_state_with_objects(gas_objects.clone()).await;
1072
1073 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1076 assert!(rx_ready_certificates.try_recv().is_err());
1078 execution_scheduler.check_empty_for_testing().await;
1080
1081 let obj_id = ObjectID::random();
1082 let object_arguments: Vec<_> = (0..10)
1083 .map(|i| {
1084 let object = Object::with_id_owner_version_for_testing(
1085 obj_id,
1086 i.into(),
1087 Owner::AddressOwner(owner),
1088 );
1089 let object_arg = if i % 2 == 0 || i == 3 {
1096 ObjectArg::Receiving(object.compute_object_reference())
1097 } else {
1098 ObjectArg::ImmOrOwnedObject(object.compute_object_reference())
1099 };
1100 let txn =
1101 make_transaction(gas_objects[0].clone(), vec![CallArg::Object(object_arg)]);
1102 (object, txn)
1103 })
1104 .collect();
1105
1106 for (i, (_, txn)) in object_arguments.iter().enumerate() {
1107 execution_scheduler.enqueue_transactions(
1110 vec![(txn.clone(), ExecutionEnv::new())],
1111 &state.epoch_store_for_testing(),
1112 );
1113 sleep(Duration::from_secs(1)).await;
1114 assert!(rx_ready_certificates.try_recv().is_err());
1115 assert_eq!(execution_scheduler.num_pending_certificates(), i + 1);
1116 }
1117
1118 let len = object_arguments.len();
1121 for (i, (object, txn)) in object_arguments.into_iter().enumerate() {
1122 state
1125 .get_cache_writer()
1126 .write_object_entry_for_test(object.clone());
1127
1128 rx_ready_certificates.recv().await.unwrap();
1131
1132 sleep(Duration::from_secs(1)).await;
1135 assert!(rx_ready_certificates.try_recv().is_err());
1136
1137 drop(txn);
1139
1140 assert_eq!(execution_scheduler.num_pending_certificates(), len - i - 1);
1143 }
1144
1145 execution_scheduler.check_empty_for_testing().await;
1147 }
1148
1149 #[tokio::test(flavor = "current_thread", start_paused = true)]
1150 async fn execution_scheduler_receiving_object_ready_notifications() {
1151 telemetry_subscribers::init_for_testing();
1152 let (owner, _keypair) = deterministic_random_account_key();
1154 let gas_objects: Vec<Object> = (0..10)
1155 .map(|_| {
1156 let gas_object_id = ObjectID::random();
1157 Object::with_id_owner_for_testing(gas_object_id, owner)
1158 })
1159 .collect();
1160 let state = init_state_with_objects(gas_objects.clone()).await;
1161
1162 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1165 assert!(rx_ready_certificates.try_recv().is_err());
1167 execution_scheduler.check_empty_for_testing().await;
1169
1170 let obj_id = ObjectID::random();
1171 let receiving_object_new0 =
1172 Object::with_id_owner_version_for_testing(obj_id, 0.into(), Owner::AddressOwner(owner));
1173 let receiving_object_new1 =
1174 Object::with_id_owner_version_for_testing(obj_id, 1.into(), Owner::AddressOwner(owner));
1175 let receiving_object_arg0 =
1176 ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1177 let receive_object_transaction0 = make_transaction(
1178 gas_objects[0].clone(),
1179 vec![CallArg::Object(receiving_object_arg0)],
1180 );
1181
1182 let receiving_object_arg1 =
1183 ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1184 let receive_object_transaction1 = make_transaction(
1185 gas_objects[0].clone(),
1186 vec![CallArg::Object(receiving_object_arg1)],
1187 );
1188
1189 execution_scheduler.enqueue_transactions(
1191 vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1192 &state.epoch_store_for_testing(),
1193 );
1194 sleep(Duration::from_secs(1)).await;
1195 assert!(rx_ready_certificates.try_recv().is_err());
1196 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1197
1198 execution_scheduler.enqueue_transactions(
1200 vec![(receive_object_transaction1.clone(), ExecutionEnv::new())],
1201 &state.epoch_store_for_testing(),
1202 );
1203 sleep(Duration::from_secs(1)).await;
1204 assert!(rx_ready_certificates.try_recv().is_err());
1205 assert_eq!(execution_scheduler.num_pending_certificates(), 2);
1206
1207 execution_scheduler.enqueue_transactions(
1209 vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1210 &state.epoch_store_for_testing(),
1211 );
1212 sleep(Duration::from_secs(1)).await;
1213 assert!(rx_ready_certificates.try_recv().is_err());
1214 assert_eq!(execution_scheduler.num_pending_certificates(), 3);
1215
1216 state
1218 .get_cache_writer()
1219 .write_object_entry_for_test(receiving_object_new0.clone());
1220
1221 rx_ready_certificates.recv().await.unwrap();
1224
1225 state
1227 .get_cache_writer()
1228 .write_object_entry_for_test(receiving_object_new1.clone());
1229
1230 rx_ready_certificates.recv().await.unwrap();
1233 }
1234
1235 #[tokio::test(flavor = "current_thread", start_paused = true)]
1236 async fn execution_scheduler_receiving_object_ready_notifications_multiple_of_same_receiving() {
1237 telemetry_subscribers::init_for_testing();
1238 let (owner, _keypair) = deterministic_random_account_key();
1240 let gas_objects: Vec<Object> = (0..10)
1241 .map(|_| {
1242 let gas_object_id = ObjectID::random();
1243 Object::with_id_owner_for_testing(gas_object_id, owner)
1244 })
1245 .collect();
1246 let state = init_state_with_objects(gas_objects.clone()).await;
1247
1248 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1251 assert!(rx_ready_certificates.try_recv().is_err());
1253 execution_scheduler.check_empty_for_testing().await;
1255
1256 let obj_id = ObjectID::random();
1257 let receiving_object_new0 =
1258 Object::with_id_owner_version_for_testing(obj_id, 0.into(), Owner::AddressOwner(owner));
1259 let receiving_object_new1 =
1260 Object::with_id_owner_version_for_testing(obj_id, 1.into(), Owner::AddressOwner(owner));
1261 let receiving_object_arg0 =
1262 ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1263 let receive_object_transaction0 = make_transaction(
1264 gas_objects[0].clone(),
1265 vec![CallArg::Object(receiving_object_arg0)],
1266 );
1267
1268 let receive_object_transaction01 = make_transaction(
1269 gas_objects[1].clone(),
1270 vec![CallArg::Object(receiving_object_arg0)],
1271 );
1272
1273 let receiving_object_arg1 =
1274 ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1275 let receive_object_transaction1 = make_transaction(
1276 gas_objects[0].clone(),
1277 vec![CallArg::Object(receiving_object_arg1)],
1278 );
1279
1280 let gas_receiving_arg = ObjectArg::Receiving(gas_objects[3].compute_object_reference());
1283 let tx1 = make_transaction(
1284 gas_objects[0].clone(),
1285 vec![CallArg::Object(gas_receiving_arg)],
1286 );
1287
1288 execution_scheduler.enqueue_transactions(
1290 vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1291 &state.epoch_store_for_testing(),
1292 );
1293 sleep(Duration::from_secs(1)).await;
1294 assert!(rx_ready_certificates.try_recv().is_err());
1295 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1296
1297 execution_scheduler.enqueue_transactions(
1299 vec![(receive_object_transaction1.clone(), ExecutionEnv::new())],
1300 &state.epoch_store_for_testing(),
1301 );
1302 sleep(Duration::from_secs(1)).await;
1303 assert!(rx_ready_certificates.try_recv().is_err());
1304 assert_eq!(execution_scheduler.num_pending_certificates(), 2);
1305
1306 execution_scheduler.enqueue_transactions(
1309 vec![(receive_object_transaction01.clone(), ExecutionEnv::new())],
1310 &state.epoch_store_for_testing(),
1311 );
1312 sleep(Duration::from_secs(1)).await;
1313 assert!(rx_ready_certificates.try_recv().is_err());
1314 assert_eq!(execution_scheduler.num_pending_certificates(), 3);
1315
1316 state
1318 .get_cache_writer()
1319 .write_object_entry_for_test(receiving_object_new0.clone());
1320
1321 rx_ready_certificates.recv().await.unwrap();
1324
1325 rx_ready_certificates.recv().await.unwrap();
1326
1327 assert!(rx_ready_certificates.try_recv().is_err());
1329
1330 execution_scheduler.enqueue_transactions(
1333 vec![(tx1.clone(), ExecutionEnv::new())],
1334 &state.epoch_store_for_testing(),
1335 );
1336 sleep(Duration::from_secs(1)).await;
1337 rx_ready_certificates.recv().await.unwrap();
1338
1339 state
1341 .get_cache_writer()
1342 .write_object_entry_for_test(receiving_object_new1.clone());
1343
1344 rx_ready_certificates.recv().await.unwrap();
1347 }
1348
1349 #[tokio::test(flavor = "current_thread", start_paused = true)]
1350 async fn execution_scheduler_receiving_object_ready_if_current_version_greater() {
1351 telemetry_subscribers::init_for_testing();
1352 let (owner, _keypair) = deterministic_random_account_key();
1354 let mut gas_objects: Vec<Object> = (0..10)
1355 .map(|_| {
1356 let gas_object_id = ObjectID::random();
1357 Object::with_id_owner_for_testing(gas_object_id, owner)
1358 })
1359 .collect();
1360 let receiving_object = Object::with_id_owner_version_for_testing(
1361 ObjectID::random(),
1362 10.into(),
1363 Owner::AddressOwner(owner),
1364 );
1365 gas_objects.push(receiving_object.clone());
1366 let state = init_state_with_objects(gas_objects.clone()).await;
1367
1368 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1371 assert!(rx_ready_certificates.try_recv().is_err());
1373 execution_scheduler.check_empty_for_testing().await;
1375
1376 let receiving_object_new0 = Object::with_id_owner_version_for_testing(
1377 receiving_object.id(),
1378 0.into(),
1379 Owner::AddressOwner(owner),
1380 );
1381 let receiving_object_new1 = Object::with_id_owner_version_for_testing(
1382 receiving_object.id(),
1383 1.into(),
1384 Owner::AddressOwner(owner),
1385 );
1386 let receiving_object_arg0 =
1387 ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1388 let receive_object_transaction0 = make_transaction(
1389 gas_objects[0].clone(),
1390 vec![CallArg::Object(receiving_object_arg0)],
1391 );
1392
1393 let receive_object_transaction01 = make_transaction(
1394 gas_objects[1].clone(),
1395 vec![CallArg::Object(receiving_object_arg0)],
1396 );
1397
1398 let receiving_object_arg1 =
1399 ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1400 let receive_object_transaction1 = make_transaction(
1401 gas_objects[0].clone(),
1402 vec![CallArg::Object(receiving_object_arg1)],
1403 );
1404
1405 execution_scheduler.enqueue_transactions(
1407 vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1408 &state.epoch_store_for_testing(),
1409 );
1410 execution_scheduler.enqueue_transactions(
1411 vec![(receive_object_transaction01.clone(), ExecutionEnv::new())],
1412 &state.epoch_store_for_testing(),
1413 );
1414 execution_scheduler.enqueue_transactions(
1415 vec![(receive_object_transaction1.clone(), ExecutionEnv::new())],
1416 &state.epoch_store_for_testing(),
1417 );
1418 sleep(Duration::from_secs(1)).await;
1419 rx_ready_certificates.recv().await.unwrap();
1420 rx_ready_certificates.recv().await.unwrap();
1421 rx_ready_certificates.recv().await.unwrap();
1422 assert!(rx_ready_certificates.try_recv().is_err());
1423 }
1424
1425 #[tokio::test(flavor = "current_thread", start_paused = true)]
1428 async fn execution_scheduler_with_cancelled_transactions() {
1429 let (owner, _keypair) = deterministic_random_account_key();
1431 let gas_object = Object::with_id_owner_for_testing(ObjectID::random(), owner);
1432 let shared_object_1 = Object::shared_for_testing();
1433 let initial_shared_version_1 = shared_object_1.owner().start_version().unwrap();
1434 let shared_object_2 = Object::shared_for_testing();
1435 let initial_shared_version_2 = shared_object_2.owner().start_version().unwrap();
1436 let owned_object = Object::with_id_owner_for_testing(ObjectID::random(), owner);
1437
1438 let state = init_state_with_objects(vec![
1439 gas_object.clone(),
1440 shared_object_1.clone(),
1441 shared_object_2.clone(),
1442 owned_object.clone(),
1443 ])
1444 .await;
1445
1446 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1449 assert!(rx_ready_certificates.try_recv().is_err());
1451
1452 let shared_object_arg_1 = ObjectArg::SharedObject {
1454 id: shared_object_1.id(),
1455 initial_shared_version: initial_shared_version_1,
1456 mutability: SharedObjectMutability::Mutable,
1457 };
1458 let shared_object_arg_2 = ObjectArg::SharedObject {
1459 id: shared_object_2.id(),
1460 initial_shared_version: initial_shared_version_2,
1461 mutability: SharedObjectMutability::Mutable,
1462 };
1463
1464 let owned_version = 2000.into();
1466 let mut owned_ref = owned_object.compute_object_reference();
1467 owned_ref.1 = owned_version;
1468 let owned_object_arg = ObjectArg::ImmOrOwnedObject(owned_ref);
1469
1470 let cancelled_transaction = make_transaction(
1471 gas_object.clone(),
1472 vec![
1473 CallArg::Object(shared_object_arg_1),
1474 CallArg::Object(shared_object_arg_2),
1475 CallArg::Object(owned_object_arg),
1476 ],
1477 );
1478 let assigned_versions = vec![
1479 (
1480 (
1481 shared_object_1.id(),
1482 shared_object_1.owner().start_version().unwrap(),
1483 ),
1484 SequenceNumber::CANCELLED_READ,
1485 ),
1486 (
1487 (
1488 shared_object_2.id(),
1489 shared_object_2.owner().start_version().unwrap(),
1490 ),
1491 SequenceNumber::CONGESTED,
1492 ),
1493 ];
1494
1495 execution_scheduler.enqueue_transactions(
1496 vec![(
1497 cancelled_transaction.clone(),
1498 ExecutionEnv::new()
1499 .with_assigned_versions(AssignedVersions::new(assigned_versions, None)),
1500 )],
1501 &state.epoch_store_for_testing(),
1502 );
1503
1504 sleep(Duration::from_secs(1)).await;
1506 assert!(rx_ready_certificates.try_recv().is_err());
1507
1508 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1509
1510 let mut new_owned_object = owned_object.clone();
1512 new_owned_object
1513 .data
1514 .try_as_move_mut()
1515 .unwrap()
1516 .increment_version_to(owned_version);
1517 state
1518 .get_cache_writer()
1519 .write_object_entry_for_test(new_owned_object);
1520
1521 let available_txn = rx_ready_certificates.recv().await.unwrap().certificate;
1523 assert_eq!(available_txn.digest(), cancelled_transaction.digest());
1524
1525 sleep(Duration::from_secs(1)).await;
1526 assert!(rx_ready_certificates.try_recv().is_err());
1527
1528 execution_scheduler.check_empty_for_testing().await;
1529 }
1530
1531 #[test]
1532 fn test_barrier_dependency_builder() {
1533 let make_transaction = |non_exclusive_writes: Vec<u32>, exclusive_writes: Vec<u32>| {
1534 assert!(
1535 non_exclusive_writes
1536 .iter()
1537 .all(|id| !exclusive_writes.contains(id))
1538 );
1539 assert!(
1540 exclusive_writes
1541 .iter()
1542 .all(|id| !non_exclusive_writes.contains(id))
1543 );
1544
1545 let non_exclusive_writes = non_exclusive_writes
1546 .into_iter()
1547 .map(|id| ObjectID::from_single_byte(id as u8));
1548 let exclusive_writes = exclusive_writes
1549 .into_iter()
1550 .map(|id| ObjectID::from_single_byte(id as u8));
1551 let mut builder = ProgrammableTransactionBuilder::new();
1552 for non_exclusive_write in non_exclusive_writes {
1553 builder
1554 .obj(ObjectArg::SharedObject {
1555 id: non_exclusive_write,
1556 initial_shared_version: SequenceNumber::new(),
1557 mutability: SharedObjectMutability::NonExclusiveWrite,
1558 })
1559 .unwrap();
1560 }
1561
1562 for exclusive_write in exclusive_writes {
1563 builder
1564 .obj(ObjectArg::SharedObject {
1565 id: exclusive_write,
1566 initial_shared_version: SequenceNumber::new(),
1567 mutability: SharedObjectMutability::Mutable,
1568 })
1569 .unwrap();
1570 }
1571
1572 let tx = TransactionKind::ProgrammableTransaction(builder.finish());
1573 let tx_data =
1574 TransactionData::new(tx, SuiAddress::default(), random_object_ref(), 1, 1);
1575 Transaction::from_data_and_signer(tx_data, vec![])
1576 };
1577
1578 {
1580 let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1581 let tx1 = make_transaction(vec![1], vec![]);
1582 let tx2 = make_transaction(vec![], vec![1]);
1583
1584 let tx1_deps =
1585 barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1586 let tx2_deps =
1587 barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1588 assert!(tx1_deps.is_empty());
1589 assert_eq!(Vec::from_iter(tx2_deps), vec![*tx1.digest()]);
1590 }
1591
1592 {
1595 let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1596 let tx1 = make_transaction(vec![1, 2], vec![]);
1597 let tx2 = make_transaction(vec![], vec![1]);
1598 let tx3 = make_transaction(vec![], vec![2]);
1599
1600 let tx1_deps =
1601 barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1602 let tx2_deps =
1603 barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1604 let tx3_deps =
1605 barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1606 assert!(tx1_deps.is_empty());
1607 assert_eq!(Vec::from_iter(tx2_deps), vec![*tx1.digest()]);
1608 assert_eq!(Vec::from_iter(tx3_deps), vec![*tx1.digest()]);
1609 }
1610
1611 {
1613 let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1614 let tx1 = make_transaction(vec![1], vec![]);
1615 let tx2 = make_transaction(vec![2], vec![]);
1616 let tx3 = make_transaction(vec![], vec![1, 2]);
1617
1618 let tx1_deps =
1619 barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1620 let tx2_deps =
1621 barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1622 let tx3_deps =
1623 barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1624 assert!(tx1_deps.is_empty());
1625 assert!(tx2_deps.is_empty());
1626 assert_eq!(tx3_deps, BTreeSet::from([*tx1.digest(), *tx2.digest()]));
1627 }
1628
1629 {
1631 let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1632 let tx1 = make_transaction(vec![1], vec![]);
1633 let tx2 = make_transaction(vec![], vec![1]);
1634 let tx3 = make_transaction(vec![], vec![1]);
1635
1636 let tx1_deps =
1637 barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1638 let tx2_deps =
1639 barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1640 let tx3_deps =
1641 barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1642 assert!(tx1_deps.is_empty());
1643 assert_eq!(tx2_deps, BTreeSet::from([*tx1.digest()]));
1644 assert!(tx3_deps.is_empty());
1645 }
1646 }
1647}