sui_core/execution_scheduler/
execution_scheduler_impl.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{
5    accumulators::funds_read::AccountFundsRead,
6    authority::{
7        AuthorityMetrics, ExecutionEnv, authority_per_epoch_store::AuthorityPerEpochStore,
8        shared_object_version_manager::Schedulable,
9    },
10    execution_cache::{ObjectCacheRead, TransactionCacheRead},
11    execution_scheduler::{
12        ExecutingGuard, PendingCertificateStats,
13        funds_withdraw_scheduler::{
14            AddressFundsSchedulerMetrics, FundsSettlement, ScheduleStatus, TxFundsWithdraw,
15            WithdrawReservations, scheduler::FundsWithdrawScheduler,
16        },
17    },
18};
19use futures::stream::{FuturesUnordered, StreamExt};
20use mysten_common::ZipDebugEqIteratorExt;
21use mysten_common::{assert_reachable, debug_fatal};
22use mysten_metrics::spawn_monitored_task;
23use parking_lot::Mutex;
24use std::{
25    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
26    sync::Arc,
27};
28use sui_config::node::{AuthorityOverloadConfig, FundsWithdrawSchedulerType};
29use sui_types::{
30    SUI_ACCUMULATOR_ROOT_OBJECT_ID,
31    base_types::{FullObjectID, ObjectID},
32    digests::TransactionDigest,
33    error::SuiResult,
34    executable_transaction::VerifiedExecutableTransaction,
35    storage::InputKey,
36    transaction::{
37        SenderSignedData, SharedInputObject, SharedObjectMutability, TransactionData,
38        TransactionDataAPI, TransactionKey,
39    },
40};
41use tokio::sync::mpsc::UnboundedSender;
42use tokio::time::Instant;
43use tracing::{debug, error, instrument};
44
45use super::{PendingCertificate, overload_tracker::OverloadTracker};
46
47/// Utility struct for collecting barrier dependencies
48pub(crate) struct BarrierDependencyBuilder {
49    dep_state: BTreeMap<ObjectID, BTreeSet<TransactionDigest>>,
50}
51
52impl BarrierDependencyBuilder {
53    pub fn new() -> Self {
54        Self {
55            dep_state: Default::default(),
56        }
57    }
58
59    /// process_tx must be called for each transaction in scheduling order. If the
60    /// transaction has a non-exclusive write to an object, the transaction digest is
61    /// stored to become a dependency of the eventual barrier transaction. If a
62    /// transaction has an exclusive write to an object, all pending non-exclusive write
63    /// transactions for that object are added to the barrier dependencies.
64    pub fn process_tx(
65        &mut self,
66        tx_digest: TransactionDigest,
67        tx: &TransactionData,
68    ) -> BTreeSet<TransactionDigest> {
69        let mut barrier_deps = BTreeSet::new();
70        for SharedInputObject { id, mutability, .. } in tx.kind().shared_input_objects() {
71            match mutability {
72                SharedObjectMutability::NonExclusiveWrite => {
73                    self.dep_state.entry(id).or_default().insert(tx_digest);
74                }
75                SharedObjectMutability::Mutable => {
76                    // If there were preceding non-exclusive writes to this object id, this
77                    // transaction is a barrier and must wait for them to finish.
78                    if let Some(deps) = self.dep_state.remove(&id) {
79                        barrier_deps.extend(deps);
80                    }
81                }
82                SharedObjectMutability::Immutable => (),
83            }
84        }
85        barrier_deps
86    }
87}
88
89#[derive(Clone)]
90pub struct ExecutionScheduler {
91    object_cache_read: Arc<dyn ObjectCacheRead>,
92    transaction_cache_read: Arc<dyn TransactionCacheRead>,
93    overload_tracker: Arc<OverloadTracker>,
94    tx_ready_certificates: UnboundedSender<PendingCertificate>,
95    address_funds_withdraw_scheduler: Arc<Mutex<Option<FundsWithdrawScheduler>>>,
96    funds_withdraw_scheduler_type: FundsWithdrawSchedulerType,
97    metrics: Arc<AuthorityMetrics>,
98    address_funds_scheduler_metrics: Arc<AddressFundsSchedulerMetrics>,
99}
100
101struct PendingGuard<'a> {
102    scheduler: &'a ExecutionScheduler,
103    cert: &'a VerifiedExecutableTransaction,
104}
105
106impl<'a> PendingGuard<'a> {
107    pub fn new(scheduler: &'a ExecutionScheduler, cert: &'a VerifiedExecutableTransaction) -> Self {
108        scheduler
109            .metrics
110            .transaction_manager_num_pending_certificates
111            .inc();
112        scheduler
113            .overload_tracker
114            .add_pending_certificate(cert.data());
115        Self { scheduler, cert }
116    }
117}
118
119impl Drop for PendingGuard<'_> {
120    fn drop(&mut self) {
121        self.scheduler
122            .metrics
123            .transaction_manager_num_pending_certificates
124            .dec();
125        self.scheduler
126            .overload_tracker
127            .remove_pending_certificate(self.cert.data());
128    }
129}
130
131impl ExecutionScheduler {
132    pub fn new(
133        object_cache_read: Arc<dyn ObjectCacheRead>,
134        account_funds_read: Arc<dyn AccountFundsRead>,
135        transaction_cache_read: Arc<dyn TransactionCacheRead>,
136        tx_ready_certificates: UnboundedSender<PendingCertificate>,
137        epoch_store: &Arc<AuthorityPerEpochStore>,
138        funds_withdraw_scheduler_type: FundsWithdrawSchedulerType,
139        metrics: Arc<AuthorityMetrics>,
140        prometheus_registry: &prometheus::Registry,
141    ) -> Self {
142        tracing::info!(
143            ?funds_withdraw_scheduler_type,
144            "Creating new ExecutionScheduler"
145        );
146        let address_funds_scheduler_metrics =
147            Arc::new(AddressFundsSchedulerMetrics::new(prometheus_registry));
148        let address_funds_withdraw_scheduler = Self::initialize_funds_withdraw_scheduler(
149            epoch_store,
150            &object_cache_read,
151            account_funds_read,
152            funds_withdraw_scheduler_type,
153            &address_funds_scheduler_metrics,
154        );
155        Self {
156            object_cache_read,
157            transaction_cache_read,
158            overload_tracker: Arc::new(OverloadTracker::new()),
159            tx_ready_certificates,
160            address_funds_withdraw_scheduler: Arc::new(Mutex::new(
161                address_funds_withdraw_scheduler,
162            )),
163            funds_withdraw_scheduler_type,
164            metrics,
165            address_funds_scheduler_metrics,
166        }
167    }
168
169    fn initialize_funds_withdraw_scheduler(
170        epoch_store: &Arc<AuthorityPerEpochStore>,
171        object_cache_read: &Arc<dyn ObjectCacheRead>,
172        account_funds_read: Arc<dyn AccountFundsRead>,
173        scheduler_type: FundsWithdrawSchedulerType,
174        address_funds_scheduler_metrics: &Arc<AddressFundsSchedulerMetrics>,
175    ) -> Option<FundsWithdrawScheduler> {
176        let withdraw_scheduler_enabled =
177            epoch_store.node_role().runs_consensus() && epoch_store.accumulators_enabled();
178        if !withdraw_scheduler_enabled {
179            return None;
180        }
181        let starting_accumulator_version = object_cache_read
182            .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
183            .expect("Accumulator root object must be present if funds accumulator is enabled")
184            .version();
185        let address_funds_withdraw_scheduler = FundsWithdrawScheduler::new(
186            account_funds_read.clone(),
187            starting_accumulator_version,
188            scheduler_type,
189            address_funds_scheduler_metrics.clone(),
190        );
191
192        Some(address_funds_withdraw_scheduler)
193    }
194
195    #[instrument(level = "debug", skip_all, fields(tx_digest = ?cert.digest()))]
196    async fn schedule_transaction(
197        self,
198        cert: VerifiedExecutableTransaction,
199        execution_env: ExecutionEnv,
200        epoch_store: &Arc<AuthorityPerEpochStore>,
201    ) {
202        let enqueue_time = Instant::now();
203        let tx_digest = cert.digest();
204        let digests = [*tx_digest];
205
206        let tx_data = cert.transaction_data();
207        let input_object_kinds = tx_data
208            .input_objects()
209            .expect("input_objects() cannot fail");
210        let mut input_object_keys: Vec<_> = epoch_store
211            .get_input_object_keys(
212                &cert.key(),
213                &input_object_kinds,
214                &execution_env.assigned_versions,
215            )
216            .into_iter()
217            .collect();
218
219        // Coin reservation transactions need to wait for the accumulator root object
220        // to reach the assigned accumulator version. This ensures settlement transactions
221        // (which create/update accumulator account objects) execute before coin reservation
222        // transactions that depend on those objects.
223        if tx_data.kind().has_coin_reservations()
224            && let Some(accumulator_version) = execution_env.assigned_versions.accumulator_version
225            && let Some(initial_shared_version) =
226                (**epoch_store.epoch_start_config()).accumulator_root_obj_initial_shared_version()
227        {
228            input_object_keys.push(InputKey::VersionedObject {
229                id: FullObjectID::new(SUI_ACCUMULATOR_ROOT_OBJECT_ID, Some(initial_shared_version)),
230                version: accumulator_version,
231            });
232        }
233
234        let receiving_object_keys: HashSet<_> = tx_data
235            .receiving_objects()
236            .into_iter()
237            .map(|entry| {
238                InputKey::VersionedObject {
239                    // TODO: Add support for receiving ConsensusV2 objects. For now this assumes fastpath.
240                    id: FullObjectID::new(entry.0, None),
241                    version: entry.1,
242                }
243            })
244            .collect();
245        let input_and_receiving_keys = [
246            input_object_keys,
247            receiving_object_keys.iter().cloned().collect(),
248        ]
249        .concat();
250
251        let epoch = epoch_store.epoch();
252        debug!(
253            ?tx_digest,
254            "Scheduled transaction, waiting for input objects: {:?}", input_and_receiving_keys,
255        );
256
257        let availability = self
258            .object_cache_read
259            .multi_input_objects_available_cache_only(&input_and_receiving_keys);
260        // Most of the times, the transaction's input objects are already available.
261        // We can check the availability of the input objects first, and only wait for the
262        // missing input objects if necessary.
263        let missing_input_keys: Vec<_> = input_and_receiving_keys
264            .into_iter()
265            .zip_debug_eq(availability)
266            .filter_map(|(key, available)| if !available { Some(key) } else { None })
267            .collect();
268
269        let has_missing_barrier_dependencies = self
270            .transaction_cache_read
271            .multi_get_executed_effects_digests(&execution_env.barrier_dependencies)
272            .into_iter()
273            .any(|r| r.is_none());
274
275        if missing_input_keys.is_empty() && !has_missing_barrier_dependencies {
276            self.metrics
277                .transaction_manager_num_enqueued_certificates
278                .with_label_values(&["ready"])
279                .inc();
280            debug!(?tx_digest, "Input objects already available");
281            self.send_transaction_for_execution(&cert, execution_env, enqueue_time);
282            return;
283        }
284
285        let _pending_guard = PendingGuard::new(&self, &cert);
286        self.metrics
287            .transaction_manager_num_enqueued_certificates
288            .with_label_values(&["pending"])
289            .inc();
290
291        if !execution_env.barrier_dependencies.is_empty() {
292            debug!(
293                "waiting for barrier dependencies to be executed: {:?}",
294                execution_env.barrier_dependencies
295            );
296            self.transaction_cache_read
297                .notify_read_executed_effects_digests(
298                    "wait_for_barrier_dependencies",
299                    &execution_env.barrier_dependencies,
300                )
301                .await;
302        }
303
304        tokio::select! {
305            _ = self.object_cache_read
306                .notify_read_input_objects(&missing_input_keys, &receiving_object_keys, epoch)
307                => {
308                    self.metrics
309                        .transaction_manager_transaction_queue_age_s
310                        .observe(enqueue_time.elapsed().as_secs_f64());
311                    debug!(?tx_digest, "Input objects available");
312                    // TODO: Eventually we could fold execution_driver into the scheduler.
313                    self.send_transaction_for_execution(
314                        &cert,
315                        execution_env,
316                        enqueue_time,
317                    );
318                }
319            _ = self.transaction_cache_read.notify_read_executed_effects_digests(
320                "ExecutionScheduler::notify_read_executed_effects_digests",
321                &digests,
322            ) => {
323                debug!(?tx_digest, "Transaction already executed");
324            }
325        };
326    }
327
328    pub fn send_transaction_for_execution(
329        &self,
330        cert: &VerifiedExecutableTransaction,
331        execution_env: ExecutionEnv,
332        enqueue_time: Instant,
333    ) {
334        let pending_cert = PendingCertificate {
335            certificate: cert.clone(),
336            execution_env,
337            stats: PendingCertificateStats {
338                enqueue_time,
339                ready_time: Some(Instant::now()),
340            },
341            executing_guard: Some(ExecutingGuard::new(
342                self.metrics
343                    .transaction_manager_num_executing_certificates
344                    .clone(),
345            )),
346        };
347        let _ = self.tx_ready_certificates.send(pending_cert);
348    }
349
350    fn schedule_funds_withdraws(
351        &self,
352        certs: Vec<(VerifiedExecutableTransaction, ExecutionEnv)>,
353        epoch_store: &Arc<AuthorityPerEpochStore>,
354    ) {
355        if certs.is_empty() {
356            return;
357        }
358        let mut withdraws = BTreeMap::new();
359        let mut prev_version = None;
360        for (cert, env) in &certs {
361            let tx_withdraws = cert
362                .transaction_data()
363                .process_funds_withdrawals_for_execution(epoch_store.get_chain_identifier());
364            assert!(!tx_withdraws.is_empty());
365            let accumulator_version = env
366                .assigned_versions
367                .accumulator_version
368                .expect("accumulator_version must be set when there are withdraws");
369            if let Some(prev_version) = prev_version {
370                // Transactions must be in order.
371                assert!(prev_version <= accumulator_version);
372            }
373            prev_version = Some(accumulator_version);
374            let tx_digest = *cert.digest();
375            withdraws
376                .entry(accumulator_version)
377                .or_insert(Vec::new())
378                .push(TxFundsWithdraw {
379                    tx_digest,
380                    reservations: tx_withdraws,
381                });
382        }
383        let mut receivers = FuturesUnordered::new();
384        {
385            let guard = self.address_funds_withdraw_scheduler.lock();
386            let withdraw_scheduler = guard
387                .as_ref()
388                .expect("Funds withdraw scheduler must be enabled if there are withdraws");
389            for (version, tx_withdraws) in withdraws {
390                receivers.extend(withdraw_scheduler.schedule_withdraws(WithdrawReservations {
391                    accumulator_version: version,
392                    withdraws: tx_withdraws,
393                }));
394            }
395            // guard will be dropped here
396        }
397        let scheduler = self.clone();
398        let epoch_store = epoch_store.clone();
399        spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
400            let mut cert_map = HashMap::new();
401            for (cert, env) in certs {
402                cert_map.insert(*cert.digest(), (cert, env));
403            }
404            while let Some(result) = receivers.next().await {
405                match result {
406                    Ok((tx_digest, status)) => match status {
407                        ScheduleStatus::InsufficientFunds => {
408                            assert_reachable!("tx cancelled, insufficient funds");
409                            debug!(
410                                ?tx_digest,
411                                "Funds withdraw scheduling result: Insufficient funds"
412                            );
413                            let (cert, env) = cert_map.remove(&tx_digest).expect("cert must exist");
414                            let env = env.with_insufficient_funds();
415                            scheduler.enqueue_transactions(vec![(cert, env)], &epoch_store);
416                        }
417                        ScheduleStatus::SufficientFunds => {
418                            assert_reachable!("tx scheduled, sufficient funds");
419                            debug!(?tx_digest, "Funds withdraw scheduling result: Success");
420                            let (cert, env) = cert_map.remove(&tx_digest).expect("cert must exist");
421                            scheduler.enqueue_transactions(vec![(cert, env)], &epoch_store);
422                        }
423                        ScheduleStatus::SkipSchedule => {
424                            assert_reachable!("tx withdrawal scheduling skipped");
425                            debug!(?tx_digest, "Skip scheduling funds withdraw");
426                        }
427                    },
428                    Err(e) => {
429                        error!("Withdraw scheduler stopped: {:?}", e);
430                    }
431                }
432            }
433        }));
434    }
435
436    fn schedule_tx_keys(
437        &self,
438        tx_with_keys: Vec<(TransactionKey, ExecutionEnv)>,
439        epoch_store: &Arc<AuthorityPerEpochStore>,
440    ) {
441        if tx_with_keys.is_empty() {
442            return;
443        }
444
445        let scheduler = self.clone();
446        let epoch_store = epoch_store.clone();
447        spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
448            let tx_keys: Vec<_> = tx_with_keys.iter().map(|(key, _)| key).cloned().collect();
449            let digests = epoch_store
450                .notify_read_tx_key_to_digest(&tx_keys)
451                .await
452                .expect("db error");
453            let transactions = scheduler
454                .transaction_cache_read
455                .multi_get_transaction_blocks(&digests)
456                .into_iter()
457                .map(|tx| {
458                    let tx = tx.expect("tx must exist").as_ref().clone();
459                    VerifiedExecutableTransaction::new_system(tx, epoch_store.epoch())
460                })
461                .zip_debug_eq(tx_with_keys.into_iter().map(|(_, env)| env))
462                .collect::<Vec<_>>();
463            scheduler.enqueue_transactions(transactions, &epoch_store);
464        }));
465    }
466
467    /// When we schedule a certificate, it should be impossible for it to have been executed in a
468    /// previous epoch.
469    #[cfg(debug_assertions)]
470    fn assert_cert_not_executed_previous_epochs(&self, cert: &VerifiedExecutableTransaction) {
471        let epoch = cert.epoch();
472        let digest = *cert.digest();
473        let digests = [digest];
474        let executed = self
475            .transaction_cache_read
476            .multi_get_executed_effects(&digests)
477            .pop()
478            .unwrap();
479        // Due to pruning, we may not always have an executed effects for the certificate
480        // even if it was executed. So this is a best-effort check.
481        if let Some(executed) = executed {
482            use sui_types::effects::TransactionEffectsAPI;
483
484            assert_eq!(
485                executed.executed_epoch(),
486                epoch,
487                "Transaction {} was executed in epoch {}, but scheduled again in epoch {}",
488                digest,
489                executed.executed_epoch(),
490                epoch
491            );
492        }
493    }
494}
495
496impl ExecutionScheduler {
497    pub fn enqueue(
498        &self,
499        certs: Vec<(Schedulable, ExecutionEnv)>,
500        epoch_store: &Arc<AuthorityPerEpochStore>,
501    ) {
502        // schedule all transactions immediately
503        let mut ordinary_txns = Vec::with_capacity(certs.len());
504        let mut tx_with_keys = Vec::new();
505        let mut tx_with_withdraws = Vec::new();
506
507        for (schedulable, env) in certs {
508            match schedulable {
509                Schedulable::Transaction(tx) => {
510                    if tx.transaction_data().has_funds_withdrawals() {
511                        tx_with_withdraws.push((tx, env));
512                    } else {
513                        ordinary_txns.push((tx, env));
514                    }
515                }
516                s @ Schedulable::RandomnessStateUpdate(..) => {
517                    tx_with_keys.push((s.key(), env));
518                }
519                Schedulable::AccumulatorSettlement(_, _) => {
520                    unreachable!("handled by SettlementScheduler");
521                }
522                Schedulable::ConsensusCommitPrologue(_, _, _) => {
523                    // we only use Schedulable::ConsensusCommitPrologue as a temporary placeholder
524                    // during version assignment, by the time we schedule transactions it should be
525                    // converted to Schedulable::Transaction
526                    unreachable!("Schedulable::ConsensusCommitPrologue should not be enqueued");
527                }
528            }
529        }
530
531        self.enqueue_transactions(ordinary_txns, epoch_store);
532        self.schedule_tx_keys(tx_with_keys, epoch_store);
533        self.schedule_funds_withdraws(tx_with_withdraws, epoch_store);
534    }
535
536    pub fn enqueue_transactions(
537        &self,
538        certs: Vec<(VerifiedExecutableTransaction, ExecutionEnv)>,
539        epoch_store: &Arc<AuthorityPerEpochStore>,
540    ) {
541        // Filter out certificates from wrong epoch.
542        let certs: Vec<_> = certs
543            .into_iter()
544            .filter_map(|cert| {
545                if cert.0.epoch() == epoch_store.epoch() {
546                    #[cfg(debug_assertions)]
547                    self.assert_cert_not_executed_previous_epochs(&cert.0);
548
549                    Some(cert)
550                } else {
551                    debug_fatal!(
552                        "We should never enqueue certificate from wrong epoch. Expected={} Certificate={:?}",
553                        epoch_store.epoch(),
554                        cert.0.epoch()
555                    );
556                    None
557                }
558            })
559            .collect();
560        let digests: Vec<_> = certs.iter().map(|(cert, _)| *cert.digest()).collect();
561        let executed = self
562            .transaction_cache_read
563            .multi_get_executed_effects_digests(&digests);
564        let mut already_executed_certs_num = 0;
565        let pending_certs = certs.into_iter().zip_debug_eq(executed).filter_map(
566            |((cert, execution_env), executed)| {
567                if executed.is_none() {
568                    Some((cert, execution_env))
569                } else {
570                    already_executed_certs_num += 1;
571                    None
572                }
573            },
574        );
575
576        for (cert, execution_env) in pending_certs {
577            let scheduler = self.clone();
578            let epoch_store = epoch_store.clone();
579            spawn_monitored_task!(
580                epoch_store.within_alive_epoch(scheduler.schedule_transaction(
581                    cert,
582                    execution_env,
583                    &epoch_store,
584                ))
585            );
586        }
587
588        self.metrics
589            .transaction_manager_num_enqueued_certificates
590            .with_label_values(&["already_executed"])
591            .inc_by(already_executed_certs_num);
592    }
593
594    pub fn settle_address_funds(&self, settlement: FundsSettlement) {
595        self.address_funds_withdraw_scheduler
596            .lock()
597            .as_ref()
598            .expect("Funds withdraw scheduler must be enabled if there are settlements")
599            .settle_funds(settlement);
600    }
601
602    /// Reconfigure internal state at epoch start. This resets the funds withdraw scheduler
603    /// to the current accumulator root object version.
604    pub fn reconfigure(
605        &self,
606        new_epoch_store: &Arc<AuthorityPerEpochStore>,
607        account_funds_read: &Arc<dyn AccountFundsRead>,
608    ) {
609        let address_funds_withdraw_scheduler = Self::initialize_funds_withdraw_scheduler(
610            new_epoch_store,
611            &self.object_cache_read,
612            account_funds_read.clone(),
613            self.funds_withdraw_scheduler_type,
614            &self.address_funds_scheduler_metrics,
615        );
616        let mut guard = self.address_funds_withdraw_scheduler.lock();
617        if let Some(old_scheduler) = guard.as_ref() {
618            old_scheduler.close_epoch();
619        }
620        *guard = address_funds_withdraw_scheduler;
621        drop(guard);
622    }
623
624    pub fn check_execution_overload(
625        &self,
626        overload_config: &AuthorityOverloadConfig,
627        tx_data: &SenderSignedData,
628    ) -> SuiResult {
629        let inflight_queue_len = self.num_pending_certificates();
630        self.overload_tracker
631            .check_execution_overload(overload_config, tx_data, inflight_queue_len)
632    }
633
634    pub fn num_pending_certificates(&self) -> usize {
635        (self
636            .metrics
637            .transaction_manager_num_pending_certificates
638            .get()
639            + self
640                .metrics
641                .transaction_manager_num_executing_certificates
642                .get()) as usize
643    }
644
645    #[cfg(test)]
646    pub async fn check_empty_for_testing(&self) {
647        for _ in 0..10 {
648            if self.num_pending_certificates() == 0 {
649                return;
650            }
651            tokio::task::yield_now().await;
652        }
653        assert_eq!(self.num_pending_certificates(), 0);
654    }
655}
656
657#[cfg(test)]
658mod test {
659    use super::{
660        BarrierDependencyBuilder, ExecutionScheduler, FundsWithdrawSchedulerType,
661        PendingCertificate,
662    };
663    use crate::authority::ExecutionEnv;
664    use crate::authority::shared_object_version_manager::AssignedVersions;
665    use crate::authority::{AuthorityState, authority_tests::init_state_with_objects};
666    use std::collections::BTreeSet;
667    use std::{time::Duration, vec};
668    use sui_test_transaction_builder::TestTransactionBuilder;
669    use sui_types::base_types::{SuiAddress, random_object_ref};
670    use sui_types::executable_transaction::VerifiedExecutableTransaction;
671    use sui_types::object::Owner;
672    use sui_types::programmable_transaction_builder::ProgrammableTransactionBuilder;
673    use sui_types::transaction::{
674        SharedObjectMutability, Transaction, TransactionData, TransactionKind, VerifiedTransaction,
675    };
676    use sui_types::{
677        SUI_FRAMEWORK_PACKAGE_ID,
678        base_types::{ObjectID, SequenceNumber},
679        crypto::deterministic_random_account_key,
680        object::Object,
681        transaction::{CallArg, ObjectArg},
682    };
683    use tokio::time::Instant;
684    use tokio::{
685        sync::mpsc::{UnboundedReceiver, error::TryRecvError, unbounded_channel},
686        time::sleep,
687    };
688
689    #[allow(clippy::disallowed_methods)] // allow unbounded_channel()
690    fn make_execution_scheduler(
691        state: &AuthorityState,
692    ) -> (ExecutionScheduler, UnboundedReceiver<PendingCertificate>) {
693        // Create a new execution scheduler instead of reusing the authority's, to examine
694        // execution_scheduler output from rx_ready_certificates.
695        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
696        let registry = prometheus::Registry::new();
697        let execution_scheduler = ExecutionScheduler::new(
698            state.get_object_cache_reader().clone(),
699            state.get_account_funds_read().clone(),
700            state.get_transaction_cache_reader().clone(),
701            tx_ready_certificates,
702            &state.epoch_store_for_testing(),
703            FundsWithdrawSchedulerType::default(),
704            state.metrics.clone(),
705            &registry,
706        );
707
708        (execution_scheduler, rx_ready_certificates)
709    }
710
711    fn make_transaction(gas_object: Object, input: Vec<CallArg>) -> VerifiedExecutableTransaction {
712        // Use fake module, function, package and gas prices since they are irrelevant for testing
713        // execution scheduler.
714        let rgp = 100;
715        let (sender, keypair) = deterministic_random_account_key();
716        let transaction =
717            TestTransactionBuilder::new(sender, gas_object.compute_object_reference(), rgp)
718                .move_call(SUI_FRAMEWORK_PACKAGE_ID, "counter", "assert_value", input)
719                .build_and_sign(&keypair);
720        VerifiedExecutableTransaction::new_system(
721            VerifiedTransaction::new_unchecked(transaction),
722            0,
723        )
724    }
725
726    #[tokio::test(flavor = "current_thread", start_paused = true)]
727    async fn execution_scheduler_basics() {
728        // Initialize an authority state.
729        let (owner, _keypair) = deterministic_random_account_key();
730        let gas_objects: Vec<Object> = (0..10)
731            .map(|_| {
732                let gas_object_id = ObjectID::random();
733                Object::with_id_owner_for_testing(gas_object_id, owner)
734            })
735            .collect();
736        let state = init_state_with_objects(gas_objects.clone()).await;
737
738        // Create a new execution scheduler instead of reusing the authority's, to examine
739        // execution_scheduler output from rx_ready_certificates.
740        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
741        // scheduler should output no transaction.
742        assert!(
743            rx_ready_certificates
744                .try_recv()
745                .is_err_and(|err| err == TryRecvError::Empty)
746        );
747        // scheduler should be empty at the beginning.
748        assert_eq!(execution_scheduler.num_pending_certificates(), 0);
749
750        // Enqueue empty vec should not crash.
751        execution_scheduler.enqueue_transactions(vec![], &state.epoch_store_for_testing());
752        // scheduler should output no transaction.
753        assert!(
754            rx_ready_certificates
755                .try_recv()
756                .is_err_and(|err| err == TryRecvError::Empty)
757        );
758
759        // Enqueue a transaction with existing gas object, empty input.
760        let transaction = make_transaction(gas_objects[0].clone(), vec![]);
761        let tx_start_time = Instant::now();
762        execution_scheduler.enqueue_transactions(
763            vec![(transaction.clone(), ExecutionEnv::new())],
764            &state.epoch_store_for_testing(),
765        );
766        // scheduler should output the transaction eventually.
767        let pending_certificate = rx_ready_certificates.recv().await.unwrap();
768
769        // Tests that pending certificate stats are recorded properly.
770        assert!(pending_certificate.stats.enqueue_time >= tx_start_time);
771        assert!(
772            pending_certificate.stats.ready_time.unwrap() >= pending_certificate.stats.enqueue_time
773        );
774
775        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
776
777        // Predent we have just executed the transaction.
778        drop(pending_certificate);
779
780        // scheduler should be empty.
781        execution_scheduler.check_empty_for_testing().await;
782
783        // Enqueue a transaction with a new gas object, empty input.
784        let gas_object_new = Object::with_id_owner_version_for_testing(
785            ObjectID::random(),
786            0.into(),
787            Owner::AddressOwner(owner),
788        );
789        let transaction = make_transaction(gas_object_new.clone(), vec![]);
790        let tx_start_time = Instant::now();
791        execution_scheduler.enqueue_transactions(
792            vec![(transaction.clone(), ExecutionEnv::new())],
793            &state.epoch_store_for_testing(),
794        );
795        // scheduler should output no transaction yet.
796        sleep(Duration::from_secs(1)).await;
797        assert!(
798            rx_ready_certificates
799                .try_recv()
800                .is_err_and(|err| err == TryRecvError::Empty)
801        );
802
803        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
804
805        // Duplicated enqueue is allowed.
806        execution_scheduler.enqueue_transactions(
807            vec![(transaction.clone(), ExecutionEnv::new())],
808            &state.epoch_store_for_testing(),
809        );
810        sleep(Duration::from_secs(1)).await;
811        assert!(
812            rx_ready_certificates
813                .try_recv()
814                .is_err_and(|err| err == TryRecvError::Empty)
815        );
816
817        assert_eq!(execution_scheduler.num_pending_certificates(), 2);
818
819        // Notify scheduler about availability of the gas object.
820        state
821            .get_cache_writer()
822            .write_object_entry_for_test(gas_object_new);
823        // scheduler should output the transaction eventually.
824        // We will see both the original and the duplicated transaction.
825        let pending_certificate = rx_ready_certificates.recv().await.unwrap();
826        let pending_certificate2 = rx_ready_certificates.recv().await.unwrap();
827        assert_eq!(
828            pending_certificate.certificate.digest(),
829            pending_certificate2.certificate.digest()
830        );
831
832        // Tests that pending certificate stats are recorded properly. The ready time should be
833        // 2 seconds apart from the enqueue time.
834        assert!(pending_certificate.stats.enqueue_time >= tx_start_time);
835        assert!(
836            pending_certificate.stats.ready_time.unwrap() - pending_certificate.stats.enqueue_time
837                >= Duration::from_secs(2)
838        );
839
840        // Predent we have just executed the transaction.
841        drop(pending_certificate);
842        drop(pending_certificate2);
843
844        // scheduler should be empty at the end.
845        execution_scheduler.check_empty_for_testing().await;
846    }
847
848    // Tests when objects become available, correct set of transactions can be sent to execute.
849    // Specifically, we have following setup,
850    //         shared_object     shared_object_2
851    //       /    |    \     \    /
852    //    tx_0  tx_1  tx_2    tx_3
853    //     r      r     w      r
854    // And when shared_object is available, tx_0, tx_1, and tx_2 can be executed. And when
855    // shared_object_2 becomes available, tx_3 can be executed.
856    #[tokio::test(flavor = "current_thread", start_paused = true)]
857    async fn execution_scheduler_object_dependency() {
858        telemetry_subscribers::init_for_testing();
859        // Initialize an authority state, with gas objects and a shared object.
860        let (owner, _keypair) = deterministic_random_account_key();
861        let gas_objects: Vec<Object> = (0..10)
862            .map(|_| {
863                let gas_object_id = ObjectID::random();
864                Object::with_id_owner_for_testing(gas_object_id, owner)
865            })
866            .collect();
867        let shared_object = Object::shared_for_testing();
868        let initial_shared_version = shared_object.owner().start_version().unwrap();
869        let shared_object_2 = Object::shared_for_testing();
870        let initial_shared_version_2 = shared_object_2.owner().start_version().unwrap();
871
872        let state = init_state_with_objects(
873            [
874                gas_objects.clone(),
875                vec![shared_object.clone(), shared_object_2.clone()],
876            ]
877            .concat(),
878        )
879        .await;
880
881        // Create a new execution scheduler instead of reusing the authority's, to examine
882        // execution_scheduler output from rx_ready_certificates.
883        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
884        // scheduler should output no transaction.
885        assert!(rx_ready_certificates.try_recv().is_err());
886
887        // Enqueue two transactions with the same shared object input in read-only mode.
888        let shared_version = 1000.into();
889        let shared_object_arg_read = ObjectArg::SharedObject {
890            id: shared_object.id(),
891            initial_shared_version,
892            mutability: SharedObjectMutability::Immutable,
893        };
894        let transaction_read_0 = make_transaction(
895            gas_objects[0].clone(),
896            vec![CallArg::Object(shared_object_arg_read)],
897        );
898        let transaction_read_1 = make_transaction(
899            gas_objects[1].clone(),
900            vec![CallArg::Object(shared_object_arg_read)],
901        );
902        let tx_read_0_assigned_versions = vec![(
903            (
904                shared_object.id(),
905                shared_object.owner().start_version().unwrap(),
906            ),
907            shared_version,
908        )];
909        let tx_read_1_assigned_versions = vec![(
910            (
911                shared_object.id(),
912                shared_object.owner().start_version().unwrap(),
913            ),
914            shared_version,
915        )];
916
917        // Enqueue one transaction with the same shared object in mutable mode.
918        let shared_object_arg_default = ObjectArg::SharedObject {
919            id: shared_object.id(),
920            initial_shared_version,
921            mutability: SharedObjectMutability::Mutable,
922        };
923        let transaction_default = make_transaction(
924            gas_objects[2].clone(),
925            vec![CallArg::Object(shared_object_arg_default)],
926        );
927        let tx_default_assigned_versions = vec![(
928            (
929                shared_object.id(),
930                shared_object.owner().start_version().unwrap(),
931            ),
932            shared_version,
933        )];
934
935        // Enqueue one transaction with two readonly shared object inputs, `shared_object` and `shared_object_2`.
936        let shared_version_2 = 1000.into();
937        let shared_object_arg_read_2 = ObjectArg::SharedObject {
938            id: shared_object_2.id(),
939            initial_shared_version: initial_shared_version_2,
940            mutability: SharedObjectMutability::Immutable,
941        };
942        let transaction_read_2 = make_transaction(
943            gas_objects[3].clone(),
944            vec![
945                CallArg::Object(shared_object_arg_default),
946                CallArg::Object(shared_object_arg_read_2),
947            ],
948        );
949        let tx_read_2_assigned_versions = vec![
950            (
951                (
952                    shared_object.id(),
953                    shared_object.owner().start_version().unwrap(),
954                ),
955                shared_version,
956            ),
957            (
958                (
959                    shared_object_2.id(),
960                    shared_object_2.owner().start_version().unwrap(),
961                ),
962                shared_version_2,
963            ),
964        ];
965
966        execution_scheduler.enqueue_transactions(
967            vec![
968                (
969                    transaction_read_0.clone(),
970                    ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
971                        tx_read_0_assigned_versions,
972                        None,
973                    )),
974                ),
975                (
976                    transaction_read_1.clone(),
977                    ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
978                        tx_read_1_assigned_versions,
979                        None,
980                    )),
981                ),
982                (
983                    transaction_default.clone(),
984                    ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
985                        tx_default_assigned_versions,
986                        None,
987                    )),
988                ),
989                (
990                    transaction_read_2.clone(),
991                    ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
992                        tx_read_2_assigned_versions,
993                        None,
994                    )),
995                ),
996            ],
997            &state.epoch_store_for_testing(),
998        );
999
1000        // scheduler should output no transaction yet.
1001        sleep(Duration::from_secs(1)).await;
1002        assert!(rx_ready_certificates.try_recv().is_err());
1003
1004        assert_eq!(execution_scheduler.num_pending_certificates(), 4);
1005
1006        // Notify scheduler about availability of the first shared object.
1007        let mut new_shared_object = shared_object.clone();
1008        new_shared_object
1009            .data
1010            .try_as_move_mut()
1011            .unwrap()
1012            .increment_version_to(shared_version_2);
1013        state
1014            .get_cache_writer()
1015            .write_object_entry_for_test(new_shared_object);
1016
1017        // scheduler should output the 3 transactions that are only waiting for this object.
1018        let tx_0 = rx_ready_certificates.recv().await.unwrap().certificate;
1019        let tx_1 = rx_ready_certificates.recv().await.unwrap().certificate;
1020        let tx_2 = rx_ready_certificates.recv().await.unwrap().certificate;
1021        {
1022            let mut want_digests = vec![
1023                transaction_read_0.digest(),
1024                transaction_read_1.digest(),
1025                transaction_default.digest(),
1026            ];
1027            want_digests.sort();
1028            let mut got_digests = vec![tx_0.digest(), tx_1.digest(), tx_2.digest()];
1029            got_digests.sort();
1030            assert_eq!(want_digests, got_digests);
1031        }
1032
1033        sleep(Duration::from_secs(1)).await;
1034        assert!(rx_ready_certificates.try_recv().is_err());
1035
1036        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1037
1038        // Make shared_object_2 available.
1039        let mut new_shared_object_2 = shared_object_2.clone();
1040        new_shared_object_2
1041            .data
1042            .try_as_move_mut()
1043            .unwrap()
1044            .increment_version_to(shared_version_2);
1045        state
1046            .get_cache_writer()
1047            .write_object_entry_for_test(new_shared_object_2);
1048
1049        // Now, the transaction waiting for both shared objects can be executed.
1050        let tx_3 = rx_ready_certificates.recv().await.unwrap().certificate;
1051        assert_eq!(transaction_read_2.digest(), tx_3.digest());
1052
1053        sleep(Duration::from_secs(1)).await;
1054        assert!(rx_ready_certificates.try_recv().is_err());
1055
1056        execution_scheduler.check_empty_for_testing().await;
1057    }
1058
1059    #[tokio::test(flavor = "current_thread", start_paused = true)]
1060    async fn execution_scheduler_receiving_notify_commit() {
1061        telemetry_subscribers::init_for_testing();
1062        // Initialize an authority state.
1063        let (owner, _keypair) = deterministic_random_account_key();
1064        let gas_objects: Vec<Object> = (0..10)
1065            .map(|_| {
1066                let gas_object_id = ObjectID::random();
1067                Object::with_id_owner_for_testing(gas_object_id, owner)
1068            })
1069            .collect();
1070        let state = init_state_with_objects(gas_objects.clone()).await;
1071
1072        // Create a new execution scheduler instead of reusing the authority's, to examine
1073        // execution_scheduler output from rx_ready_certificates.
1074        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1075        // scheduler should output no transaction.
1076        assert!(rx_ready_certificates.try_recv().is_err());
1077        // scheduler should be empty at the beginning.
1078        execution_scheduler.check_empty_for_testing().await;
1079
1080        let obj_id = ObjectID::random();
1081        let object_arguments: Vec<_> = (0..10)
1082            .map(|i| {
1083                let object = Object::with_id_owner_version_for_testing(
1084                    obj_id,
1085                    i.into(),
1086                    Owner::AddressOwner(owner),
1087                );
1088                // Every other transaction receives the object, and we create a run of multiple receives in
1089                // a row at the beginning to test that the scheduler doesn't get stuck in either configuration of:
1090                // ImmOrOwnedObject => Receiving,
1091                // Receiving => Receiving
1092                // Receiving => ImmOrOwnedObject
1093                // ImmOrOwnedObject => ImmOrOwnedObject is already tested as the default case on mainnet.
1094                let object_arg = if i % 2 == 0 || i == 3 {
1095                    ObjectArg::Receiving(object.compute_object_reference())
1096                } else {
1097                    ObjectArg::ImmOrOwnedObject(object.compute_object_reference())
1098                };
1099                let txn =
1100                    make_transaction(gas_objects[0].clone(), vec![CallArg::Object(object_arg)]);
1101                (object, txn)
1102            })
1103            .collect();
1104
1105        for (i, (_, txn)) in object_arguments.iter().enumerate() {
1106            // scheduler should output no transaction yet since waiting on receiving object or
1107            // ImmOrOwnedObject input.
1108            execution_scheduler.enqueue_transactions(
1109                vec![(txn.clone(), ExecutionEnv::new())],
1110                &state.epoch_store_for_testing(),
1111            );
1112            sleep(Duration::from_secs(1)).await;
1113            assert!(rx_ready_certificates.try_recv().is_err());
1114            assert_eq!(execution_scheduler.num_pending_certificates(), i + 1);
1115        }
1116
1117        // Now start to unravel the transactions by notifying that each subsequent
1118        // transaction has been processed.
1119        let len = object_arguments.len();
1120        for (i, (object, txn)) in object_arguments.into_iter().enumerate() {
1121            // Mark the object as available.
1122            // We should now eventually see the transaction as ready.
1123            state
1124                .get_cache_writer()
1125                .write_object_entry_for_test(object.clone());
1126
1127            // scheduler should output the transaction eventually now that the receiving object has become
1128            // available.
1129            rx_ready_certificates.recv().await.unwrap();
1130
1131            // Only one transaction at a time should become available though. So if we try to get
1132            // another one it should fail.
1133            sleep(Duration::from_secs(1)).await;
1134            assert!(rx_ready_certificates.try_recv().is_err());
1135
1136            // Notify the scheduler that the transaction has been processed.
1137            drop(txn);
1138
1139            // scheduler should now output another transaction to run since it the next version of that object
1140            // has become available.
1141            assert_eq!(execution_scheduler.num_pending_certificates(), len - i - 1);
1142        }
1143
1144        // After everything scheduler should be empty.
1145        execution_scheduler.check_empty_for_testing().await;
1146    }
1147
1148    #[tokio::test(flavor = "current_thread", start_paused = true)]
1149    async fn execution_scheduler_receiving_object_ready_notifications() {
1150        telemetry_subscribers::init_for_testing();
1151        // Initialize an authority state.
1152        let (owner, _keypair) = deterministic_random_account_key();
1153        let gas_objects: Vec<Object> = (0..10)
1154            .map(|_| {
1155                let gas_object_id = ObjectID::random();
1156                Object::with_id_owner_for_testing(gas_object_id, owner)
1157            })
1158            .collect();
1159        let state = init_state_with_objects(gas_objects.clone()).await;
1160
1161        // Create a new execution scheduler instead of reusing the authority's, to examine
1162        // execution_scheduler output from rx_ready_certificates.
1163        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1164        // scheduler should output no transaction.
1165        assert!(rx_ready_certificates.try_recv().is_err());
1166        // scheduler should be empty at the beginning.
1167        execution_scheduler.check_empty_for_testing().await;
1168
1169        let obj_id = ObjectID::random();
1170        let receiving_object_new0 =
1171            Object::with_id_owner_version_for_testing(obj_id, 0.into(), Owner::AddressOwner(owner));
1172        let receiving_object_new1 =
1173            Object::with_id_owner_version_for_testing(obj_id, 1.into(), Owner::AddressOwner(owner));
1174        let receiving_object_arg0 =
1175            ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1176        let receive_object_transaction0 = make_transaction(
1177            gas_objects[0].clone(),
1178            vec![CallArg::Object(receiving_object_arg0)],
1179        );
1180
1181        let receiving_object_arg1 =
1182            ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1183        let receive_object_transaction1 = make_transaction(
1184            gas_objects[0].clone(),
1185            vec![CallArg::Object(receiving_object_arg1)],
1186        );
1187
1188        // scheduler should output no transaction yet since waiting on receiving object.
1189        execution_scheduler.enqueue_transactions(
1190            vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1191            &state.epoch_store_for_testing(),
1192        );
1193        sleep(Duration::from_secs(1)).await;
1194        assert!(rx_ready_certificates.try_recv().is_err());
1195        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1196
1197        // scheduler should output no transaction yet since waiting on receiving object.
1198        execution_scheduler.enqueue_transactions(
1199            vec![(receive_object_transaction1.clone(), ExecutionEnv::new())],
1200            &state.epoch_store_for_testing(),
1201        );
1202        sleep(Duration::from_secs(1)).await;
1203        assert!(rx_ready_certificates.try_recv().is_err());
1204        assert_eq!(execution_scheduler.num_pending_certificates(), 2);
1205
1206        // Duplicate enqueue of receiving object is allowed.
1207        execution_scheduler.enqueue_transactions(
1208            vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1209            &state.epoch_store_for_testing(),
1210        );
1211        sleep(Duration::from_secs(1)).await;
1212        assert!(rx_ready_certificates.try_recv().is_err());
1213        assert_eq!(execution_scheduler.num_pending_certificates(), 3);
1214
1215        // Notify scheduler that the receiving object 0 is available.
1216        state
1217            .get_cache_writer()
1218            .write_object_entry_for_test(receiving_object_new0.clone());
1219
1220        // scheduler should output the transaction eventually now that the receiving object has become
1221        // available.
1222        rx_ready_certificates.recv().await.unwrap();
1223
1224        // Notify scheduler that the receiving object 0 is available.
1225        state
1226            .get_cache_writer()
1227            .write_object_entry_for_test(receiving_object_new1.clone());
1228
1229        // scheduler should output the transaction eventually now that the receiving object has become
1230        // available.
1231        rx_ready_certificates.recv().await.unwrap();
1232    }
1233
1234    #[tokio::test(flavor = "current_thread", start_paused = true)]
1235    async fn execution_scheduler_receiving_object_ready_notifications_multiple_of_same_receiving() {
1236        telemetry_subscribers::init_for_testing();
1237        // Initialize an authority state.
1238        let (owner, _keypair) = deterministic_random_account_key();
1239        let gas_objects: Vec<Object> = (0..10)
1240            .map(|_| {
1241                let gas_object_id = ObjectID::random();
1242                Object::with_id_owner_for_testing(gas_object_id, owner)
1243            })
1244            .collect();
1245        let state = init_state_with_objects(gas_objects.clone()).await;
1246
1247        // Create a new execution scheduler instead of reusing the authority's, to examine
1248        // execution_scheduler output from rx_ready_certificates.
1249        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1250        // scheduler should output no transaction.
1251        assert!(rx_ready_certificates.try_recv().is_err());
1252        // scheduler should be empty at the beginning.
1253        execution_scheduler.check_empty_for_testing().await;
1254
1255        let obj_id = ObjectID::random();
1256        let receiving_object_new0 =
1257            Object::with_id_owner_version_for_testing(obj_id, 0.into(), Owner::AddressOwner(owner));
1258        let receiving_object_new1 =
1259            Object::with_id_owner_version_for_testing(obj_id, 1.into(), Owner::AddressOwner(owner));
1260        let receiving_object_arg0 =
1261            ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1262        let receive_object_transaction0 = make_transaction(
1263            gas_objects[0].clone(),
1264            vec![CallArg::Object(receiving_object_arg0)],
1265        );
1266
1267        let receive_object_transaction01 = make_transaction(
1268            gas_objects[1].clone(),
1269            vec![CallArg::Object(receiving_object_arg0)],
1270        );
1271
1272        let receiving_object_arg1 =
1273            ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1274        let receive_object_transaction1 = make_transaction(
1275            gas_objects[0].clone(),
1276            vec![CallArg::Object(receiving_object_arg1)],
1277        );
1278
1279        // Enqueuing a transaction with a receiving object that is available at the time it is enqueued
1280        // should become immediately available.
1281        let gas_receiving_arg = ObjectArg::Receiving(gas_objects[3].compute_object_reference());
1282        let tx1 = make_transaction(
1283            gas_objects[0].clone(),
1284            vec![CallArg::Object(gas_receiving_arg)],
1285        );
1286
1287        // scheduler should output no transaction yet since waiting on receiving object.
1288        execution_scheduler.enqueue_transactions(
1289            vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1290            &state.epoch_store_for_testing(),
1291        );
1292        sleep(Duration::from_secs(1)).await;
1293        assert!(rx_ready_certificates.try_recv().is_err());
1294        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1295
1296        // scheduler should output no transaction yet since waiting on receiving object.
1297        execution_scheduler.enqueue_transactions(
1298            vec![(receive_object_transaction1.clone(), ExecutionEnv::new())],
1299            &state.epoch_store_for_testing(),
1300        );
1301        sleep(Duration::from_secs(1)).await;
1302        assert!(rx_ready_certificates.try_recv().is_err());
1303        assert_eq!(execution_scheduler.num_pending_certificates(), 2);
1304
1305        // Different transaction with a duplicate receiving object reference is allowed.
1306        // Both transaction's will be outputted once the receiving object is available.
1307        execution_scheduler.enqueue_transactions(
1308            vec![(receive_object_transaction01.clone(), ExecutionEnv::new())],
1309            &state.epoch_store_for_testing(),
1310        );
1311        sleep(Duration::from_secs(1)).await;
1312        assert!(rx_ready_certificates.try_recv().is_err());
1313        assert_eq!(execution_scheduler.num_pending_certificates(), 3);
1314
1315        // Notify scheduler that the receiving object 0 is available.
1316        state
1317            .get_cache_writer()
1318            .write_object_entry_for_test(receiving_object_new0.clone());
1319
1320        // scheduler should output both transactions depending on the receiving object now that the
1321        // transaction's receiving object has become available.
1322        rx_ready_certificates.recv().await.unwrap();
1323
1324        rx_ready_certificates.recv().await.unwrap();
1325
1326        // Only two transactions that were dependent on the receiving object should be output.
1327        assert!(rx_ready_certificates.try_recv().is_err());
1328
1329        // Enqueue a transaction with a receiving object that is available at the time it is enqueued.
1330        // This should be immediately available.
1331        execution_scheduler.enqueue_transactions(
1332            vec![(tx1.clone(), ExecutionEnv::new())],
1333            &state.epoch_store_for_testing(),
1334        );
1335        sleep(Duration::from_secs(1)).await;
1336        rx_ready_certificates.recv().await.unwrap();
1337
1338        // Notify scheduler that the receiving object 0 is available.
1339        state
1340            .get_cache_writer()
1341            .write_object_entry_for_test(receiving_object_new1.clone());
1342
1343        // scheduler should output the transaction eventually now that the receiving object has become
1344        // available.
1345        rx_ready_certificates.recv().await.unwrap();
1346    }
1347
1348    #[tokio::test(flavor = "current_thread", start_paused = true)]
1349    async fn execution_scheduler_receiving_object_ready_if_current_version_greater() {
1350        telemetry_subscribers::init_for_testing();
1351        // Initialize an authority state.
1352        let (owner, _keypair) = deterministic_random_account_key();
1353        let mut gas_objects: Vec<Object> = (0..10)
1354            .map(|_| {
1355                let gas_object_id = ObjectID::random();
1356                Object::with_id_owner_for_testing(gas_object_id, owner)
1357            })
1358            .collect();
1359        let receiving_object = Object::with_id_owner_version_for_testing(
1360            ObjectID::random(),
1361            10.into(),
1362            Owner::AddressOwner(owner),
1363        );
1364        gas_objects.push(receiving_object.clone());
1365        let state = init_state_with_objects(gas_objects.clone()).await;
1366
1367        // Create a new execution scheduler instead of reusing the authority's, to examine
1368        // execution_scheduler output from rx_ready_certificates.
1369        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1370        // scheduler should output no transaction.
1371        assert!(rx_ready_certificates.try_recv().is_err());
1372        // scheduler should be empty at the beginning.
1373        execution_scheduler.check_empty_for_testing().await;
1374
1375        let receiving_object_new0 = Object::with_id_owner_version_for_testing(
1376            receiving_object.id(),
1377            0.into(),
1378            Owner::AddressOwner(owner),
1379        );
1380        let receiving_object_new1 = Object::with_id_owner_version_for_testing(
1381            receiving_object.id(),
1382            1.into(),
1383            Owner::AddressOwner(owner),
1384        );
1385        let receiving_object_arg0 =
1386            ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1387        let receive_object_transaction0 = make_transaction(
1388            gas_objects[0].clone(),
1389            vec![CallArg::Object(receiving_object_arg0)],
1390        );
1391
1392        let receive_object_transaction01 = make_transaction(
1393            gas_objects[1].clone(),
1394            vec![CallArg::Object(receiving_object_arg0)],
1395        );
1396
1397        let receiving_object_arg1 =
1398            ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1399        let receive_object_transaction1 = make_transaction(
1400            gas_objects[0].clone(),
1401            vec![CallArg::Object(receiving_object_arg1)],
1402        );
1403
1404        // scheduler should output no transaction yet since waiting on receiving object.
1405        execution_scheduler.enqueue_transactions(
1406            vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1407            &state.epoch_store_for_testing(),
1408        );
1409        execution_scheduler.enqueue_transactions(
1410            vec![(receive_object_transaction01.clone(), ExecutionEnv::new())],
1411            &state.epoch_store_for_testing(),
1412        );
1413        execution_scheduler.enqueue_transactions(
1414            vec![(receive_object_transaction1.clone(), ExecutionEnv::new())],
1415            &state.epoch_store_for_testing(),
1416        );
1417        sleep(Duration::from_secs(1)).await;
1418        rx_ready_certificates.recv().await.unwrap();
1419        rx_ready_certificates.recv().await.unwrap();
1420        rx_ready_certificates.recv().await.unwrap();
1421        assert!(rx_ready_certificates.try_recv().is_err());
1422    }
1423
1424    // Tests transaction cancellation logic in execution scheduler. Mainly tests that for cancelled transaction,
1425    // execution scheduler only waits for all non-shared objects to be available before outputting the transaction.
1426    #[tokio::test(flavor = "current_thread", start_paused = true)]
1427    async fn execution_scheduler_with_cancelled_transactions() {
1428        // Initialize an authority state, with gas objects and 3 shared objects.
1429        let (owner, _keypair) = deterministic_random_account_key();
1430        let gas_object = Object::with_id_owner_for_testing(ObjectID::random(), owner);
1431        let shared_object_1 = Object::shared_for_testing();
1432        let initial_shared_version_1 = shared_object_1.owner().start_version().unwrap();
1433        let shared_object_2 = Object::shared_for_testing();
1434        let initial_shared_version_2 = shared_object_2.owner().start_version().unwrap();
1435        let owned_object = Object::with_id_owner_for_testing(ObjectID::random(), owner);
1436
1437        let state = init_state_with_objects(vec![
1438            gas_object.clone(),
1439            shared_object_1.clone(),
1440            shared_object_2.clone(),
1441            owned_object.clone(),
1442        ])
1443        .await;
1444
1445        // Create a new execution scheduler instead of reusing the authority's, to examine
1446        // execution_scheduler output from rx_ready_certificates.
1447        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1448        // scheduler should output no transaction.
1449        assert!(rx_ready_certificates.try_recv().is_err());
1450
1451        // Enqueue one transaction with 2 shared object inputs and 1 owned input.
1452        let shared_object_arg_1 = ObjectArg::SharedObject {
1453            id: shared_object_1.id(),
1454            initial_shared_version: initial_shared_version_1,
1455            mutability: SharedObjectMutability::Mutable,
1456        };
1457        let shared_object_arg_2 = ObjectArg::SharedObject {
1458            id: shared_object_2.id(),
1459            initial_shared_version: initial_shared_version_2,
1460            mutability: SharedObjectMutability::Mutable,
1461        };
1462
1463        // Changes the desired owned object version to a higher version. We will make it available later.
1464        let owned_version = 2000.into();
1465        let mut owned_ref = owned_object.compute_object_reference();
1466        owned_ref.1 = owned_version;
1467        let owned_object_arg = ObjectArg::ImmOrOwnedObject(owned_ref);
1468
1469        let cancelled_transaction = make_transaction(
1470            gas_object.clone(),
1471            vec![
1472                CallArg::Object(shared_object_arg_1),
1473                CallArg::Object(shared_object_arg_2),
1474                CallArg::Object(owned_object_arg),
1475            ],
1476        );
1477        let assigned_versions = vec![
1478            (
1479                (
1480                    shared_object_1.id(),
1481                    shared_object_1.owner().start_version().unwrap(),
1482                ),
1483                SequenceNumber::CANCELLED_READ,
1484            ),
1485            (
1486                (
1487                    shared_object_2.id(),
1488                    shared_object_2.owner().start_version().unwrap(),
1489                ),
1490                SequenceNumber::CONGESTED,
1491            ),
1492        ];
1493
1494        execution_scheduler.enqueue_transactions(
1495            vec![(
1496                cancelled_transaction.clone(),
1497                ExecutionEnv::new()
1498                    .with_assigned_versions(AssignedVersions::new(assigned_versions, None)),
1499            )],
1500            &state.epoch_store_for_testing(),
1501        );
1502
1503        // scheduler should output no transaction yet.
1504        sleep(Duration::from_secs(1)).await;
1505        assert!(rx_ready_certificates.try_recv().is_err());
1506
1507        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1508
1509        // Notify scheduler about availability of the owned object.
1510        let mut new_owned_object = owned_object.clone();
1511        new_owned_object
1512            .data
1513            .try_as_move_mut()
1514            .unwrap()
1515            .increment_version_to(owned_version);
1516        state
1517            .get_cache_writer()
1518            .write_object_entry_for_test(new_owned_object);
1519
1520        // scheduler should output the transaction as soon as the owned object is available.
1521        let available_txn = rx_ready_certificates.recv().await.unwrap().certificate;
1522        assert_eq!(available_txn.digest(), cancelled_transaction.digest());
1523
1524        sleep(Duration::from_secs(1)).await;
1525        assert!(rx_ready_certificates.try_recv().is_err());
1526
1527        execution_scheduler.check_empty_for_testing().await;
1528    }
1529
1530    #[test]
1531    fn test_barrier_dependency_builder() {
1532        let make_transaction = |non_exclusive_writes: Vec<u32>, exclusive_writes: Vec<u32>| {
1533            assert!(
1534                non_exclusive_writes
1535                    .iter()
1536                    .all(|id| !exclusive_writes.contains(id))
1537            );
1538            assert!(
1539                exclusive_writes
1540                    .iter()
1541                    .all(|id| !non_exclusive_writes.contains(id))
1542            );
1543
1544            let non_exclusive_writes = non_exclusive_writes
1545                .into_iter()
1546                .map(|id| ObjectID::from_single_byte(id as u8));
1547            let exclusive_writes = exclusive_writes
1548                .into_iter()
1549                .map(|id| ObjectID::from_single_byte(id as u8));
1550            let mut builder = ProgrammableTransactionBuilder::new();
1551            for non_exclusive_write in non_exclusive_writes {
1552                builder
1553                    .obj(ObjectArg::SharedObject {
1554                        id: non_exclusive_write,
1555                        initial_shared_version: SequenceNumber::new(),
1556                        mutability: SharedObjectMutability::NonExclusiveWrite,
1557                    })
1558                    .unwrap();
1559            }
1560
1561            for exclusive_write in exclusive_writes {
1562                builder
1563                    .obj(ObjectArg::SharedObject {
1564                        id: exclusive_write,
1565                        initial_shared_version: SequenceNumber::new(),
1566                        mutability: SharedObjectMutability::Mutable,
1567                    })
1568                    .unwrap();
1569            }
1570
1571            let tx = TransactionKind::ProgrammableTransaction(builder.finish());
1572            let tx_data =
1573                TransactionData::new(tx, SuiAddress::default(), random_object_ref(), 1, 1);
1574            Transaction::from_data_and_signer(tx_data, vec![])
1575        };
1576
1577        // One non-exclusive write, one exclusive write.
1578        {
1579            let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1580            let tx1 = make_transaction(vec![1], vec![]);
1581            let tx2 = make_transaction(vec![], vec![1]);
1582
1583            let tx1_deps =
1584                barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1585            let tx2_deps =
1586                barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1587            assert!(tx1_deps.is_empty());
1588            assert_eq!(Vec::from_iter(tx2_deps), vec![*tx1.digest()]);
1589        }
1590
1591        // One transaction has non-exclusive writes to two different objects, and then becomes
1592        // a dependency of two barriers
1593        {
1594            let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1595            let tx1 = make_transaction(vec![1, 2], vec![]);
1596            let tx2 = make_transaction(vec![], vec![1]);
1597            let tx3 = make_transaction(vec![], vec![2]);
1598
1599            let tx1_deps =
1600                barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1601            let tx2_deps =
1602                barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1603            let tx3_deps =
1604                barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1605            assert!(tx1_deps.is_empty());
1606            assert_eq!(Vec::from_iter(tx2_deps), vec![*tx1.digest()]);
1607            assert_eq!(Vec::from_iter(tx3_deps), vec![*tx1.digest()]);
1608        }
1609
1610        // Ensure multiple-object dependences are merged
1611        {
1612            let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1613            let tx1 = make_transaction(vec![1], vec![]);
1614            let tx2 = make_transaction(vec![2], vec![]);
1615            let tx3 = make_transaction(vec![], vec![1, 2]);
1616
1617            let tx1_deps =
1618                barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1619            let tx2_deps =
1620                barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1621            let tx3_deps =
1622                barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1623            assert!(tx1_deps.is_empty());
1624            assert!(tx2_deps.is_empty());
1625            assert_eq!(tx3_deps, BTreeSet::from([*tx1.digest(), *tx2.digest()]));
1626        }
1627
1628        // Ensure dependency state is cleared
1629        {
1630            let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1631            let tx1 = make_transaction(vec![1], vec![]);
1632            let tx2 = make_transaction(vec![], vec![1]);
1633            let tx3 = make_transaction(vec![], vec![1]);
1634
1635            let tx1_deps =
1636                barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1637            let tx2_deps =
1638                barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1639            let tx3_deps =
1640                barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1641            assert!(tx1_deps.is_empty());
1642            assert_eq!(tx2_deps, BTreeSet::from([*tx1.digest()]));
1643            assert!(tx3_deps.is_empty());
1644        }
1645    }
1646}