1use crate::{
5 accumulators::funds_read::AccountFundsRead,
6 authority::{
7 AuthorityMetrics, ExecutionEnv, authority_per_epoch_store::AuthorityPerEpochStore,
8 shared_object_version_manager::Schedulable,
9 },
10 execution_cache::{ObjectCacheRead, TransactionCacheRead},
11 execution_scheduler::{
12 ExecutingGuard, PendingCertificateStats,
13 funds_withdraw_scheduler::{
14 FundsSettlement, ObjectFundsWithdrawSchedulerTrait, ObjectFundsWithdrawStatus,
15 ScheduleStatus, TxFundsWithdraw, naive_scheduler::NaiveObjectFundsWithdrawScheduler,
16 scheduler::FundsWithdrawScheduler,
17 },
18 },
19};
20use futures::stream::{FuturesUnordered, StreamExt};
21use mysten_common::{assert_reachable, debug_fatal};
22use mysten_metrics::spawn_monitored_task;
23use parking_lot::Mutex;
24use std::{
25 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
26 sync::Arc,
27};
28use sui_config::node::AuthorityOverloadConfig;
29use sui_types::{
30 SUI_ACCUMULATOR_ROOT_OBJECT_ID,
31 base_types::{FullObjectID, ObjectID, SequenceNumber},
32 digests::TransactionDigest,
33 effects::{AccumulatorOperation, AccumulatorValue, TransactionEffects, TransactionEffectsAPI},
34 error::SuiResult,
35 executable_transaction::VerifiedExecutableTransaction,
36 execution_params::FundsWithdrawStatus,
37 storage::InputKey,
38 transaction::{
39 SenderSignedData, SharedInputObject, SharedObjectMutability, TransactionData,
40 TransactionDataAPI, TransactionKey,
41 },
42};
43use tokio::sync::mpsc::UnboundedSender;
44use tokio::time::Instant;
45use tracing::{debug, error, instrument};
46
47use super::{PendingCertificate, overload_tracker::OverloadTracker};
48
49pub(crate) struct BarrierDependencyBuilder {
51 dep_state: BTreeMap<ObjectID, BTreeSet<TransactionDigest>>,
52}
53
54impl BarrierDependencyBuilder {
55 pub fn new() -> Self {
56 Self {
57 dep_state: Default::default(),
58 }
59 }
60
61 pub fn process_tx(
67 &mut self,
68 tx_digest: TransactionDigest,
69 tx: &TransactionData,
70 ) -> BTreeSet<TransactionDigest> {
71 let mut barrier_deps = BTreeSet::new();
72 for SharedInputObject { id, mutability, .. } in tx.kind().shared_input_objects() {
73 match mutability {
74 SharedObjectMutability::NonExclusiveWrite => {
75 self.dep_state.entry(id).or_default().insert(tx_digest);
76 }
77 SharedObjectMutability::Mutable => {
78 if let Some(deps) = self.dep_state.remove(&id) {
81 barrier_deps.extend(deps);
82 }
83 }
84 SharedObjectMutability::Immutable => (),
85 }
86 }
87 barrier_deps
88 }
89}
90
91#[derive(Clone)]
92pub struct ExecutionScheduler {
93 object_cache_read: Arc<dyn ObjectCacheRead>,
94 transaction_cache_read: Arc<dyn TransactionCacheRead>,
95 overload_tracker: Arc<OverloadTracker>,
96 tx_ready_certificates: UnboundedSender<PendingCertificate>,
97 address_funds_withdraw_scheduler: Arc<Mutex<Option<FundsWithdrawScheduler>>>,
98 object_funds_withdraw_scheduler: Arc<Mutex<Option<Box<dyn ObjectFundsWithdrawSchedulerTrait>>>>,
99 metrics: Arc<AuthorityMetrics>,
100}
101
102struct PendingGuard<'a> {
103 scheduler: &'a ExecutionScheduler,
104 cert: &'a VerifiedExecutableTransaction,
105}
106
107impl<'a> PendingGuard<'a> {
108 pub fn new(scheduler: &'a ExecutionScheduler, cert: &'a VerifiedExecutableTransaction) -> Self {
109 scheduler
110 .metrics
111 .transaction_manager_num_pending_certificates
112 .inc();
113 scheduler
114 .overload_tracker
115 .add_pending_certificate(cert.data());
116 Self { scheduler, cert }
117 }
118}
119
120impl Drop for PendingGuard<'_> {
121 fn drop(&mut self) {
122 self.scheduler
123 .metrics
124 .transaction_manager_num_pending_certificates
125 .dec();
126 self.scheduler
127 .overload_tracker
128 .remove_pending_certificate(self.cert.data());
129 }
130}
131
132impl ExecutionScheduler {
133 pub fn new(
134 object_cache_read: Arc<dyn ObjectCacheRead>,
135 account_funds_read: Arc<dyn AccountFundsRead>,
136 transaction_cache_read: Arc<dyn TransactionCacheRead>,
137 tx_ready_certificates: UnboundedSender<PendingCertificate>,
138 epoch_store: &Arc<AuthorityPerEpochStore>,
139 metrics: Arc<AuthorityMetrics>,
140 ) -> Self {
141 tracing::info!("Creating new ExecutionScheduler");
142 let (address_funds_withdraw_scheduler, object_funds_withdraw_scheduler) =
143 Self::initialize_funds_withdraw_scheduler(
144 epoch_store,
145 &object_cache_read,
146 account_funds_read,
147 );
148 Self {
149 object_cache_read,
150 transaction_cache_read,
151 overload_tracker: Arc::new(OverloadTracker::new()),
152 tx_ready_certificates,
153 address_funds_withdraw_scheduler: Arc::new(Mutex::new(
154 address_funds_withdraw_scheduler,
155 )),
156 object_funds_withdraw_scheduler: Arc::new(Mutex::new(object_funds_withdraw_scheduler)),
157 metrics,
158 }
159 }
160
161 fn initialize_funds_withdraw_scheduler(
162 epoch_store: &Arc<AuthorityPerEpochStore>,
163 object_cache_read: &Arc<dyn ObjectCacheRead>,
164 account_funds_read: Arc<dyn AccountFundsRead>,
165 ) -> (
166 Option<FundsWithdrawScheduler>,
167 Option<Box<dyn ObjectFundsWithdrawSchedulerTrait>>,
168 ) {
169 let withdraw_scheduler_enabled =
170 epoch_store.is_validator() && epoch_store.accumulators_enabled();
171 if !withdraw_scheduler_enabled {
172 return (None, None);
173 }
174 let starting_accumulator_version = object_cache_read
175 .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
176 .expect("Accumulator root object must be present if funds accumulator is enabled")
177 .version();
178 let address_funds_withdraw_scheduler =
179 FundsWithdrawScheduler::new(account_funds_read.clone(), starting_accumulator_version);
180 let object_funds_withdraw_scheduler =
181 if epoch_store.protocol_config().enable_object_funds_withdraw() {
182 let scheduler: Box<dyn ObjectFundsWithdrawSchedulerTrait> =
183 Box::new(NaiveObjectFundsWithdrawScheduler::new(
184 account_funds_read,
185 starting_accumulator_version,
186 ));
187 Some(scheduler)
188 } else {
189 None
190 };
191 (
192 Some(address_funds_withdraw_scheduler),
193 object_funds_withdraw_scheduler,
194 )
195 }
196
197 #[instrument(level = "debug", skip_all, fields(tx_digest = ?cert.digest()))]
198 async fn schedule_transaction(
199 self,
200 cert: VerifiedExecutableTransaction,
201 execution_env: ExecutionEnv,
202 epoch_store: &Arc<AuthorityPerEpochStore>,
203 ) {
204 let enqueue_time = Instant::now();
205 let tx_digest = cert.digest();
206 let digests = [*tx_digest];
207
208 let tx_data = cert.transaction_data();
209 let input_object_kinds = tx_data
210 .input_objects()
211 .expect("input_objects() cannot fail");
212 let input_object_keys: Vec<_> = epoch_store
213 .get_input_object_keys(
214 &cert.key(),
215 &input_object_kinds,
216 &execution_env.assigned_versions,
217 )
218 .into_iter()
219 .collect();
220
221 let receiving_object_keys: HashSet<_> = tx_data
222 .receiving_objects()
223 .into_iter()
224 .map(|entry| {
225 InputKey::VersionedObject {
226 id: FullObjectID::new(entry.0, None),
228 version: entry.1,
229 }
230 })
231 .collect();
232 let input_and_receiving_keys = [
233 input_object_keys,
234 receiving_object_keys.iter().cloned().collect(),
235 ]
236 .concat();
237
238 let epoch = epoch_store.epoch();
239 debug!(
240 ?tx_digest,
241 "Scheduled transaction, waiting for input objects: {:?}", input_and_receiving_keys,
242 );
243
244 let availability = self
245 .object_cache_read
246 .multi_input_objects_available_cache_only(&input_and_receiving_keys);
247 let missing_input_keys: Vec<_> = input_and_receiving_keys
251 .into_iter()
252 .zip(availability)
253 .filter_map(|(key, available)| if !available { Some(key) } else { None })
254 .collect();
255
256 let has_missing_barrier_dependencies = self
257 .transaction_cache_read
258 .multi_get_executed_effects_digests(&execution_env.barrier_dependencies)
259 .into_iter()
260 .any(|r| r.is_none());
261
262 if missing_input_keys.is_empty() && !has_missing_barrier_dependencies {
263 self.metrics
264 .transaction_manager_num_enqueued_certificates
265 .with_label_values(&["ready"])
266 .inc();
267 debug!(?tx_digest, "Input objects already available");
268 self.send_transaction_for_execution(&cert, execution_env, enqueue_time);
269 return;
270 }
271
272 let _pending_guard = PendingGuard::new(&self, &cert);
273 self.metrics
274 .transaction_manager_num_enqueued_certificates
275 .with_label_values(&["pending"])
276 .inc();
277
278 if !execution_env.barrier_dependencies.is_empty() {
279 debug!(
280 "waiting for barrier dependencies to be executed: {:?}",
281 execution_env.barrier_dependencies
282 );
283 self.transaction_cache_read
284 .notify_read_executed_effects_digests(
285 "wait_for_barrier_dependencies",
286 &execution_env.barrier_dependencies,
287 )
288 .await;
289 }
290
291 tokio::select! {
292 _ = self.object_cache_read
293 .notify_read_input_objects(&missing_input_keys, &receiving_object_keys, epoch)
294 => {
295 self.metrics
296 .transaction_manager_transaction_queue_age_s
297 .observe(enqueue_time.elapsed().as_secs_f64());
298 debug!(?tx_digest, "Input objects available");
299 self.send_transaction_for_execution(
301 &cert,
302 execution_env,
303 enqueue_time,
304 );
305 }
306 _ = self.transaction_cache_read.notify_read_executed_effects_digests(
307 "ExecutionScheduler::notify_read_executed_effects_digests",
308 &digests,
309 ) => {
310 debug!(?tx_digest, "Transaction already executed");
311 }
312 };
313 }
314
315 fn send_transaction_for_execution(
316 &self,
317 cert: &VerifiedExecutableTransaction,
318 execution_env: ExecutionEnv,
319 enqueue_time: Instant,
320 ) {
321 let pending_cert = PendingCertificate {
322 certificate: cert.clone(),
323 execution_env,
324 stats: PendingCertificateStats {
325 enqueue_time,
326 ready_time: Some(Instant::now()),
327 },
328 executing_guard: Some(ExecutingGuard::new(
329 self.metrics
330 .transaction_manager_num_executing_certificates
331 .clone(),
332 )),
333 };
334 let _ = self.tx_ready_certificates.send(pending_cert);
335 }
336
337 fn schedule_funds_withdraws(
338 &self,
339 certs: Vec<(VerifiedExecutableTransaction, ExecutionEnv)>,
340 epoch_store: &Arc<AuthorityPerEpochStore>,
341 ) {
342 if certs.is_empty() {
343 return;
344 }
345 let mut withdraws = BTreeMap::new();
346 let mut prev_version = None;
347 for (cert, env) in &certs {
348 let tx_withdraws = cert
349 .transaction_data()
350 .process_funds_withdrawals_for_execution(epoch_store.get_chain_identifier());
351 assert!(!tx_withdraws.is_empty());
352 let accumulator_version = env
353 .assigned_versions
354 .accumulator_version
355 .expect("accumulator_version must be set when there are withdraws");
356 if let Some(prev_version) = prev_version {
357 assert!(prev_version <= accumulator_version);
359 }
360 prev_version = Some(accumulator_version);
361 let tx_digest = *cert.digest();
362 withdraws
363 .entry(accumulator_version)
364 .or_insert(Vec::new())
365 .push(TxFundsWithdraw {
366 tx_digest,
367 reservations: tx_withdraws,
368 });
369 }
370 let mut receivers = FuturesUnordered::new();
371 {
372 let guard = self.address_funds_withdraw_scheduler.lock();
373 let withdraw_scheduler = guard
374 .as_ref()
375 .expect("Funds withdraw scheduler must be enabled if there are withdraws");
376 for (version, tx_withdraws) in withdraws {
377 receivers.extend(withdraw_scheduler.schedule_withdraws(version, tx_withdraws));
378 }
379 }
381 let scheduler = self.clone();
382 let epoch_store = epoch_store.clone();
383 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
384 let mut cert_map = HashMap::new();
385 for (cert, env) in certs {
386 cert_map.insert(*cert.digest(), (cert, env));
387 }
388 while let Some(result) = receivers.next().await {
389 match result {
390 Ok(result) => match result.status {
391 ScheduleStatus::InsufficientFunds => {
392 assert_reachable!("tx cancelled, insufficient funds");
393 let tx_digest = result.tx_digest;
394 debug!(
395 ?tx_digest,
396 "Funds withdraw scheduling result: Insufficient funds"
397 );
398 let (cert, env) = cert_map.remove(&tx_digest).expect("cert must exist");
399 let env = env.with_insufficient_funds();
400 scheduler.enqueue_transactions(vec![(cert, env)], &epoch_store);
401 }
402 ScheduleStatus::SufficientFunds => {
403 assert_reachable!("tx scheduled, sufficient funds");
404 let tx_digest = result.tx_digest;
405 debug!(?tx_digest, "Funds withdraw scheduling result: Success");
406 let (cert, env) = cert_map.remove(&tx_digest).expect("cert must exist");
407 scheduler.enqueue_transactions(vec![(cert, env)], &epoch_store);
408 }
409 ScheduleStatus::SkipSchedule => {
410 assert_reachable!("tx withdrawal scheduling skipped");
411 let tx_digest = result.tx_digest;
412 debug!(?tx_digest, "Skip scheduling funds withdraw");
413 }
414 },
415 Err(e) => {
416 error!("Withdraw scheduler stopped: {:?}", e);
417 }
418 }
419 }
420 }));
421 }
422
423 fn schedule_settlement_transactions(
424 &self,
425 settlement_txns: Vec<(TransactionKey, ExecutionEnv)>,
426 epoch_store: &Arc<AuthorityPerEpochStore>,
427 ) {
428 if !settlement_txns.is_empty() {
429 let scheduler = self.clone();
430 let epoch_store = epoch_store.clone();
431
432 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
433 let mut futures: FuturesUnordered<_> = settlement_txns
434 .into_iter()
435 .map(|(key, env)| {
436 let epoch_store = epoch_store.clone();
437 async move {
438 (
439 key,
440 epoch_store.wait_for_settlement_transactions(key).await,
441 env,
442 )
443 }
444 })
445 .collect();
446
447 while let Some((settlement_key, txns, env)) = futures.next().await {
448 let mut barrier_deps = BarrierDependencyBuilder::new();
449 let txns = txns
450 .into_iter()
451 .map(|tx| {
452 let deps = barrier_deps.process_tx(*tx.digest(), tx.transaction_data());
453 let env = env.clone().with_barrier_dependencies(deps);
454 (tx, env)
455 })
456 .collect::<Vec<_>>();
457
458 scheduler.enqueue_transactions(txns, &epoch_store);
459
460 let scheduler = scheduler.clone();
462 let epoch_store = epoch_store.clone();
463 let env = env.clone();
464 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
465 let barrier_tx = epoch_store
466 .wait_for_barrier_transaction(settlement_key)
467 .await;
468 let deps = barrier_deps
469 .process_tx(*barrier_tx.digest(), barrier_tx.transaction_data());
470 let env = env.with_barrier_dependencies(deps);
471 scheduler.enqueue_transactions(vec![(barrier_tx, env)], &epoch_store);
472 }));
473 }
474 }));
475 }
476 }
477
478 fn schedule_tx_keys(
479 &self,
480 tx_with_keys: Vec<(TransactionKey, ExecutionEnv)>,
481 epoch_store: &Arc<AuthorityPerEpochStore>,
482 ) {
483 if tx_with_keys.is_empty() {
484 return;
485 }
486
487 let scheduler = self.clone();
488 let epoch_store = epoch_store.clone();
489 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
490 let tx_keys: Vec<_> = tx_with_keys.iter().map(|(key, _)| key).cloned().collect();
491 let digests = epoch_store
492 .notify_read_tx_key_to_digest(&tx_keys)
493 .await
494 .expect("db error");
495 let transactions = scheduler
496 .transaction_cache_read
497 .multi_get_transaction_blocks(&digests)
498 .into_iter()
499 .map(|tx| {
500 let tx = tx.expect("tx must exist").as_ref().clone();
501 VerifiedExecutableTransaction::new_system(tx, epoch_store.epoch())
502 })
503 .zip(tx_with_keys.into_iter().map(|(_, env)| env))
504 .collect::<Vec<_>>();
505 scheduler.enqueue_transactions(transactions, &epoch_store);
506 }));
507 }
508
509 #[cfg(debug_assertions)]
512 fn assert_cert_not_executed_previous_epochs(&self, cert: &VerifiedExecutableTransaction) {
513 let epoch = cert.epoch();
514 let digest = *cert.digest();
515 let digests = [digest];
516 let executed = self
517 .transaction_cache_read
518 .multi_get_executed_effects(&digests)
519 .pop()
520 .unwrap();
521 if let Some(executed) = executed {
524 use sui_types::effects::TransactionEffectsAPI;
525
526 assert_eq!(
527 executed.executed_epoch(),
528 epoch,
529 "Transaction {} was executed in epoch {}, but scheduled again in epoch {}",
530 digest,
531 executed.executed_epoch(),
532 epoch
533 );
534 }
535 }
536}
537
538impl ExecutionScheduler {
539 pub fn enqueue(
540 &self,
541 certs: Vec<(Schedulable, ExecutionEnv)>,
542 epoch_store: &Arc<AuthorityPerEpochStore>,
543 ) {
544 let mut ordinary_txns = Vec::with_capacity(certs.len());
546 let mut tx_with_keys = Vec::new();
547 let mut tx_with_withdraws = Vec::new();
548 let mut settlement_txns = Vec::new();
549
550 for (schedulable, env) in certs {
551 match schedulable {
552 Schedulable::Transaction(tx) => {
553 if tx.transaction_data().has_funds_withdrawals() {
554 tx_with_withdraws.push((tx, env));
555 } else {
556 ordinary_txns.push((tx, env));
557 }
558 }
559 s @ Schedulable::RandomnessStateUpdate(..) => {
560 tx_with_keys.push((s.key(), env));
561 }
562 Schedulable::AccumulatorSettlement(_, _) => {
563 settlement_txns.push((schedulable.key(), env));
564 }
565 Schedulable::ConsensusCommitPrologue(_, _, _) => {
566 unreachable!("Schedulable::ConsensusCommitPrologue should not be enqueued");
570 }
571 }
572 }
573
574 self.enqueue_transactions(ordinary_txns, epoch_store);
575 self.schedule_tx_keys(tx_with_keys, epoch_store);
576 self.schedule_funds_withdraws(tx_with_withdraws, epoch_store);
577 self.schedule_settlement_transactions(settlement_txns, epoch_store);
578 }
579
580 pub fn enqueue_transactions(
581 &self,
582 certs: Vec<(VerifiedExecutableTransaction, ExecutionEnv)>,
583 epoch_store: &Arc<AuthorityPerEpochStore>,
584 ) {
585 let certs: Vec<_> = certs
587 .into_iter()
588 .filter_map(|cert| {
589 if cert.0.epoch() == epoch_store.epoch() {
590 #[cfg(debug_assertions)]
591 self.assert_cert_not_executed_previous_epochs(&cert.0);
592
593 Some(cert)
594 } else {
595 debug_fatal!(
596 "We should never enqueue certificate from wrong epoch. Expected={} Certificate={:?}",
597 epoch_store.epoch(),
598 cert.0.epoch()
599 );
600 None
601 }
602 })
603 .collect();
604 let digests: Vec<_> = certs.iter().map(|(cert, _)| *cert.digest()).collect();
605 let executed = self
606 .transaction_cache_read
607 .multi_get_executed_effects_digests(&digests);
608 let mut already_executed_certs_num = 0;
609 let pending_certs =
610 certs
611 .into_iter()
612 .zip(executed)
613 .filter_map(|((cert, execution_env), executed)| {
614 if executed.is_none() {
615 Some((cert, execution_env))
616 } else {
617 already_executed_certs_num += 1;
618 None
619 }
620 });
621
622 for (cert, execution_env) in pending_certs {
623 let scheduler = self.clone();
624 let epoch_store = epoch_store.clone();
625 spawn_monitored_task!(
626 epoch_store.within_alive_epoch(scheduler.schedule_transaction(
627 cert,
628 execution_env,
629 &epoch_store,
630 ))
631 );
632 }
633
634 self.metrics
635 .transaction_manager_num_enqueued_certificates
636 .with_label_values(&["already_executed"])
637 .inc_by(already_executed_certs_num);
638 }
639
640 pub fn settle_address_funds(&self, settlement: FundsSettlement) {
641 self.address_funds_withdraw_scheduler
642 .lock()
643 .as_ref()
644 .expect("Funds withdraw scheduler must be enabled if there are settlements")
645 .settle_funds(settlement);
646 }
647
648 pub fn settle_object_funds(&self, next_accumulator_version: SequenceNumber) {
649 if let Some(object_funds_withdraw_scheduler) =
650 self.object_funds_withdraw_scheduler.lock().as_ref()
651 {
652 object_funds_withdraw_scheduler.settle_accumulator_version(next_accumulator_version);
653 }
654 }
655
656 pub fn reconfigure(
659 &self,
660 new_epoch_store: &Arc<AuthorityPerEpochStore>,
661 account_funds_read: &Arc<dyn AccountFundsRead>,
662 ) {
663 let (address_funds_withdraw_scheduler, object_funds_withdraw_scheduler) =
664 Self::initialize_funds_withdraw_scheduler(
665 new_epoch_store,
666 &self.object_cache_read,
667 account_funds_read.clone(),
668 );
669 let mut guard = self.address_funds_withdraw_scheduler.lock();
670 if let Some(old_scheduler) = guard.as_ref() {
671 old_scheduler.close_epoch();
672 }
673 *guard = address_funds_withdraw_scheduler;
674 drop(guard);
675
676 let mut object_guard = self.object_funds_withdraw_scheduler.lock();
677 if let Some(old_scheduler) = object_guard.as_ref() {
678 old_scheduler.close_epoch();
679 }
680 *object_guard = object_funds_withdraw_scheduler;
681 }
682
683 pub fn check_execution_overload(
684 &self,
685 overload_config: &AuthorityOverloadConfig,
686 tx_data: &SenderSignedData,
687 ) -> SuiResult {
688 let inflight_queue_len = self.num_pending_certificates();
689 self.overload_tracker
690 .check_execution_overload(overload_config, tx_data, inflight_queue_len)
691 }
692
693 pub fn num_pending_certificates(&self) -> usize {
694 (self
695 .metrics
696 .transaction_manager_num_pending_certificates
697 .get()
698 + self
699 .metrics
700 .transaction_manager_num_executing_certificates
701 .get()) as usize
702 }
703
704 #[instrument(level = "debug", skip_all, fields(tx_digest = ?certificate.digest()))]
705 pub fn should_commit_object_funds_withdraws(
706 &self,
707 certificate: &VerifiedExecutableTransaction,
708 effects: &TransactionEffects,
709 execution_env: &ExecutionEnv,
710 epoch_store: &Arc<AuthorityPerEpochStore>,
711 ) -> bool {
712 if self.object_funds_withdraw_scheduler.lock().is_none() {
714 return true;
715 }
716
717 if effects.status().is_err() {
718 debug!("Transaction failed, committing effects");
721 return true;
722 }
723 let address_funds_reservations: BTreeSet<_> = certificate
724 .transaction_data()
725 .process_funds_withdrawals_for_execution(epoch_store.get_chain_identifier())
726 .into_keys()
727 .collect();
728 let object_withdraws: BTreeMap<_, _> = effects
732 .accumulator_events()
733 .into_iter()
734 .filter_map(|event| {
735 if address_funds_reservations.contains(&event.accumulator_obj) {
736 return None;
737 }
738 if let (AccumulatorOperation::Split, AccumulatorValue::Integer(amount)) =
740 (event.write.operation, event.write.value)
741 {
742 Some((event.accumulator_obj, amount))
743 } else {
744 None
745 }
746 })
747 .collect();
748 if object_withdraws.is_empty() {
750 debug!("No object withdraws, committing effects");
751 return true;
752 }
753 let Some(accumulator_version) = execution_env.assigned_versions.accumulator_version else {
754 return false;
762 };
763 match self
764 .object_funds_withdraw_scheduler
765 .lock()
766 .as_ref()
767 .unwrap()
768 .schedule(object_withdraws, accumulator_version)
769 {
770 ObjectFundsWithdrawStatus::SufficientFunds => {
772 debug!("Object funds sufficient, committing effects");
773 true
774 }
775 ObjectFundsWithdrawStatus::Pending(receiver) => {
780 let scheduler = self.clone();
781 let cert = certificate.clone();
782 let mut execution_env = execution_env.clone();
783 let epoch_store = epoch_store.clone();
784 tokio::task::spawn(async move {
785 let _ = epoch_store
789 .within_alive_epoch(async move {
790 let tx_digest = cert.digest();
791 match receiver.await {
792 Ok(FundsWithdrawStatus::MaybeSufficient) => {
793 debug!(?tx_digest, "Object funds possibly sufficient");
798 }
799 Ok(FundsWithdrawStatus::Insufficient) => {
800 execution_env = execution_env.with_insufficient_funds();
806 debug!(?tx_digest, "Object funds insufficient");
807 }
808 Err(e) => {
809 error!("Error receiving funds withdraw status: {:?}", e);
810 }
811 }
812 scheduler.send_transaction_for_execution(
813 &cert,
814 execution_env,
815 Instant::now(),
818 );
819 })
820 .await;
821 });
822 false
823 }
824 }
825 }
826
827 #[cfg(test)]
828 pub fn check_empty_for_testing(&self) {
829 assert_eq!(self.num_pending_certificates(), 0);
830 }
831}
832
833#[cfg(test)]
834mod test {
835 use super::{BarrierDependencyBuilder, ExecutionScheduler, PendingCertificate};
836 use crate::authority::ExecutionEnv;
837 use crate::authority::shared_object_version_manager::AssignedVersions;
838 use crate::authority::{AuthorityState, authority_tests::init_state_with_objects};
839 use crate::execution_scheduler::SchedulingSource;
840 use std::collections::BTreeSet;
841 use std::{time::Duration, vec};
842 use sui_test_transaction_builder::TestTransactionBuilder;
843 use sui_types::base_types::{SuiAddress, random_object_ref};
844 use sui_types::executable_transaction::VerifiedExecutableTransaction;
845 use sui_types::object::Owner;
846 use sui_types::programmable_transaction_builder::ProgrammableTransactionBuilder;
847 use sui_types::transaction::{
848 SharedObjectMutability, Transaction, TransactionData, TransactionKind, VerifiedTransaction,
849 };
850 use sui_types::{
851 SUI_FRAMEWORK_PACKAGE_ID,
852 base_types::{ObjectID, SequenceNumber},
853 crypto::deterministic_random_account_key,
854 object::Object,
855 transaction::{CallArg, ObjectArg},
856 };
857 use tokio::time::Instant;
858 use tokio::{
859 sync::mpsc::{UnboundedReceiver, error::TryRecvError, unbounded_channel},
860 time::sleep,
861 };
862
863 #[allow(clippy::disallowed_methods)] fn make_execution_scheduler(
865 state: &AuthorityState,
866 ) -> (ExecutionScheduler, UnboundedReceiver<PendingCertificate>) {
867 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
870 let execution_scheduler = ExecutionScheduler::new(
871 state.get_object_cache_reader().clone(),
872 state.get_account_funds_read().clone(),
873 state.get_transaction_cache_reader().clone(),
874 tx_ready_certificates,
875 &state.epoch_store_for_testing(),
876 state.metrics.clone(),
877 );
878
879 (execution_scheduler, rx_ready_certificates)
880 }
881
882 fn make_transaction(gas_object: Object, input: Vec<CallArg>) -> VerifiedExecutableTransaction {
883 let rgp = 100;
886 let (sender, keypair) = deterministic_random_account_key();
887 let transaction =
888 TestTransactionBuilder::new(sender, gas_object.compute_object_reference(), rgp)
889 .move_call(SUI_FRAMEWORK_PACKAGE_ID, "counter", "assert_value", input)
890 .build_and_sign(&keypair);
891 VerifiedExecutableTransaction::new_system(
892 VerifiedTransaction::new_unchecked(transaction),
893 0,
894 )
895 }
896
897 #[tokio::test(flavor = "current_thread", start_paused = true)]
898 async fn execution_scheduler_basics() {
899 let (owner, _keypair) = deterministic_random_account_key();
901 let gas_objects: Vec<Object> = (0..10)
902 .map(|_| {
903 let gas_object_id = ObjectID::random();
904 Object::with_id_owner_for_testing(gas_object_id, owner)
905 })
906 .collect();
907 let state = init_state_with_objects(gas_objects.clone()).await;
908
909 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
912 assert!(
914 rx_ready_certificates
915 .try_recv()
916 .is_err_and(|err| err == TryRecvError::Empty)
917 );
918 assert_eq!(execution_scheduler.num_pending_certificates(), 0);
920
921 execution_scheduler.enqueue_transactions(vec![], &state.epoch_store_for_testing());
923 assert!(
925 rx_ready_certificates
926 .try_recv()
927 .is_err_and(|err| err == TryRecvError::Empty)
928 );
929
930 let transaction = make_transaction(gas_objects[0].clone(), vec![]);
932 let tx_start_time = Instant::now();
933 execution_scheduler.enqueue_transactions(
934 vec![(
935 transaction.clone(),
936 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
937 )],
938 &state.epoch_store_for_testing(),
939 );
940 let pending_certificate = rx_ready_certificates.recv().await.unwrap();
942
943 assert!(pending_certificate.stats.enqueue_time >= tx_start_time);
945 assert!(
946 pending_certificate.stats.ready_time.unwrap() >= pending_certificate.stats.enqueue_time
947 );
948
949 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
950
951 drop(pending_certificate);
953
954 execution_scheduler.check_empty_for_testing();
956
957 let gas_object_new = Object::with_id_owner_version_for_testing(
959 ObjectID::random(),
960 0.into(),
961 Owner::AddressOwner(owner),
962 );
963 let transaction = make_transaction(gas_object_new.clone(), vec![]);
964 let tx_start_time = Instant::now();
965 execution_scheduler.enqueue_transactions(
966 vec![(
967 transaction.clone(),
968 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
969 )],
970 &state.epoch_store_for_testing(),
971 );
972 sleep(Duration::from_secs(1)).await;
974 assert!(
975 rx_ready_certificates
976 .try_recv()
977 .is_err_and(|err| err == TryRecvError::Empty)
978 );
979
980 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
981
982 execution_scheduler.enqueue_transactions(
984 vec![(
985 transaction.clone(),
986 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
987 )],
988 &state.epoch_store_for_testing(),
989 );
990 sleep(Duration::from_secs(1)).await;
991 assert!(
992 rx_ready_certificates
993 .try_recv()
994 .is_err_and(|err| err == TryRecvError::Empty)
995 );
996
997 assert_eq!(execution_scheduler.num_pending_certificates(), 2);
998
999 state
1001 .get_cache_writer()
1002 .write_object_entry_for_test(gas_object_new);
1003 let pending_certificate = rx_ready_certificates.recv().await.unwrap();
1006 let pending_certificate2 = rx_ready_certificates.recv().await.unwrap();
1007 assert_eq!(
1008 pending_certificate.certificate.digest(),
1009 pending_certificate2.certificate.digest()
1010 );
1011
1012 assert!(pending_certificate.stats.enqueue_time >= tx_start_time);
1015 assert!(
1016 pending_certificate.stats.ready_time.unwrap() - pending_certificate.stats.enqueue_time
1017 >= Duration::from_secs(2)
1018 );
1019
1020 drop(pending_certificate);
1022 drop(pending_certificate2);
1023
1024 execution_scheduler.check_empty_for_testing();
1026 }
1027
1028 #[tokio::test(flavor = "current_thread", start_paused = true)]
1037 async fn execution_scheduler_object_dependency() {
1038 telemetry_subscribers::init_for_testing();
1039 let (owner, _keypair) = deterministic_random_account_key();
1041 let gas_objects: Vec<Object> = (0..10)
1042 .map(|_| {
1043 let gas_object_id = ObjectID::random();
1044 Object::with_id_owner_for_testing(gas_object_id, owner)
1045 })
1046 .collect();
1047 let shared_object = Object::shared_for_testing();
1048 let initial_shared_version = shared_object.owner().start_version().unwrap();
1049 let shared_object_2 = Object::shared_for_testing();
1050 let initial_shared_version_2 = shared_object_2.owner().start_version().unwrap();
1051
1052 let state = init_state_with_objects(
1053 [
1054 gas_objects.clone(),
1055 vec![shared_object.clone(), shared_object_2.clone()],
1056 ]
1057 .concat(),
1058 )
1059 .await;
1060
1061 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1064 assert!(rx_ready_certificates.try_recv().is_err());
1066
1067 let shared_version = 1000.into();
1069 let shared_object_arg_read = ObjectArg::SharedObject {
1070 id: shared_object.id(),
1071 initial_shared_version,
1072 mutability: SharedObjectMutability::Immutable,
1073 };
1074 let transaction_read_0 = make_transaction(
1075 gas_objects[0].clone(),
1076 vec![CallArg::Object(shared_object_arg_read)],
1077 );
1078 let transaction_read_1 = make_transaction(
1079 gas_objects[1].clone(),
1080 vec![CallArg::Object(shared_object_arg_read)],
1081 );
1082 let tx_read_0_assigned_versions = vec![(
1083 (
1084 shared_object.id(),
1085 shared_object.owner().start_version().unwrap(),
1086 ),
1087 shared_version,
1088 )];
1089 let tx_read_1_assigned_versions = vec![(
1090 (
1091 shared_object.id(),
1092 shared_object.owner().start_version().unwrap(),
1093 ),
1094 shared_version,
1095 )];
1096
1097 let shared_object_arg_default = ObjectArg::SharedObject {
1099 id: shared_object.id(),
1100 initial_shared_version,
1101 mutability: SharedObjectMutability::Mutable,
1102 };
1103 let transaction_default = make_transaction(
1104 gas_objects[2].clone(),
1105 vec![CallArg::Object(shared_object_arg_default)],
1106 );
1107 let tx_default_assigned_versions = vec![(
1108 (
1109 shared_object.id(),
1110 shared_object.owner().start_version().unwrap(),
1111 ),
1112 shared_version,
1113 )];
1114
1115 let shared_version_2 = 1000.into();
1117 let shared_object_arg_read_2 = ObjectArg::SharedObject {
1118 id: shared_object_2.id(),
1119 initial_shared_version: initial_shared_version_2,
1120 mutability: SharedObjectMutability::Immutable,
1121 };
1122 let transaction_read_2 = make_transaction(
1123 gas_objects[3].clone(),
1124 vec![
1125 CallArg::Object(shared_object_arg_default),
1126 CallArg::Object(shared_object_arg_read_2),
1127 ],
1128 );
1129 let tx_read_2_assigned_versions = vec![
1130 (
1131 (
1132 shared_object.id(),
1133 shared_object.owner().start_version().unwrap(),
1134 ),
1135 shared_version,
1136 ),
1137 (
1138 (
1139 shared_object_2.id(),
1140 shared_object_2.owner().start_version().unwrap(),
1141 ),
1142 shared_version_2,
1143 ),
1144 ];
1145
1146 execution_scheduler.enqueue_transactions(
1147 vec![
1148 (
1149 transaction_read_0.clone(),
1150 ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
1151 tx_read_0_assigned_versions,
1152 None,
1153 )),
1154 ),
1155 (
1156 transaction_read_1.clone(),
1157 ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
1158 tx_read_1_assigned_versions,
1159 None,
1160 )),
1161 ),
1162 (
1163 transaction_default.clone(),
1164 ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
1165 tx_default_assigned_versions,
1166 None,
1167 )),
1168 ),
1169 (
1170 transaction_read_2.clone(),
1171 ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
1172 tx_read_2_assigned_versions,
1173 None,
1174 )),
1175 ),
1176 ],
1177 &state.epoch_store_for_testing(),
1178 );
1179
1180 sleep(Duration::from_secs(1)).await;
1182 assert!(rx_ready_certificates.try_recv().is_err());
1183
1184 assert_eq!(execution_scheduler.num_pending_certificates(), 4);
1185
1186 let mut new_shared_object = shared_object.clone();
1188 new_shared_object
1189 .data
1190 .try_as_move_mut()
1191 .unwrap()
1192 .increment_version_to(shared_version_2);
1193 state
1194 .get_cache_writer()
1195 .write_object_entry_for_test(new_shared_object);
1196
1197 let tx_0 = rx_ready_certificates.recv().await.unwrap().certificate;
1199 let tx_1 = rx_ready_certificates.recv().await.unwrap().certificate;
1200 let tx_2 = rx_ready_certificates.recv().await.unwrap().certificate;
1201 {
1202 let mut want_digests = vec![
1203 transaction_read_0.digest(),
1204 transaction_read_1.digest(),
1205 transaction_default.digest(),
1206 ];
1207 want_digests.sort();
1208 let mut got_digests = vec![tx_0.digest(), tx_1.digest(), tx_2.digest()];
1209 got_digests.sort();
1210 assert_eq!(want_digests, got_digests);
1211 }
1212
1213 sleep(Duration::from_secs(1)).await;
1214 assert!(rx_ready_certificates.try_recv().is_err());
1215
1216 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1217
1218 let mut new_shared_object_2 = shared_object_2.clone();
1220 new_shared_object_2
1221 .data
1222 .try_as_move_mut()
1223 .unwrap()
1224 .increment_version_to(shared_version_2);
1225 state
1226 .get_cache_writer()
1227 .write_object_entry_for_test(new_shared_object_2);
1228
1229 let tx_3 = rx_ready_certificates.recv().await.unwrap().certificate;
1231 assert_eq!(transaction_read_2.digest(), tx_3.digest());
1232
1233 sleep(Duration::from_secs(1)).await;
1234 assert!(rx_ready_certificates.try_recv().is_err());
1235
1236 execution_scheduler.check_empty_for_testing();
1237 }
1238
1239 #[tokio::test(flavor = "current_thread", start_paused = true)]
1240 async fn execution_scheduler_receiving_notify_commit() {
1241 telemetry_subscribers::init_for_testing();
1242 let (owner, _keypair) = deterministic_random_account_key();
1244 let gas_objects: Vec<Object> = (0..10)
1245 .map(|_| {
1246 let gas_object_id = ObjectID::random();
1247 Object::with_id_owner_for_testing(gas_object_id, owner)
1248 })
1249 .collect();
1250 let state = init_state_with_objects(gas_objects.clone()).await;
1251
1252 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1255 assert!(rx_ready_certificates.try_recv().is_err());
1257 execution_scheduler.check_empty_for_testing();
1259
1260 let obj_id = ObjectID::random();
1261 let object_arguments: Vec<_> = (0..10)
1262 .map(|i| {
1263 let object = Object::with_id_owner_version_for_testing(
1264 obj_id,
1265 i.into(),
1266 Owner::AddressOwner(owner),
1267 );
1268 let object_arg = if i % 2 == 0 || i == 3 {
1275 ObjectArg::Receiving(object.compute_object_reference())
1276 } else {
1277 ObjectArg::ImmOrOwnedObject(object.compute_object_reference())
1278 };
1279 let txn =
1280 make_transaction(gas_objects[0].clone(), vec![CallArg::Object(object_arg)]);
1281 (object, txn)
1282 })
1283 .collect();
1284
1285 for (i, (_, txn)) in object_arguments.iter().enumerate() {
1286 execution_scheduler.enqueue_transactions(
1289 vec![(
1290 txn.clone(),
1291 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1292 )],
1293 &state.epoch_store_for_testing(),
1294 );
1295 sleep(Duration::from_secs(1)).await;
1296 assert!(rx_ready_certificates.try_recv().is_err());
1297 assert_eq!(execution_scheduler.num_pending_certificates(), i + 1);
1298 }
1299
1300 let len = object_arguments.len();
1303 for (i, (object, txn)) in object_arguments.into_iter().enumerate() {
1304 state
1307 .get_cache_writer()
1308 .write_object_entry_for_test(object.clone());
1309
1310 rx_ready_certificates.recv().await.unwrap();
1313
1314 sleep(Duration::from_secs(1)).await;
1317 assert!(rx_ready_certificates.try_recv().is_err());
1318
1319 drop(txn);
1321
1322 assert_eq!(execution_scheduler.num_pending_certificates(), len - i - 1);
1325 }
1326
1327 execution_scheduler.check_empty_for_testing();
1329 }
1330
1331 #[tokio::test(flavor = "current_thread", start_paused = true)]
1332 async fn execution_scheduler_receiving_object_ready_notifications() {
1333 telemetry_subscribers::init_for_testing();
1334 let (owner, _keypair) = deterministic_random_account_key();
1336 let gas_objects: Vec<Object> = (0..10)
1337 .map(|_| {
1338 let gas_object_id = ObjectID::random();
1339 Object::with_id_owner_for_testing(gas_object_id, owner)
1340 })
1341 .collect();
1342 let state = init_state_with_objects(gas_objects.clone()).await;
1343
1344 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1347 assert!(rx_ready_certificates.try_recv().is_err());
1349 execution_scheduler.check_empty_for_testing();
1351
1352 let obj_id = ObjectID::random();
1353 let receiving_object_new0 =
1354 Object::with_id_owner_version_for_testing(obj_id, 0.into(), Owner::AddressOwner(owner));
1355 let receiving_object_new1 =
1356 Object::with_id_owner_version_for_testing(obj_id, 1.into(), Owner::AddressOwner(owner));
1357 let receiving_object_arg0 =
1358 ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1359 let receive_object_transaction0 = make_transaction(
1360 gas_objects[0].clone(),
1361 vec![CallArg::Object(receiving_object_arg0)],
1362 );
1363
1364 let receiving_object_arg1 =
1365 ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1366 let receive_object_transaction1 = make_transaction(
1367 gas_objects[0].clone(),
1368 vec![CallArg::Object(receiving_object_arg1)],
1369 );
1370
1371 execution_scheduler.enqueue_transactions(
1373 vec![(
1374 receive_object_transaction0.clone(),
1375 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1376 )],
1377 &state.epoch_store_for_testing(),
1378 );
1379 sleep(Duration::from_secs(1)).await;
1380 assert!(rx_ready_certificates.try_recv().is_err());
1381 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1382
1383 execution_scheduler.enqueue_transactions(
1385 vec![(
1386 receive_object_transaction1.clone(),
1387 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1388 )],
1389 &state.epoch_store_for_testing(),
1390 );
1391 sleep(Duration::from_secs(1)).await;
1392 assert!(rx_ready_certificates.try_recv().is_err());
1393 assert_eq!(execution_scheduler.num_pending_certificates(), 2);
1394
1395 execution_scheduler.enqueue_transactions(
1397 vec![(
1398 receive_object_transaction0.clone(),
1399 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1400 )],
1401 &state.epoch_store_for_testing(),
1402 );
1403 sleep(Duration::from_secs(1)).await;
1404 assert!(rx_ready_certificates.try_recv().is_err());
1405 assert_eq!(execution_scheduler.num_pending_certificates(), 3);
1406
1407 state
1409 .get_cache_writer()
1410 .write_object_entry_for_test(receiving_object_new0.clone());
1411
1412 rx_ready_certificates.recv().await.unwrap();
1415
1416 state
1418 .get_cache_writer()
1419 .write_object_entry_for_test(receiving_object_new1.clone());
1420
1421 rx_ready_certificates.recv().await.unwrap();
1424 }
1425
1426 #[tokio::test(flavor = "current_thread", start_paused = true)]
1427 async fn execution_scheduler_receiving_object_ready_notifications_multiple_of_same_receiving() {
1428 telemetry_subscribers::init_for_testing();
1429 let (owner, _keypair) = deterministic_random_account_key();
1431 let gas_objects: Vec<Object> = (0..10)
1432 .map(|_| {
1433 let gas_object_id = ObjectID::random();
1434 Object::with_id_owner_for_testing(gas_object_id, owner)
1435 })
1436 .collect();
1437 let state = init_state_with_objects(gas_objects.clone()).await;
1438
1439 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1442 assert!(rx_ready_certificates.try_recv().is_err());
1444 execution_scheduler.check_empty_for_testing();
1446
1447 let obj_id = ObjectID::random();
1448 let receiving_object_new0 =
1449 Object::with_id_owner_version_for_testing(obj_id, 0.into(), Owner::AddressOwner(owner));
1450 let receiving_object_new1 =
1451 Object::with_id_owner_version_for_testing(obj_id, 1.into(), Owner::AddressOwner(owner));
1452 let receiving_object_arg0 =
1453 ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1454 let receive_object_transaction0 = make_transaction(
1455 gas_objects[0].clone(),
1456 vec![CallArg::Object(receiving_object_arg0)],
1457 );
1458
1459 let receive_object_transaction01 = make_transaction(
1460 gas_objects[1].clone(),
1461 vec![CallArg::Object(receiving_object_arg0)],
1462 );
1463
1464 let receiving_object_arg1 =
1465 ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1466 let receive_object_transaction1 = make_transaction(
1467 gas_objects[0].clone(),
1468 vec![CallArg::Object(receiving_object_arg1)],
1469 );
1470
1471 let gas_receiving_arg = ObjectArg::Receiving(gas_objects[3].compute_object_reference());
1474 let tx1 = make_transaction(
1475 gas_objects[0].clone(),
1476 vec![CallArg::Object(gas_receiving_arg)],
1477 );
1478
1479 execution_scheduler.enqueue_transactions(
1481 vec![(
1482 receive_object_transaction0.clone(),
1483 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1484 )],
1485 &state.epoch_store_for_testing(),
1486 );
1487 sleep(Duration::from_secs(1)).await;
1488 assert!(rx_ready_certificates.try_recv().is_err());
1489 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1490
1491 execution_scheduler.enqueue_transactions(
1493 vec![(
1494 receive_object_transaction1.clone(),
1495 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1496 )],
1497 &state.epoch_store_for_testing(),
1498 );
1499 sleep(Duration::from_secs(1)).await;
1500 assert!(rx_ready_certificates.try_recv().is_err());
1501 assert_eq!(execution_scheduler.num_pending_certificates(), 2);
1502
1503 execution_scheduler.enqueue_transactions(
1506 vec![(
1507 receive_object_transaction01.clone(),
1508 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1509 )],
1510 &state.epoch_store_for_testing(),
1511 );
1512 sleep(Duration::from_secs(1)).await;
1513 assert!(rx_ready_certificates.try_recv().is_err());
1514 assert_eq!(execution_scheduler.num_pending_certificates(), 3);
1515
1516 state
1518 .get_cache_writer()
1519 .write_object_entry_for_test(receiving_object_new0.clone());
1520
1521 rx_ready_certificates.recv().await.unwrap();
1524
1525 rx_ready_certificates.recv().await.unwrap();
1526
1527 assert!(rx_ready_certificates.try_recv().is_err());
1529
1530 execution_scheduler.enqueue_transactions(
1533 vec![(
1534 tx1.clone(),
1535 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1536 )],
1537 &state.epoch_store_for_testing(),
1538 );
1539 sleep(Duration::from_secs(1)).await;
1540 rx_ready_certificates.recv().await.unwrap();
1541
1542 state
1544 .get_cache_writer()
1545 .write_object_entry_for_test(receiving_object_new1.clone());
1546
1547 rx_ready_certificates.recv().await.unwrap();
1550 }
1551
1552 #[tokio::test(flavor = "current_thread", start_paused = true)]
1553 async fn execution_scheduler_receiving_object_ready_if_current_version_greater() {
1554 telemetry_subscribers::init_for_testing();
1555 let (owner, _keypair) = deterministic_random_account_key();
1557 let mut gas_objects: Vec<Object> = (0..10)
1558 .map(|_| {
1559 let gas_object_id = ObjectID::random();
1560 Object::with_id_owner_for_testing(gas_object_id, owner)
1561 })
1562 .collect();
1563 let receiving_object = Object::with_id_owner_version_for_testing(
1564 ObjectID::random(),
1565 10.into(),
1566 Owner::AddressOwner(owner),
1567 );
1568 gas_objects.push(receiving_object.clone());
1569 let state = init_state_with_objects(gas_objects.clone()).await;
1570
1571 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1574 assert!(rx_ready_certificates.try_recv().is_err());
1576 execution_scheduler.check_empty_for_testing();
1578
1579 let receiving_object_new0 = Object::with_id_owner_version_for_testing(
1580 receiving_object.id(),
1581 0.into(),
1582 Owner::AddressOwner(owner),
1583 );
1584 let receiving_object_new1 = Object::with_id_owner_version_for_testing(
1585 receiving_object.id(),
1586 1.into(),
1587 Owner::AddressOwner(owner),
1588 );
1589 let receiving_object_arg0 =
1590 ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1591 let receive_object_transaction0 = make_transaction(
1592 gas_objects[0].clone(),
1593 vec![CallArg::Object(receiving_object_arg0)],
1594 );
1595
1596 let receive_object_transaction01 = make_transaction(
1597 gas_objects[1].clone(),
1598 vec![CallArg::Object(receiving_object_arg0)],
1599 );
1600
1601 let receiving_object_arg1 =
1602 ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1603 let receive_object_transaction1 = make_transaction(
1604 gas_objects[0].clone(),
1605 vec![CallArg::Object(receiving_object_arg1)],
1606 );
1607
1608 execution_scheduler.enqueue_transactions(
1610 vec![(
1611 receive_object_transaction0.clone(),
1612 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1613 )],
1614 &state.epoch_store_for_testing(),
1615 );
1616 execution_scheduler.enqueue_transactions(
1617 vec![(
1618 receive_object_transaction01.clone(),
1619 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1620 )],
1621 &state.epoch_store_for_testing(),
1622 );
1623 execution_scheduler.enqueue_transactions(
1624 vec![(
1625 receive_object_transaction1.clone(),
1626 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1627 )],
1628 &state.epoch_store_for_testing(),
1629 );
1630 sleep(Duration::from_secs(1)).await;
1631 rx_ready_certificates.recv().await.unwrap();
1632 rx_ready_certificates.recv().await.unwrap();
1633 rx_ready_certificates.recv().await.unwrap();
1634 assert!(rx_ready_certificates.try_recv().is_err());
1635 }
1636
1637 #[tokio::test(flavor = "current_thread", start_paused = true)]
1640 async fn execution_scheduler_with_cancelled_transactions() {
1641 let (owner, _keypair) = deterministic_random_account_key();
1643 let gas_object = Object::with_id_owner_for_testing(ObjectID::random(), owner);
1644 let shared_object_1 = Object::shared_for_testing();
1645 let initial_shared_version_1 = shared_object_1.owner().start_version().unwrap();
1646 let shared_object_2 = Object::shared_for_testing();
1647 let initial_shared_version_2 = shared_object_2.owner().start_version().unwrap();
1648 let owned_object = Object::with_id_owner_for_testing(ObjectID::random(), owner);
1649
1650 let state = init_state_with_objects(vec![
1651 gas_object.clone(),
1652 shared_object_1.clone(),
1653 shared_object_2.clone(),
1654 owned_object.clone(),
1655 ])
1656 .await;
1657
1658 let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1661 assert!(rx_ready_certificates.try_recv().is_err());
1663
1664 let shared_object_arg_1 = ObjectArg::SharedObject {
1666 id: shared_object_1.id(),
1667 initial_shared_version: initial_shared_version_1,
1668 mutability: SharedObjectMutability::Mutable,
1669 };
1670 let shared_object_arg_2 = ObjectArg::SharedObject {
1671 id: shared_object_2.id(),
1672 initial_shared_version: initial_shared_version_2,
1673 mutability: SharedObjectMutability::Mutable,
1674 };
1675
1676 let owned_version = 2000.into();
1678 let mut owned_ref = owned_object.compute_object_reference();
1679 owned_ref.1 = owned_version;
1680 let owned_object_arg = ObjectArg::ImmOrOwnedObject(owned_ref);
1681
1682 let cancelled_transaction = make_transaction(
1683 gas_object.clone(),
1684 vec![
1685 CallArg::Object(shared_object_arg_1),
1686 CallArg::Object(shared_object_arg_2),
1687 CallArg::Object(owned_object_arg),
1688 ],
1689 );
1690 let assigned_versions = vec![
1691 (
1692 (
1693 shared_object_1.id(),
1694 shared_object_1.owner().start_version().unwrap(),
1695 ),
1696 SequenceNumber::CANCELLED_READ,
1697 ),
1698 (
1699 (
1700 shared_object_2.id(),
1701 shared_object_2.owner().start_version().unwrap(),
1702 ),
1703 SequenceNumber::CONGESTED,
1704 ),
1705 ];
1706
1707 execution_scheduler.enqueue_transactions(
1708 vec![(
1709 cancelled_transaction.clone(),
1710 ExecutionEnv::new()
1711 .with_assigned_versions(AssignedVersions::new(assigned_versions, None)),
1712 )],
1713 &state.epoch_store_for_testing(),
1714 );
1715
1716 sleep(Duration::from_secs(1)).await;
1718 assert!(rx_ready_certificates.try_recv().is_err());
1719
1720 assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1721
1722 let mut new_owned_object = owned_object.clone();
1724 new_owned_object
1725 .data
1726 .try_as_move_mut()
1727 .unwrap()
1728 .increment_version_to(owned_version);
1729 state
1730 .get_cache_writer()
1731 .write_object_entry_for_test(new_owned_object);
1732
1733 let available_txn = rx_ready_certificates.recv().await.unwrap().certificate;
1735 assert_eq!(available_txn.digest(), cancelled_transaction.digest());
1736
1737 sleep(Duration::from_secs(1)).await;
1738 assert!(rx_ready_certificates.try_recv().is_err());
1739
1740 execution_scheduler.check_empty_for_testing();
1741 }
1742
1743 #[test]
1744 fn test_barrier_dependency_builder() {
1745 let make_transaction = |non_exclusive_writes: Vec<u32>, exclusive_writes: Vec<u32>| {
1746 assert!(
1747 non_exclusive_writes
1748 .iter()
1749 .all(|id| !exclusive_writes.contains(id))
1750 );
1751 assert!(
1752 exclusive_writes
1753 .iter()
1754 .all(|id| !non_exclusive_writes.contains(id))
1755 );
1756
1757 let non_exclusive_writes = non_exclusive_writes
1758 .into_iter()
1759 .map(|id| ObjectID::from_single_byte(id as u8));
1760 let exclusive_writes = exclusive_writes
1761 .into_iter()
1762 .map(|id| ObjectID::from_single_byte(id as u8));
1763 let mut builder = ProgrammableTransactionBuilder::new();
1764 for non_exclusive_write in non_exclusive_writes {
1765 builder
1766 .obj(ObjectArg::SharedObject {
1767 id: non_exclusive_write,
1768 initial_shared_version: SequenceNumber::new(),
1769 mutability: SharedObjectMutability::NonExclusiveWrite,
1770 })
1771 .unwrap();
1772 }
1773
1774 for exclusive_write in exclusive_writes {
1775 builder
1776 .obj(ObjectArg::SharedObject {
1777 id: exclusive_write,
1778 initial_shared_version: SequenceNumber::new(),
1779 mutability: SharedObjectMutability::Mutable,
1780 })
1781 .unwrap();
1782 }
1783
1784 let tx = TransactionKind::ProgrammableTransaction(builder.finish());
1785 let tx_data =
1786 TransactionData::new(tx, SuiAddress::default(), random_object_ref(), 1, 1);
1787 Transaction::from_data_and_signer(tx_data, vec![])
1788 };
1789
1790 {
1792 let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1793 let tx1 = make_transaction(vec![1], vec![]);
1794 let tx2 = make_transaction(vec![], vec![1]);
1795
1796 let tx1_deps =
1797 barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1798 let tx2_deps =
1799 barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1800 assert!(tx1_deps.is_empty());
1801 assert_eq!(Vec::from_iter(tx2_deps), vec![*tx1.digest()]);
1802 }
1803
1804 {
1807 let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1808 let tx1 = make_transaction(vec![1, 2], vec![]);
1809 let tx2 = make_transaction(vec![], vec![1]);
1810 let tx3 = make_transaction(vec![], vec![2]);
1811
1812 let tx1_deps =
1813 barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1814 let tx2_deps =
1815 barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1816 let tx3_deps =
1817 barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1818 assert!(tx1_deps.is_empty());
1819 assert_eq!(Vec::from_iter(tx2_deps), vec![*tx1.digest()]);
1820 assert_eq!(Vec::from_iter(tx3_deps), vec![*tx1.digest()]);
1821 }
1822
1823 {
1825 let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1826 let tx1 = make_transaction(vec![1], vec![]);
1827 let tx2 = make_transaction(vec![2], vec![]);
1828 let tx3 = make_transaction(vec![], vec![1, 2]);
1829
1830 let tx1_deps =
1831 barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1832 let tx2_deps =
1833 barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1834 let tx3_deps =
1835 barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1836 assert!(tx1_deps.is_empty());
1837 assert!(tx2_deps.is_empty());
1838 assert_eq!(tx3_deps, BTreeSet::from([*tx1.digest(), *tx2.digest()]));
1839 }
1840
1841 {
1843 let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1844 let tx1 = make_transaction(vec![1], vec![]);
1845 let tx2 = make_transaction(vec![], vec![1]);
1846 let tx3 = make_transaction(vec![], vec![1]);
1847
1848 let tx1_deps =
1849 barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1850 let tx2_deps =
1851 barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1852 let tx3_deps =
1853 barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1854 assert!(tx1_deps.is_empty());
1855 assert_eq!(tx2_deps, BTreeSet::from([*tx1.digest()]));
1856 assert!(tx3_deps.is_empty());
1857 }
1858 }
1859}