1use crate::{
5 accumulators::funds_read::AccountFundsRead,
6 authority::{
7 AuthorityMetrics, ExecutionEnv, authority_per_epoch_store::AuthorityPerEpochStore,
8 shared_object_version_manager::Schedulable,
9 },
10 execution_cache::{ObjectCacheRead, TransactionCacheRead},
11 execution_scheduler::{
12 ExecutingGuard, PendingCertificateStats,
13 funds_withdraw_scheduler::{
14 AddressFundsSchedulerMetrics, FundsSettlement, ScheduleStatus, TxFundsWithdraw,
15 WithdrawReservations, scheduler::FundsWithdrawScheduler,
16 },
17 },
18};
19use futures::stream::{FuturesUnordered, StreamExt};
20use mysten_common::ZipDebugEqIteratorExt;
21use mysten_common::{assert_reachable, debug_fatal};
22use mysten_metrics::spawn_monitored_task;
23use parking_lot::Mutex;
24use std::{
25 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
26 sync::Arc,
27};
28use sui_config::node::{AuthorityOverloadConfig, FundsWithdrawSchedulerType};
29use sui_types::{
30 SUI_ACCUMULATOR_ROOT_OBJECT_ID,
31 base_types::{FullObjectID, ObjectID},
32 digests::TransactionDigest,
33 error::SuiResult,
34 executable_transaction::VerifiedExecutableTransaction,
35 storage::InputKey,
36 transaction::{
37 SenderSignedData, SharedInputObject, SharedObjectMutability, TransactionData,
38 TransactionDataAPI, TransactionKey,
39 },
40};
41use tokio::sync::mpsc::UnboundedSender;
42use tokio::time::Instant;
43use tracing::{debug, error, instrument};
44
45use super::{PendingCertificate, overload_tracker::OverloadTracker};
46
47pub(crate) struct BarrierDependencyBuilder {
49 dep_state: BTreeMap<ObjectID, BTreeSet<TransactionDigest>>,
50}
51
52impl BarrierDependencyBuilder {
53 pub fn new() -> Self {
54 Self {
55 dep_state: Default::default(),
56 }
57 }
58
59 pub fn process_tx(
65 &mut self,
66 tx_digest: TransactionDigest,
67 tx: &TransactionData,
68 ) -> BTreeSet<TransactionDigest> {
69 let mut barrier_deps = BTreeSet::new();
70 for SharedInputObject { id, mutability, .. } in tx.kind().shared_input_objects() {
71 match mutability {
72 SharedObjectMutability::NonExclusiveWrite => {
73 self.dep_state.entry(id).or_default().insert(tx_digest);
74 }
75 SharedObjectMutability::Mutable => {
76 if let Some(deps) = self.dep_state.remove(&id) {
79 barrier_deps.extend(deps);
80 }
81 }
82 SharedObjectMutability::Immutable => (),
83 }
84 }
85 barrier_deps
86 }
87}
88
89#[derive(Clone)]
90pub struct ExecutionScheduler {
91 object_cache_read: Arc<dyn ObjectCacheRead>,
92 transaction_cache_read: Arc<dyn TransactionCacheRead>,
93 overload_tracker: Arc<OverloadTracker>,
94 tx_ready_certificates: UnboundedSender<PendingCertificate>,
95 address_funds_withdraw_scheduler: Arc<Mutex<Option<FundsWithdrawScheduler>>>,
96 funds_withdraw_scheduler_type: FundsWithdrawSchedulerType,
97 metrics: Arc<AuthorityMetrics>,
98 address_funds_scheduler_metrics: Arc<AddressFundsSchedulerMetrics>,
99}
100
101struct PendingGuard<'a> {
102 scheduler: &'a ExecutionScheduler,
103 cert: &'a VerifiedExecutableTransaction,
104}
105
106impl<'a> PendingGuard<'a> {
107 pub fn new(scheduler: &'a ExecutionScheduler, cert: &'a VerifiedExecutableTransaction) -> Self {
108 scheduler
109 .metrics
110 .transaction_manager_num_pending_certificates
111 .inc();
112 scheduler
113 .overload_tracker
114 .add_pending_certificate(cert.data());
115 Self { scheduler, cert }
116 }
117}
118
119impl Drop for PendingGuard<'_> {
120 fn drop(&mut self) {
121 self.scheduler
122 .metrics
123 .transaction_manager_num_pending_certificates
124 .dec();
125 self.scheduler
126 .overload_tracker
127 .remove_pending_certificate(self.cert.data());
128 }
129}
130
131impl ExecutionScheduler {
132 pub fn new(
133 object_cache_read: Arc<dyn ObjectCacheRead>,
134 account_funds_read: Arc<dyn AccountFundsRead>,
135 transaction_cache_read: Arc<dyn TransactionCacheRead>,
136 tx_ready_certificates: UnboundedSender<PendingCertificate>,
137 epoch_store: &Arc<AuthorityPerEpochStore>,
138 funds_withdraw_scheduler_type: FundsWithdrawSchedulerType,
139 metrics: Arc<AuthorityMetrics>,
140 prometheus_registry: &prometheus::Registry,
141 ) -> Self {
142 tracing::info!(
143 ?funds_withdraw_scheduler_type,
144 "Creating new ExecutionScheduler"
145 );
146 let address_funds_scheduler_metrics =
147 Arc::new(AddressFundsSchedulerMetrics::new(prometheus_registry));
148 let address_funds_withdraw_scheduler = Self::initialize_funds_withdraw_scheduler(
149 epoch_store,
150 &object_cache_read,
151 account_funds_read,
152 funds_withdraw_scheduler_type,
153 &address_funds_scheduler_metrics,
154 );
155 Self {
156 object_cache_read,
157 transaction_cache_read,
158 overload_tracker: Arc::new(OverloadTracker::new()),
159 tx_ready_certificates,
160 address_funds_withdraw_scheduler: Arc::new(Mutex::new(
161 address_funds_withdraw_scheduler,
162 )),
163 funds_withdraw_scheduler_type,
164 metrics,
165 address_funds_scheduler_metrics,
166 }
167 }
168
169 fn initialize_funds_withdraw_scheduler(
170 epoch_store: &Arc<AuthorityPerEpochStore>,
171 object_cache_read: &Arc<dyn ObjectCacheRead>,
172 account_funds_read: Arc<dyn AccountFundsRead>,
173 scheduler_type: FundsWithdrawSchedulerType,
174 address_funds_scheduler_metrics: &Arc<AddressFundsSchedulerMetrics>,
175 ) -> Option<FundsWithdrawScheduler> {
176 let withdraw_scheduler_enabled =
177 epoch_store.is_validator() && epoch_store.accumulators_enabled();
178 if !withdraw_scheduler_enabled {
179 return None;
180 }
181 let starting_accumulator_version = object_cache_read
182 .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
183 .expect("Accumulator root object must be present if funds accumulator is enabled")
184 .version();
185 let address_funds_withdraw_scheduler = FundsWithdrawScheduler::new(
186 account_funds_read.clone(),
187 starting_accumulator_version,
188 scheduler_type,
189 address_funds_scheduler_metrics.clone(),
190 );
191
192 Some(address_funds_withdraw_scheduler)
193 }
194
195 #[instrument(level = "debug", skip_all, fields(tx_digest = ?cert.digest()))]
196 async fn schedule_transaction(
197 self,
198 cert: VerifiedExecutableTransaction,
199 execution_env: ExecutionEnv,
200 epoch_store: &Arc<AuthorityPerEpochStore>,
201 ) {
202 let enqueue_time = Instant::now();
203 let tx_digest = cert.digest();
204 let digests = [*tx_digest];
205
206 let tx_data = cert.transaction_data();
207 let input_object_kinds = tx_data
208 .input_objects()
209 .expect("input_objects() cannot fail");
210 let input_object_keys: Vec<_> = epoch_store
211 .get_input_object_keys(
212 &cert.key(),
213 &input_object_kinds,
214 &execution_env.assigned_versions,
215 )
216 .into_iter()
217 .collect();
218
219 let receiving_object_keys: HashSet<_> = tx_data
220 .receiving_objects()
221 .into_iter()
222 .map(|entry| {
223 InputKey::VersionedObject {
224 id: FullObjectID::new(entry.0, None),
226 version: entry.1,
227 }
228 })
229 .collect();
230 let input_and_receiving_keys = [
231 input_object_keys,
232 receiving_object_keys.iter().cloned().collect(),
233 ]
234 .concat();
235
236 let epoch = epoch_store.epoch();
237 debug!(
238 ?tx_digest,
239 "Scheduled transaction, waiting for input objects: {:?}", input_and_receiving_keys,
240 );
241
242 let availability = self
243 .object_cache_read
244 .multi_input_objects_available_cache_only(&input_and_receiving_keys);
245 let missing_input_keys: Vec<_> = input_and_receiving_keys
249 .into_iter()
250 .zip_debug_eq(availability)
251 .filter_map(|(key, available)| if !available { Some(key) } else { None })
252 .collect();
253
254 let has_missing_barrier_dependencies = self
255 .transaction_cache_read
256 .multi_get_executed_effects_digests(&execution_env.barrier_dependencies)
257 .into_iter()
258 .any(|r| r.is_none());
259
260 if missing_input_keys.is_empty() && !has_missing_barrier_dependencies {
261 self.metrics
262 .transaction_manager_num_enqueued_certificates
263 .with_label_values(&["ready"])
264 .inc();
265 debug!(?tx_digest, "Input objects already available");
266 self.send_transaction_for_execution(&cert, execution_env, enqueue_time);
267 return;
268 }
269
270 let _pending_guard = PendingGuard::new(&self, &cert);
271 self.metrics
272 .transaction_manager_num_enqueued_certificates
273 .with_label_values(&["pending"])
274 .inc();
275
276 if !execution_env.barrier_dependencies.is_empty() {
277 debug!(
278 "waiting for barrier dependencies to be executed: {:?}",
279 execution_env.barrier_dependencies
280 );
281 self.transaction_cache_read
282 .notify_read_executed_effects_digests(
283 "wait_for_barrier_dependencies",
284 &execution_env.barrier_dependencies,
285 )
286 .await;
287 }
288
289 tokio::select! {
290 _ = self.object_cache_read
291 .notify_read_input_objects(&missing_input_keys, &receiving_object_keys, epoch)
292 => {
293 self.metrics
294 .transaction_manager_transaction_queue_age_s
295 .observe(enqueue_time.elapsed().as_secs_f64());
296 debug!(?tx_digest, "Input objects available");
297 self.send_transaction_for_execution(
299 &cert,
300 execution_env,
301 enqueue_time,
302 );
303 }
304 _ = self.transaction_cache_read.notify_read_executed_effects_digests(
305 "ExecutionScheduler::notify_read_executed_effects_digests",
306 &digests,
307 ) => {
308 debug!(?tx_digest, "Transaction already executed");
309 }
310 };
311 }
312
313 pub fn send_transaction_for_execution(
314 &self,
315 cert: &VerifiedExecutableTransaction,
316 execution_env: ExecutionEnv,
317 enqueue_time: Instant,
318 ) {
319 let pending_cert = PendingCertificate {
320 certificate: cert.clone(),
321 execution_env,
322 stats: PendingCertificateStats {
323 enqueue_time,
324 ready_time: Some(Instant::now()),
325 },
326 executing_guard: Some(ExecutingGuard::new(
327 self.metrics
328 .transaction_manager_num_executing_certificates
329 .clone(),
330 )),
331 };
332 let _ = self.tx_ready_certificates.send(pending_cert);
333 }
334
335 fn schedule_funds_withdraws(
336 &self,
337 certs: Vec<(VerifiedExecutableTransaction, ExecutionEnv)>,
338 epoch_store: &Arc<AuthorityPerEpochStore>,
339 ) {
340 if certs.is_empty() {
341 return;
342 }
343 let mut withdraws = BTreeMap::new();
344 let mut prev_version = None;
345 for (cert, env) in &certs {
346 let tx_withdraws = cert
347 .transaction_data()
348 .process_funds_withdrawals_for_execution(epoch_store.get_chain_identifier());
349 assert!(!tx_withdraws.is_empty());
350 let accumulator_version = env
351 .assigned_versions
352 .accumulator_version
353 .expect("accumulator_version must be set when there are withdraws");
354 if let Some(prev_version) = prev_version {
355 assert!(prev_version <= accumulator_version);
357 }
358 prev_version = Some(accumulator_version);
359 let tx_digest = *cert.digest();
360 withdraws
361 .entry(accumulator_version)
362 .or_insert(Vec::new())
363 .push(TxFundsWithdraw {
364 tx_digest,
365 reservations: tx_withdraws,
366 });
367 }
368 let mut receivers = FuturesUnordered::new();
369 {
370 let guard = self.address_funds_withdraw_scheduler.lock();
371 let withdraw_scheduler = guard
372 .as_ref()
373 .expect("Funds withdraw scheduler must be enabled if there are withdraws");
374 for (version, tx_withdraws) in withdraws {
375 receivers.extend(withdraw_scheduler.schedule_withdraws(WithdrawReservations {
376 accumulator_version: version,
377 withdraws: tx_withdraws,
378 }));
379 }
380 }
382 let scheduler = self.clone();
383 let epoch_store = epoch_store.clone();
384 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
385 let mut cert_map = HashMap::new();
386 for (cert, env) in certs {
387 cert_map.insert(*cert.digest(), (cert, env));
388 }
389 while let Some(result) = receivers.next().await {
390 match result {
391 Ok((tx_digest, status)) => match status {
392 ScheduleStatus::InsufficientFunds => {
393 assert_reachable!("tx cancelled, insufficient funds");
394 debug!(
395 ?tx_digest,
396 "Funds withdraw scheduling result: Insufficient funds"
397 );
398 let (cert, env) = cert_map.remove(&tx_digest).expect("cert must exist");
399 let env = env.with_insufficient_funds();
400 scheduler.enqueue_transactions(vec![(cert, env)], &epoch_store);
401 }
402 ScheduleStatus::SufficientFunds => {
403 assert_reachable!("tx scheduled, sufficient funds");
404 debug!(?tx_digest, "Funds withdraw scheduling result: Success");
405 let (cert, env) = cert_map.remove(&tx_digest).expect("cert must exist");
406 scheduler.enqueue_transactions(vec![(cert, env)], &epoch_store);
407 }
408 ScheduleStatus::SkipSchedule => {
409 assert_reachable!("tx withdrawal scheduling skipped");
410 debug!(?tx_digest, "Skip scheduling funds withdraw");
411 }
412 },
413 Err(e) => {
414 error!("Withdraw scheduler stopped: {:?}", e);
415 }
416 }
417 }
418 }));
419 }
420
421 fn schedule_tx_keys(
422 &self,
423 tx_with_keys: Vec<(TransactionKey, ExecutionEnv)>,
424 epoch_store: &Arc<AuthorityPerEpochStore>,
425 ) {
426 if tx_with_keys.is_empty() {
427 return;
428 }
429
430 let scheduler = self.clone();
431 let epoch_store = epoch_store.clone();
432 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
433 let tx_keys: Vec<_> = tx_with_keys.iter().map(|(key, _)| key).cloned().collect();
434 let digests = epoch_store
435 .notify_read_tx_key_to_digest(&tx_keys)
436 .await
437 .expect("db error");
438 let transactions = scheduler
439 .transaction_cache_read
440 .multi_get_transaction_blocks(&digests)
441 .into_iter()
442 .map(|tx| {
443 let tx = tx.expect("tx must exist").as_ref().clone();
444 VerifiedExecutableTransaction::new_system(tx, epoch_store.epoch())
445 })
446 .zip_debug_eq(tx_with_keys.into_iter().map(|(_, env)| env))
447 .collect::<Vec<_>>();
448 scheduler.enqueue_transactions(transactions, &epoch_store);
449 }));
450 }
451
452 #[cfg(debug_assertions)]
455 fn assert_cert_not_executed_previous_epochs(&self, cert: &VerifiedExecutableTransaction) {
456 let epoch = cert.epoch();
457 let digest = *cert.digest();
458 let digests = [digest];
459 let executed = self
460 .transaction_cache_read
461 .multi_get_executed_effects(&digests)
462 .pop()
463 .unwrap();
464 if let Some(executed) = executed {
467 use sui_types::effects::TransactionEffectsAPI;
468
469 assert_eq!(
470 executed.executed_epoch(),
471 epoch,
472 "Transaction {} was executed in epoch {}, but scheduled again in epoch {}",
473 digest,
474 executed.executed_epoch(),
475 epoch
476 );
477 }
478 }
479}
480
481impl ExecutionScheduler {
482 pub fn enqueue(
483 &self,
484 certs: Vec<(Schedulable, ExecutionEnv)>,
485 epoch_store: &Arc<AuthorityPerEpochStore>,
486 ) {
487 let mut ordinary_txns = Vec::with_capacity(certs.len());
489 let mut tx_with_keys = Vec::new();
490 let mut tx_with_withdraws = Vec::new();
491
492 for (schedulable, env) in certs {
493 match schedulable {
494 Schedulable::Transaction(tx) => {
495 if tx.transaction_data().has_funds_withdrawals() {
496 tx_with_withdraws.push((tx, env));
497 } else {
498 ordinary_txns.push((tx, env));
499 }
500 }
501 s @ Schedulable::RandomnessStateUpdate(..) => {
502 tx_with_keys.push((s.key(), env));
503 }
504 Schedulable::AccumulatorSettlement(_, _) => {
505 unreachable!("handled by SettlementScheduler");
506 }
507 Schedulable::ConsensusCommitPrologue(_, _, _) => {
508 unreachable!("Schedulable::ConsensusCommitPrologue should not be enqueued");
512 }
513 }
514 }
515
516 self.enqueue_transactions(ordinary_txns, epoch_store);
517 self.schedule_tx_keys(tx_with_keys, epoch_store);
518 self.schedule_funds_withdraws(tx_with_withdraws, epoch_store);
519 }
520
521 pub fn enqueue_transactions(
522 &self,
523 certs: Vec<(VerifiedExecutableTransaction, ExecutionEnv)>,
524 epoch_store: &Arc<AuthorityPerEpochStore>,
525 ) {
526 let certs: Vec<_> = certs
528 .into_iter()
529 .filter_map(|cert| {
530 if cert.0.epoch() == epoch_store.epoch() {
531 #[cfg(debug_assertions)]
532 self.assert_cert_not_executed_previous_epochs(&cert.0);
533
534 Some(cert)
535 } else {
536 debug_fatal!(
537 "We should never enqueue certificate from wrong epoch. Expected={} Certificate={:?}",
538 epoch_store.epoch(),
539 cert.0.epoch()
540 );
541 None
542 }
543 })
544 .collect();
545 let digests: Vec<_> = certs.iter().map(|(cert, _)| *cert.digest()).collect();
546 let executed = self
547 .transaction_cache_read
548 .multi_get_executed_effects_digests(&digests);
549 let mut already_executed_certs_num = 0;
550 let pending_certs = certs.into_iter().zip_debug_eq(executed).filter_map(
551 |((cert, execution_env), executed)| {
552 if executed.is_none() {
553 Some((cert, execution_env))
554 } else {
555 already_executed_certs_num += 1;
556 None
557 }
558 },
559 );
560
561 for (cert, execution_env) in pending_certs {
562 let scheduler = self.clone();
563 let epoch_store = epoch_store.clone();
564 spawn_monitored_task!(
565 epoch_store.within_alive_epoch(scheduler.schedule_transaction(
566 cert,
567 execution_env,
568 &epoch_store,
569 ))
570 );
571 }
572
573 self.metrics
574 .transaction_manager_num_enqueued_certificates
575 .with_label_values(&["already_executed"])
576 .inc_by(already_executed_certs_num);
577 }
578
579 pub fn settle_address_funds(&self, settlement: FundsSettlement) {
580 self.address_funds_withdraw_scheduler
581 .lock()
582 .as_ref()
583 .expect("Funds withdraw scheduler must be enabled if there are settlements")
584 .settle_funds(settlement);
585 }
586
587 pub fn reconfigure(
590 &self,
591 new_epoch_store: &Arc<AuthorityPerEpochStore>,
592 account_funds_read: &Arc<dyn AccountFundsRead>,
593 ) {
594 let address_funds_withdraw_scheduler = Self::initialize_funds_withdraw_scheduler(
595 new_epoch_store,
596 &self.object_cache_read,
597 account_funds_read.clone(),
598 self.funds_withdraw_scheduler_type,
599 &self.address_funds_scheduler_metrics,
600 );
601 let mut guard = self.address_funds_withdraw_scheduler.lock();
602 if let Some(old_scheduler) = guard.as_ref() {
603 old_scheduler.close_epoch();
604 }
605 *guard = address_funds_withdraw_scheduler;
606 drop(guard);
607 }
608
609 pub fn check_execution_overload(
610 &self,
611 overload_config: &AuthorityOverloadConfig,
612 tx_data: &SenderSignedData,
613 ) -> SuiResult {
614 let inflight_queue_len = self.num_pending_certificates();
615 self.overload_tracker
616 .check_execution_overload(overload_config, tx_data, inflight_queue_len)
617 }
618
619 pub fn num_pending_certificates(&self) -> usize {
620 (self
621 .metrics
622 .transaction_manager_num_pending_certificates
623 .get()
624 + self
625 .metrics
626 .transaction_manager_num_executing_certificates
627 .get()) as usize
628 }
629
630 #[cfg(test)]
631 pub async fn check_empty_for_testing(&self) {
632 for _ in 0..10 {
633 if self.num_pending_certificates() == 0 {
634 return;
635 }
636 tokio::task::yield_now().await;
637 }
638 assert_eq!(self.num_pending_certificates(), 0);
639 }
640}
641
642#[cfg(test)]
643mod test {
644 use super::{
645 BarrierDependencyBuilder, ExecutionScheduler, FundsWithdrawSchedulerType,
646 PendingCertificate,
647 };
648 use crate::authority::ExecutionEnv;
649 use crate::authority::shared_object_version_manager::AssignedVersions;
650 use crate::authority::{AuthorityState, authority_tests::init_state_with_objects};
651 use std::collections::BTreeSet;
652 use std::{time::Duration, vec};
653 use sui_test_transaction_builder::TestTransactionBuilder;
654 use sui_types::base_types::{SuiAddress, random_object_ref};
655 use sui_types::executable_transaction::VerifiedExecutableTransaction;
656 use sui_types::object::Owner;
657 use sui_types::programmable_transaction_builder::ProgrammableTransactionBuilder;
658 use sui_types::transaction::{
659 SharedObjectMutability, Transaction, TransactionData, TransactionKind, VerifiedTransaction,
660 };
661 use sui_types::{
662 SUI_FRAMEWORK_PACKAGE_ID,
663 base_types::{ObjectID, SequenceNumber},
664 crypto::deterministic_random_account_key,
665 object::Object,
666 transaction::{CallArg, ObjectArg},
667 };
668 use tokio::time::Instant;
669 use tokio::{
670 sync::mpsc::{UnboundedReceiver, error::TryRecvError, unbounded_channel},
671 time::sleep,
672 };
673
674 #[allow(clippy::disallowed_methods)] fn make_execution_scheduler(
676 state: &AuthorityState,
677 ) -> (ExecutionScheduler, UnboundedReceiver<PendingCertificate>) {
678 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
681 let registry = prometheus::Registry::new();
682 let execution_scheduler = ExecutionScheduler::new(
683 state.get_object_cache_reader().clone(),
684 state.get_account_funds_read().clone(),
685 state.get_transaction_cache_reader().clone(),
686 tx_ready_certificates,
687 &state.epoch_store_for_testing(),
688 FundsWithdrawSchedulerType::default(),
689 state.metrics.clone(),
690 ®istry,
691 );
692
693 (execution_scheduler, rx_ready_certificates)
694 }
695
696 fn make_transaction(gas_object: Object, input: Vec<CallArg>) -> VerifiedExecutableTransaction {
697 let rgp = 100;
700 let (sender, keypair) = deterministic_random_account_key();
701 let transaction =
702 TestTransactionBuilder::new(sender, gas_object.compute_object_reference(), rgp)
703 .move_call(SUI_FRAMEWORK_PACKAGE_ID, "counter", "assert_value", input)
704 .build_and_sign(&keypair);
705 VerifiedExecutableTransaction::new_system(
706 VerifiedTransaction::new_unchecked(transaction),
707 0,
708 )
709 }
710
711 #[tokio::test(flavor = "current_thread", start_paused = true)]
712 async fn execution_scheduler_basics() {
713 let (owner, _keypair) = deterministic_random_account_key();
715 let gas_objects: Vec<Object> = (0..10)
716 .map(|_| {
717 let gas_object_id = ObjectID::random();
718 Object::with_id_owner_for_testing(gas_object_id, owner)
719 })
720 .collect();
721 let state = init_state_with_objects(gas_objects.clone()).await;
722
723 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
726 assert!(
728 rx_ready_certificates
729 .try_recv()
730 .is_err_and(|err| err == TryRecvError::Empty)
731 );
732 assert_eq!(execution_scheduler.num_pending_certificates(), 0);
734
735 execution_scheduler.enqueue_transactions(vec![], &state.epoch_store_for_testing());
737 assert!(
739 rx_ready_certificates
740 .try_recv()
741 .is_err_and(|err| err == TryRecvError::Empty)
742 );
743
744 let transaction = make_transaction(gas_objects[0].clone(), vec![]);
746 let tx_start_time = Instant::now();
747 execution_scheduler.enqueue_transactions(
748 vec![(transaction.clone(), ExecutionEnv::new())],
749 &state.epoch_store_for_testing(),
750 );
751 let pending_certificate = rx_ready_certificates.recv().await.unwrap();
753
754 assert!(pending_certificate.stats.enqueue_time >= tx_start_time);
756 assert!(
757 pending_certificate.stats.ready_time.unwrap() >= pending_certificate.stats.enqueue_time
758 );
759
760 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
761
762 drop(pending_certificate);
764
765 execution_scheduler.check_empty_for_testing().await;
767
768 let gas_object_new = Object::with_id_owner_version_for_testing(
770 ObjectID::random(),
771 0.into(),
772 Owner::AddressOwner(owner),
773 );
774 let transaction = make_transaction(gas_object_new.clone(), vec![]);
775 let tx_start_time = Instant::now();
776 execution_scheduler.enqueue_transactions(
777 vec![(transaction.clone(), ExecutionEnv::new())],
778 &state.epoch_store_for_testing(),
779 );
780 sleep(Duration::from_secs(1)).await;
782 assert!(
783 rx_ready_certificates
784 .try_recv()
785 .is_err_and(|err| err == TryRecvError::Empty)
786 );
787
788 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
789
790 execution_scheduler.enqueue_transactions(
792 vec![(transaction.clone(), ExecutionEnv::new())],
793 &state.epoch_store_for_testing(),
794 );
795 sleep(Duration::from_secs(1)).await;
796 assert!(
797 rx_ready_certificates
798 .try_recv()
799 .is_err_and(|err| err == TryRecvError::Empty)
800 );
801
802 assert_eq!(execution_scheduler.num_pending_certificates(), 2);
803
804 state
806 .get_cache_writer()
807 .write_object_entry_for_test(gas_object_new);
808 let pending_certificate = rx_ready_certificates.recv().await.unwrap();
811 let pending_certificate2 = rx_ready_certificates.recv().await.unwrap();
812 assert_eq!(
813 pending_certificate.certificate.digest(),
814 pending_certificate2.certificate.digest()
815 );
816
817 assert!(pending_certificate.stats.enqueue_time >= tx_start_time);
820 assert!(
821 pending_certificate.stats.ready_time.unwrap() - pending_certificate.stats.enqueue_time
822 >= Duration::from_secs(2)
823 );
824
825 drop(pending_certificate);
827 drop(pending_certificate2);
828
829 execution_scheduler.check_empty_for_testing().await;
831 }
832
833 #[tokio::test(flavor = "current_thread", start_paused = true)]
842 async fn execution_scheduler_object_dependency() {
843 telemetry_subscribers::init_for_testing();
844 let (owner, _keypair) = deterministic_random_account_key();
846 let gas_objects: Vec<Object> = (0..10)
847 .map(|_| {
848 let gas_object_id = ObjectID::random();
849 Object::with_id_owner_for_testing(gas_object_id, owner)
850 })
851 .collect();
852 let shared_object = Object::shared_for_testing();
853 let initial_shared_version = shared_object.owner().start_version().unwrap();
854 let shared_object_2 = Object::shared_for_testing();
855 let initial_shared_version_2 = shared_object_2.owner().start_version().unwrap();
856
857 let state = init_state_with_objects(
858 [
859 gas_objects.clone(),
860 vec![shared_object.clone(), shared_object_2.clone()],
861 ]
862 .concat(),
863 )
864 .await;
865
866 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
869 assert!(rx_ready_certificates.try_recv().is_err());
871
872 let shared_version = 1000.into();
874 let shared_object_arg_read = ObjectArg::SharedObject {
875 id: shared_object.id(),
876 initial_shared_version,
877 mutability: SharedObjectMutability::Immutable,
878 };
879 let transaction_read_0 = make_transaction(
880 gas_objects[0].clone(),
881 vec![CallArg::Object(shared_object_arg_read)],
882 );
883 let transaction_read_1 = make_transaction(
884 gas_objects[1].clone(),
885 vec![CallArg::Object(shared_object_arg_read)],
886 );
887 let tx_read_0_assigned_versions = vec![(
888 (
889 shared_object.id(),
890 shared_object.owner().start_version().unwrap(),
891 ),
892 shared_version,
893 )];
894 let tx_read_1_assigned_versions = vec![(
895 (
896 shared_object.id(),
897 shared_object.owner().start_version().unwrap(),
898 ),
899 shared_version,
900 )];
901
902 let shared_object_arg_default = ObjectArg::SharedObject {
904 id: shared_object.id(),
905 initial_shared_version,
906 mutability: SharedObjectMutability::Mutable,
907 };
908 let transaction_default = make_transaction(
909 gas_objects[2].clone(),
910 vec![CallArg::Object(shared_object_arg_default)],
911 );
912 let tx_default_assigned_versions = vec![(
913 (
914 shared_object.id(),
915 shared_object.owner().start_version().unwrap(),
916 ),
917 shared_version,
918 )];
919
920 let shared_version_2 = 1000.into();
922 let shared_object_arg_read_2 = ObjectArg::SharedObject {
923 id: shared_object_2.id(),
924 initial_shared_version: initial_shared_version_2,
925 mutability: SharedObjectMutability::Immutable,
926 };
927 let transaction_read_2 = make_transaction(
928 gas_objects[3].clone(),
929 vec![
930 CallArg::Object(shared_object_arg_default),
931 CallArg::Object(shared_object_arg_read_2),
932 ],
933 );
934 let tx_read_2_assigned_versions = vec![
935 (
936 (
937 shared_object.id(),
938 shared_object.owner().start_version().unwrap(),
939 ),
940 shared_version,
941 ),
942 (
943 (
944 shared_object_2.id(),
945 shared_object_2.owner().start_version().unwrap(),
946 ),
947 shared_version_2,
948 ),
949 ];
950
951 execution_scheduler.enqueue_transactions(
952 vec![
953 (
954 transaction_read_0.clone(),
955 ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
956 tx_read_0_assigned_versions,
957 None,
958 )),
959 ),
960 (
961 transaction_read_1.clone(),
962 ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
963 tx_read_1_assigned_versions,
964 None,
965 )),
966 ),
967 (
968 transaction_default.clone(),
969 ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
970 tx_default_assigned_versions,
971 None,
972 )),
973 ),
974 (
975 transaction_read_2.clone(),
976 ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
977 tx_read_2_assigned_versions,
978 None,
979 )),
980 ),
981 ],
982 &state.epoch_store_for_testing(),
983 );
984
985 sleep(Duration::from_secs(1)).await;
987 assert!(rx_ready_certificates.try_recv().is_err());
988
989 assert_eq!(execution_scheduler.num_pending_certificates(), 4);
990
991 let mut new_shared_object = shared_object.clone();
993 new_shared_object
994 .data
995 .try_as_move_mut()
996 .unwrap()
997 .increment_version_to(shared_version_2);
998 state
999 .get_cache_writer()
1000 .write_object_entry_for_test(new_shared_object);
1001
1002 let tx_0 = rx_ready_certificates.recv().await.unwrap().certificate;
1004 let tx_1 = rx_ready_certificates.recv().await.unwrap().certificate;
1005 let tx_2 = rx_ready_certificates.recv().await.unwrap().certificate;
1006 {
1007 let mut want_digests = vec![
1008 transaction_read_0.digest(),
1009 transaction_read_1.digest(),
1010 transaction_default.digest(),
1011 ];
1012 want_digests.sort();
1013 let mut got_digests = vec![tx_0.digest(), tx_1.digest(), tx_2.digest()];
1014 got_digests.sort();
1015 assert_eq!(want_digests, got_digests);
1016 }
1017
1018 sleep(Duration::from_secs(1)).await;
1019 assert!(rx_ready_certificates.try_recv().is_err());
1020
1021 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1022
1023 let mut new_shared_object_2 = shared_object_2.clone();
1025 new_shared_object_2
1026 .data
1027 .try_as_move_mut()
1028 .unwrap()
1029 .increment_version_to(shared_version_2);
1030 state
1031 .get_cache_writer()
1032 .write_object_entry_for_test(new_shared_object_2);
1033
1034 let tx_3 = rx_ready_certificates.recv().await.unwrap().certificate;
1036 assert_eq!(transaction_read_2.digest(), tx_3.digest());
1037
1038 sleep(Duration::from_secs(1)).await;
1039 assert!(rx_ready_certificates.try_recv().is_err());
1040
1041 execution_scheduler.check_empty_for_testing().await;
1042 }
1043
1044 #[tokio::test(flavor = "current_thread", start_paused = true)]
1045 async fn execution_scheduler_receiving_notify_commit() {
1046 telemetry_subscribers::init_for_testing();
1047 let (owner, _keypair) = deterministic_random_account_key();
1049 let gas_objects: Vec<Object> = (0..10)
1050 .map(|_| {
1051 let gas_object_id = ObjectID::random();
1052 Object::with_id_owner_for_testing(gas_object_id, owner)
1053 })
1054 .collect();
1055 let state = init_state_with_objects(gas_objects.clone()).await;
1056
1057 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1060 assert!(rx_ready_certificates.try_recv().is_err());
1062 execution_scheduler.check_empty_for_testing().await;
1064
1065 let obj_id = ObjectID::random();
1066 let object_arguments: Vec<_> = (0..10)
1067 .map(|i| {
1068 let object = Object::with_id_owner_version_for_testing(
1069 obj_id,
1070 i.into(),
1071 Owner::AddressOwner(owner),
1072 );
1073 let object_arg = if i % 2 == 0 || i == 3 {
1080 ObjectArg::Receiving(object.compute_object_reference())
1081 } else {
1082 ObjectArg::ImmOrOwnedObject(object.compute_object_reference())
1083 };
1084 let txn =
1085 make_transaction(gas_objects[0].clone(), vec![CallArg::Object(object_arg)]);
1086 (object, txn)
1087 })
1088 .collect();
1089
1090 for (i, (_, txn)) in object_arguments.iter().enumerate() {
1091 execution_scheduler.enqueue_transactions(
1094 vec![(txn.clone(), ExecutionEnv::new())],
1095 &state.epoch_store_for_testing(),
1096 );
1097 sleep(Duration::from_secs(1)).await;
1098 assert!(rx_ready_certificates.try_recv().is_err());
1099 assert_eq!(execution_scheduler.num_pending_certificates(), i + 1);
1100 }
1101
1102 let len = object_arguments.len();
1105 for (i, (object, txn)) in object_arguments.into_iter().enumerate() {
1106 state
1109 .get_cache_writer()
1110 .write_object_entry_for_test(object.clone());
1111
1112 rx_ready_certificates.recv().await.unwrap();
1115
1116 sleep(Duration::from_secs(1)).await;
1119 assert!(rx_ready_certificates.try_recv().is_err());
1120
1121 drop(txn);
1123
1124 assert_eq!(execution_scheduler.num_pending_certificates(), len - i - 1);
1127 }
1128
1129 execution_scheduler.check_empty_for_testing().await;
1131 }
1132
1133 #[tokio::test(flavor = "current_thread", start_paused = true)]
1134 async fn execution_scheduler_receiving_object_ready_notifications() {
1135 telemetry_subscribers::init_for_testing();
1136 let (owner, _keypair) = deterministic_random_account_key();
1138 let gas_objects: Vec<Object> = (0..10)
1139 .map(|_| {
1140 let gas_object_id = ObjectID::random();
1141 Object::with_id_owner_for_testing(gas_object_id, owner)
1142 })
1143 .collect();
1144 let state = init_state_with_objects(gas_objects.clone()).await;
1145
1146 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1149 assert!(rx_ready_certificates.try_recv().is_err());
1151 execution_scheduler.check_empty_for_testing().await;
1153
1154 let obj_id = ObjectID::random();
1155 let receiving_object_new0 =
1156 Object::with_id_owner_version_for_testing(obj_id, 0.into(), Owner::AddressOwner(owner));
1157 let receiving_object_new1 =
1158 Object::with_id_owner_version_for_testing(obj_id, 1.into(), Owner::AddressOwner(owner));
1159 let receiving_object_arg0 =
1160 ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1161 let receive_object_transaction0 = make_transaction(
1162 gas_objects[0].clone(),
1163 vec![CallArg::Object(receiving_object_arg0)],
1164 );
1165
1166 let receiving_object_arg1 =
1167 ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1168 let receive_object_transaction1 = make_transaction(
1169 gas_objects[0].clone(),
1170 vec![CallArg::Object(receiving_object_arg1)],
1171 );
1172
1173 execution_scheduler.enqueue_transactions(
1175 vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1176 &state.epoch_store_for_testing(),
1177 );
1178 sleep(Duration::from_secs(1)).await;
1179 assert!(rx_ready_certificates.try_recv().is_err());
1180 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1181
1182 execution_scheduler.enqueue_transactions(
1184 vec![(receive_object_transaction1.clone(), ExecutionEnv::new())],
1185 &state.epoch_store_for_testing(),
1186 );
1187 sleep(Duration::from_secs(1)).await;
1188 assert!(rx_ready_certificates.try_recv().is_err());
1189 assert_eq!(execution_scheduler.num_pending_certificates(), 2);
1190
1191 execution_scheduler.enqueue_transactions(
1193 vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1194 &state.epoch_store_for_testing(),
1195 );
1196 sleep(Duration::from_secs(1)).await;
1197 assert!(rx_ready_certificates.try_recv().is_err());
1198 assert_eq!(execution_scheduler.num_pending_certificates(), 3);
1199
1200 state
1202 .get_cache_writer()
1203 .write_object_entry_for_test(receiving_object_new0.clone());
1204
1205 rx_ready_certificates.recv().await.unwrap();
1208
1209 state
1211 .get_cache_writer()
1212 .write_object_entry_for_test(receiving_object_new1.clone());
1213
1214 rx_ready_certificates.recv().await.unwrap();
1217 }
1218
1219 #[tokio::test(flavor = "current_thread", start_paused = true)]
1220 async fn execution_scheduler_receiving_object_ready_notifications_multiple_of_same_receiving() {
1221 telemetry_subscribers::init_for_testing();
1222 let (owner, _keypair) = deterministic_random_account_key();
1224 let gas_objects: Vec<Object> = (0..10)
1225 .map(|_| {
1226 let gas_object_id = ObjectID::random();
1227 Object::with_id_owner_for_testing(gas_object_id, owner)
1228 })
1229 .collect();
1230 let state = init_state_with_objects(gas_objects.clone()).await;
1231
1232 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1235 assert!(rx_ready_certificates.try_recv().is_err());
1237 execution_scheduler.check_empty_for_testing().await;
1239
1240 let obj_id = ObjectID::random();
1241 let receiving_object_new0 =
1242 Object::with_id_owner_version_for_testing(obj_id, 0.into(), Owner::AddressOwner(owner));
1243 let receiving_object_new1 =
1244 Object::with_id_owner_version_for_testing(obj_id, 1.into(), Owner::AddressOwner(owner));
1245 let receiving_object_arg0 =
1246 ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1247 let receive_object_transaction0 = make_transaction(
1248 gas_objects[0].clone(),
1249 vec![CallArg::Object(receiving_object_arg0)],
1250 );
1251
1252 let receive_object_transaction01 = make_transaction(
1253 gas_objects[1].clone(),
1254 vec![CallArg::Object(receiving_object_arg0)],
1255 );
1256
1257 let receiving_object_arg1 =
1258 ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1259 let receive_object_transaction1 = make_transaction(
1260 gas_objects[0].clone(),
1261 vec![CallArg::Object(receiving_object_arg1)],
1262 );
1263
1264 let gas_receiving_arg = ObjectArg::Receiving(gas_objects[3].compute_object_reference());
1267 let tx1 = make_transaction(
1268 gas_objects[0].clone(),
1269 vec![CallArg::Object(gas_receiving_arg)],
1270 );
1271
1272 execution_scheduler.enqueue_transactions(
1274 vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1275 &state.epoch_store_for_testing(),
1276 );
1277 sleep(Duration::from_secs(1)).await;
1278 assert!(rx_ready_certificates.try_recv().is_err());
1279 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1280
1281 execution_scheduler.enqueue_transactions(
1283 vec![(receive_object_transaction1.clone(), ExecutionEnv::new())],
1284 &state.epoch_store_for_testing(),
1285 );
1286 sleep(Duration::from_secs(1)).await;
1287 assert!(rx_ready_certificates.try_recv().is_err());
1288 assert_eq!(execution_scheduler.num_pending_certificates(), 2);
1289
1290 execution_scheduler.enqueue_transactions(
1293 vec![(receive_object_transaction01.clone(), ExecutionEnv::new())],
1294 &state.epoch_store_for_testing(),
1295 );
1296 sleep(Duration::from_secs(1)).await;
1297 assert!(rx_ready_certificates.try_recv().is_err());
1298 assert_eq!(execution_scheduler.num_pending_certificates(), 3);
1299
1300 state
1302 .get_cache_writer()
1303 .write_object_entry_for_test(receiving_object_new0.clone());
1304
1305 rx_ready_certificates.recv().await.unwrap();
1308
1309 rx_ready_certificates.recv().await.unwrap();
1310
1311 assert!(rx_ready_certificates.try_recv().is_err());
1313
1314 execution_scheduler.enqueue_transactions(
1317 vec![(tx1.clone(), ExecutionEnv::new())],
1318 &state.epoch_store_for_testing(),
1319 );
1320 sleep(Duration::from_secs(1)).await;
1321 rx_ready_certificates.recv().await.unwrap();
1322
1323 state
1325 .get_cache_writer()
1326 .write_object_entry_for_test(receiving_object_new1.clone());
1327
1328 rx_ready_certificates.recv().await.unwrap();
1331 }
1332
1333 #[tokio::test(flavor = "current_thread", start_paused = true)]
1334 async fn execution_scheduler_receiving_object_ready_if_current_version_greater() {
1335 telemetry_subscribers::init_for_testing();
1336 let (owner, _keypair) = deterministic_random_account_key();
1338 let mut gas_objects: Vec<Object> = (0..10)
1339 .map(|_| {
1340 let gas_object_id = ObjectID::random();
1341 Object::with_id_owner_for_testing(gas_object_id, owner)
1342 })
1343 .collect();
1344 let receiving_object = Object::with_id_owner_version_for_testing(
1345 ObjectID::random(),
1346 10.into(),
1347 Owner::AddressOwner(owner),
1348 );
1349 gas_objects.push(receiving_object.clone());
1350 let state = init_state_with_objects(gas_objects.clone()).await;
1351
1352 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1355 assert!(rx_ready_certificates.try_recv().is_err());
1357 execution_scheduler.check_empty_for_testing().await;
1359
1360 let receiving_object_new0 = Object::with_id_owner_version_for_testing(
1361 receiving_object.id(),
1362 0.into(),
1363 Owner::AddressOwner(owner),
1364 );
1365 let receiving_object_new1 = Object::with_id_owner_version_for_testing(
1366 receiving_object.id(),
1367 1.into(),
1368 Owner::AddressOwner(owner),
1369 );
1370 let receiving_object_arg0 =
1371 ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1372 let receive_object_transaction0 = make_transaction(
1373 gas_objects[0].clone(),
1374 vec![CallArg::Object(receiving_object_arg0)],
1375 );
1376
1377 let receive_object_transaction01 = make_transaction(
1378 gas_objects[1].clone(),
1379 vec![CallArg::Object(receiving_object_arg0)],
1380 );
1381
1382 let receiving_object_arg1 =
1383 ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1384 let receive_object_transaction1 = make_transaction(
1385 gas_objects[0].clone(),
1386 vec![CallArg::Object(receiving_object_arg1)],
1387 );
1388
1389 execution_scheduler.enqueue_transactions(
1391 vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1392 &state.epoch_store_for_testing(),
1393 );
1394 execution_scheduler.enqueue_transactions(
1395 vec![(receive_object_transaction01.clone(), ExecutionEnv::new())],
1396 &state.epoch_store_for_testing(),
1397 );
1398 execution_scheduler.enqueue_transactions(
1399 vec![(receive_object_transaction1.clone(), ExecutionEnv::new())],
1400 &state.epoch_store_for_testing(),
1401 );
1402 sleep(Duration::from_secs(1)).await;
1403 rx_ready_certificates.recv().await.unwrap();
1404 rx_ready_certificates.recv().await.unwrap();
1405 rx_ready_certificates.recv().await.unwrap();
1406 assert!(rx_ready_certificates.try_recv().is_err());
1407 }
1408
1409 #[tokio::test(flavor = "current_thread", start_paused = true)]
1412 async fn execution_scheduler_with_cancelled_transactions() {
1413 let (owner, _keypair) = deterministic_random_account_key();
1415 let gas_object = Object::with_id_owner_for_testing(ObjectID::random(), owner);
1416 let shared_object_1 = Object::shared_for_testing();
1417 let initial_shared_version_1 = shared_object_1.owner().start_version().unwrap();
1418 let shared_object_2 = Object::shared_for_testing();
1419 let initial_shared_version_2 = shared_object_2.owner().start_version().unwrap();
1420 let owned_object = Object::with_id_owner_for_testing(ObjectID::random(), owner);
1421
1422 let state = init_state_with_objects(vec![
1423 gas_object.clone(),
1424 shared_object_1.clone(),
1425 shared_object_2.clone(),
1426 owned_object.clone(),
1427 ])
1428 .await;
1429
1430 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1433 assert!(rx_ready_certificates.try_recv().is_err());
1435
1436 let shared_object_arg_1 = ObjectArg::SharedObject {
1438 id: shared_object_1.id(),
1439 initial_shared_version: initial_shared_version_1,
1440 mutability: SharedObjectMutability::Mutable,
1441 };
1442 let shared_object_arg_2 = ObjectArg::SharedObject {
1443 id: shared_object_2.id(),
1444 initial_shared_version: initial_shared_version_2,
1445 mutability: SharedObjectMutability::Mutable,
1446 };
1447
1448 let owned_version = 2000.into();
1450 let mut owned_ref = owned_object.compute_object_reference();
1451 owned_ref.1 = owned_version;
1452 let owned_object_arg = ObjectArg::ImmOrOwnedObject(owned_ref);
1453
1454 let cancelled_transaction = make_transaction(
1455 gas_object.clone(),
1456 vec![
1457 CallArg::Object(shared_object_arg_1),
1458 CallArg::Object(shared_object_arg_2),
1459 CallArg::Object(owned_object_arg),
1460 ],
1461 );
1462 let assigned_versions = vec![
1463 (
1464 (
1465 shared_object_1.id(),
1466 shared_object_1.owner().start_version().unwrap(),
1467 ),
1468 SequenceNumber::CANCELLED_READ,
1469 ),
1470 (
1471 (
1472 shared_object_2.id(),
1473 shared_object_2.owner().start_version().unwrap(),
1474 ),
1475 SequenceNumber::CONGESTED,
1476 ),
1477 ];
1478
1479 execution_scheduler.enqueue_transactions(
1480 vec![(
1481 cancelled_transaction.clone(),
1482 ExecutionEnv::new()
1483 .with_assigned_versions(AssignedVersions::new(assigned_versions, None)),
1484 )],
1485 &state.epoch_store_for_testing(),
1486 );
1487
1488 sleep(Duration::from_secs(1)).await;
1490 assert!(rx_ready_certificates.try_recv().is_err());
1491
1492 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1493
1494 let mut new_owned_object = owned_object.clone();
1496 new_owned_object
1497 .data
1498 .try_as_move_mut()
1499 .unwrap()
1500 .increment_version_to(owned_version);
1501 state
1502 .get_cache_writer()
1503 .write_object_entry_for_test(new_owned_object);
1504
1505 let available_txn = rx_ready_certificates.recv().await.unwrap().certificate;
1507 assert_eq!(available_txn.digest(), cancelled_transaction.digest());
1508
1509 sleep(Duration::from_secs(1)).await;
1510 assert!(rx_ready_certificates.try_recv().is_err());
1511
1512 execution_scheduler.check_empty_for_testing().await;
1513 }
1514
1515 #[test]
1516 fn test_barrier_dependency_builder() {
1517 let make_transaction = |non_exclusive_writes: Vec<u32>, exclusive_writes: Vec<u32>| {
1518 assert!(
1519 non_exclusive_writes
1520 .iter()
1521 .all(|id| !exclusive_writes.contains(id))
1522 );
1523 assert!(
1524 exclusive_writes
1525 .iter()
1526 .all(|id| !non_exclusive_writes.contains(id))
1527 );
1528
1529 let non_exclusive_writes = non_exclusive_writes
1530 .into_iter()
1531 .map(|id| ObjectID::from_single_byte(id as u8));
1532 let exclusive_writes = exclusive_writes
1533 .into_iter()
1534 .map(|id| ObjectID::from_single_byte(id as u8));
1535 let mut builder = ProgrammableTransactionBuilder::new();
1536 for non_exclusive_write in non_exclusive_writes {
1537 builder
1538 .obj(ObjectArg::SharedObject {
1539 id: non_exclusive_write,
1540 initial_shared_version: SequenceNumber::new(),
1541 mutability: SharedObjectMutability::NonExclusiveWrite,
1542 })
1543 .unwrap();
1544 }
1545
1546 for exclusive_write in exclusive_writes {
1547 builder
1548 .obj(ObjectArg::SharedObject {
1549 id: exclusive_write,
1550 initial_shared_version: SequenceNumber::new(),
1551 mutability: SharedObjectMutability::Mutable,
1552 })
1553 .unwrap();
1554 }
1555
1556 let tx = TransactionKind::ProgrammableTransaction(builder.finish());
1557 let tx_data =
1558 TransactionData::new(tx, SuiAddress::default(), random_object_ref(), 1, 1);
1559 Transaction::from_data_and_signer(tx_data, vec![])
1560 };
1561
1562 {
1564 let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1565 let tx1 = make_transaction(vec![1], vec![]);
1566 let tx2 = make_transaction(vec![], vec![1]);
1567
1568 let tx1_deps =
1569 barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1570 let tx2_deps =
1571 barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1572 assert!(tx1_deps.is_empty());
1573 assert_eq!(Vec::from_iter(tx2_deps), vec![*tx1.digest()]);
1574 }
1575
1576 {
1579 let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1580 let tx1 = make_transaction(vec![1, 2], vec![]);
1581 let tx2 = make_transaction(vec![], vec![1]);
1582 let tx3 = make_transaction(vec![], vec![2]);
1583
1584 let tx1_deps =
1585 barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1586 let tx2_deps =
1587 barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1588 let tx3_deps =
1589 barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1590 assert!(tx1_deps.is_empty());
1591 assert_eq!(Vec::from_iter(tx2_deps), vec![*tx1.digest()]);
1592 assert_eq!(Vec::from_iter(tx3_deps), vec![*tx1.digest()]);
1593 }
1594
1595 {
1597 let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1598 let tx1 = make_transaction(vec![1], vec![]);
1599 let tx2 = make_transaction(vec![2], vec![]);
1600 let tx3 = make_transaction(vec![], vec![1, 2]);
1601
1602 let tx1_deps =
1603 barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1604 let tx2_deps =
1605 barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1606 let tx3_deps =
1607 barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1608 assert!(tx1_deps.is_empty());
1609 assert!(tx2_deps.is_empty());
1610 assert_eq!(tx3_deps, BTreeSet::from([*tx1.digest(), *tx2.digest()]));
1611 }
1612
1613 {
1615 let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1616 let tx1 = make_transaction(vec![1], vec![]);
1617 let tx2 = make_transaction(vec![], vec![1]);
1618 let tx3 = make_transaction(vec![], vec![1]);
1619
1620 let tx1_deps =
1621 barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1622 let tx2_deps =
1623 barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1624 let tx3_deps =
1625 barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1626 assert!(tx1_deps.is_empty());
1627 assert_eq!(tx2_deps, BTreeSet::from([*tx1.digest()]));
1628 assert!(tx3_deps.is_empty());
1629 }
1630 }
1631}