sui_core/execution_scheduler/
execution_scheduler_impl.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{
5    accumulators::funds_read::AccountFundsRead,
6    authority::{
7        AuthorityMetrics, ExecutionEnv, authority_per_epoch_store::AuthorityPerEpochStore,
8        shared_object_version_manager::Schedulable,
9    },
10    execution_cache::{ObjectCacheRead, TransactionCacheRead},
11    execution_scheduler::{
12        ExecutingGuard, PendingCertificateStats,
13        funds_withdraw_scheduler::{
14            FundsSettlement, ObjectFundsWithdrawSchedulerTrait, ObjectFundsWithdrawStatus,
15            ScheduleStatus, TxFundsWithdraw, naive_scheduler::NaiveObjectFundsWithdrawScheduler,
16            scheduler::FundsWithdrawScheduler,
17        },
18    },
19};
20use futures::stream::{FuturesUnordered, StreamExt};
21use mysten_common::{assert_reachable, debug_fatal};
22use mysten_metrics::spawn_monitored_task;
23use parking_lot::Mutex;
24use std::{
25    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
26    sync::Arc,
27};
28use sui_config::node::AuthorityOverloadConfig;
29use sui_types::{
30    SUI_ACCUMULATOR_ROOT_OBJECT_ID,
31    base_types::{FullObjectID, ObjectID, SequenceNumber},
32    digests::TransactionDigest,
33    effects::{AccumulatorOperation, AccumulatorValue, TransactionEffects, TransactionEffectsAPI},
34    error::SuiResult,
35    executable_transaction::VerifiedExecutableTransaction,
36    execution_params::FundsWithdrawStatus,
37    storage::InputKey,
38    transaction::{
39        SenderSignedData, SharedInputObject, SharedObjectMutability, TransactionData,
40        TransactionDataAPI, TransactionKey,
41    },
42};
43use tokio::sync::mpsc::UnboundedSender;
44use tokio::time::Instant;
45use tracing::{debug, error, instrument};
46
47use super::{PendingCertificate, overload_tracker::OverloadTracker};
48
49/// Utility struct for collecting barrier dependencies
50pub(crate) struct BarrierDependencyBuilder {
51    dep_state: BTreeMap<ObjectID, BTreeSet<TransactionDigest>>,
52}
53
54impl BarrierDependencyBuilder {
55    pub fn new() -> Self {
56        Self {
57            dep_state: Default::default(),
58        }
59    }
60
61    /// process_tx must be called for each transaction in scheduling order. If the
62    /// transaction has a non-exclusive write to an object, the transaction digest is
63    /// stored to become a dependency of the eventual barrier transaction. If a
64    /// transaction has an exclusive write to an object, all pending non-exclusive write
65    /// transactions for that object are added to the barrier dependencies.
66    pub fn process_tx(
67        &mut self,
68        tx_digest: TransactionDigest,
69        tx: &TransactionData,
70    ) -> BTreeSet<TransactionDigest> {
71        let mut barrier_deps = BTreeSet::new();
72        for SharedInputObject { id, mutability, .. } in tx.kind().shared_input_objects() {
73            match mutability {
74                SharedObjectMutability::NonExclusiveWrite => {
75                    self.dep_state.entry(id).or_default().insert(tx_digest);
76                }
77                SharedObjectMutability::Mutable => {
78                    // If there were preceding non-exclusive writes to this object id, this
79                    // transaction is a barrier and must wait for them to finish.
80                    if let Some(deps) = self.dep_state.remove(&id) {
81                        barrier_deps.extend(deps);
82                    }
83                }
84                SharedObjectMutability::Immutable => (),
85            }
86        }
87        barrier_deps
88    }
89}
90
91#[derive(Clone)]
92pub struct ExecutionScheduler {
93    object_cache_read: Arc<dyn ObjectCacheRead>,
94    transaction_cache_read: Arc<dyn TransactionCacheRead>,
95    overload_tracker: Arc<OverloadTracker>,
96    tx_ready_certificates: UnboundedSender<PendingCertificate>,
97    address_funds_withdraw_scheduler: Arc<Mutex<Option<FundsWithdrawScheduler>>>,
98    object_funds_withdraw_scheduler: Arc<Mutex<Option<Box<dyn ObjectFundsWithdrawSchedulerTrait>>>>,
99    metrics: Arc<AuthorityMetrics>,
100}
101
102struct PendingGuard<'a> {
103    scheduler: &'a ExecutionScheduler,
104    cert: &'a VerifiedExecutableTransaction,
105}
106
107impl<'a> PendingGuard<'a> {
108    pub fn new(scheduler: &'a ExecutionScheduler, cert: &'a VerifiedExecutableTransaction) -> Self {
109        scheduler
110            .metrics
111            .transaction_manager_num_pending_certificates
112            .inc();
113        scheduler
114            .overload_tracker
115            .add_pending_certificate(cert.data());
116        Self { scheduler, cert }
117    }
118}
119
120impl Drop for PendingGuard<'_> {
121    fn drop(&mut self) {
122        self.scheduler
123            .metrics
124            .transaction_manager_num_pending_certificates
125            .dec();
126        self.scheduler
127            .overload_tracker
128            .remove_pending_certificate(self.cert.data());
129    }
130}
131
132impl ExecutionScheduler {
133    pub fn new(
134        object_cache_read: Arc<dyn ObjectCacheRead>,
135        account_funds_read: Arc<dyn AccountFundsRead>,
136        transaction_cache_read: Arc<dyn TransactionCacheRead>,
137        tx_ready_certificates: UnboundedSender<PendingCertificate>,
138        epoch_store: &Arc<AuthorityPerEpochStore>,
139        metrics: Arc<AuthorityMetrics>,
140    ) -> Self {
141        tracing::info!("Creating new ExecutionScheduler");
142        let (address_funds_withdraw_scheduler, object_funds_withdraw_scheduler) =
143            Self::initialize_funds_withdraw_scheduler(
144                epoch_store,
145                &object_cache_read,
146                account_funds_read,
147            );
148        Self {
149            object_cache_read,
150            transaction_cache_read,
151            overload_tracker: Arc::new(OverloadTracker::new()),
152            tx_ready_certificates,
153            address_funds_withdraw_scheduler: Arc::new(Mutex::new(
154                address_funds_withdraw_scheduler,
155            )),
156            object_funds_withdraw_scheduler: Arc::new(Mutex::new(object_funds_withdraw_scheduler)),
157            metrics,
158        }
159    }
160
161    fn initialize_funds_withdraw_scheduler(
162        epoch_store: &Arc<AuthorityPerEpochStore>,
163        object_cache_read: &Arc<dyn ObjectCacheRead>,
164        account_funds_read: Arc<dyn AccountFundsRead>,
165    ) -> (
166        Option<FundsWithdrawScheduler>,
167        Option<Box<dyn ObjectFundsWithdrawSchedulerTrait>>,
168    ) {
169        let withdraw_scheduler_enabled =
170            epoch_store.is_validator() && epoch_store.accumulators_enabled();
171        if !withdraw_scheduler_enabled {
172            return (None, None);
173        }
174        let starting_accumulator_version = object_cache_read
175            .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
176            .expect("Accumulator root object must be present if funds accumulator is enabled")
177            .version();
178        let address_funds_withdraw_scheduler =
179            FundsWithdrawScheduler::new(account_funds_read.clone(), starting_accumulator_version);
180        let object_funds_withdraw_scheduler =
181            if epoch_store.protocol_config().enable_object_funds_withdraw() {
182                let scheduler: Box<dyn ObjectFundsWithdrawSchedulerTrait> =
183                    Box::new(NaiveObjectFundsWithdrawScheduler::new(
184                        account_funds_read,
185                        starting_accumulator_version,
186                    ));
187                Some(scheduler)
188            } else {
189                None
190            };
191        (
192            Some(address_funds_withdraw_scheduler),
193            object_funds_withdraw_scheduler,
194        )
195    }
196
197    #[instrument(level = "debug", skip_all, fields(tx_digest = ?cert.digest()))]
198    async fn schedule_transaction(
199        self,
200        cert: VerifiedExecutableTransaction,
201        execution_env: ExecutionEnv,
202        epoch_store: &Arc<AuthorityPerEpochStore>,
203    ) {
204        let enqueue_time = Instant::now();
205        let tx_digest = cert.digest();
206        let digests = [*tx_digest];
207
208        let tx_data = cert.transaction_data();
209        let input_object_kinds = tx_data
210            .input_objects()
211            .expect("input_objects() cannot fail");
212        let input_object_keys: Vec<_> = epoch_store
213            .get_input_object_keys(
214                &cert.key(),
215                &input_object_kinds,
216                &execution_env.assigned_versions,
217            )
218            .into_iter()
219            .collect();
220
221        let receiving_object_keys: HashSet<_> = tx_data
222            .receiving_objects()
223            .into_iter()
224            .map(|entry| {
225                InputKey::VersionedObject {
226                    // TODO: Add support for receiving ConsensusV2 objects. For now this assumes fastpath.
227                    id: FullObjectID::new(entry.0, None),
228                    version: entry.1,
229                }
230            })
231            .collect();
232        let input_and_receiving_keys = [
233            input_object_keys,
234            receiving_object_keys.iter().cloned().collect(),
235        ]
236        .concat();
237
238        let epoch = epoch_store.epoch();
239        debug!(
240            ?tx_digest,
241            "Scheduled transaction, waiting for input objects: {:?}", input_and_receiving_keys,
242        );
243
244        let availability = self
245            .object_cache_read
246            .multi_input_objects_available_cache_only(&input_and_receiving_keys);
247        // Most of the times, the transaction's input objects are already available.
248        // We can check the availability of the input objects first, and only wait for the
249        // missing input objects if necessary.
250        let missing_input_keys: Vec<_> = input_and_receiving_keys
251            .into_iter()
252            .zip(availability)
253            .filter_map(|(key, available)| if !available { Some(key) } else { None })
254            .collect();
255
256        let has_missing_barrier_dependencies = self
257            .transaction_cache_read
258            .multi_get_executed_effects_digests(&execution_env.barrier_dependencies)
259            .into_iter()
260            .any(|r| r.is_none());
261
262        if missing_input_keys.is_empty() && !has_missing_barrier_dependencies {
263            self.metrics
264                .transaction_manager_num_enqueued_certificates
265                .with_label_values(&["ready"])
266                .inc();
267            debug!(?tx_digest, "Input objects already available");
268            self.send_transaction_for_execution(&cert, execution_env, enqueue_time);
269            return;
270        }
271
272        let _pending_guard = PendingGuard::new(&self, &cert);
273        self.metrics
274            .transaction_manager_num_enqueued_certificates
275            .with_label_values(&["pending"])
276            .inc();
277
278        if !execution_env.barrier_dependencies.is_empty() {
279            debug!(
280                "waiting for barrier dependencies to be executed: {:?}",
281                execution_env.barrier_dependencies
282            );
283            self.transaction_cache_read
284                .notify_read_executed_effects_digests(
285                    "wait_for_barrier_dependencies",
286                    &execution_env.barrier_dependencies,
287                )
288                .await;
289        }
290
291        tokio::select! {
292            _ = self.object_cache_read
293                .notify_read_input_objects(&missing_input_keys, &receiving_object_keys, epoch)
294                => {
295                    self.metrics
296                        .transaction_manager_transaction_queue_age_s
297                        .observe(enqueue_time.elapsed().as_secs_f64());
298                    debug!(?tx_digest, "Input objects available");
299                    // TODO: Eventually we could fold execution_driver into the scheduler.
300                    self.send_transaction_for_execution(
301                        &cert,
302                        execution_env,
303                        enqueue_time,
304                    );
305                }
306            _ = self.transaction_cache_read.notify_read_executed_effects_digests(
307                "ExecutionScheduler::notify_read_executed_effects_digests",
308                &digests,
309            ) => {
310                debug!(?tx_digest, "Transaction already executed");
311            }
312        };
313    }
314
315    fn send_transaction_for_execution(
316        &self,
317        cert: &VerifiedExecutableTransaction,
318        execution_env: ExecutionEnv,
319        enqueue_time: Instant,
320    ) {
321        let pending_cert = PendingCertificate {
322            certificate: cert.clone(),
323            execution_env,
324            stats: PendingCertificateStats {
325                enqueue_time,
326                ready_time: Some(Instant::now()),
327            },
328            executing_guard: Some(ExecutingGuard::new(
329                self.metrics
330                    .transaction_manager_num_executing_certificates
331                    .clone(),
332            )),
333        };
334        let _ = self.tx_ready_certificates.send(pending_cert);
335    }
336
337    fn schedule_funds_withdraws(
338        &self,
339        certs: Vec<(VerifiedExecutableTransaction, ExecutionEnv)>,
340        epoch_store: &Arc<AuthorityPerEpochStore>,
341    ) {
342        if certs.is_empty() {
343            return;
344        }
345        let mut withdraws = BTreeMap::new();
346        let mut prev_version = None;
347        for (cert, env) in &certs {
348            let tx_withdraws = cert
349                .transaction_data()
350                .process_funds_withdrawals_for_execution(epoch_store.get_chain_identifier());
351            assert!(!tx_withdraws.is_empty());
352            let accumulator_version = env
353                .assigned_versions
354                .accumulator_version
355                .expect("accumulator_version must be set when there are withdraws");
356            if let Some(prev_version) = prev_version {
357                // Transactions must be in order.
358                assert!(prev_version <= accumulator_version);
359            }
360            prev_version = Some(accumulator_version);
361            let tx_digest = *cert.digest();
362            withdraws
363                .entry(accumulator_version)
364                .or_insert(Vec::new())
365                .push(TxFundsWithdraw {
366                    tx_digest,
367                    reservations: tx_withdraws,
368                });
369        }
370        let mut receivers = FuturesUnordered::new();
371        {
372            let guard = self.address_funds_withdraw_scheduler.lock();
373            let withdraw_scheduler = guard
374                .as_ref()
375                .expect("Funds withdraw scheduler must be enabled if there are withdraws");
376            for (version, tx_withdraws) in withdraws {
377                receivers.extend(withdraw_scheduler.schedule_withdraws(version, tx_withdraws));
378            }
379            // guard will be dropped here
380        }
381        let scheduler = self.clone();
382        let epoch_store = epoch_store.clone();
383        spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
384            let mut cert_map = HashMap::new();
385            for (cert, env) in certs {
386                cert_map.insert(*cert.digest(), (cert, env));
387            }
388            while let Some(result) = receivers.next().await {
389                match result {
390                    Ok(result) => match result.status {
391                        ScheduleStatus::InsufficientFunds => {
392                            assert_reachable!("tx cancelled, insufficient funds");
393                            let tx_digest = result.tx_digest;
394                            debug!(
395                                ?tx_digest,
396                                "Funds withdraw scheduling result: Insufficient funds"
397                            );
398                            let (cert, env) = cert_map.remove(&tx_digest).expect("cert must exist");
399                            let env = env.with_insufficient_funds();
400                            scheduler.enqueue_transactions(vec![(cert, env)], &epoch_store);
401                        }
402                        ScheduleStatus::SufficientFunds => {
403                            assert_reachable!("tx scheduled, sufficient funds");
404                            let tx_digest = result.tx_digest;
405                            debug!(?tx_digest, "Funds withdraw scheduling result: Success");
406                            let (cert, env) = cert_map.remove(&tx_digest).expect("cert must exist");
407                            scheduler.enqueue_transactions(vec![(cert, env)], &epoch_store);
408                        }
409                        ScheduleStatus::SkipSchedule => {
410                            assert_reachable!("tx withdrawal scheduling skipped");
411                            let tx_digest = result.tx_digest;
412                            debug!(?tx_digest, "Skip scheduling funds withdraw");
413                        }
414                    },
415                    Err(e) => {
416                        error!("Withdraw scheduler stopped: {:?}", e);
417                    }
418                }
419            }
420        }));
421    }
422
423    fn schedule_settlement_transactions(
424        &self,
425        settlement_txns: Vec<(TransactionKey, ExecutionEnv)>,
426        epoch_store: &Arc<AuthorityPerEpochStore>,
427    ) {
428        if !settlement_txns.is_empty() {
429            let scheduler = self.clone();
430            let epoch_store = epoch_store.clone();
431
432            spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
433                let mut futures: FuturesUnordered<_> = settlement_txns
434                    .into_iter()
435                    .map(|(key, env)| {
436                        let epoch_store = epoch_store.clone();
437                        async move {
438                            (
439                                key,
440                                epoch_store.wait_for_settlement_transactions(key).await,
441                                env,
442                            )
443                        }
444                    })
445                    .collect();
446
447                while let Some((settlement_key, txns, env)) = futures.next().await {
448                    let mut barrier_deps = BarrierDependencyBuilder::new();
449                    let txns = txns
450                        .into_iter()
451                        .map(|tx| {
452                            let deps = barrier_deps.process_tx(*tx.digest(), tx.transaction_data());
453                            let env = env.clone().with_barrier_dependencies(deps);
454                            (tx, env)
455                        })
456                        .collect::<Vec<_>>();
457
458                    scheduler.enqueue_transactions(txns, &epoch_store);
459
460                    // Spawn a new task to wait for the barrier transaction.
461                    let scheduler = scheduler.clone();
462                    let epoch_store = epoch_store.clone();
463                    let env = env.clone();
464                    spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
465                        let barrier_tx = epoch_store
466                            .wait_for_barrier_transaction(settlement_key)
467                            .await;
468                        let deps = barrier_deps
469                            .process_tx(*barrier_tx.digest(), barrier_tx.transaction_data());
470                        let env = env.with_barrier_dependencies(deps);
471                        scheduler.enqueue_transactions(vec![(barrier_tx, env)], &epoch_store);
472                    }));
473                }
474            }));
475        }
476    }
477
478    fn schedule_tx_keys(
479        &self,
480        tx_with_keys: Vec<(TransactionKey, ExecutionEnv)>,
481        epoch_store: &Arc<AuthorityPerEpochStore>,
482    ) {
483        if tx_with_keys.is_empty() {
484            return;
485        }
486
487        let scheduler = self.clone();
488        let epoch_store = epoch_store.clone();
489        spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
490            let tx_keys: Vec<_> = tx_with_keys.iter().map(|(key, _)| key).cloned().collect();
491            let digests = epoch_store
492                .notify_read_tx_key_to_digest(&tx_keys)
493                .await
494                .expect("db error");
495            let transactions = scheduler
496                .transaction_cache_read
497                .multi_get_transaction_blocks(&digests)
498                .into_iter()
499                .map(|tx| {
500                    let tx = tx.expect("tx must exist").as_ref().clone();
501                    VerifiedExecutableTransaction::new_system(tx, epoch_store.epoch())
502                })
503                .zip(tx_with_keys.into_iter().map(|(_, env)| env))
504                .collect::<Vec<_>>();
505            scheduler.enqueue_transactions(transactions, &epoch_store);
506        }));
507    }
508
509    /// When we schedule a certificate, it should be impossible for it to have been executed in a
510    /// previous epoch.
511    #[cfg(debug_assertions)]
512    fn assert_cert_not_executed_previous_epochs(&self, cert: &VerifiedExecutableTransaction) {
513        let epoch = cert.epoch();
514        let digest = *cert.digest();
515        let digests = [digest];
516        let executed = self
517            .transaction_cache_read
518            .multi_get_executed_effects(&digests)
519            .pop()
520            .unwrap();
521        // Due to pruning, we may not always have an executed effects for the certificate
522        // even if it was executed. So this is a best-effort check.
523        if let Some(executed) = executed {
524            use sui_types::effects::TransactionEffectsAPI;
525
526            assert_eq!(
527                executed.executed_epoch(),
528                epoch,
529                "Transaction {} was executed in epoch {}, but scheduled again in epoch {}",
530                digest,
531                executed.executed_epoch(),
532                epoch
533            );
534        }
535    }
536}
537
538impl ExecutionScheduler {
539    pub fn enqueue(
540        &self,
541        certs: Vec<(Schedulable, ExecutionEnv)>,
542        epoch_store: &Arc<AuthorityPerEpochStore>,
543    ) {
544        // schedule all transactions immediately
545        let mut ordinary_txns = Vec::with_capacity(certs.len());
546        let mut tx_with_keys = Vec::new();
547        let mut tx_with_withdraws = Vec::new();
548        let mut settlement_txns = Vec::new();
549
550        for (schedulable, env) in certs {
551            match schedulable {
552                Schedulable::Transaction(tx) => {
553                    if tx.transaction_data().has_funds_withdrawals() {
554                        tx_with_withdraws.push((tx, env));
555                    } else {
556                        ordinary_txns.push((tx, env));
557                    }
558                }
559                s @ Schedulable::RandomnessStateUpdate(..) => {
560                    tx_with_keys.push((s.key(), env));
561                }
562                Schedulable::AccumulatorSettlement(_, _) => {
563                    settlement_txns.push((schedulable.key(), env));
564                }
565                Schedulable::ConsensusCommitPrologue(_, _, _) => {
566                    // we only use Schedulable::ConsensusCommitPrologue as a temporary placeholder
567                    // during version assignment, by the time we schedule transactions it should be
568                    // converted to Schedulable::Transaction
569                    unreachable!("Schedulable::ConsensusCommitPrologue should not be enqueued");
570                }
571            }
572        }
573
574        self.enqueue_transactions(ordinary_txns, epoch_store);
575        self.schedule_tx_keys(tx_with_keys, epoch_store);
576        self.schedule_funds_withdraws(tx_with_withdraws, epoch_store);
577        self.schedule_settlement_transactions(settlement_txns, epoch_store);
578    }
579
580    pub fn enqueue_transactions(
581        &self,
582        certs: Vec<(VerifiedExecutableTransaction, ExecutionEnv)>,
583        epoch_store: &Arc<AuthorityPerEpochStore>,
584    ) {
585        // Filter out certificates from wrong epoch.
586        let certs: Vec<_> = certs
587            .into_iter()
588            .filter_map(|cert| {
589                if cert.0.epoch() == epoch_store.epoch() {
590                    #[cfg(debug_assertions)]
591                    self.assert_cert_not_executed_previous_epochs(&cert.0);
592
593                    Some(cert)
594                } else {
595                    debug_fatal!(
596                        "We should never enqueue certificate from wrong epoch. Expected={} Certificate={:?}",
597                        epoch_store.epoch(),
598                        cert.0.epoch()
599                    );
600                    None
601                }
602            })
603            .collect();
604        let digests: Vec<_> = certs.iter().map(|(cert, _)| *cert.digest()).collect();
605        let executed = self
606            .transaction_cache_read
607            .multi_get_executed_effects_digests(&digests);
608        let mut already_executed_certs_num = 0;
609        let pending_certs =
610            certs
611                .into_iter()
612                .zip(executed)
613                .filter_map(|((cert, execution_env), executed)| {
614                    if executed.is_none() {
615                        Some((cert, execution_env))
616                    } else {
617                        already_executed_certs_num += 1;
618                        None
619                    }
620                });
621
622        for (cert, execution_env) in pending_certs {
623            let scheduler = self.clone();
624            let epoch_store = epoch_store.clone();
625            spawn_monitored_task!(
626                epoch_store.within_alive_epoch(scheduler.schedule_transaction(
627                    cert,
628                    execution_env,
629                    &epoch_store,
630                ))
631            );
632        }
633
634        self.metrics
635            .transaction_manager_num_enqueued_certificates
636            .with_label_values(&["already_executed"])
637            .inc_by(already_executed_certs_num);
638    }
639
640    pub fn settle_address_funds(&self, settlement: FundsSettlement) {
641        self.address_funds_withdraw_scheduler
642            .lock()
643            .as_ref()
644            .expect("Funds withdraw scheduler must be enabled if there are settlements")
645            .settle_funds(settlement);
646    }
647
648    pub fn settle_object_funds(&self, next_accumulator_version: SequenceNumber) {
649        if let Some(object_funds_withdraw_scheduler) =
650            self.object_funds_withdraw_scheduler.lock().as_ref()
651        {
652            object_funds_withdraw_scheduler.settle_accumulator_version(next_accumulator_version);
653        }
654    }
655
656    /// Reconfigure internal state at epoch start. This resets the funds withdraw scheduler
657    /// to the current accumulator root object version.
658    pub fn reconfigure(
659        &self,
660        new_epoch_store: &Arc<AuthorityPerEpochStore>,
661        account_funds_read: &Arc<dyn AccountFundsRead>,
662    ) {
663        let (address_funds_withdraw_scheduler, object_funds_withdraw_scheduler) =
664            Self::initialize_funds_withdraw_scheduler(
665                new_epoch_store,
666                &self.object_cache_read,
667                account_funds_read.clone(),
668            );
669        let mut guard = self.address_funds_withdraw_scheduler.lock();
670        if let Some(old_scheduler) = guard.as_ref() {
671            old_scheduler.close_epoch();
672        }
673        *guard = address_funds_withdraw_scheduler;
674        drop(guard);
675
676        let mut object_guard = self.object_funds_withdraw_scheduler.lock();
677        if let Some(old_scheduler) = object_guard.as_ref() {
678            old_scheduler.close_epoch();
679        }
680        *object_guard = object_funds_withdraw_scheduler;
681    }
682
683    pub fn check_execution_overload(
684        &self,
685        overload_config: &AuthorityOverloadConfig,
686        tx_data: &SenderSignedData,
687    ) -> SuiResult {
688        let inflight_queue_len = self.num_pending_certificates();
689        self.overload_tracker
690            .check_execution_overload(overload_config, tx_data, inflight_queue_len)
691    }
692
693    pub fn num_pending_certificates(&self) -> usize {
694        (self
695            .metrics
696            .transaction_manager_num_pending_certificates
697            .get()
698            + self
699                .metrics
700                .transaction_manager_num_executing_certificates
701                .get()) as usize
702    }
703
704    #[instrument(level = "debug", skip_all, fields(tx_digest = ?certificate.digest()))]
705    pub fn should_commit_object_funds_withdraws(
706        &self,
707        certificate: &VerifiedExecutableTransaction,
708        effects: &TransactionEffects,
709        execution_env: &ExecutionEnv,
710        epoch_store: &Arc<AuthorityPerEpochStore>,
711    ) -> bool {
712        // Object funds withdraw is not enabled on this node.
713        if self.object_funds_withdraw_scheduler.lock().is_none() {
714            return true;
715        }
716
717        if effects.status().is_err() {
718            // This transaction already failed. It does not matter any more
719            // whether it has sufficient object funds or not.
720            debug!("Transaction failed, committing effects");
721            return true;
722        }
723        let address_funds_reservations: BTreeSet<_> = certificate
724            .transaction_data()
725            .process_funds_withdrawals_for_execution(epoch_store.get_chain_identifier())
726            .into_keys()
727            .collect();
728        // All withdraws will show up as accumulator events with integer values.
729        // Among them, addresses that do not have funds reservations are object
730        // withdraws.
731        let object_withdraws: BTreeMap<_, _> = effects
732            .accumulator_events()
733            .into_iter()
734            .filter_map(|event| {
735                if address_funds_reservations.contains(&event.accumulator_obj) {
736                    return None;
737                }
738                // Only integer splits are funds withdraws.
739                if let (AccumulatorOperation::Split, AccumulatorValue::Integer(amount)) =
740                    (event.write.operation, event.write.value)
741                {
742                    Some((event.accumulator_obj, amount))
743                } else {
744                    None
745                }
746            })
747            .collect();
748        // If there are no object withdraws, we can skip checking object funds.
749        if object_withdraws.is_empty() {
750            debug!("No object withdraws, committing effects");
751            return true;
752        }
753        let Some(accumulator_version) = execution_env.assigned_versions.accumulator_version else {
754            // Fastpath transactions that perform object funds withdraws
755            // must wait for consensus to assign the accumulator version.
756            // We cannot optimize the scheduling by processing fastpath object withdraws
757            // sooner because these may get reverted, and we don't want them
758            // pollute the scheduler tracking state.
759            // TODO: We could however optimize execution by caching
760            // the execution state to avoid re-execution.
761            return false;
762        };
763        match self
764            .object_funds_withdraw_scheduler
765            .lock()
766            .as_ref()
767            .unwrap()
768            .schedule(object_withdraws, accumulator_version)
769        {
770            // Sufficient funds, we can go ahead and commit the execution results as it is.
771            ObjectFundsWithdrawStatus::SufficientFunds => {
772                debug!("Object funds sufficient, committing effects");
773                true
774            }
775            // Currently insufficient funds. We need to wait until it reach a deterministic state
776            // before we can determine if it is really insufficient (to include potential deposits)
777            // At that time we will have to re-enqueue the transaction for execution again.
778            // Re-enqueue is handled here so the caller does not need to worry about it.
779            ObjectFundsWithdrawStatus::Pending(receiver) => {
780                let scheduler = self.clone();
781                let cert = certificate.clone();
782                let mut execution_env = execution_env.clone();
783                let epoch_store = epoch_store.clone();
784                tokio::task::spawn(async move {
785                    // It is possible that checkpoint executor finished executing
786                    // the current epoch and went ahead with epoch change asynchronously,
787                    // while this is still waiting.
788                    let _ = epoch_store
789                        .within_alive_epoch(async move {
790                            let tx_digest = cert.digest();
791                            match receiver.await {
792                                Ok(FundsWithdrawStatus::MaybeSufficient) => {
793                                    // The withdraw state is now deterministically known,
794                                    // so we can enqueue the transaction again and it will check again
795                                    // whether it is sufficient or not in the next execution.
796                                    // TODO: We should be able to optimize this by avoiding re-execution.
797                                    debug!(?tx_digest, "Object funds possibly sufficient");
798                                }
799                                Ok(FundsWithdrawStatus::Insufficient) => {
800                                    // Re-enqueue with insufficient funds status, so it will be executed
801                                    // in the next execution and fail through early error.
802                                    // FIXME: We need to also track the amount of gas that was used,
803                                    // so that we could charge properly in the next execution when we
804                                    // go through early error. Otherwise we would undercharge.
805                                    execution_env = execution_env.with_insufficient_funds();
806                                    debug!(?tx_digest, "Object funds insufficient");
807                                }
808                                Err(e) => {
809                                    error!("Error receiving funds withdraw status: {:?}", e);
810                                }
811                            }
812                            scheduler.send_transaction_for_execution(
813                                &cert,
814                                execution_env,
815                                // TODO: Should the enqueue_time be the original enqueue time
816                                // of this transaction?
817                                Instant::now(),
818                            );
819                        })
820                        .await;
821                });
822                false
823            }
824        }
825    }
826
827    #[cfg(test)]
828    pub fn check_empty_for_testing(&self) {
829        assert_eq!(self.num_pending_certificates(), 0);
830    }
831}
832
833#[cfg(test)]
834mod test {
835    use super::{BarrierDependencyBuilder, ExecutionScheduler, PendingCertificate};
836    use crate::authority::ExecutionEnv;
837    use crate::authority::shared_object_version_manager::AssignedVersions;
838    use crate::authority::{AuthorityState, authority_tests::init_state_with_objects};
839    use crate::execution_scheduler::SchedulingSource;
840    use std::collections::BTreeSet;
841    use std::{time::Duration, vec};
842    use sui_test_transaction_builder::TestTransactionBuilder;
843    use sui_types::base_types::{SuiAddress, random_object_ref};
844    use sui_types::executable_transaction::VerifiedExecutableTransaction;
845    use sui_types::object::Owner;
846    use sui_types::programmable_transaction_builder::ProgrammableTransactionBuilder;
847    use sui_types::transaction::{
848        SharedObjectMutability, Transaction, TransactionData, TransactionKind, VerifiedTransaction,
849    };
850    use sui_types::{
851        SUI_FRAMEWORK_PACKAGE_ID,
852        base_types::{ObjectID, SequenceNumber},
853        crypto::deterministic_random_account_key,
854        object::Object,
855        transaction::{CallArg, ObjectArg},
856    };
857    use tokio::time::Instant;
858    use tokio::{
859        sync::mpsc::{UnboundedReceiver, error::TryRecvError, unbounded_channel},
860        time::sleep,
861    };
862
863    #[allow(clippy::disallowed_methods)] // allow unbounded_channel()
864    fn make_execution_scheduler(
865        state: &AuthorityState,
866    ) -> (ExecutionScheduler, UnboundedReceiver<PendingCertificate>) {
867        // Create a new execution scheduler instead of reusing the authority's, to examine
868        // execution_scheduler output from rx_ready_certificates.
869        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
870        let execution_scheduler = ExecutionScheduler::new(
871            state.get_object_cache_reader().clone(),
872            state.get_account_funds_read().clone(),
873            state.get_transaction_cache_reader().clone(),
874            tx_ready_certificates,
875            &state.epoch_store_for_testing(),
876            state.metrics.clone(),
877        );
878
879        (execution_scheduler, rx_ready_certificates)
880    }
881
882    fn make_transaction(gas_object: Object, input: Vec<CallArg>) -> VerifiedExecutableTransaction {
883        // Use fake module, function, package and gas prices since they are irrelevant for testing
884        // execution scheduler.
885        let rgp = 100;
886        let (sender, keypair) = deterministic_random_account_key();
887        let transaction =
888            TestTransactionBuilder::new(sender, gas_object.compute_object_reference(), rgp)
889                .move_call(SUI_FRAMEWORK_PACKAGE_ID, "counter", "assert_value", input)
890                .build_and_sign(&keypair);
891        VerifiedExecutableTransaction::new_system(
892            VerifiedTransaction::new_unchecked(transaction),
893            0,
894        )
895    }
896
897    #[tokio::test(flavor = "current_thread", start_paused = true)]
898    async fn execution_scheduler_basics() {
899        // Initialize an authority state.
900        let (owner, _keypair) = deterministic_random_account_key();
901        let gas_objects: Vec<Object> = (0..10)
902            .map(|_| {
903                let gas_object_id = ObjectID::random();
904                Object::with_id_owner_for_testing(gas_object_id, owner)
905            })
906            .collect();
907        let state = init_state_with_objects(gas_objects.clone()).await;
908
909        // Create a new execution scheduler instead of reusing the authority's, to examine
910        // execution_scheduler output from rx_ready_certificates.
911        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
912        // scheduler should output no transaction.
913        assert!(
914            rx_ready_certificates
915                .try_recv()
916                .is_err_and(|err| err == TryRecvError::Empty)
917        );
918        // scheduler should be empty at the beginning.
919        assert_eq!(execution_scheduler.num_pending_certificates(), 0);
920
921        // Enqueue empty vec should not crash.
922        execution_scheduler.enqueue_transactions(vec![], &state.epoch_store_for_testing());
923        // scheduler should output no transaction.
924        assert!(
925            rx_ready_certificates
926                .try_recv()
927                .is_err_and(|err| err == TryRecvError::Empty)
928        );
929
930        // Enqueue a transaction with existing gas object, empty input.
931        let transaction = make_transaction(gas_objects[0].clone(), vec![]);
932        let tx_start_time = Instant::now();
933        execution_scheduler.enqueue_transactions(
934            vec![(
935                transaction.clone(),
936                ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
937            )],
938            &state.epoch_store_for_testing(),
939        );
940        // scheduler should output the transaction eventually.
941        let pending_certificate = rx_ready_certificates.recv().await.unwrap();
942
943        // Tests that pending certificate stats are recorded properly.
944        assert!(pending_certificate.stats.enqueue_time >= tx_start_time);
945        assert!(
946            pending_certificate.stats.ready_time.unwrap() >= pending_certificate.stats.enqueue_time
947        );
948
949        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
950
951        // Predent we have just executed the transaction.
952        drop(pending_certificate);
953
954        // scheduler should be empty.
955        execution_scheduler.check_empty_for_testing();
956
957        // Enqueue a transaction with a new gas object, empty input.
958        let gas_object_new = Object::with_id_owner_version_for_testing(
959            ObjectID::random(),
960            0.into(),
961            Owner::AddressOwner(owner),
962        );
963        let transaction = make_transaction(gas_object_new.clone(), vec![]);
964        let tx_start_time = Instant::now();
965        execution_scheduler.enqueue_transactions(
966            vec![(
967                transaction.clone(),
968                ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
969            )],
970            &state.epoch_store_for_testing(),
971        );
972        // scheduler should output no transaction yet.
973        sleep(Duration::from_secs(1)).await;
974        assert!(
975            rx_ready_certificates
976                .try_recv()
977                .is_err_and(|err| err == TryRecvError::Empty)
978        );
979
980        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
981
982        // Duplicated enqueue is allowed.
983        execution_scheduler.enqueue_transactions(
984            vec![(
985                transaction.clone(),
986                ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
987            )],
988            &state.epoch_store_for_testing(),
989        );
990        sleep(Duration::from_secs(1)).await;
991        assert!(
992            rx_ready_certificates
993                .try_recv()
994                .is_err_and(|err| err == TryRecvError::Empty)
995        );
996
997        assert_eq!(execution_scheduler.num_pending_certificates(), 2);
998
999        // Notify scheduler about availability of the gas object.
1000        state
1001            .get_cache_writer()
1002            .write_object_entry_for_test(gas_object_new);
1003        // scheduler should output the transaction eventually.
1004        // We will see both the original and the duplicated transaction.
1005        let pending_certificate = rx_ready_certificates.recv().await.unwrap();
1006        let pending_certificate2 = rx_ready_certificates.recv().await.unwrap();
1007        assert_eq!(
1008            pending_certificate.certificate.digest(),
1009            pending_certificate2.certificate.digest()
1010        );
1011
1012        // Tests that pending certificate stats are recorded properly. The ready time should be
1013        // 2 seconds apart from the enqueue time.
1014        assert!(pending_certificate.stats.enqueue_time >= tx_start_time);
1015        assert!(
1016            pending_certificate.stats.ready_time.unwrap() - pending_certificate.stats.enqueue_time
1017                >= Duration::from_secs(2)
1018        );
1019
1020        // Predent we have just executed the transaction.
1021        drop(pending_certificate);
1022        drop(pending_certificate2);
1023
1024        // scheduler should be empty at the end.
1025        execution_scheduler.check_empty_for_testing();
1026    }
1027
1028    // Tests when objects become available, correct set of transactions can be sent to execute.
1029    // Specifically, we have following setup,
1030    //         shared_object     shared_object_2
1031    //       /    |    \     \    /
1032    //    tx_0  tx_1  tx_2    tx_3
1033    //     r      r     w      r
1034    // And when shared_object is available, tx_0, tx_1, and tx_2 can be executed. And when
1035    // shared_object_2 becomes available, tx_3 can be executed.
1036    #[tokio::test(flavor = "current_thread", start_paused = true)]
1037    async fn execution_scheduler_object_dependency() {
1038        telemetry_subscribers::init_for_testing();
1039        // Initialize an authority state, with gas objects and a shared object.
1040        let (owner, _keypair) = deterministic_random_account_key();
1041        let gas_objects: Vec<Object> = (0..10)
1042            .map(|_| {
1043                let gas_object_id = ObjectID::random();
1044                Object::with_id_owner_for_testing(gas_object_id, owner)
1045            })
1046            .collect();
1047        let shared_object = Object::shared_for_testing();
1048        let initial_shared_version = shared_object.owner().start_version().unwrap();
1049        let shared_object_2 = Object::shared_for_testing();
1050        let initial_shared_version_2 = shared_object_2.owner().start_version().unwrap();
1051
1052        let state = init_state_with_objects(
1053            [
1054                gas_objects.clone(),
1055                vec![shared_object.clone(), shared_object_2.clone()],
1056            ]
1057            .concat(),
1058        )
1059        .await;
1060
1061        // Create a new execution scheduler instead of reusing the authority's, to examine
1062        // execution_scheduler output from rx_ready_certificates.
1063        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1064        // scheduler should output no transaction.
1065        assert!(rx_ready_certificates.try_recv().is_err());
1066
1067        // Enqueue two transactions with the same shared object input in read-only mode.
1068        let shared_version = 1000.into();
1069        let shared_object_arg_read = ObjectArg::SharedObject {
1070            id: shared_object.id(),
1071            initial_shared_version,
1072            mutability: SharedObjectMutability::Immutable,
1073        };
1074        let transaction_read_0 = make_transaction(
1075            gas_objects[0].clone(),
1076            vec![CallArg::Object(shared_object_arg_read)],
1077        );
1078        let transaction_read_1 = make_transaction(
1079            gas_objects[1].clone(),
1080            vec![CallArg::Object(shared_object_arg_read)],
1081        );
1082        let tx_read_0_assigned_versions = vec![(
1083            (
1084                shared_object.id(),
1085                shared_object.owner().start_version().unwrap(),
1086            ),
1087            shared_version,
1088        )];
1089        let tx_read_1_assigned_versions = vec![(
1090            (
1091                shared_object.id(),
1092                shared_object.owner().start_version().unwrap(),
1093            ),
1094            shared_version,
1095        )];
1096
1097        // Enqueue one transaction with the same shared object in mutable mode.
1098        let shared_object_arg_default = ObjectArg::SharedObject {
1099            id: shared_object.id(),
1100            initial_shared_version,
1101            mutability: SharedObjectMutability::Mutable,
1102        };
1103        let transaction_default = make_transaction(
1104            gas_objects[2].clone(),
1105            vec![CallArg::Object(shared_object_arg_default)],
1106        );
1107        let tx_default_assigned_versions = vec![(
1108            (
1109                shared_object.id(),
1110                shared_object.owner().start_version().unwrap(),
1111            ),
1112            shared_version,
1113        )];
1114
1115        // Enqueue one transaction with two readonly shared object inputs, `shared_object` and `shared_object_2`.
1116        let shared_version_2 = 1000.into();
1117        let shared_object_arg_read_2 = ObjectArg::SharedObject {
1118            id: shared_object_2.id(),
1119            initial_shared_version: initial_shared_version_2,
1120            mutability: SharedObjectMutability::Immutable,
1121        };
1122        let transaction_read_2 = make_transaction(
1123            gas_objects[3].clone(),
1124            vec![
1125                CallArg::Object(shared_object_arg_default),
1126                CallArg::Object(shared_object_arg_read_2),
1127            ],
1128        );
1129        let tx_read_2_assigned_versions = vec![
1130            (
1131                (
1132                    shared_object.id(),
1133                    shared_object.owner().start_version().unwrap(),
1134                ),
1135                shared_version,
1136            ),
1137            (
1138                (
1139                    shared_object_2.id(),
1140                    shared_object_2.owner().start_version().unwrap(),
1141                ),
1142                shared_version_2,
1143            ),
1144        ];
1145
1146        execution_scheduler.enqueue_transactions(
1147            vec![
1148                (
1149                    transaction_read_0.clone(),
1150                    ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
1151                        tx_read_0_assigned_versions,
1152                        None,
1153                    )),
1154                ),
1155                (
1156                    transaction_read_1.clone(),
1157                    ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
1158                        tx_read_1_assigned_versions,
1159                        None,
1160                    )),
1161                ),
1162                (
1163                    transaction_default.clone(),
1164                    ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
1165                        tx_default_assigned_versions,
1166                        None,
1167                    )),
1168                ),
1169                (
1170                    transaction_read_2.clone(),
1171                    ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
1172                        tx_read_2_assigned_versions,
1173                        None,
1174                    )),
1175                ),
1176            ],
1177            &state.epoch_store_for_testing(),
1178        );
1179
1180        // scheduler should output no transaction yet.
1181        sleep(Duration::from_secs(1)).await;
1182        assert!(rx_ready_certificates.try_recv().is_err());
1183
1184        assert_eq!(execution_scheduler.num_pending_certificates(), 4);
1185
1186        // Notify scheduler about availability of the first shared object.
1187        let mut new_shared_object = shared_object.clone();
1188        new_shared_object
1189            .data
1190            .try_as_move_mut()
1191            .unwrap()
1192            .increment_version_to(shared_version_2);
1193        state
1194            .get_cache_writer()
1195            .write_object_entry_for_test(new_shared_object);
1196
1197        // scheduler should output the 3 transactions that are only waiting for this object.
1198        let tx_0 = rx_ready_certificates.recv().await.unwrap().certificate;
1199        let tx_1 = rx_ready_certificates.recv().await.unwrap().certificate;
1200        let tx_2 = rx_ready_certificates.recv().await.unwrap().certificate;
1201        {
1202            let mut want_digests = vec![
1203                transaction_read_0.digest(),
1204                transaction_read_1.digest(),
1205                transaction_default.digest(),
1206            ];
1207            want_digests.sort();
1208            let mut got_digests = vec![tx_0.digest(), tx_1.digest(), tx_2.digest()];
1209            got_digests.sort();
1210            assert_eq!(want_digests, got_digests);
1211        }
1212
1213        sleep(Duration::from_secs(1)).await;
1214        assert!(rx_ready_certificates.try_recv().is_err());
1215
1216        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1217
1218        // Make shared_object_2 available.
1219        let mut new_shared_object_2 = shared_object_2.clone();
1220        new_shared_object_2
1221            .data
1222            .try_as_move_mut()
1223            .unwrap()
1224            .increment_version_to(shared_version_2);
1225        state
1226            .get_cache_writer()
1227            .write_object_entry_for_test(new_shared_object_2);
1228
1229        // Now, the transaction waiting for both shared objects can be executed.
1230        let tx_3 = rx_ready_certificates.recv().await.unwrap().certificate;
1231        assert_eq!(transaction_read_2.digest(), tx_3.digest());
1232
1233        sleep(Duration::from_secs(1)).await;
1234        assert!(rx_ready_certificates.try_recv().is_err());
1235
1236        execution_scheduler.check_empty_for_testing();
1237    }
1238
1239    #[tokio::test(flavor = "current_thread", start_paused = true)]
1240    async fn execution_scheduler_receiving_notify_commit() {
1241        telemetry_subscribers::init_for_testing();
1242        // Initialize an authority state.
1243        let (owner, _keypair) = deterministic_random_account_key();
1244        let gas_objects: Vec<Object> = (0..10)
1245            .map(|_| {
1246                let gas_object_id = ObjectID::random();
1247                Object::with_id_owner_for_testing(gas_object_id, owner)
1248            })
1249            .collect();
1250        let state = init_state_with_objects(gas_objects.clone()).await;
1251
1252        // Create a new execution scheduler instead of reusing the authority's, to examine
1253        // execution_scheduler output from rx_ready_certificates.
1254        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1255        // scheduler should output no transaction.
1256        assert!(rx_ready_certificates.try_recv().is_err());
1257        // scheduler should be empty at the beginning.
1258        execution_scheduler.check_empty_for_testing();
1259
1260        let obj_id = ObjectID::random();
1261        let object_arguments: Vec<_> = (0..10)
1262            .map(|i| {
1263                let object = Object::with_id_owner_version_for_testing(
1264                    obj_id,
1265                    i.into(),
1266                    Owner::AddressOwner(owner),
1267                );
1268                // Every other transaction receives the object, and we create a run of multiple receives in
1269                // a row at the beginning to test that the scheduler doesn't get stuck in either configuration of:
1270                // ImmOrOwnedObject => Receiving,
1271                // Receiving => Receiving
1272                // Receiving => ImmOrOwnedObject
1273                // ImmOrOwnedObject => ImmOrOwnedObject is already tested as the default case on mainnet.
1274                let object_arg = if i % 2 == 0 || i == 3 {
1275                    ObjectArg::Receiving(object.compute_object_reference())
1276                } else {
1277                    ObjectArg::ImmOrOwnedObject(object.compute_object_reference())
1278                };
1279                let txn =
1280                    make_transaction(gas_objects[0].clone(), vec![CallArg::Object(object_arg)]);
1281                (object, txn)
1282            })
1283            .collect();
1284
1285        for (i, (_, txn)) in object_arguments.iter().enumerate() {
1286            // scheduler should output no transaction yet since waiting on receiving object or
1287            // ImmOrOwnedObject input.
1288            execution_scheduler.enqueue_transactions(
1289                vec![(
1290                    txn.clone(),
1291                    ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1292                )],
1293                &state.epoch_store_for_testing(),
1294            );
1295            sleep(Duration::from_secs(1)).await;
1296            assert!(rx_ready_certificates.try_recv().is_err());
1297            assert_eq!(execution_scheduler.num_pending_certificates(), i + 1);
1298        }
1299
1300        // Now start to unravel the transactions by notifying that each subsequent
1301        // transaction has been processed.
1302        let len = object_arguments.len();
1303        for (i, (object, txn)) in object_arguments.into_iter().enumerate() {
1304            // Mark the object as available.
1305            // We should now eventually see the transaction as ready.
1306            state
1307                .get_cache_writer()
1308                .write_object_entry_for_test(object.clone());
1309
1310            // scheduler should output the transaction eventually now that the receiving object has become
1311            // available.
1312            rx_ready_certificates.recv().await.unwrap();
1313
1314            // Only one transaction at a time should become available though. So if we try to get
1315            // another one it should fail.
1316            sleep(Duration::from_secs(1)).await;
1317            assert!(rx_ready_certificates.try_recv().is_err());
1318
1319            // Notify the scheduler that the transaction has been processed.
1320            drop(txn);
1321
1322            // scheduler should now output another transaction to run since it the next version of that object
1323            // has become available.
1324            assert_eq!(execution_scheduler.num_pending_certificates(), len - i - 1);
1325        }
1326
1327        // After everything scheduler should be empty.
1328        execution_scheduler.check_empty_for_testing();
1329    }
1330
1331    #[tokio::test(flavor = "current_thread", start_paused = true)]
1332    async fn execution_scheduler_receiving_object_ready_notifications() {
1333        telemetry_subscribers::init_for_testing();
1334        // Initialize an authority state.
1335        let (owner, _keypair) = deterministic_random_account_key();
1336        let gas_objects: Vec<Object> = (0..10)
1337            .map(|_| {
1338                let gas_object_id = ObjectID::random();
1339                Object::with_id_owner_for_testing(gas_object_id, owner)
1340            })
1341            .collect();
1342        let state = init_state_with_objects(gas_objects.clone()).await;
1343
1344        // Create a new execution scheduler instead of reusing the authority's, to examine
1345        // execution_scheduler output from rx_ready_certificates.
1346        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1347        // scheduler should output no transaction.
1348        assert!(rx_ready_certificates.try_recv().is_err());
1349        // scheduler should be empty at the beginning.
1350        execution_scheduler.check_empty_for_testing();
1351
1352        let obj_id = ObjectID::random();
1353        let receiving_object_new0 =
1354            Object::with_id_owner_version_for_testing(obj_id, 0.into(), Owner::AddressOwner(owner));
1355        let receiving_object_new1 =
1356            Object::with_id_owner_version_for_testing(obj_id, 1.into(), Owner::AddressOwner(owner));
1357        let receiving_object_arg0 =
1358            ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1359        let receive_object_transaction0 = make_transaction(
1360            gas_objects[0].clone(),
1361            vec![CallArg::Object(receiving_object_arg0)],
1362        );
1363
1364        let receiving_object_arg1 =
1365            ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1366        let receive_object_transaction1 = make_transaction(
1367            gas_objects[0].clone(),
1368            vec![CallArg::Object(receiving_object_arg1)],
1369        );
1370
1371        // scheduler should output no transaction yet since waiting on receiving object.
1372        execution_scheduler.enqueue_transactions(
1373            vec![(
1374                receive_object_transaction0.clone(),
1375                ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1376            )],
1377            &state.epoch_store_for_testing(),
1378        );
1379        sleep(Duration::from_secs(1)).await;
1380        assert!(rx_ready_certificates.try_recv().is_err());
1381        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1382
1383        // scheduler should output no transaction yet since waiting on receiving object.
1384        execution_scheduler.enqueue_transactions(
1385            vec![(
1386                receive_object_transaction1.clone(),
1387                ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1388            )],
1389            &state.epoch_store_for_testing(),
1390        );
1391        sleep(Duration::from_secs(1)).await;
1392        assert!(rx_ready_certificates.try_recv().is_err());
1393        assert_eq!(execution_scheduler.num_pending_certificates(), 2);
1394
1395        // Duplicate enqueue of receiving object is allowed.
1396        execution_scheduler.enqueue_transactions(
1397            vec![(
1398                receive_object_transaction0.clone(),
1399                ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1400            )],
1401            &state.epoch_store_for_testing(),
1402        );
1403        sleep(Duration::from_secs(1)).await;
1404        assert!(rx_ready_certificates.try_recv().is_err());
1405        assert_eq!(execution_scheduler.num_pending_certificates(), 3);
1406
1407        // Notify scheduler that the receiving object 0 is available.
1408        state
1409            .get_cache_writer()
1410            .write_object_entry_for_test(receiving_object_new0.clone());
1411
1412        // scheduler should output the transaction eventually now that the receiving object has become
1413        // available.
1414        rx_ready_certificates.recv().await.unwrap();
1415
1416        // Notify scheduler that the receiving object 0 is available.
1417        state
1418            .get_cache_writer()
1419            .write_object_entry_for_test(receiving_object_new1.clone());
1420
1421        // scheduler should output the transaction eventually now that the receiving object has become
1422        // available.
1423        rx_ready_certificates.recv().await.unwrap();
1424    }
1425
1426    #[tokio::test(flavor = "current_thread", start_paused = true)]
1427    async fn execution_scheduler_receiving_object_ready_notifications_multiple_of_same_receiving() {
1428        telemetry_subscribers::init_for_testing();
1429        // Initialize an authority state.
1430        let (owner, _keypair) = deterministic_random_account_key();
1431        let gas_objects: Vec<Object> = (0..10)
1432            .map(|_| {
1433                let gas_object_id = ObjectID::random();
1434                Object::with_id_owner_for_testing(gas_object_id, owner)
1435            })
1436            .collect();
1437        let state = init_state_with_objects(gas_objects.clone()).await;
1438
1439        // Create a new execution scheduler instead of reusing the authority's, to examine
1440        // execution_scheduler output from rx_ready_certificates.
1441        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1442        // scheduler should output no transaction.
1443        assert!(rx_ready_certificates.try_recv().is_err());
1444        // scheduler should be empty at the beginning.
1445        execution_scheduler.check_empty_for_testing();
1446
1447        let obj_id = ObjectID::random();
1448        let receiving_object_new0 =
1449            Object::with_id_owner_version_for_testing(obj_id, 0.into(), Owner::AddressOwner(owner));
1450        let receiving_object_new1 =
1451            Object::with_id_owner_version_for_testing(obj_id, 1.into(), Owner::AddressOwner(owner));
1452        let receiving_object_arg0 =
1453            ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1454        let receive_object_transaction0 = make_transaction(
1455            gas_objects[0].clone(),
1456            vec![CallArg::Object(receiving_object_arg0)],
1457        );
1458
1459        let receive_object_transaction01 = make_transaction(
1460            gas_objects[1].clone(),
1461            vec![CallArg::Object(receiving_object_arg0)],
1462        );
1463
1464        let receiving_object_arg1 =
1465            ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1466        let receive_object_transaction1 = make_transaction(
1467            gas_objects[0].clone(),
1468            vec![CallArg::Object(receiving_object_arg1)],
1469        );
1470
1471        // Enqueuing a transaction with a receiving object that is available at the time it is enqueued
1472        // should become immediately available.
1473        let gas_receiving_arg = ObjectArg::Receiving(gas_objects[3].compute_object_reference());
1474        let tx1 = make_transaction(
1475            gas_objects[0].clone(),
1476            vec![CallArg::Object(gas_receiving_arg)],
1477        );
1478
1479        // scheduler should output no transaction yet since waiting on receiving object.
1480        execution_scheduler.enqueue_transactions(
1481            vec![(
1482                receive_object_transaction0.clone(),
1483                ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1484            )],
1485            &state.epoch_store_for_testing(),
1486        );
1487        sleep(Duration::from_secs(1)).await;
1488        assert!(rx_ready_certificates.try_recv().is_err());
1489        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1490
1491        // scheduler should output no transaction yet since waiting on receiving object.
1492        execution_scheduler.enqueue_transactions(
1493            vec![(
1494                receive_object_transaction1.clone(),
1495                ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1496            )],
1497            &state.epoch_store_for_testing(),
1498        );
1499        sleep(Duration::from_secs(1)).await;
1500        assert!(rx_ready_certificates.try_recv().is_err());
1501        assert_eq!(execution_scheduler.num_pending_certificates(), 2);
1502
1503        // Different transaction with a duplicate receiving object reference is allowed.
1504        // Both transaction's will be outputted once the receiving object is available.
1505        execution_scheduler.enqueue_transactions(
1506            vec![(
1507                receive_object_transaction01.clone(),
1508                ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1509            )],
1510            &state.epoch_store_for_testing(),
1511        );
1512        sleep(Duration::from_secs(1)).await;
1513        assert!(rx_ready_certificates.try_recv().is_err());
1514        assert_eq!(execution_scheduler.num_pending_certificates(), 3);
1515
1516        // Notify scheduler that the receiving object 0 is available.
1517        state
1518            .get_cache_writer()
1519            .write_object_entry_for_test(receiving_object_new0.clone());
1520
1521        // scheduler should output both transactions depending on the receiving object now that the
1522        // transaction's receiving object has become available.
1523        rx_ready_certificates.recv().await.unwrap();
1524
1525        rx_ready_certificates.recv().await.unwrap();
1526
1527        // Only two transactions that were dependent on the receiving object should be output.
1528        assert!(rx_ready_certificates.try_recv().is_err());
1529
1530        // Enqueue a transaction with a receiving object that is available at the time it is enqueued.
1531        // This should be immediately available.
1532        execution_scheduler.enqueue_transactions(
1533            vec![(
1534                tx1.clone(),
1535                ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1536            )],
1537            &state.epoch_store_for_testing(),
1538        );
1539        sleep(Duration::from_secs(1)).await;
1540        rx_ready_certificates.recv().await.unwrap();
1541
1542        // Notify scheduler that the receiving object 0 is available.
1543        state
1544            .get_cache_writer()
1545            .write_object_entry_for_test(receiving_object_new1.clone());
1546
1547        // scheduler should output the transaction eventually now that the receiving object has become
1548        // available.
1549        rx_ready_certificates.recv().await.unwrap();
1550    }
1551
1552    #[tokio::test(flavor = "current_thread", start_paused = true)]
1553    async fn execution_scheduler_receiving_object_ready_if_current_version_greater() {
1554        telemetry_subscribers::init_for_testing();
1555        // Initialize an authority state.
1556        let (owner, _keypair) = deterministic_random_account_key();
1557        let mut gas_objects: Vec<Object> = (0..10)
1558            .map(|_| {
1559                let gas_object_id = ObjectID::random();
1560                Object::with_id_owner_for_testing(gas_object_id, owner)
1561            })
1562            .collect();
1563        let receiving_object = Object::with_id_owner_version_for_testing(
1564            ObjectID::random(),
1565            10.into(),
1566            Owner::AddressOwner(owner),
1567        );
1568        gas_objects.push(receiving_object.clone());
1569        let state = init_state_with_objects(gas_objects.clone()).await;
1570
1571        // Create a new execution scheduler instead of reusing the authority's, to examine
1572        // execution_scheduler output from rx_ready_certificates.
1573        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1574        // scheduler should output no transaction.
1575        assert!(rx_ready_certificates.try_recv().is_err());
1576        // scheduler should be empty at the beginning.
1577        execution_scheduler.check_empty_for_testing();
1578
1579        let receiving_object_new0 = Object::with_id_owner_version_for_testing(
1580            receiving_object.id(),
1581            0.into(),
1582            Owner::AddressOwner(owner),
1583        );
1584        let receiving_object_new1 = Object::with_id_owner_version_for_testing(
1585            receiving_object.id(),
1586            1.into(),
1587            Owner::AddressOwner(owner),
1588        );
1589        let receiving_object_arg0 =
1590            ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1591        let receive_object_transaction0 = make_transaction(
1592            gas_objects[0].clone(),
1593            vec![CallArg::Object(receiving_object_arg0)],
1594        );
1595
1596        let receive_object_transaction01 = make_transaction(
1597            gas_objects[1].clone(),
1598            vec![CallArg::Object(receiving_object_arg0)],
1599        );
1600
1601        let receiving_object_arg1 =
1602            ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1603        let receive_object_transaction1 = make_transaction(
1604            gas_objects[0].clone(),
1605            vec![CallArg::Object(receiving_object_arg1)],
1606        );
1607
1608        // scheduler should output no transaction yet since waiting on receiving object.
1609        execution_scheduler.enqueue_transactions(
1610            vec![(
1611                receive_object_transaction0.clone(),
1612                ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1613            )],
1614            &state.epoch_store_for_testing(),
1615        );
1616        execution_scheduler.enqueue_transactions(
1617            vec![(
1618                receive_object_transaction01.clone(),
1619                ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1620            )],
1621            &state.epoch_store_for_testing(),
1622        );
1623        execution_scheduler.enqueue_transactions(
1624            vec![(
1625                receive_object_transaction1.clone(),
1626                ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1627            )],
1628            &state.epoch_store_for_testing(),
1629        );
1630        sleep(Duration::from_secs(1)).await;
1631        rx_ready_certificates.recv().await.unwrap();
1632        rx_ready_certificates.recv().await.unwrap();
1633        rx_ready_certificates.recv().await.unwrap();
1634        assert!(rx_ready_certificates.try_recv().is_err());
1635    }
1636
1637    // Tests transaction cancellation logic in execution scheduler. Mainly tests that for cancelled transaction,
1638    // execution scheduler only waits for all non-shared objects to be available before outputting the transaction.
1639    #[tokio::test(flavor = "current_thread", start_paused = true)]
1640    async fn execution_scheduler_with_cancelled_transactions() {
1641        // Initialize an authority state, with gas objects and 3 shared objects.
1642        let (owner, _keypair) = deterministic_random_account_key();
1643        let gas_object = Object::with_id_owner_for_testing(ObjectID::random(), owner);
1644        let shared_object_1 = Object::shared_for_testing();
1645        let initial_shared_version_1 = shared_object_1.owner().start_version().unwrap();
1646        let shared_object_2 = Object::shared_for_testing();
1647        let initial_shared_version_2 = shared_object_2.owner().start_version().unwrap();
1648        let owned_object = Object::with_id_owner_for_testing(ObjectID::random(), owner);
1649
1650        let state = init_state_with_objects(vec![
1651            gas_object.clone(),
1652            shared_object_1.clone(),
1653            shared_object_2.clone(),
1654            owned_object.clone(),
1655        ])
1656        .await;
1657
1658        // Create a new execution scheduler instead of reusing the authority's, to examine
1659        // execution_scheduler output from rx_ready_certificates.
1660        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1661        // scheduler should output no transaction.
1662        assert!(rx_ready_certificates.try_recv().is_err());
1663
1664        // Enqueue one transaction with 2 shared object inputs and 1 owned input.
1665        let shared_object_arg_1 = ObjectArg::SharedObject {
1666            id: shared_object_1.id(),
1667            initial_shared_version: initial_shared_version_1,
1668            mutability: SharedObjectMutability::Mutable,
1669        };
1670        let shared_object_arg_2 = ObjectArg::SharedObject {
1671            id: shared_object_2.id(),
1672            initial_shared_version: initial_shared_version_2,
1673            mutability: SharedObjectMutability::Mutable,
1674        };
1675
1676        // Changes the desired owned object version to a higher version. We will make it available later.
1677        let owned_version = 2000.into();
1678        let mut owned_ref = owned_object.compute_object_reference();
1679        owned_ref.1 = owned_version;
1680        let owned_object_arg = ObjectArg::ImmOrOwnedObject(owned_ref);
1681
1682        let cancelled_transaction = make_transaction(
1683            gas_object.clone(),
1684            vec![
1685                CallArg::Object(shared_object_arg_1),
1686                CallArg::Object(shared_object_arg_2),
1687                CallArg::Object(owned_object_arg),
1688            ],
1689        );
1690        let assigned_versions = vec![
1691            (
1692                (
1693                    shared_object_1.id(),
1694                    shared_object_1.owner().start_version().unwrap(),
1695                ),
1696                SequenceNumber::CANCELLED_READ,
1697            ),
1698            (
1699                (
1700                    shared_object_2.id(),
1701                    shared_object_2.owner().start_version().unwrap(),
1702                ),
1703                SequenceNumber::CONGESTED,
1704            ),
1705        ];
1706
1707        execution_scheduler.enqueue_transactions(
1708            vec![(
1709                cancelled_transaction.clone(),
1710                ExecutionEnv::new()
1711                    .with_assigned_versions(AssignedVersions::new(assigned_versions, None)),
1712            )],
1713            &state.epoch_store_for_testing(),
1714        );
1715
1716        // scheduler should output no transaction yet.
1717        sleep(Duration::from_secs(1)).await;
1718        assert!(rx_ready_certificates.try_recv().is_err());
1719
1720        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1721
1722        // Notify scheduler about availability of the owned object.
1723        let mut new_owned_object = owned_object.clone();
1724        new_owned_object
1725            .data
1726            .try_as_move_mut()
1727            .unwrap()
1728            .increment_version_to(owned_version);
1729        state
1730            .get_cache_writer()
1731            .write_object_entry_for_test(new_owned_object);
1732
1733        // scheduler should output the transaction as soon as the owned object is available.
1734        let available_txn = rx_ready_certificates.recv().await.unwrap().certificate;
1735        assert_eq!(available_txn.digest(), cancelled_transaction.digest());
1736
1737        sleep(Duration::from_secs(1)).await;
1738        assert!(rx_ready_certificates.try_recv().is_err());
1739
1740        execution_scheduler.check_empty_for_testing();
1741    }
1742
1743    #[test]
1744    fn test_barrier_dependency_builder() {
1745        let make_transaction = |non_exclusive_writes: Vec<u32>, exclusive_writes: Vec<u32>| {
1746            assert!(
1747                non_exclusive_writes
1748                    .iter()
1749                    .all(|id| !exclusive_writes.contains(id))
1750            );
1751            assert!(
1752                exclusive_writes
1753                    .iter()
1754                    .all(|id| !non_exclusive_writes.contains(id))
1755            );
1756
1757            let non_exclusive_writes = non_exclusive_writes
1758                .into_iter()
1759                .map(|id| ObjectID::from_single_byte(id as u8));
1760            let exclusive_writes = exclusive_writes
1761                .into_iter()
1762                .map(|id| ObjectID::from_single_byte(id as u8));
1763            let mut builder = ProgrammableTransactionBuilder::new();
1764            for non_exclusive_write in non_exclusive_writes {
1765                builder
1766                    .obj(ObjectArg::SharedObject {
1767                        id: non_exclusive_write,
1768                        initial_shared_version: SequenceNumber::new(),
1769                        mutability: SharedObjectMutability::NonExclusiveWrite,
1770                    })
1771                    .unwrap();
1772            }
1773
1774            for exclusive_write in exclusive_writes {
1775                builder
1776                    .obj(ObjectArg::SharedObject {
1777                        id: exclusive_write,
1778                        initial_shared_version: SequenceNumber::new(),
1779                        mutability: SharedObjectMutability::Mutable,
1780                    })
1781                    .unwrap();
1782            }
1783
1784            let tx = TransactionKind::ProgrammableTransaction(builder.finish());
1785            let tx_data =
1786                TransactionData::new(tx, SuiAddress::default(), random_object_ref(), 1, 1);
1787            Transaction::from_data_and_signer(tx_data, vec![])
1788        };
1789
1790        // One non-exclusive write, one exclusive write.
1791        {
1792            let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1793            let tx1 = make_transaction(vec![1], vec![]);
1794            let tx2 = make_transaction(vec![], vec![1]);
1795
1796            let tx1_deps =
1797                barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1798            let tx2_deps =
1799                barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1800            assert!(tx1_deps.is_empty());
1801            assert_eq!(Vec::from_iter(tx2_deps), vec![*tx1.digest()]);
1802        }
1803
1804        // One transaction has non-exclusive writes to two different objects, and then becomes
1805        // a dependency of two barriers
1806        {
1807            let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1808            let tx1 = make_transaction(vec![1, 2], vec![]);
1809            let tx2 = make_transaction(vec![], vec![1]);
1810            let tx3 = make_transaction(vec![], vec![2]);
1811
1812            let tx1_deps =
1813                barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1814            let tx2_deps =
1815                barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1816            let tx3_deps =
1817                barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1818            assert!(tx1_deps.is_empty());
1819            assert_eq!(Vec::from_iter(tx2_deps), vec![*tx1.digest()]);
1820            assert_eq!(Vec::from_iter(tx3_deps), vec![*tx1.digest()]);
1821        }
1822
1823        // Ensure multiple-object dependences are merged
1824        {
1825            let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1826            let tx1 = make_transaction(vec![1], vec![]);
1827            let tx2 = make_transaction(vec![2], vec![]);
1828            let tx3 = make_transaction(vec![], vec![1, 2]);
1829
1830            let tx1_deps =
1831                barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1832            let tx2_deps =
1833                barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1834            let tx3_deps =
1835                barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1836            assert!(tx1_deps.is_empty());
1837            assert!(tx2_deps.is_empty());
1838            assert_eq!(tx3_deps, BTreeSet::from([*tx1.digest(), *tx2.digest()]));
1839        }
1840
1841        // Ensure dependency state is cleared
1842        {
1843            let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1844            let tx1 = make_transaction(vec![1], vec![]);
1845            let tx2 = make_transaction(vec![], vec![1]);
1846            let tx3 = make_transaction(vec![], vec![1]);
1847
1848            let tx1_deps =
1849                barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1850            let tx2_deps =
1851                barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1852            let tx3_deps =
1853                barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1854            assert!(tx1_deps.is_empty());
1855            assert_eq!(tx2_deps, BTreeSet::from([*tx1.digest()]));
1856            assert!(tx3_deps.is_empty());
1857        }
1858    }
1859}