1use crate::{
5 accumulators::funds_read::AccountFundsRead,
6 authority::{
7 AuthorityMetrics, ExecutionEnv, authority_per_epoch_store::AuthorityPerEpochStore,
8 shared_object_version_manager::Schedulable,
9 },
10 execution_cache::{ObjectCacheRead, TransactionCacheRead},
11 execution_scheduler::{
12 ExecutingGuard, PendingCertificateStats,
13 funds_withdraw_scheduler::{
14 AddressFundsSchedulerMetrics, FundsSettlement, ScheduleStatus, TxFundsWithdraw,
15 WithdrawReservations, scheduler::FundsWithdrawScheduler,
16 },
17 },
18};
19use futures::stream::{FuturesUnordered, StreamExt};
20use mysten_common::{assert_reachable, debug_fatal};
21use mysten_metrics::spawn_monitored_task;
22use parking_lot::Mutex;
23use std::{
24 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
25 sync::Arc,
26};
27use sui_config::node::{AuthorityOverloadConfig, FundsWithdrawSchedulerType};
28use sui_types::{
29 SUI_ACCUMULATOR_ROOT_OBJECT_ID,
30 base_types::{FullObjectID, ObjectID},
31 digests::TransactionDigest,
32 error::SuiResult,
33 executable_transaction::VerifiedExecutableTransaction,
34 storage::InputKey,
35 transaction::{
36 SenderSignedData, SharedInputObject, SharedObjectMutability, TransactionData,
37 TransactionDataAPI, TransactionKey,
38 },
39};
40use tokio::sync::mpsc::UnboundedSender;
41use tokio::time::Instant;
42use tracing::{debug, error, instrument};
43
44use super::{PendingCertificate, overload_tracker::OverloadTracker};
45
46pub(crate) struct BarrierDependencyBuilder {
48 dep_state: BTreeMap<ObjectID, BTreeSet<TransactionDigest>>,
49}
50
51impl BarrierDependencyBuilder {
52 pub fn new() -> Self {
53 Self {
54 dep_state: Default::default(),
55 }
56 }
57
58 pub fn process_tx(
64 &mut self,
65 tx_digest: TransactionDigest,
66 tx: &TransactionData,
67 ) -> BTreeSet<TransactionDigest> {
68 let mut barrier_deps = BTreeSet::new();
69 for SharedInputObject { id, mutability, .. } in tx.kind().shared_input_objects() {
70 match mutability {
71 SharedObjectMutability::NonExclusiveWrite => {
72 self.dep_state.entry(id).or_default().insert(tx_digest);
73 }
74 SharedObjectMutability::Mutable => {
75 if let Some(deps) = self.dep_state.remove(&id) {
78 barrier_deps.extend(deps);
79 }
80 }
81 SharedObjectMutability::Immutable => (),
82 }
83 }
84 barrier_deps
85 }
86}
87
88#[derive(Clone)]
89pub struct ExecutionScheduler {
90 object_cache_read: Arc<dyn ObjectCacheRead>,
91 transaction_cache_read: Arc<dyn TransactionCacheRead>,
92 overload_tracker: Arc<OverloadTracker>,
93 tx_ready_certificates: UnboundedSender<PendingCertificate>,
94 address_funds_withdraw_scheduler: Arc<Mutex<Option<FundsWithdrawScheduler>>>,
95 funds_withdraw_scheduler_type: FundsWithdrawSchedulerType,
96 metrics: Arc<AuthorityMetrics>,
97 address_funds_scheduler_metrics: Arc<AddressFundsSchedulerMetrics>,
98}
99
100struct PendingGuard<'a> {
101 scheduler: &'a ExecutionScheduler,
102 cert: &'a VerifiedExecutableTransaction,
103}
104
105impl<'a> PendingGuard<'a> {
106 pub fn new(scheduler: &'a ExecutionScheduler, cert: &'a VerifiedExecutableTransaction) -> Self {
107 scheduler
108 .metrics
109 .transaction_manager_num_pending_certificates
110 .inc();
111 scheduler
112 .overload_tracker
113 .add_pending_certificate(cert.data());
114 Self { scheduler, cert }
115 }
116}
117
118impl Drop for PendingGuard<'_> {
119 fn drop(&mut self) {
120 self.scheduler
121 .metrics
122 .transaction_manager_num_pending_certificates
123 .dec();
124 self.scheduler
125 .overload_tracker
126 .remove_pending_certificate(self.cert.data());
127 }
128}
129
130impl ExecutionScheduler {
131 pub fn new(
132 object_cache_read: Arc<dyn ObjectCacheRead>,
133 account_funds_read: Arc<dyn AccountFundsRead>,
134 transaction_cache_read: Arc<dyn TransactionCacheRead>,
135 tx_ready_certificates: UnboundedSender<PendingCertificate>,
136 epoch_store: &Arc<AuthorityPerEpochStore>,
137 funds_withdraw_scheduler_type: FundsWithdrawSchedulerType,
138 metrics: Arc<AuthorityMetrics>,
139 prometheus_registry: &prometheus::Registry,
140 ) -> Self {
141 tracing::info!(
142 ?funds_withdraw_scheduler_type,
143 "Creating new ExecutionScheduler"
144 );
145 let address_funds_scheduler_metrics =
146 Arc::new(AddressFundsSchedulerMetrics::new(prometheus_registry));
147 let address_funds_withdraw_scheduler = Self::initialize_funds_withdraw_scheduler(
148 epoch_store,
149 &object_cache_read,
150 account_funds_read,
151 funds_withdraw_scheduler_type,
152 &address_funds_scheduler_metrics,
153 );
154 Self {
155 object_cache_read,
156 transaction_cache_read,
157 overload_tracker: Arc::new(OverloadTracker::new()),
158 tx_ready_certificates,
159 address_funds_withdraw_scheduler: Arc::new(Mutex::new(
160 address_funds_withdraw_scheduler,
161 )),
162 funds_withdraw_scheduler_type,
163 metrics,
164 address_funds_scheduler_metrics,
165 }
166 }
167
168 fn initialize_funds_withdraw_scheduler(
169 epoch_store: &Arc<AuthorityPerEpochStore>,
170 object_cache_read: &Arc<dyn ObjectCacheRead>,
171 account_funds_read: Arc<dyn AccountFundsRead>,
172 scheduler_type: FundsWithdrawSchedulerType,
173 address_funds_scheduler_metrics: &Arc<AddressFundsSchedulerMetrics>,
174 ) -> Option<FundsWithdrawScheduler> {
175 let withdraw_scheduler_enabled =
176 epoch_store.is_validator() && epoch_store.accumulators_enabled();
177 if !withdraw_scheduler_enabled {
178 return None;
179 }
180 let starting_accumulator_version = object_cache_read
181 .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
182 .expect("Accumulator root object must be present if funds accumulator is enabled")
183 .version();
184 let address_funds_withdraw_scheduler = FundsWithdrawScheduler::new(
185 account_funds_read.clone(),
186 starting_accumulator_version,
187 scheduler_type,
188 address_funds_scheduler_metrics.clone(),
189 );
190
191 Some(address_funds_withdraw_scheduler)
192 }
193
194 #[instrument(level = "debug", skip_all, fields(tx_digest = ?cert.digest()))]
195 async fn schedule_transaction(
196 self,
197 cert: VerifiedExecutableTransaction,
198 execution_env: ExecutionEnv,
199 epoch_store: &Arc<AuthorityPerEpochStore>,
200 ) {
201 let enqueue_time = Instant::now();
202 let tx_digest = cert.digest();
203 let digests = [*tx_digest];
204
205 let tx_data = cert.transaction_data();
206 let input_object_kinds = tx_data
207 .input_objects()
208 .expect("input_objects() cannot fail");
209 let input_object_keys: Vec<_> = epoch_store
210 .get_input_object_keys(
211 &cert.key(),
212 &input_object_kinds,
213 &execution_env.assigned_versions,
214 )
215 .into_iter()
216 .collect();
217
218 let receiving_object_keys: HashSet<_> = tx_data
219 .receiving_objects()
220 .into_iter()
221 .map(|entry| {
222 InputKey::VersionedObject {
223 id: FullObjectID::new(entry.0, None),
225 version: entry.1,
226 }
227 })
228 .collect();
229 let input_and_receiving_keys = [
230 input_object_keys,
231 receiving_object_keys.iter().cloned().collect(),
232 ]
233 .concat();
234
235 let epoch = epoch_store.epoch();
236 debug!(
237 ?tx_digest,
238 "Scheduled transaction, waiting for input objects: {:?}", input_and_receiving_keys,
239 );
240
241 let availability = self
242 .object_cache_read
243 .multi_input_objects_available_cache_only(&input_and_receiving_keys);
244 let missing_input_keys: Vec<_> = input_and_receiving_keys
248 .into_iter()
249 .zip(availability)
250 .filter_map(|(key, available)| if !available { Some(key) } else { None })
251 .collect();
252
253 let has_missing_barrier_dependencies = self
254 .transaction_cache_read
255 .multi_get_executed_effects_digests(&execution_env.barrier_dependencies)
256 .into_iter()
257 .any(|r| r.is_none());
258
259 if missing_input_keys.is_empty() && !has_missing_barrier_dependencies {
260 self.metrics
261 .transaction_manager_num_enqueued_certificates
262 .with_label_values(&["ready"])
263 .inc();
264 debug!(?tx_digest, "Input objects already available");
265 self.send_transaction_for_execution(&cert, execution_env, enqueue_time);
266 return;
267 }
268
269 let _pending_guard = PendingGuard::new(&self, &cert);
270 self.metrics
271 .transaction_manager_num_enqueued_certificates
272 .with_label_values(&["pending"])
273 .inc();
274
275 if !execution_env.barrier_dependencies.is_empty() {
276 debug!(
277 "waiting for barrier dependencies to be executed: {:?}",
278 execution_env.barrier_dependencies
279 );
280 self.transaction_cache_read
281 .notify_read_executed_effects_digests(
282 "wait_for_barrier_dependencies",
283 &execution_env.barrier_dependencies,
284 )
285 .await;
286 }
287
288 tokio::select! {
289 _ = self.object_cache_read
290 .notify_read_input_objects(&missing_input_keys, &receiving_object_keys, epoch)
291 => {
292 self.metrics
293 .transaction_manager_transaction_queue_age_s
294 .observe(enqueue_time.elapsed().as_secs_f64());
295 debug!(?tx_digest, "Input objects available");
296 self.send_transaction_for_execution(
298 &cert,
299 execution_env,
300 enqueue_time,
301 );
302 }
303 _ = self.transaction_cache_read.notify_read_executed_effects_digests(
304 "ExecutionScheduler::notify_read_executed_effects_digests",
305 &digests,
306 ) => {
307 debug!(?tx_digest, "Transaction already executed");
308 }
309 };
310 }
311
312 pub fn send_transaction_for_execution(
313 &self,
314 cert: &VerifiedExecutableTransaction,
315 execution_env: ExecutionEnv,
316 enqueue_time: Instant,
317 ) {
318 let pending_cert = PendingCertificate {
319 certificate: cert.clone(),
320 execution_env,
321 stats: PendingCertificateStats {
322 enqueue_time,
323 ready_time: Some(Instant::now()),
324 },
325 executing_guard: Some(ExecutingGuard::new(
326 self.metrics
327 .transaction_manager_num_executing_certificates
328 .clone(),
329 )),
330 };
331 let _ = self.tx_ready_certificates.send(pending_cert);
332 }
333
334 fn schedule_funds_withdraws(
335 &self,
336 certs: Vec<(VerifiedExecutableTransaction, ExecutionEnv)>,
337 epoch_store: &Arc<AuthorityPerEpochStore>,
338 ) {
339 if certs.is_empty() {
340 return;
341 }
342 let mut withdraws = BTreeMap::new();
343 let mut prev_version = None;
344 for (cert, env) in &certs {
345 let tx_withdraws = cert
346 .transaction_data()
347 .process_funds_withdrawals_for_execution(epoch_store.get_chain_identifier());
348 assert!(!tx_withdraws.is_empty());
349 let accumulator_version = env
350 .assigned_versions
351 .accumulator_version
352 .expect("accumulator_version must be set when there are withdraws");
353 if let Some(prev_version) = prev_version {
354 assert!(prev_version <= accumulator_version);
356 }
357 prev_version = Some(accumulator_version);
358 let tx_digest = *cert.digest();
359 withdraws
360 .entry(accumulator_version)
361 .or_insert(Vec::new())
362 .push(TxFundsWithdraw {
363 tx_digest,
364 reservations: tx_withdraws,
365 });
366 }
367 let mut receivers = FuturesUnordered::new();
368 {
369 let guard = self.address_funds_withdraw_scheduler.lock();
370 let withdraw_scheduler = guard
371 .as_ref()
372 .expect("Funds withdraw scheduler must be enabled if there are withdraws");
373 for (version, tx_withdraws) in withdraws {
374 receivers.extend(withdraw_scheduler.schedule_withdraws(WithdrawReservations {
375 accumulator_version: version,
376 withdraws: tx_withdraws,
377 }));
378 }
379 }
381 let scheduler = self.clone();
382 let epoch_store = epoch_store.clone();
383 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
384 let mut cert_map = HashMap::new();
385 for (cert, env) in certs {
386 cert_map.insert(*cert.digest(), (cert, env));
387 }
388 while let Some(result) = receivers.next().await {
389 match result {
390 Ok((tx_digest, status)) => match status {
391 ScheduleStatus::InsufficientFunds => {
392 assert_reachable!("tx cancelled, insufficient funds");
393 debug!(
394 ?tx_digest,
395 "Funds withdraw scheduling result: Insufficient funds"
396 );
397 let (cert, env) = cert_map.remove(&tx_digest).expect("cert must exist");
398 let env = env.with_insufficient_funds();
399 scheduler.enqueue_transactions(vec![(cert, env)], &epoch_store);
400 }
401 ScheduleStatus::SufficientFunds => {
402 assert_reachable!("tx scheduled, sufficient funds");
403 debug!(?tx_digest, "Funds withdraw scheduling result: Success");
404 let (cert, env) = cert_map.remove(&tx_digest).expect("cert must exist");
405 scheduler.enqueue_transactions(vec![(cert, env)], &epoch_store);
406 }
407 ScheduleStatus::SkipSchedule => {
408 assert_reachable!("tx withdrawal scheduling skipped");
409 debug!(?tx_digest, "Skip scheduling funds withdraw");
410 }
411 },
412 Err(e) => {
413 error!("Withdraw scheduler stopped: {:?}", e);
414 }
415 }
416 }
417 }));
418 }
419
420 fn schedule_tx_keys(
421 &self,
422 tx_with_keys: Vec<(TransactionKey, ExecutionEnv)>,
423 epoch_store: &Arc<AuthorityPerEpochStore>,
424 ) {
425 if tx_with_keys.is_empty() {
426 return;
427 }
428
429 let scheduler = self.clone();
430 let epoch_store = epoch_store.clone();
431 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
432 let tx_keys: Vec<_> = tx_with_keys.iter().map(|(key, _)| key).cloned().collect();
433 let digests = epoch_store
434 .notify_read_tx_key_to_digest(&tx_keys)
435 .await
436 .expect("db error");
437 let transactions = scheduler
438 .transaction_cache_read
439 .multi_get_transaction_blocks(&digests)
440 .into_iter()
441 .map(|tx| {
442 let tx = tx.expect("tx must exist").as_ref().clone();
443 VerifiedExecutableTransaction::new_system(tx, epoch_store.epoch())
444 })
445 .zip(tx_with_keys.into_iter().map(|(_, env)| env))
446 .collect::<Vec<_>>();
447 scheduler.enqueue_transactions(transactions, &epoch_store);
448 }));
449 }
450
451 #[cfg(debug_assertions)]
454 fn assert_cert_not_executed_previous_epochs(&self, cert: &VerifiedExecutableTransaction) {
455 let epoch = cert.epoch();
456 let digest = *cert.digest();
457 let digests = [digest];
458 let executed = self
459 .transaction_cache_read
460 .multi_get_executed_effects(&digests)
461 .pop()
462 .unwrap();
463 if let Some(executed) = executed {
466 use sui_types::effects::TransactionEffectsAPI;
467
468 assert_eq!(
469 executed.executed_epoch(),
470 epoch,
471 "Transaction {} was executed in epoch {}, but scheduled again in epoch {}",
472 digest,
473 executed.executed_epoch(),
474 epoch
475 );
476 }
477 }
478}
479
480impl ExecutionScheduler {
481 pub fn enqueue(
482 &self,
483 certs: Vec<(Schedulable, ExecutionEnv)>,
484 epoch_store: &Arc<AuthorityPerEpochStore>,
485 ) {
486 let mut ordinary_txns = Vec::with_capacity(certs.len());
488 let mut tx_with_keys = Vec::new();
489 let mut tx_with_withdraws = Vec::new();
490
491 for (schedulable, env) in certs {
492 match schedulable {
493 Schedulable::Transaction(tx) => {
494 if tx.transaction_data().has_funds_withdrawals() {
495 tx_with_withdraws.push((tx, env));
496 } else {
497 ordinary_txns.push((tx, env));
498 }
499 }
500 s @ Schedulable::RandomnessStateUpdate(..) => {
501 tx_with_keys.push((s.key(), env));
502 }
503 Schedulable::AccumulatorSettlement(_, _) => {
504 unreachable!("handled by SettlementScheduler");
505 }
506 Schedulable::ConsensusCommitPrologue(_, _, _) => {
507 unreachable!("Schedulable::ConsensusCommitPrologue should not be enqueued");
511 }
512 }
513 }
514
515 self.enqueue_transactions(ordinary_txns, epoch_store);
516 self.schedule_tx_keys(tx_with_keys, epoch_store);
517 self.schedule_funds_withdraws(tx_with_withdraws, epoch_store);
518 }
519
520 pub fn enqueue_transactions(
521 &self,
522 certs: Vec<(VerifiedExecutableTransaction, ExecutionEnv)>,
523 epoch_store: &Arc<AuthorityPerEpochStore>,
524 ) {
525 let certs: Vec<_> = certs
527 .into_iter()
528 .filter_map(|cert| {
529 if cert.0.epoch() == epoch_store.epoch() {
530 #[cfg(debug_assertions)]
531 self.assert_cert_not_executed_previous_epochs(&cert.0);
532
533 Some(cert)
534 } else {
535 debug_fatal!(
536 "We should never enqueue certificate from wrong epoch. Expected={} Certificate={:?}",
537 epoch_store.epoch(),
538 cert.0.epoch()
539 );
540 None
541 }
542 })
543 .collect();
544 let digests: Vec<_> = certs.iter().map(|(cert, _)| *cert.digest()).collect();
545 let executed = self
546 .transaction_cache_read
547 .multi_get_executed_effects_digests(&digests);
548 let mut already_executed_certs_num = 0;
549 let pending_certs =
550 certs
551 .into_iter()
552 .zip(executed)
553 .filter_map(|((cert, execution_env), executed)| {
554 if executed.is_none() {
555 Some((cert, execution_env))
556 } else {
557 already_executed_certs_num += 1;
558 None
559 }
560 });
561
562 for (cert, execution_env) in pending_certs {
563 let scheduler = self.clone();
564 let epoch_store = epoch_store.clone();
565 spawn_monitored_task!(
566 epoch_store.within_alive_epoch(scheduler.schedule_transaction(
567 cert,
568 execution_env,
569 &epoch_store,
570 ))
571 );
572 }
573
574 self.metrics
575 .transaction_manager_num_enqueued_certificates
576 .with_label_values(&["already_executed"])
577 .inc_by(already_executed_certs_num);
578 }
579
580 pub fn settle_address_funds(&self, settlement: FundsSettlement) {
581 self.address_funds_withdraw_scheduler
582 .lock()
583 .as_ref()
584 .expect("Funds withdraw scheduler must be enabled if there are settlements")
585 .settle_funds(settlement);
586 }
587
588 pub fn reconfigure(
591 &self,
592 new_epoch_store: &Arc<AuthorityPerEpochStore>,
593 account_funds_read: &Arc<dyn AccountFundsRead>,
594 ) {
595 let address_funds_withdraw_scheduler = Self::initialize_funds_withdraw_scheduler(
596 new_epoch_store,
597 &self.object_cache_read,
598 account_funds_read.clone(),
599 self.funds_withdraw_scheduler_type,
600 &self.address_funds_scheduler_metrics,
601 );
602 let mut guard = self.address_funds_withdraw_scheduler.lock();
603 if let Some(old_scheduler) = guard.as_ref() {
604 old_scheduler.close_epoch();
605 }
606 *guard = address_funds_withdraw_scheduler;
607 drop(guard);
608 }
609
610 pub fn check_execution_overload(
611 &self,
612 overload_config: &AuthorityOverloadConfig,
613 tx_data: &SenderSignedData,
614 ) -> SuiResult {
615 let inflight_queue_len = self.num_pending_certificates();
616 self.overload_tracker
617 .check_execution_overload(overload_config, tx_data, inflight_queue_len)
618 }
619
620 pub fn num_pending_certificates(&self) -> usize {
621 (self
622 .metrics
623 .transaction_manager_num_pending_certificates
624 .get()
625 + self
626 .metrics
627 .transaction_manager_num_executing_certificates
628 .get()) as usize
629 }
630
631 #[cfg(test)]
632 pub async fn check_empty_for_testing(&self) {
633 for _ in 0..10 {
634 if self.num_pending_certificates() == 0 {
635 return;
636 }
637 tokio::task::yield_now().await;
638 }
639 assert_eq!(self.num_pending_certificates(), 0);
640 }
641}
642
643#[cfg(test)]
644mod test {
645 use super::{
646 BarrierDependencyBuilder, ExecutionScheduler, FundsWithdrawSchedulerType,
647 PendingCertificate,
648 };
649 use crate::authority::ExecutionEnv;
650 use crate::authority::shared_object_version_manager::AssignedVersions;
651 use crate::authority::{AuthorityState, authority_tests::init_state_with_objects};
652 use std::collections::BTreeSet;
653 use std::{time::Duration, vec};
654 use sui_test_transaction_builder::TestTransactionBuilder;
655 use sui_types::base_types::{SuiAddress, random_object_ref};
656 use sui_types::executable_transaction::VerifiedExecutableTransaction;
657 use sui_types::object::Owner;
658 use sui_types::programmable_transaction_builder::ProgrammableTransactionBuilder;
659 use sui_types::transaction::{
660 SharedObjectMutability, Transaction, TransactionData, TransactionKind, VerifiedTransaction,
661 };
662 use sui_types::{
663 SUI_FRAMEWORK_PACKAGE_ID,
664 base_types::{ObjectID, SequenceNumber},
665 crypto::deterministic_random_account_key,
666 object::Object,
667 transaction::{CallArg, ObjectArg},
668 };
669 use tokio::time::Instant;
670 use tokio::{
671 sync::mpsc::{UnboundedReceiver, error::TryRecvError, unbounded_channel},
672 time::sleep,
673 };
674
675 #[allow(clippy::disallowed_methods)] fn make_execution_scheduler(
677 state: &AuthorityState,
678 ) -> (ExecutionScheduler, UnboundedReceiver<PendingCertificate>) {
679 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
682 let registry = prometheus::Registry::new();
683 let execution_scheduler = ExecutionScheduler::new(
684 state.get_object_cache_reader().clone(),
685 state.get_account_funds_read().clone(),
686 state.get_transaction_cache_reader().clone(),
687 tx_ready_certificates,
688 &state.epoch_store_for_testing(),
689 FundsWithdrawSchedulerType::default(),
690 state.metrics.clone(),
691 ®istry,
692 );
693
694 (execution_scheduler, rx_ready_certificates)
695 }
696
697 fn make_transaction(gas_object: Object, input: Vec<CallArg>) -> VerifiedExecutableTransaction {
698 let rgp = 100;
701 let (sender, keypair) = deterministic_random_account_key();
702 let transaction =
703 TestTransactionBuilder::new(sender, gas_object.compute_object_reference(), rgp)
704 .move_call(SUI_FRAMEWORK_PACKAGE_ID, "counter", "assert_value", input)
705 .build_and_sign(&keypair);
706 VerifiedExecutableTransaction::new_system(
707 VerifiedTransaction::new_unchecked(transaction),
708 0,
709 )
710 }
711
712 #[tokio::test(flavor = "current_thread", start_paused = true)]
713 async fn execution_scheduler_basics() {
714 let (owner, _keypair) = deterministic_random_account_key();
716 let gas_objects: Vec<Object> = (0..10)
717 .map(|_| {
718 let gas_object_id = ObjectID::random();
719 Object::with_id_owner_for_testing(gas_object_id, owner)
720 })
721 .collect();
722 let state = init_state_with_objects(gas_objects.clone()).await;
723
724 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
727 assert!(
729 rx_ready_certificates
730 .try_recv()
731 .is_err_and(|err| err == TryRecvError::Empty)
732 );
733 assert_eq!(execution_scheduler.num_pending_certificates(), 0);
735
736 execution_scheduler.enqueue_transactions(vec![], &state.epoch_store_for_testing());
738 assert!(
740 rx_ready_certificates
741 .try_recv()
742 .is_err_and(|err| err == TryRecvError::Empty)
743 );
744
745 let transaction = make_transaction(gas_objects[0].clone(), vec![]);
747 let tx_start_time = Instant::now();
748 execution_scheduler.enqueue_transactions(
749 vec![(transaction.clone(), ExecutionEnv::new())],
750 &state.epoch_store_for_testing(),
751 );
752 let pending_certificate = rx_ready_certificates.recv().await.unwrap();
754
755 assert!(pending_certificate.stats.enqueue_time >= tx_start_time);
757 assert!(
758 pending_certificate.stats.ready_time.unwrap() >= pending_certificate.stats.enqueue_time
759 );
760
761 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
762
763 drop(pending_certificate);
765
766 execution_scheduler.check_empty_for_testing().await;
768
769 let gas_object_new = Object::with_id_owner_version_for_testing(
771 ObjectID::random(),
772 0.into(),
773 Owner::AddressOwner(owner),
774 );
775 let transaction = make_transaction(gas_object_new.clone(), vec![]);
776 let tx_start_time = Instant::now();
777 execution_scheduler.enqueue_transactions(
778 vec![(transaction.clone(), ExecutionEnv::new())],
779 &state.epoch_store_for_testing(),
780 );
781 sleep(Duration::from_secs(1)).await;
783 assert!(
784 rx_ready_certificates
785 .try_recv()
786 .is_err_and(|err| err == TryRecvError::Empty)
787 );
788
789 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
790
791 execution_scheduler.enqueue_transactions(
793 vec![(transaction.clone(), ExecutionEnv::new())],
794 &state.epoch_store_for_testing(),
795 );
796 sleep(Duration::from_secs(1)).await;
797 assert!(
798 rx_ready_certificates
799 .try_recv()
800 .is_err_and(|err| err == TryRecvError::Empty)
801 );
802
803 assert_eq!(execution_scheduler.num_pending_certificates(), 2);
804
805 state
807 .get_cache_writer()
808 .write_object_entry_for_test(gas_object_new);
809 let pending_certificate = rx_ready_certificates.recv().await.unwrap();
812 let pending_certificate2 = rx_ready_certificates.recv().await.unwrap();
813 assert_eq!(
814 pending_certificate.certificate.digest(),
815 pending_certificate2.certificate.digest()
816 );
817
818 assert!(pending_certificate.stats.enqueue_time >= tx_start_time);
821 assert!(
822 pending_certificate.stats.ready_time.unwrap() - pending_certificate.stats.enqueue_time
823 >= Duration::from_secs(2)
824 );
825
826 drop(pending_certificate);
828 drop(pending_certificate2);
829
830 execution_scheduler.check_empty_for_testing().await;
832 }
833
834 #[tokio::test(flavor = "current_thread", start_paused = true)]
843 async fn execution_scheduler_object_dependency() {
844 telemetry_subscribers::init_for_testing();
845 let (owner, _keypair) = deterministic_random_account_key();
847 let gas_objects: Vec<Object> = (0..10)
848 .map(|_| {
849 let gas_object_id = ObjectID::random();
850 Object::with_id_owner_for_testing(gas_object_id, owner)
851 })
852 .collect();
853 let shared_object = Object::shared_for_testing();
854 let initial_shared_version = shared_object.owner().start_version().unwrap();
855 let shared_object_2 = Object::shared_for_testing();
856 let initial_shared_version_2 = shared_object_2.owner().start_version().unwrap();
857
858 let state = init_state_with_objects(
859 [
860 gas_objects.clone(),
861 vec![shared_object.clone(), shared_object_2.clone()],
862 ]
863 .concat(),
864 )
865 .await;
866
867 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
870 assert!(rx_ready_certificates.try_recv().is_err());
872
873 let shared_version = 1000.into();
875 let shared_object_arg_read = ObjectArg::SharedObject {
876 id: shared_object.id(),
877 initial_shared_version,
878 mutability: SharedObjectMutability::Immutable,
879 };
880 let transaction_read_0 = make_transaction(
881 gas_objects[0].clone(),
882 vec![CallArg::Object(shared_object_arg_read)],
883 );
884 let transaction_read_1 = make_transaction(
885 gas_objects[1].clone(),
886 vec![CallArg::Object(shared_object_arg_read)],
887 );
888 let tx_read_0_assigned_versions = vec![(
889 (
890 shared_object.id(),
891 shared_object.owner().start_version().unwrap(),
892 ),
893 shared_version,
894 )];
895 let tx_read_1_assigned_versions = vec![(
896 (
897 shared_object.id(),
898 shared_object.owner().start_version().unwrap(),
899 ),
900 shared_version,
901 )];
902
903 let shared_object_arg_default = ObjectArg::SharedObject {
905 id: shared_object.id(),
906 initial_shared_version,
907 mutability: SharedObjectMutability::Mutable,
908 };
909 let transaction_default = make_transaction(
910 gas_objects[2].clone(),
911 vec![CallArg::Object(shared_object_arg_default)],
912 );
913 let tx_default_assigned_versions = vec![(
914 (
915 shared_object.id(),
916 shared_object.owner().start_version().unwrap(),
917 ),
918 shared_version,
919 )];
920
921 let shared_version_2 = 1000.into();
923 let shared_object_arg_read_2 = ObjectArg::SharedObject {
924 id: shared_object_2.id(),
925 initial_shared_version: initial_shared_version_2,
926 mutability: SharedObjectMutability::Immutable,
927 };
928 let transaction_read_2 = make_transaction(
929 gas_objects[3].clone(),
930 vec![
931 CallArg::Object(shared_object_arg_default),
932 CallArg::Object(shared_object_arg_read_2),
933 ],
934 );
935 let tx_read_2_assigned_versions = vec![
936 (
937 (
938 shared_object.id(),
939 shared_object.owner().start_version().unwrap(),
940 ),
941 shared_version,
942 ),
943 (
944 (
945 shared_object_2.id(),
946 shared_object_2.owner().start_version().unwrap(),
947 ),
948 shared_version_2,
949 ),
950 ];
951
952 execution_scheduler.enqueue_transactions(
953 vec![
954 (
955 transaction_read_0.clone(),
956 ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
957 tx_read_0_assigned_versions,
958 None,
959 )),
960 ),
961 (
962 transaction_read_1.clone(),
963 ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
964 tx_read_1_assigned_versions,
965 None,
966 )),
967 ),
968 (
969 transaction_default.clone(),
970 ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
971 tx_default_assigned_versions,
972 None,
973 )),
974 ),
975 (
976 transaction_read_2.clone(),
977 ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
978 tx_read_2_assigned_versions,
979 None,
980 )),
981 ),
982 ],
983 &state.epoch_store_for_testing(),
984 );
985
986 sleep(Duration::from_secs(1)).await;
988 assert!(rx_ready_certificates.try_recv().is_err());
989
990 assert_eq!(execution_scheduler.num_pending_certificates(), 4);
991
992 let mut new_shared_object = shared_object.clone();
994 new_shared_object
995 .data
996 .try_as_move_mut()
997 .unwrap()
998 .increment_version_to(shared_version_2);
999 state
1000 .get_cache_writer()
1001 .write_object_entry_for_test(new_shared_object);
1002
1003 let tx_0 = rx_ready_certificates.recv().await.unwrap().certificate;
1005 let tx_1 = rx_ready_certificates.recv().await.unwrap().certificate;
1006 let tx_2 = rx_ready_certificates.recv().await.unwrap().certificate;
1007 {
1008 let mut want_digests = vec![
1009 transaction_read_0.digest(),
1010 transaction_read_1.digest(),
1011 transaction_default.digest(),
1012 ];
1013 want_digests.sort();
1014 let mut got_digests = vec![tx_0.digest(), tx_1.digest(), tx_2.digest()];
1015 got_digests.sort();
1016 assert_eq!(want_digests, got_digests);
1017 }
1018
1019 sleep(Duration::from_secs(1)).await;
1020 assert!(rx_ready_certificates.try_recv().is_err());
1021
1022 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1023
1024 let mut new_shared_object_2 = shared_object_2.clone();
1026 new_shared_object_2
1027 .data
1028 .try_as_move_mut()
1029 .unwrap()
1030 .increment_version_to(shared_version_2);
1031 state
1032 .get_cache_writer()
1033 .write_object_entry_for_test(new_shared_object_2);
1034
1035 let tx_3 = rx_ready_certificates.recv().await.unwrap().certificate;
1037 assert_eq!(transaction_read_2.digest(), tx_3.digest());
1038
1039 sleep(Duration::from_secs(1)).await;
1040 assert!(rx_ready_certificates.try_recv().is_err());
1041
1042 execution_scheduler.check_empty_for_testing().await;
1043 }
1044
1045 #[tokio::test(flavor = "current_thread", start_paused = true)]
1046 async fn execution_scheduler_receiving_notify_commit() {
1047 telemetry_subscribers::init_for_testing();
1048 let (owner, _keypair) = deterministic_random_account_key();
1050 let gas_objects: Vec<Object> = (0..10)
1051 .map(|_| {
1052 let gas_object_id = ObjectID::random();
1053 Object::with_id_owner_for_testing(gas_object_id, owner)
1054 })
1055 .collect();
1056 let state = init_state_with_objects(gas_objects.clone()).await;
1057
1058 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1061 assert!(rx_ready_certificates.try_recv().is_err());
1063 execution_scheduler.check_empty_for_testing().await;
1065
1066 let obj_id = ObjectID::random();
1067 let object_arguments: Vec<_> = (0..10)
1068 .map(|i| {
1069 let object = Object::with_id_owner_version_for_testing(
1070 obj_id,
1071 i.into(),
1072 Owner::AddressOwner(owner),
1073 );
1074 let object_arg = if i % 2 == 0 || i == 3 {
1081 ObjectArg::Receiving(object.compute_object_reference())
1082 } else {
1083 ObjectArg::ImmOrOwnedObject(object.compute_object_reference())
1084 };
1085 let txn =
1086 make_transaction(gas_objects[0].clone(), vec![CallArg::Object(object_arg)]);
1087 (object, txn)
1088 })
1089 .collect();
1090
1091 for (i, (_, txn)) in object_arguments.iter().enumerate() {
1092 execution_scheduler.enqueue_transactions(
1095 vec![(txn.clone(), ExecutionEnv::new())],
1096 &state.epoch_store_for_testing(),
1097 );
1098 sleep(Duration::from_secs(1)).await;
1099 assert!(rx_ready_certificates.try_recv().is_err());
1100 assert_eq!(execution_scheduler.num_pending_certificates(), i + 1);
1101 }
1102
1103 let len = object_arguments.len();
1106 for (i, (object, txn)) in object_arguments.into_iter().enumerate() {
1107 state
1110 .get_cache_writer()
1111 .write_object_entry_for_test(object.clone());
1112
1113 rx_ready_certificates.recv().await.unwrap();
1116
1117 sleep(Duration::from_secs(1)).await;
1120 assert!(rx_ready_certificates.try_recv().is_err());
1121
1122 drop(txn);
1124
1125 assert_eq!(execution_scheduler.num_pending_certificates(), len - i - 1);
1128 }
1129
1130 execution_scheduler.check_empty_for_testing().await;
1132 }
1133
1134 #[tokio::test(flavor = "current_thread", start_paused = true)]
1135 async fn execution_scheduler_receiving_object_ready_notifications() {
1136 telemetry_subscribers::init_for_testing();
1137 let (owner, _keypair) = deterministic_random_account_key();
1139 let gas_objects: Vec<Object> = (0..10)
1140 .map(|_| {
1141 let gas_object_id = ObjectID::random();
1142 Object::with_id_owner_for_testing(gas_object_id, owner)
1143 })
1144 .collect();
1145 let state = init_state_with_objects(gas_objects.clone()).await;
1146
1147 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1150 assert!(rx_ready_certificates.try_recv().is_err());
1152 execution_scheduler.check_empty_for_testing().await;
1154
1155 let obj_id = ObjectID::random();
1156 let receiving_object_new0 =
1157 Object::with_id_owner_version_for_testing(obj_id, 0.into(), Owner::AddressOwner(owner));
1158 let receiving_object_new1 =
1159 Object::with_id_owner_version_for_testing(obj_id, 1.into(), Owner::AddressOwner(owner));
1160 let receiving_object_arg0 =
1161 ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1162 let receive_object_transaction0 = make_transaction(
1163 gas_objects[0].clone(),
1164 vec![CallArg::Object(receiving_object_arg0)],
1165 );
1166
1167 let receiving_object_arg1 =
1168 ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1169 let receive_object_transaction1 = make_transaction(
1170 gas_objects[0].clone(),
1171 vec![CallArg::Object(receiving_object_arg1)],
1172 );
1173
1174 execution_scheduler.enqueue_transactions(
1176 vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1177 &state.epoch_store_for_testing(),
1178 );
1179 sleep(Duration::from_secs(1)).await;
1180 assert!(rx_ready_certificates.try_recv().is_err());
1181 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1182
1183 execution_scheduler.enqueue_transactions(
1185 vec![(receive_object_transaction1.clone(), ExecutionEnv::new())],
1186 &state.epoch_store_for_testing(),
1187 );
1188 sleep(Duration::from_secs(1)).await;
1189 assert!(rx_ready_certificates.try_recv().is_err());
1190 assert_eq!(execution_scheduler.num_pending_certificates(), 2);
1191
1192 execution_scheduler.enqueue_transactions(
1194 vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1195 &state.epoch_store_for_testing(),
1196 );
1197 sleep(Duration::from_secs(1)).await;
1198 assert!(rx_ready_certificates.try_recv().is_err());
1199 assert_eq!(execution_scheduler.num_pending_certificates(), 3);
1200
1201 state
1203 .get_cache_writer()
1204 .write_object_entry_for_test(receiving_object_new0.clone());
1205
1206 rx_ready_certificates.recv().await.unwrap();
1209
1210 state
1212 .get_cache_writer()
1213 .write_object_entry_for_test(receiving_object_new1.clone());
1214
1215 rx_ready_certificates.recv().await.unwrap();
1218 }
1219
1220 #[tokio::test(flavor = "current_thread", start_paused = true)]
1221 async fn execution_scheduler_receiving_object_ready_notifications_multiple_of_same_receiving() {
1222 telemetry_subscribers::init_for_testing();
1223 let (owner, _keypair) = deterministic_random_account_key();
1225 let gas_objects: Vec<Object> = (0..10)
1226 .map(|_| {
1227 let gas_object_id = ObjectID::random();
1228 Object::with_id_owner_for_testing(gas_object_id, owner)
1229 })
1230 .collect();
1231 let state = init_state_with_objects(gas_objects.clone()).await;
1232
1233 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1236 assert!(rx_ready_certificates.try_recv().is_err());
1238 execution_scheduler.check_empty_for_testing().await;
1240
1241 let obj_id = ObjectID::random();
1242 let receiving_object_new0 =
1243 Object::with_id_owner_version_for_testing(obj_id, 0.into(), Owner::AddressOwner(owner));
1244 let receiving_object_new1 =
1245 Object::with_id_owner_version_for_testing(obj_id, 1.into(), Owner::AddressOwner(owner));
1246 let receiving_object_arg0 =
1247 ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1248 let receive_object_transaction0 = make_transaction(
1249 gas_objects[0].clone(),
1250 vec![CallArg::Object(receiving_object_arg0)],
1251 );
1252
1253 let receive_object_transaction01 = make_transaction(
1254 gas_objects[1].clone(),
1255 vec![CallArg::Object(receiving_object_arg0)],
1256 );
1257
1258 let receiving_object_arg1 =
1259 ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1260 let receive_object_transaction1 = make_transaction(
1261 gas_objects[0].clone(),
1262 vec![CallArg::Object(receiving_object_arg1)],
1263 );
1264
1265 let gas_receiving_arg = ObjectArg::Receiving(gas_objects[3].compute_object_reference());
1268 let tx1 = make_transaction(
1269 gas_objects[0].clone(),
1270 vec![CallArg::Object(gas_receiving_arg)],
1271 );
1272
1273 execution_scheduler.enqueue_transactions(
1275 vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1276 &state.epoch_store_for_testing(),
1277 );
1278 sleep(Duration::from_secs(1)).await;
1279 assert!(rx_ready_certificates.try_recv().is_err());
1280 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1281
1282 execution_scheduler.enqueue_transactions(
1284 vec![(receive_object_transaction1.clone(), ExecutionEnv::new())],
1285 &state.epoch_store_for_testing(),
1286 );
1287 sleep(Duration::from_secs(1)).await;
1288 assert!(rx_ready_certificates.try_recv().is_err());
1289 assert_eq!(execution_scheduler.num_pending_certificates(), 2);
1290
1291 execution_scheduler.enqueue_transactions(
1294 vec![(receive_object_transaction01.clone(), ExecutionEnv::new())],
1295 &state.epoch_store_for_testing(),
1296 );
1297 sleep(Duration::from_secs(1)).await;
1298 assert!(rx_ready_certificates.try_recv().is_err());
1299 assert_eq!(execution_scheduler.num_pending_certificates(), 3);
1300
1301 state
1303 .get_cache_writer()
1304 .write_object_entry_for_test(receiving_object_new0.clone());
1305
1306 rx_ready_certificates.recv().await.unwrap();
1309
1310 rx_ready_certificates.recv().await.unwrap();
1311
1312 assert!(rx_ready_certificates.try_recv().is_err());
1314
1315 execution_scheduler.enqueue_transactions(
1318 vec![(tx1.clone(), ExecutionEnv::new())],
1319 &state.epoch_store_for_testing(),
1320 );
1321 sleep(Duration::from_secs(1)).await;
1322 rx_ready_certificates.recv().await.unwrap();
1323
1324 state
1326 .get_cache_writer()
1327 .write_object_entry_for_test(receiving_object_new1.clone());
1328
1329 rx_ready_certificates.recv().await.unwrap();
1332 }
1333
1334 #[tokio::test(flavor = "current_thread", start_paused = true)]
1335 async fn execution_scheduler_receiving_object_ready_if_current_version_greater() {
1336 telemetry_subscribers::init_for_testing();
1337 let (owner, _keypair) = deterministic_random_account_key();
1339 let mut gas_objects: Vec<Object> = (0..10)
1340 .map(|_| {
1341 let gas_object_id = ObjectID::random();
1342 Object::with_id_owner_for_testing(gas_object_id, owner)
1343 })
1344 .collect();
1345 let receiving_object = Object::with_id_owner_version_for_testing(
1346 ObjectID::random(),
1347 10.into(),
1348 Owner::AddressOwner(owner),
1349 );
1350 gas_objects.push(receiving_object.clone());
1351 let state = init_state_with_objects(gas_objects.clone()).await;
1352
1353 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1356 assert!(rx_ready_certificates.try_recv().is_err());
1358 execution_scheduler.check_empty_for_testing().await;
1360
1361 let receiving_object_new0 = Object::with_id_owner_version_for_testing(
1362 receiving_object.id(),
1363 0.into(),
1364 Owner::AddressOwner(owner),
1365 );
1366 let receiving_object_new1 = Object::with_id_owner_version_for_testing(
1367 receiving_object.id(),
1368 1.into(),
1369 Owner::AddressOwner(owner),
1370 );
1371 let receiving_object_arg0 =
1372 ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1373 let receive_object_transaction0 = make_transaction(
1374 gas_objects[0].clone(),
1375 vec![CallArg::Object(receiving_object_arg0)],
1376 );
1377
1378 let receive_object_transaction01 = make_transaction(
1379 gas_objects[1].clone(),
1380 vec![CallArg::Object(receiving_object_arg0)],
1381 );
1382
1383 let receiving_object_arg1 =
1384 ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1385 let receive_object_transaction1 = make_transaction(
1386 gas_objects[0].clone(),
1387 vec![CallArg::Object(receiving_object_arg1)],
1388 );
1389
1390 execution_scheduler.enqueue_transactions(
1392 vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1393 &state.epoch_store_for_testing(),
1394 );
1395 execution_scheduler.enqueue_transactions(
1396 vec![(receive_object_transaction01.clone(), ExecutionEnv::new())],
1397 &state.epoch_store_for_testing(),
1398 );
1399 execution_scheduler.enqueue_transactions(
1400 vec![(receive_object_transaction1.clone(), ExecutionEnv::new())],
1401 &state.epoch_store_for_testing(),
1402 );
1403 sleep(Duration::from_secs(1)).await;
1404 rx_ready_certificates.recv().await.unwrap();
1405 rx_ready_certificates.recv().await.unwrap();
1406 rx_ready_certificates.recv().await.unwrap();
1407 assert!(rx_ready_certificates.try_recv().is_err());
1408 }
1409
1410 #[tokio::test(flavor = "current_thread", start_paused = true)]
1413 async fn execution_scheduler_with_cancelled_transactions() {
1414 let (owner, _keypair) = deterministic_random_account_key();
1416 let gas_object = Object::with_id_owner_for_testing(ObjectID::random(), owner);
1417 let shared_object_1 = Object::shared_for_testing();
1418 let initial_shared_version_1 = shared_object_1.owner().start_version().unwrap();
1419 let shared_object_2 = Object::shared_for_testing();
1420 let initial_shared_version_2 = shared_object_2.owner().start_version().unwrap();
1421 let owned_object = Object::with_id_owner_for_testing(ObjectID::random(), owner);
1422
1423 let state = init_state_with_objects(vec![
1424 gas_object.clone(),
1425 shared_object_1.clone(),
1426 shared_object_2.clone(),
1427 owned_object.clone(),
1428 ])
1429 .await;
1430
1431 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1434 assert!(rx_ready_certificates.try_recv().is_err());
1436
1437 let shared_object_arg_1 = ObjectArg::SharedObject {
1439 id: shared_object_1.id(),
1440 initial_shared_version: initial_shared_version_1,
1441 mutability: SharedObjectMutability::Mutable,
1442 };
1443 let shared_object_arg_2 = ObjectArg::SharedObject {
1444 id: shared_object_2.id(),
1445 initial_shared_version: initial_shared_version_2,
1446 mutability: SharedObjectMutability::Mutable,
1447 };
1448
1449 let owned_version = 2000.into();
1451 let mut owned_ref = owned_object.compute_object_reference();
1452 owned_ref.1 = owned_version;
1453 let owned_object_arg = ObjectArg::ImmOrOwnedObject(owned_ref);
1454
1455 let cancelled_transaction = make_transaction(
1456 gas_object.clone(),
1457 vec![
1458 CallArg::Object(shared_object_arg_1),
1459 CallArg::Object(shared_object_arg_2),
1460 CallArg::Object(owned_object_arg),
1461 ],
1462 );
1463 let assigned_versions = vec![
1464 (
1465 (
1466 shared_object_1.id(),
1467 shared_object_1.owner().start_version().unwrap(),
1468 ),
1469 SequenceNumber::CANCELLED_READ,
1470 ),
1471 (
1472 (
1473 shared_object_2.id(),
1474 shared_object_2.owner().start_version().unwrap(),
1475 ),
1476 SequenceNumber::CONGESTED,
1477 ),
1478 ];
1479
1480 execution_scheduler.enqueue_transactions(
1481 vec![(
1482 cancelled_transaction.clone(),
1483 ExecutionEnv::new()
1484 .with_assigned_versions(AssignedVersions::new(assigned_versions, None)),
1485 )],
1486 &state.epoch_store_for_testing(),
1487 );
1488
1489 sleep(Duration::from_secs(1)).await;
1491 assert!(rx_ready_certificates.try_recv().is_err());
1492
1493 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1494
1495 let mut new_owned_object = owned_object.clone();
1497 new_owned_object
1498 .data
1499 .try_as_move_mut()
1500 .unwrap()
1501 .increment_version_to(owned_version);
1502 state
1503 .get_cache_writer()
1504 .write_object_entry_for_test(new_owned_object);
1505
1506 let available_txn = rx_ready_certificates.recv().await.unwrap().certificate;
1508 assert_eq!(available_txn.digest(), cancelled_transaction.digest());
1509
1510 sleep(Duration::from_secs(1)).await;
1511 assert!(rx_ready_certificates.try_recv().is_err());
1512
1513 execution_scheduler.check_empty_for_testing().await;
1514 }
1515
1516 #[test]
1517 fn test_barrier_dependency_builder() {
1518 let make_transaction = |non_exclusive_writes: Vec<u32>, exclusive_writes: Vec<u32>| {
1519 assert!(
1520 non_exclusive_writes
1521 .iter()
1522 .all(|id| !exclusive_writes.contains(id))
1523 );
1524 assert!(
1525 exclusive_writes
1526 .iter()
1527 .all(|id| !non_exclusive_writes.contains(id))
1528 );
1529
1530 let non_exclusive_writes = non_exclusive_writes
1531 .into_iter()
1532 .map(|id| ObjectID::from_single_byte(id as u8));
1533 let exclusive_writes = exclusive_writes
1534 .into_iter()
1535 .map(|id| ObjectID::from_single_byte(id as u8));
1536 let mut builder = ProgrammableTransactionBuilder::new();
1537 for non_exclusive_write in non_exclusive_writes {
1538 builder
1539 .obj(ObjectArg::SharedObject {
1540 id: non_exclusive_write,
1541 initial_shared_version: SequenceNumber::new(),
1542 mutability: SharedObjectMutability::NonExclusiveWrite,
1543 })
1544 .unwrap();
1545 }
1546
1547 for exclusive_write in exclusive_writes {
1548 builder
1549 .obj(ObjectArg::SharedObject {
1550 id: exclusive_write,
1551 initial_shared_version: SequenceNumber::new(),
1552 mutability: SharedObjectMutability::Mutable,
1553 })
1554 .unwrap();
1555 }
1556
1557 let tx = TransactionKind::ProgrammableTransaction(builder.finish());
1558 let tx_data =
1559 TransactionData::new(tx, SuiAddress::default(), random_object_ref(), 1, 1);
1560 Transaction::from_data_and_signer(tx_data, vec![])
1561 };
1562
1563 {
1565 let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1566 let tx1 = make_transaction(vec![1], vec![]);
1567 let tx2 = make_transaction(vec![], vec![1]);
1568
1569 let tx1_deps =
1570 barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1571 let tx2_deps =
1572 barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1573 assert!(tx1_deps.is_empty());
1574 assert_eq!(Vec::from_iter(tx2_deps), vec![*tx1.digest()]);
1575 }
1576
1577 {
1580 let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1581 let tx1 = make_transaction(vec![1, 2], vec![]);
1582 let tx2 = make_transaction(vec![], vec![1]);
1583 let tx3 = make_transaction(vec![], vec![2]);
1584
1585 let tx1_deps =
1586 barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1587 let tx2_deps =
1588 barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1589 let tx3_deps =
1590 barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1591 assert!(tx1_deps.is_empty());
1592 assert_eq!(Vec::from_iter(tx2_deps), vec![*tx1.digest()]);
1593 assert_eq!(Vec::from_iter(tx3_deps), vec![*tx1.digest()]);
1594 }
1595
1596 {
1598 let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1599 let tx1 = make_transaction(vec![1], vec![]);
1600 let tx2 = make_transaction(vec![2], vec![]);
1601 let tx3 = make_transaction(vec![], vec![1, 2]);
1602
1603 let tx1_deps =
1604 barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1605 let tx2_deps =
1606 barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1607 let tx3_deps =
1608 barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1609 assert!(tx1_deps.is_empty());
1610 assert!(tx2_deps.is_empty());
1611 assert_eq!(tx3_deps, BTreeSet::from([*tx1.digest(), *tx2.digest()]));
1612 }
1613
1614 {
1616 let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1617 let tx1 = make_transaction(vec![1], vec![]);
1618 let tx2 = make_transaction(vec![], vec![1]);
1619 let tx3 = make_transaction(vec![], vec![1]);
1620
1621 let tx1_deps =
1622 barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1623 let tx2_deps =
1624 barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1625 let tx3_deps =
1626 barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1627 assert!(tx1_deps.is_empty());
1628 assert_eq!(tx2_deps, BTreeSet::from([*tx1.digest()]));
1629 assert!(tx3_deps.is_empty());
1630 }
1631 }
1632}