sui_core/execution_scheduler/
execution_scheduler_impl.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{
5    accumulators::funds_read::AccountFundsRead,
6    authority::{
7        AuthorityMetrics, ExecutionEnv, authority_per_epoch_store::AuthorityPerEpochStore,
8        shared_object_version_manager::Schedulable,
9    },
10    execution_cache::{ObjectCacheRead, TransactionCacheRead},
11    execution_scheduler::{
12        ExecutingGuard, PendingCertificateStats,
13        funds_withdraw_scheduler::{
14            AddressFundsSchedulerMetrics, FundsSettlement, ScheduleStatus, TxFundsWithdraw,
15            WithdrawReservations, scheduler::FundsWithdrawScheduler,
16        },
17    },
18};
19use futures::stream::{FuturesUnordered, StreamExt};
20use mysten_common::{assert_reachable, debug_fatal};
21use mysten_metrics::spawn_monitored_task;
22use parking_lot::Mutex;
23use std::{
24    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
25    sync::Arc,
26};
27use sui_config::node::{AuthorityOverloadConfig, FundsWithdrawSchedulerType};
28use sui_types::{
29    SUI_ACCUMULATOR_ROOT_OBJECT_ID,
30    base_types::{FullObjectID, ObjectID},
31    digests::TransactionDigest,
32    error::SuiResult,
33    executable_transaction::VerifiedExecutableTransaction,
34    storage::InputKey,
35    transaction::{
36        SenderSignedData, SharedInputObject, SharedObjectMutability, TransactionData,
37        TransactionDataAPI, TransactionKey,
38    },
39};
40use tokio::sync::mpsc::UnboundedSender;
41use tokio::time::Instant;
42use tracing::{debug, error, instrument};
43
44use super::{PendingCertificate, overload_tracker::OverloadTracker};
45
46/// Utility struct for collecting barrier dependencies
47pub(crate) struct BarrierDependencyBuilder {
48    dep_state: BTreeMap<ObjectID, BTreeSet<TransactionDigest>>,
49}
50
51impl BarrierDependencyBuilder {
52    pub fn new() -> Self {
53        Self {
54            dep_state: Default::default(),
55        }
56    }
57
58    /// process_tx must be called for each transaction in scheduling order. If the
59    /// transaction has a non-exclusive write to an object, the transaction digest is
60    /// stored to become a dependency of the eventual barrier transaction. If a
61    /// transaction has an exclusive write to an object, all pending non-exclusive write
62    /// transactions for that object are added to the barrier dependencies.
63    pub fn process_tx(
64        &mut self,
65        tx_digest: TransactionDigest,
66        tx: &TransactionData,
67    ) -> BTreeSet<TransactionDigest> {
68        let mut barrier_deps = BTreeSet::new();
69        for SharedInputObject { id, mutability, .. } in tx.kind().shared_input_objects() {
70            match mutability {
71                SharedObjectMutability::NonExclusiveWrite => {
72                    self.dep_state.entry(id).or_default().insert(tx_digest);
73                }
74                SharedObjectMutability::Mutable => {
75                    // If there were preceding non-exclusive writes to this object id, this
76                    // transaction is a barrier and must wait for them to finish.
77                    if let Some(deps) = self.dep_state.remove(&id) {
78                        barrier_deps.extend(deps);
79                    }
80                }
81                SharedObjectMutability::Immutable => (),
82            }
83        }
84        barrier_deps
85    }
86}
87
88#[derive(Clone)]
89pub struct ExecutionScheduler {
90    object_cache_read: Arc<dyn ObjectCacheRead>,
91    transaction_cache_read: Arc<dyn TransactionCacheRead>,
92    overload_tracker: Arc<OverloadTracker>,
93    tx_ready_certificates: UnboundedSender<PendingCertificate>,
94    address_funds_withdraw_scheduler: Arc<Mutex<Option<FundsWithdrawScheduler>>>,
95    funds_withdraw_scheduler_type: FundsWithdrawSchedulerType,
96    metrics: Arc<AuthorityMetrics>,
97    address_funds_scheduler_metrics: Arc<AddressFundsSchedulerMetrics>,
98}
99
100struct PendingGuard<'a> {
101    scheduler: &'a ExecutionScheduler,
102    cert: &'a VerifiedExecutableTransaction,
103}
104
105impl<'a> PendingGuard<'a> {
106    pub fn new(scheduler: &'a ExecutionScheduler, cert: &'a VerifiedExecutableTransaction) -> Self {
107        scheduler
108            .metrics
109            .transaction_manager_num_pending_certificates
110            .inc();
111        scheduler
112            .overload_tracker
113            .add_pending_certificate(cert.data());
114        Self { scheduler, cert }
115    }
116}
117
118impl Drop for PendingGuard<'_> {
119    fn drop(&mut self) {
120        self.scheduler
121            .metrics
122            .transaction_manager_num_pending_certificates
123            .dec();
124        self.scheduler
125            .overload_tracker
126            .remove_pending_certificate(self.cert.data());
127    }
128}
129
130impl ExecutionScheduler {
131    pub fn new(
132        object_cache_read: Arc<dyn ObjectCacheRead>,
133        account_funds_read: Arc<dyn AccountFundsRead>,
134        transaction_cache_read: Arc<dyn TransactionCacheRead>,
135        tx_ready_certificates: UnboundedSender<PendingCertificate>,
136        epoch_store: &Arc<AuthorityPerEpochStore>,
137        funds_withdraw_scheduler_type: FundsWithdrawSchedulerType,
138        metrics: Arc<AuthorityMetrics>,
139        prometheus_registry: &prometheus::Registry,
140    ) -> Self {
141        tracing::info!(
142            ?funds_withdraw_scheduler_type,
143            "Creating new ExecutionScheduler"
144        );
145        let address_funds_scheduler_metrics =
146            Arc::new(AddressFundsSchedulerMetrics::new(prometheus_registry));
147        let address_funds_withdraw_scheduler = Self::initialize_funds_withdraw_scheduler(
148            epoch_store,
149            &object_cache_read,
150            account_funds_read,
151            funds_withdraw_scheduler_type,
152            &address_funds_scheduler_metrics,
153        );
154        Self {
155            object_cache_read,
156            transaction_cache_read,
157            overload_tracker: Arc::new(OverloadTracker::new()),
158            tx_ready_certificates,
159            address_funds_withdraw_scheduler: Arc::new(Mutex::new(
160                address_funds_withdraw_scheduler,
161            )),
162            funds_withdraw_scheduler_type,
163            metrics,
164            address_funds_scheduler_metrics,
165        }
166    }
167
168    fn initialize_funds_withdraw_scheduler(
169        epoch_store: &Arc<AuthorityPerEpochStore>,
170        object_cache_read: &Arc<dyn ObjectCacheRead>,
171        account_funds_read: Arc<dyn AccountFundsRead>,
172        scheduler_type: FundsWithdrawSchedulerType,
173        address_funds_scheduler_metrics: &Arc<AddressFundsSchedulerMetrics>,
174    ) -> Option<FundsWithdrawScheduler> {
175        let withdraw_scheduler_enabled =
176            epoch_store.is_validator() && epoch_store.accumulators_enabled();
177        if !withdraw_scheduler_enabled {
178            return None;
179        }
180        let starting_accumulator_version = object_cache_read
181            .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
182            .expect("Accumulator root object must be present if funds accumulator is enabled")
183            .version();
184        let address_funds_withdraw_scheduler = FundsWithdrawScheduler::new(
185            account_funds_read.clone(),
186            starting_accumulator_version,
187            scheduler_type,
188            address_funds_scheduler_metrics.clone(),
189        );
190
191        Some(address_funds_withdraw_scheduler)
192    }
193
194    #[instrument(level = "debug", skip_all, fields(tx_digest = ?cert.digest()))]
195    async fn schedule_transaction(
196        self,
197        cert: VerifiedExecutableTransaction,
198        execution_env: ExecutionEnv,
199        epoch_store: &Arc<AuthorityPerEpochStore>,
200    ) {
201        let enqueue_time = Instant::now();
202        let tx_digest = cert.digest();
203        let digests = [*tx_digest];
204
205        let tx_data = cert.transaction_data();
206        let input_object_kinds = tx_data
207            .input_objects()
208            .expect("input_objects() cannot fail");
209        let input_object_keys: Vec<_> = epoch_store
210            .get_input_object_keys(
211                &cert.key(),
212                &input_object_kinds,
213                &execution_env.assigned_versions,
214            )
215            .into_iter()
216            .collect();
217
218        let receiving_object_keys: HashSet<_> = tx_data
219            .receiving_objects()
220            .into_iter()
221            .map(|entry| {
222                InputKey::VersionedObject {
223                    // TODO: Add support for receiving ConsensusV2 objects. For now this assumes fastpath.
224                    id: FullObjectID::new(entry.0, None),
225                    version: entry.1,
226                }
227            })
228            .collect();
229        let input_and_receiving_keys = [
230            input_object_keys,
231            receiving_object_keys.iter().cloned().collect(),
232        ]
233        .concat();
234
235        let epoch = epoch_store.epoch();
236        debug!(
237            ?tx_digest,
238            "Scheduled transaction, waiting for input objects: {:?}", input_and_receiving_keys,
239        );
240
241        let availability = self
242            .object_cache_read
243            .multi_input_objects_available_cache_only(&input_and_receiving_keys);
244        // Most of the times, the transaction's input objects are already available.
245        // We can check the availability of the input objects first, and only wait for the
246        // missing input objects if necessary.
247        let missing_input_keys: Vec<_> = input_and_receiving_keys
248            .into_iter()
249            .zip(availability)
250            .filter_map(|(key, available)| if !available { Some(key) } else { None })
251            .collect();
252
253        let has_missing_barrier_dependencies = self
254            .transaction_cache_read
255            .multi_get_executed_effects_digests(&execution_env.barrier_dependencies)
256            .into_iter()
257            .any(|r| r.is_none());
258
259        if missing_input_keys.is_empty() && !has_missing_barrier_dependencies {
260            self.metrics
261                .transaction_manager_num_enqueued_certificates
262                .with_label_values(&["ready"])
263                .inc();
264            debug!(?tx_digest, "Input objects already available");
265            self.send_transaction_for_execution(&cert, execution_env, enqueue_time);
266            return;
267        }
268
269        let _pending_guard = PendingGuard::new(&self, &cert);
270        self.metrics
271            .transaction_manager_num_enqueued_certificates
272            .with_label_values(&["pending"])
273            .inc();
274
275        if !execution_env.barrier_dependencies.is_empty() {
276            debug!(
277                "waiting for barrier dependencies to be executed: {:?}",
278                execution_env.barrier_dependencies
279            );
280            self.transaction_cache_read
281                .notify_read_executed_effects_digests(
282                    "wait_for_barrier_dependencies",
283                    &execution_env.barrier_dependencies,
284                )
285                .await;
286        }
287
288        tokio::select! {
289            _ = self.object_cache_read
290                .notify_read_input_objects(&missing_input_keys, &receiving_object_keys, epoch)
291                => {
292                    self.metrics
293                        .transaction_manager_transaction_queue_age_s
294                        .observe(enqueue_time.elapsed().as_secs_f64());
295                    debug!(?tx_digest, "Input objects available");
296                    // TODO: Eventually we could fold execution_driver into the scheduler.
297                    self.send_transaction_for_execution(
298                        &cert,
299                        execution_env,
300                        enqueue_time,
301                    );
302                }
303            _ = self.transaction_cache_read.notify_read_executed_effects_digests(
304                "ExecutionScheduler::notify_read_executed_effects_digests",
305                &digests,
306            ) => {
307                debug!(?tx_digest, "Transaction already executed");
308            }
309        };
310    }
311
312    pub fn send_transaction_for_execution(
313        &self,
314        cert: &VerifiedExecutableTransaction,
315        execution_env: ExecutionEnv,
316        enqueue_time: Instant,
317    ) {
318        let pending_cert = PendingCertificate {
319            certificate: cert.clone(),
320            execution_env,
321            stats: PendingCertificateStats {
322                enqueue_time,
323                ready_time: Some(Instant::now()),
324            },
325            executing_guard: Some(ExecutingGuard::new(
326                self.metrics
327                    .transaction_manager_num_executing_certificates
328                    .clone(),
329            )),
330        };
331        let _ = self.tx_ready_certificates.send(pending_cert);
332    }
333
334    fn schedule_funds_withdraws(
335        &self,
336        certs: Vec<(VerifiedExecutableTransaction, ExecutionEnv)>,
337        epoch_store: &Arc<AuthorityPerEpochStore>,
338    ) {
339        if certs.is_empty() {
340            return;
341        }
342        let mut withdraws = BTreeMap::new();
343        let mut prev_version = None;
344        for (cert, env) in &certs {
345            let tx_withdraws = cert
346                .transaction_data()
347                .process_funds_withdrawals_for_execution(epoch_store.get_chain_identifier());
348            assert!(!tx_withdraws.is_empty());
349            let accumulator_version = env
350                .assigned_versions
351                .accumulator_version
352                .expect("accumulator_version must be set when there are withdraws");
353            if let Some(prev_version) = prev_version {
354                // Transactions must be in order.
355                assert!(prev_version <= accumulator_version);
356            }
357            prev_version = Some(accumulator_version);
358            let tx_digest = *cert.digest();
359            withdraws
360                .entry(accumulator_version)
361                .or_insert(Vec::new())
362                .push(TxFundsWithdraw {
363                    tx_digest,
364                    reservations: tx_withdraws,
365                });
366        }
367        let mut receivers = FuturesUnordered::new();
368        {
369            let guard = self.address_funds_withdraw_scheduler.lock();
370            let withdraw_scheduler = guard
371                .as_ref()
372                .expect("Funds withdraw scheduler must be enabled if there are withdraws");
373            for (version, tx_withdraws) in withdraws {
374                receivers.extend(withdraw_scheduler.schedule_withdraws(WithdrawReservations {
375                    accumulator_version: version,
376                    withdraws: tx_withdraws,
377                }));
378            }
379            // guard will be dropped here
380        }
381        let scheduler = self.clone();
382        let epoch_store = epoch_store.clone();
383        spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
384            let mut cert_map = HashMap::new();
385            for (cert, env) in certs {
386                cert_map.insert(*cert.digest(), (cert, env));
387            }
388            while let Some(result) = receivers.next().await {
389                match result {
390                    Ok((tx_digest, status)) => match status {
391                        ScheduleStatus::InsufficientFunds => {
392                            assert_reachable!("tx cancelled, insufficient funds");
393                            debug!(
394                                ?tx_digest,
395                                "Funds withdraw scheduling result: Insufficient funds"
396                            );
397                            let (cert, env) = cert_map.remove(&tx_digest).expect("cert must exist");
398                            let env = env.with_insufficient_funds();
399                            scheduler.enqueue_transactions(vec![(cert, env)], &epoch_store);
400                        }
401                        ScheduleStatus::SufficientFunds => {
402                            assert_reachable!("tx scheduled, sufficient funds");
403                            debug!(?tx_digest, "Funds withdraw scheduling result: Success");
404                            let (cert, env) = cert_map.remove(&tx_digest).expect("cert must exist");
405                            scheduler.enqueue_transactions(vec![(cert, env)], &epoch_store);
406                        }
407                        ScheduleStatus::SkipSchedule => {
408                            assert_reachable!("tx withdrawal scheduling skipped");
409                            debug!(?tx_digest, "Skip scheduling funds withdraw");
410                        }
411                    },
412                    Err(e) => {
413                        error!("Withdraw scheduler stopped: {:?}", e);
414                    }
415                }
416            }
417        }));
418    }
419
420    fn schedule_tx_keys(
421        &self,
422        tx_with_keys: Vec<(TransactionKey, ExecutionEnv)>,
423        epoch_store: &Arc<AuthorityPerEpochStore>,
424    ) {
425        if tx_with_keys.is_empty() {
426            return;
427        }
428
429        let scheduler = self.clone();
430        let epoch_store = epoch_store.clone();
431        spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
432            let tx_keys: Vec<_> = tx_with_keys.iter().map(|(key, _)| key).cloned().collect();
433            let digests = epoch_store
434                .notify_read_tx_key_to_digest(&tx_keys)
435                .await
436                .expect("db error");
437            let transactions = scheduler
438                .transaction_cache_read
439                .multi_get_transaction_blocks(&digests)
440                .into_iter()
441                .map(|tx| {
442                    let tx = tx.expect("tx must exist").as_ref().clone();
443                    VerifiedExecutableTransaction::new_system(tx, epoch_store.epoch())
444                })
445                .zip(tx_with_keys.into_iter().map(|(_, env)| env))
446                .collect::<Vec<_>>();
447            scheduler.enqueue_transactions(transactions, &epoch_store);
448        }));
449    }
450
451    /// When we schedule a certificate, it should be impossible for it to have been executed in a
452    /// previous epoch.
453    #[cfg(debug_assertions)]
454    fn assert_cert_not_executed_previous_epochs(&self, cert: &VerifiedExecutableTransaction) {
455        let epoch = cert.epoch();
456        let digest = *cert.digest();
457        let digests = [digest];
458        let executed = self
459            .transaction_cache_read
460            .multi_get_executed_effects(&digests)
461            .pop()
462            .unwrap();
463        // Due to pruning, we may not always have an executed effects for the certificate
464        // even if it was executed. So this is a best-effort check.
465        if let Some(executed) = executed {
466            use sui_types::effects::TransactionEffectsAPI;
467
468            assert_eq!(
469                executed.executed_epoch(),
470                epoch,
471                "Transaction {} was executed in epoch {}, but scheduled again in epoch {}",
472                digest,
473                executed.executed_epoch(),
474                epoch
475            );
476        }
477    }
478}
479
480impl ExecutionScheduler {
481    pub fn enqueue(
482        &self,
483        certs: Vec<(Schedulable, ExecutionEnv)>,
484        epoch_store: &Arc<AuthorityPerEpochStore>,
485    ) {
486        // schedule all transactions immediately
487        let mut ordinary_txns = Vec::with_capacity(certs.len());
488        let mut tx_with_keys = Vec::new();
489        let mut tx_with_withdraws = Vec::new();
490
491        for (schedulable, env) in certs {
492            match schedulable {
493                Schedulable::Transaction(tx) => {
494                    if tx.transaction_data().has_funds_withdrawals() {
495                        tx_with_withdraws.push((tx, env));
496                    } else {
497                        ordinary_txns.push((tx, env));
498                    }
499                }
500                s @ Schedulable::RandomnessStateUpdate(..) => {
501                    tx_with_keys.push((s.key(), env));
502                }
503                Schedulable::AccumulatorSettlement(_, _) => {
504                    unreachable!("handled by SettlementScheduler");
505                }
506                Schedulable::ConsensusCommitPrologue(_, _, _) => {
507                    // we only use Schedulable::ConsensusCommitPrologue as a temporary placeholder
508                    // during version assignment, by the time we schedule transactions it should be
509                    // converted to Schedulable::Transaction
510                    unreachable!("Schedulable::ConsensusCommitPrologue should not be enqueued");
511                }
512            }
513        }
514
515        self.enqueue_transactions(ordinary_txns, epoch_store);
516        self.schedule_tx_keys(tx_with_keys, epoch_store);
517        self.schedule_funds_withdraws(tx_with_withdraws, epoch_store);
518    }
519
520    pub fn enqueue_transactions(
521        &self,
522        certs: Vec<(VerifiedExecutableTransaction, ExecutionEnv)>,
523        epoch_store: &Arc<AuthorityPerEpochStore>,
524    ) {
525        // Filter out certificates from wrong epoch.
526        let certs: Vec<_> = certs
527            .into_iter()
528            .filter_map(|cert| {
529                if cert.0.epoch() == epoch_store.epoch() {
530                    #[cfg(debug_assertions)]
531                    self.assert_cert_not_executed_previous_epochs(&cert.0);
532
533                    Some(cert)
534                } else {
535                    debug_fatal!(
536                        "We should never enqueue certificate from wrong epoch. Expected={} Certificate={:?}",
537                        epoch_store.epoch(),
538                        cert.0.epoch()
539                    );
540                    None
541                }
542            })
543            .collect();
544        let digests: Vec<_> = certs.iter().map(|(cert, _)| *cert.digest()).collect();
545        let executed = self
546            .transaction_cache_read
547            .multi_get_executed_effects_digests(&digests);
548        let mut already_executed_certs_num = 0;
549        let pending_certs =
550            certs
551                .into_iter()
552                .zip(executed)
553                .filter_map(|((cert, execution_env), executed)| {
554                    if executed.is_none() {
555                        Some((cert, execution_env))
556                    } else {
557                        already_executed_certs_num += 1;
558                        None
559                    }
560                });
561
562        for (cert, execution_env) in pending_certs {
563            let scheduler = self.clone();
564            let epoch_store = epoch_store.clone();
565            spawn_monitored_task!(
566                epoch_store.within_alive_epoch(scheduler.schedule_transaction(
567                    cert,
568                    execution_env,
569                    &epoch_store,
570                ))
571            );
572        }
573
574        self.metrics
575            .transaction_manager_num_enqueued_certificates
576            .with_label_values(&["already_executed"])
577            .inc_by(already_executed_certs_num);
578    }
579
580    pub fn settle_address_funds(&self, settlement: FundsSettlement) {
581        self.address_funds_withdraw_scheduler
582            .lock()
583            .as_ref()
584            .expect("Funds withdraw scheduler must be enabled if there are settlements")
585            .settle_funds(settlement);
586    }
587
588    /// Reconfigure internal state at epoch start. This resets the funds withdraw scheduler
589    /// to the current accumulator root object version.
590    pub fn reconfigure(
591        &self,
592        new_epoch_store: &Arc<AuthorityPerEpochStore>,
593        account_funds_read: &Arc<dyn AccountFundsRead>,
594    ) {
595        let address_funds_withdraw_scheduler = Self::initialize_funds_withdraw_scheduler(
596            new_epoch_store,
597            &self.object_cache_read,
598            account_funds_read.clone(),
599            self.funds_withdraw_scheduler_type,
600            &self.address_funds_scheduler_metrics,
601        );
602        let mut guard = self.address_funds_withdraw_scheduler.lock();
603        if let Some(old_scheduler) = guard.as_ref() {
604            old_scheduler.close_epoch();
605        }
606        *guard = address_funds_withdraw_scheduler;
607        drop(guard);
608    }
609
610    pub fn check_execution_overload(
611        &self,
612        overload_config: &AuthorityOverloadConfig,
613        tx_data: &SenderSignedData,
614    ) -> SuiResult {
615        let inflight_queue_len = self.num_pending_certificates();
616        self.overload_tracker
617            .check_execution_overload(overload_config, tx_data, inflight_queue_len)
618    }
619
620    pub fn num_pending_certificates(&self) -> usize {
621        (self
622            .metrics
623            .transaction_manager_num_pending_certificates
624            .get()
625            + self
626                .metrics
627                .transaction_manager_num_executing_certificates
628                .get()) as usize
629    }
630
631    #[cfg(test)]
632    pub async fn check_empty_for_testing(&self) {
633        for _ in 0..10 {
634            if self.num_pending_certificates() == 0 {
635                return;
636            }
637            tokio::task::yield_now().await;
638        }
639        assert_eq!(self.num_pending_certificates(), 0);
640    }
641}
642
643#[cfg(test)]
644mod test {
645    use super::{
646        BarrierDependencyBuilder, ExecutionScheduler, FundsWithdrawSchedulerType,
647        PendingCertificate,
648    };
649    use crate::authority::ExecutionEnv;
650    use crate::authority::shared_object_version_manager::AssignedVersions;
651    use crate::authority::{AuthorityState, authority_tests::init_state_with_objects};
652    use std::collections::BTreeSet;
653    use std::{time::Duration, vec};
654    use sui_test_transaction_builder::TestTransactionBuilder;
655    use sui_types::base_types::{SuiAddress, random_object_ref};
656    use sui_types::executable_transaction::VerifiedExecutableTransaction;
657    use sui_types::object::Owner;
658    use sui_types::programmable_transaction_builder::ProgrammableTransactionBuilder;
659    use sui_types::transaction::{
660        SharedObjectMutability, Transaction, TransactionData, TransactionKind, VerifiedTransaction,
661    };
662    use sui_types::{
663        SUI_FRAMEWORK_PACKAGE_ID,
664        base_types::{ObjectID, SequenceNumber},
665        crypto::deterministic_random_account_key,
666        object::Object,
667        transaction::{CallArg, ObjectArg},
668    };
669    use tokio::time::Instant;
670    use tokio::{
671        sync::mpsc::{UnboundedReceiver, error::TryRecvError, unbounded_channel},
672        time::sleep,
673    };
674
675    #[allow(clippy::disallowed_methods)] // allow unbounded_channel()
676    fn make_execution_scheduler(
677        state: &AuthorityState,
678    ) -> (ExecutionScheduler, UnboundedReceiver<PendingCertificate>) {
679        // Create a new execution scheduler instead of reusing the authority's, to examine
680        // execution_scheduler output from rx_ready_certificates.
681        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
682        let registry = prometheus::Registry::new();
683        let execution_scheduler = ExecutionScheduler::new(
684            state.get_object_cache_reader().clone(),
685            state.get_account_funds_read().clone(),
686            state.get_transaction_cache_reader().clone(),
687            tx_ready_certificates,
688            &state.epoch_store_for_testing(),
689            FundsWithdrawSchedulerType::default(),
690            state.metrics.clone(),
691            &registry,
692        );
693
694        (execution_scheduler, rx_ready_certificates)
695    }
696
697    fn make_transaction(gas_object: Object, input: Vec<CallArg>) -> VerifiedExecutableTransaction {
698        // Use fake module, function, package and gas prices since they are irrelevant for testing
699        // execution scheduler.
700        let rgp = 100;
701        let (sender, keypair) = deterministic_random_account_key();
702        let transaction =
703            TestTransactionBuilder::new(sender, gas_object.compute_object_reference(), rgp)
704                .move_call(SUI_FRAMEWORK_PACKAGE_ID, "counter", "assert_value", input)
705                .build_and_sign(&keypair);
706        VerifiedExecutableTransaction::new_system(
707            VerifiedTransaction::new_unchecked(transaction),
708            0,
709        )
710    }
711
712    #[tokio::test(flavor = "current_thread", start_paused = true)]
713    async fn execution_scheduler_basics() {
714        // Initialize an authority state.
715        let (owner, _keypair) = deterministic_random_account_key();
716        let gas_objects: Vec<Object> = (0..10)
717            .map(|_| {
718                let gas_object_id = ObjectID::random();
719                Object::with_id_owner_for_testing(gas_object_id, owner)
720            })
721            .collect();
722        let state = init_state_with_objects(gas_objects.clone()).await;
723
724        // Create a new execution scheduler instead of reusing the authority's, to examine
725        // execution_scheduler output from rx_ready_certificates.
726        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
727        // scheduler should output no transaction.
728        assert!(
729            rx_ready_certificates
730                .try_recv()
731                .is_err_and(|err| err == TryRecvError::Empty)
732        );
733        // scheduler should be empty at the beginning.
734        assert_eq!(execution_scheduler.num_pending_certificates(), 0);
735
736        // Enqueue empty vec should not crash.
737        execution_scheduler.enqueue_transactions(vec![], &state.epoch_store_for_testing());
738        // scheduler should output no transaction.
739        assert!(
740            rx_ready_certificates
741                .try_recv()
742                .is_err_and(|err| err == TryRecvError::Empty)
743        );
744
745        // Enqueue a transaction with existing gas object, empty input.
746        let transaction = make_transaction(gas_objects[0].clone(), vec![]);
747        let tx_start_time = Instant::now();
748        execution_scheduler.enqueue_transactions(
749            vec![(transaction.clone(), ExecutionEnv::new())],
750            &state.epoch_store_for_testing(),
751        );
752        // scheduler should output the transaction eventually.
753        let pending_certificate = rx_ready_certificates.recv().await.unwrap();
754
755        // Tests that pending certificate stats are recorded properly.
756        assert!(pending_certificate.stats.enqueue_time >= tx_start_time);
757        assert!(
758            pending_certificate.stats.ready_time.unwrap() >= pending_certificate.stats.enqueue_time
759        );
760
761        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
762
763        // Predent we have just executed the transaction.
764        drop(pending_certificate);
765
766        // scheduler should be empty.
767        execution_scheduler.check_empty_for_testing().await;
768
769        // Enqueue a transaction with a new gas object, empty input.
770        let gas_object_new = Object::with_id_owner_version_for_testing(
771            ObjectID::random(),
772            0.into(),
773            Owner::AddressOwner(owner),
774        );
775        let transaction = make_transaction(gas_object_new.clone(), vec![]);
776        let tx_start_time = Instant::now();
777        execution_scheduler.enqueue_transactions(
778            vec![(transaction.clone(), ExecutionEnv::new())],
779            &state.epoch_store_for_testing(),
780        );
781        // scheduler should output no transaction yet.
782        sleep(Duration::from_secs(1)).await;
783        assert!(
784            rx_ready_certificates
785                .try_recv()
786                .is_err_and(|err| err == TryRecvError::Empty)
787        );
788
789        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
790
791        // Duplicated enqueue is allowed.
792        execution_scheduler.enqueue_transactions(
793            vec![(transaction.clone(), ExecutionEnv::new())],
794            &state.epoch_store_for_testing(),
795        );
796        sleep(Duration::from_secs(1)).await;
797        assert!(
798            rx_ready_certificates
799                .try_recv()
800                .is_err_and(|err| err == TryRecvError::Empty)
801        );
802
803        assert_eq!(execution_scheduler.num_pending_certificates(), 2);
804
805        // Notify scheduler about availability of the gas object.
806        state
807            .get_cache_writer()
808            .write_object_entry_for_test(gas_object_new);
809        // scheduler should output the transaction eventually.
810        // We will see both the original and the duplicated transaction.
811        let pending_certificate = rx_ready_certificates.recv().await.unwrap();
812        let pending_certificate2 = rx_ready_certificates.recv().await.unwrap();
813        assert_eq!(
814            pending_certificate.certificate.digest(),
815            pending_certificate2.certificate.digest()
816        );
817
818        // Tests that pending certificate stats are recorded properly. The ready time should be
819        // 2 seconds apart from the enqueue time.
820        assert!(pending_certificate.stats.enqueue_time >= tx_start_time);
821        assert!(
822            pending_certificate.stats.ready_time.unwrap() - pending_certificate.stats.enqueue_time
823                >= Duration::from_secs(2)
824        );
825
826        // Predent we have just executed the transaction.
827        drop(pending_certificate);
828        drop(pending_certificate2);
829
830        // scheduler should be empty at the end.
831        execution_scheduler.check_empty_for_testing().await;
832    }
833
834    // Tests when objects become available, correct set of transactions can be sent to execute.
835    // Specifically, we have following setup,
836    //         shared_object     shared_object_2
837    //       /    |    \     \    /
838    //    tx_0  tx_1  tx_2    tx_3
839    //     r      r     w      r
840    // And when shared_object is available, tx_0, tx_1, and tx_2 can be executed. And when
841    // shared_object_2 becomes available, tx_3 can be executed.
842    #[tokio::test(flavor = "current_thread", start_paused = true)]
843    async fn execution_scheduler_object_dependency() {
844        telemetry_subscribers::init_for_testing();
845        // Initialize an authority state, with gas objects and a shared object.
846        let (owner, _keypair) = deterministic_random_account_key();
847        let gas_objects: Vec<Object> = (0..10)
848            .map(|_| {
849                let gas_object_id = ObjectID::random();
850                Object::with_id_owner_for_testing(gas_object_id, owner)
851            })
852            .collect();
853        let shared_object = Object::shared_for_testing();
854        let initial_shared_version = shared_object.owner().start_version().unwrap();
855        let shared_object_2 = Object::shared_for_testing();
856        let initial_shared_version_2 = shared_object_2.owner().start_version().unwrap();
857
858        let state = init_state_with_objects(
859            [
860                gas_objects.clone(),
861                vec![shared_object.clone(), shared_object_2.clone()],
862            ]
863            .concat(),
864        )
865        .await;
866
867        // Create a new execution scheduler instead of reusing the authority's, to examine
868        // execution_scheduler output from rx_ready_certificates.
869        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
870        // scheduler should output no transaction.
871        assert!(rx_ready_certificates.try_recv().is_err());
872
873        // Enqueue two transactions with the same shared object input in read-only mode.
874        let shared_version = 1000.into();
875        let shared_object_arg_read = ObjectArg::SharedObject {
876            id: shared_object.id(),
877            initial_shared_version,
878            mutability: SharedObjectMutability::Immutable,
879        };
880        let transaction_read_0 = make_transaction(
881            gas_objects[0].clone(),
882            vec![CallArg::Object(shared_object_arg_read)],
883        );
884        let transaction_read_1 = make_transaction(
885            gas_objects[1].clone(),
886            vec![CallArg::Object(shared_object_arg_read)],
887        );
888        let tx_read_0_assigned_versions = vec![(
889            (
890                shared_object.id(),
891                shared_object.owner().start_version().unwrap(),
892            ),
893            shared_version,
894        )];
895        let tx_read_1_assigned_versions = vec![(
896            (
897                shared_object.id(),
898                shared_object.owner().start_version().unwrap(),
899            ),
900            shared_version,
901        )];
902
903        // Enqueue one transaction with the same shared object in mutable mode.
904        let shared_object_arg_default = ObjectArg::SharedObject {
905            id: shared_object.id(),
906            initial_shared_version,
907            mutability: SharedObjectMutability::Mutable,
908        };
909        let transaction_default = make_transaction(
910            gas_objects[2].clone(),
911            vec![CallArg::Object(shared_object_arg_default)],
912        );
913        let tx_default_assigned_versions = vec![(
914            (
915                shared_object.id(),
916                shared_object.owner().start_version().unwrap(),
917            ),
918            shared_version,
919        )];
920
921        // Enqueue one transaction with two readonly shared object inputs, `shared_object` and `shared_object_2`.
922        let shared_version_2 = 1000.into();
923        let shared_object_arg_read_2 = ObjectArg::SharedObject {
924            id: shared_object_2.id(),
925            initial_shared_version: initial_shared_version_2,
926            mutability: SharedObjectMutability::Immutable,
927        };
928        let transaction_read_2 = make_transaction(
929            gas_objects[3].clone(),
930            vec![
931                CallArg::Object(shared_object_arg_default),
932                CallArg::Object(shared_object_arg_read_2),
933            ],
934        );
935        let tx_read_2_assigned_versions = vec![
936            (
937                (
938                    shared_object.id(),
939                    shared_object.owner().start_version().unwrap(),
940                ),
941                shared_version,
942            ),
943            (
944                (
945                    shared_object_2.id(),
946                    shared_object_2.owner().start_version().unwrap(),
947                ),
948                shared_version_2,
949            ),
950        ];
951
952        execution_scheduler.enqueue_transactions(
953            vec![
954                (
955                    transaction_read_0.clone(),
956                    ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
957                        tx_read_0_assigned_versions,
958                        None,
959                    )),
960                ),
961                (
962                    transaction_read_1.clone(),
963                    ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
964                        tx_read_1_assigned_versions,
965                        None,
966                    )),
967                ),
968                (
969                    transaction_default.clone(),
970                    ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
971                        tx_default_assigned_versions,
972                        None,
973                    )),
974                ),
975                (
976                    transaction_read_2.clone(),
977                    ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
978                        tx_read_2_assigned_versions,
979                        None,
980                    )),
981                ),
982            ],
983            &state.epoch_store_for_testing(),
984        );
985
986        // scheduler should output no transaction yet.
987        sleep(Duration::from_secs(1)).await;
988        assert!(rx_ready_certificates.try_recv().is_err());
989
990        assert_eq!(execution_scheduler.num_pending_certificates(), 4);
991
992        // Notify scheduler about availability of the first shared object.
993        let mut new_shared_object = shared_object.clone();
994        new_shared_object
995            .data
996            .try_as_move_mut()
997            .unwrap()
998            .increment_version_to(shared_version_2);
999        state
1000            .get_cache_writer()
1001            .write_object_entry_for_test(new_shared_object);
1002
1003        // scheduler should output the 3 transactions that are only waiting for this object.
1004        let tx_0 = rx_ready_certificates.recv().await.unwrap().certificate;
1005        let tx_1 = rx_ready_certificates.recv().await.unwrap().certificate;
1006        let tx_2 = rx_ready_certificates.recv().await.unwrap().certificate;
1007        {
1008            let mut want_digests = vec![
1009                transaction_read_0.digest(),
1010                transaction_read_1.digest(),
1011                transaction_default.digest(),
1012            ];
1013            want_digests.sort();
1014            let mut got_digests = vec![tx_0.digest(), tx_1.digest(), tx_2.digest()];
1015            got_digests.sort();
1016            assert_eq!(want_digests, got_digests);
1017        }
1018
1019        sleep(Duration::from_secs(1)).await;
1020        assert!(rx_ready_certificates.try_recv().is_err());
1021
1022        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1023
1024        // Make shared_object_2 available.
1025        let mut new_shared_object_2 = shared_object_2.clone();
1026        new_shared_object_2
1027            .data
1028            .try_as_move_mut()
1029            .unwrap()
1030            .increment_version_to(shared_version_2);
1031        state
1032            .get_cache_writer()
1033            .write_object_entry_for_test(new_shared_object_2);
1034
1035        // Now, the transaction waiting for both shared objects can be executed.
1036        let tx_3 = rx_ready_certificates.recv().await.unwrap().certificate;
1037        assert_eq!(transaction_read_2.digest(), tx_3.digest());
1038
1039        sleep(Duration::from_secs(1)).await;
1040        assert!(rx_ready_certificates.try_recv().is_err());
1041
1042        execution_scheduler.check_empty_for_testing().await;
1043    }
1044
1045    #[tokio::test(flavor = "current_thread", start_paused = true)]
1046    async fn execution_scheduler_receiving_notify_commit() {
1047        telemetry_subscribers::init_for_testing();
1048        // Initialize an authority state.
1049        let (owner, _keypair) = deterministic_random_account_key();
1050        let gas_objects: Vec<Object> = (0..10)
1051            .map(|_| {
1052                let gas_object_id = ObjectID::random();
1053                Object::with_id_owner_for_testing(gas_object_id, owner)
1054            })
1055            .collect();
1056        let state = init_state_with_objects(gas_objects.clone()).await;
1057
1058        // Create a new execution scheduler instead of reusing the authority's, to examine
1059        // execution_scheduler output from rx_ready_certificates.
1060        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1061        // scheduler should output no transaction.
1062        assert!(rx_ready_certificates.try_recv().is_err());
1063        // scheduler should be empty at the beginning.
1064        execution_scheduler.check_empty_for_testing().await;
1065
1066        let obj_id = ObjectID::random();
1067        let object_arguments: Vec<_> = (0..10)
1068            .map(|i| {
1069                let object = Object::with_id_owner_version_for_testing(
1070                    obj_id,
1071                    i.into(),
1072                    Owner::AddressOwner(owner),
1073                );
1074                // Every other transaction receives the object, and we create a run of multiple receives in
1075                // a row at the beginning to test that the scheduler doesn't get stuck in either configuration of:
1076                // ImmOrOwnedObject => Receiving,
1077                // Receiving => Receiving
1078                // Receiving => ImmOrOwnedObject
1079                // ImmOrOwnedObject => ImmOrOwnedObject is already tested as the default case on mainnet.
1080                let object_arg = if i % 2 == 0 || i == 3 {
1081                    ObjectArg::Receiving(object.compute_object_reference())
1082                } else {
1083                    ObjectArg::ImmOrOwnedObject(object.compute_object_reference())
1084                };
1085                let txn =
1086                    make_transaction(gas_objects[0].clone(), vec![CallArg::Object(object_arg)]);
1087                (object, txn)
1088            })
1089            .collect();
1090
1091        for (i, (_, txn)) in object_arguments.iter().enumerate() {
1092            // scheduler should output no transaction yet since waiting on receiving object or
1093            // ImmOrOwnedObject input.
1094            execution_scheduler.enqueue_transactions(
1095                vec![(txn.clone(), ExecutionEnv::new())],
1096                &state.epoch_store_for_testing(),
1097            );
1098            sleep(Duration::from_secs(1)).await;
1099            assert!(rx_ready_certificates.try_recv().is_err());
1100            assert_eq!(execution_scheduler.num_pending_certificates(), i + 1);
1101        }
1102
1103        // Now start to unravel the transactions by notifying that each subsequent
1104        // transaction has been processed.
1105        let len = object_arguments.len();
1106        for (i, (object, txn)) in object_arguments.into_iter().enumerate() {
1107            // Mark the object as available.
1108            // We should now eventually see the transaction as ready.
1109            state
1110                .get_cache_writer()
1111                .write_object_entry_for_test(object.clone());
1112
1113            // scheduler should output the transaction eventually now that the receiving object has become
1114            // available.
1115            rx_ready_certificates.recv().await.unwrap();
1116
1117            // Only one transaction at a time should become available though. So if we try to get
1118            // another one it should fail.
1119            sleep(Duration::from_secs(1)).await;
1120            assert!(rx_ready_certificates.try_recv().is_err());
1121
1122            // Notify the scheduler that the transaction has been processed.
1123            drop(txn);
1124
1125            // scheduler should now output another transaction to run since it the next version of that object
1126            // has become available.
1127            assert_eq!(execution_scheduler.num_pending_certificates(), len - i - 1);
1128        }
1129
1130        // After everything scheduler should be empty.
1131        execution_scheduler.check_empty_for_testing().await;
1132    }
1133
1134    #[tokio::test(flavor = "current_thread", start_paused = true)]
1135    async fn execution_scheduler_receiving_object_ready_notifications() {
1136        telemetry_subscribers::init_for_testing();
1137        // Initialize an authority state.
1138        let (owner, _keypair) = deterministic_random_account_key();
1139        let gas_objects: Vec<Object> = (0..10)
1140            .map(|_| {
1141                let gas_object_id = ObjectID::random();
1142                Object::with_id_owner_for_testing(gas_object_id, owner)
1143            })
1144            .collect();
1145        let state = init_state_with_objects(gas_objects.clone()).await;
1146
1147        // Create a new execution scheduler instead of reusing the authority's, to examine
1148        // execution_scheduler output from rx_ready_certificates.
1149        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1150        // scheduler should output no transaction.
1151        assert!(rx_ready_certificates.try_recv().is_err());
1152        // scheduler should be empty at the beginning.
1153        execution_scheduler.check_empty_for_testing().await;
1154
1155        let obj_id = ObjectID::random();
1156        let receiving_object_new0 =
1157            Object::with_id_owner_version_for_testing(obj_id, 0.into(), Owner::AddressOwner(owner));
1158        let receiving_object_new1 =
1159            Object::with_id_owner_version_for_testing(obj_id, 1.into(), Owner::AddressOwner(owner));
1160        let receiving_object_arg0 =
1161            ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1162        let receive_object_transaction0 = make_transaction(
1163            gas_objects[0].clone(),
1164            vec![CallArg::Object(receiving_object_arg0)],
1165        );
1166
1167        let receiving_object_arg1 =
1168            ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1169        let receive_object_transaction1 = make_transaction(
1170            gas_objects[0].clone(),
1171            vec![CallArg::Object(receiving_object_arg1)],
1172        );
1173
1174        // scheduler should output no transaction yet since waiting on receiving object.
1175        execution_scheduler.enqueue_transactions(
1176            vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1177            &state.epoch_store_for_testing(),
1178        );
1179        sleep(Duration::from_secs(1)).await;
1180        assert!(rx_ready_certificates.try_recv().is_err());
1181        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1182
1183        // scheduler should output no transaction yet since waiting on receiving object.
1184        execution_scheduler.enqueue_transactions(
1185            vec![(receive_object_transaction1.clone(), ExecutionEnv::new())],
1186            &state.epoch_store_for_testing(),
1187        );
1188        sleep(Duration::from_secs(1)).await;
1189        assert!(rx_ready_certificates.try_recv().is_err());
1190        assert_eq!(execution_scheduler.num_pending_certificates(), 2);
1191
1192        // Duplicate enqueue of receiving object is allowed.
1193        execution_scheduler.enqueue_transactions(
1194            vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1195            &state.epoch_store_for_testing(),
1196        );
1197        sleep(Duration::from_secs(1)).await;
1198        assert!(rx_ready_certificates.try_recv().is_err());
1199        assert_eq!(execution_scheduler.num_pending_certificates(), 3);
1200
1201        // Notify scheduler that the receiving object 0 is available.
1202        state
1203            .get_cache_writer()
1204            .write_object_entry_for_test(receiving_object_new0.clone());
1205
1206        // scheduler should output the transaction eventually now that the receiving object has become
1207        // available.
1208        rx_ready_certificates.recv().await.unwrap();
1209
1210        // Notify scheduler that the receiving object 0 is available.
1211        state
1212            .get_cache_writer()
1213            .write_object_entry_for_test(receiving_object_new1.clone());
1214
1215        // scheduler should output the transaction eventually now that the receiving object has become
1216        // available.
1217        rx_ready_certificates.recv().await.unwrap();
1218    }
1219
1220    #[tokio::test(flavor = "current_thread", start_paused = true)]
1221    async fn execution_scheduler_receiving_object_ready_notifications_multiple_of_same_receiving() {
1222        telemetry_subscribers::init_for_testing();
1223        // Initialize an authority state.
1224        let (owner, _keypair) = deterministic_random_account_key();
1225        let gas_objects: Vec<Object> = (0..10)
1226            .map(|_| {
1227                let gas_object_id = ObjectID::random();
1228                Object::with_id_owner_for_testing(gas_object_id, owner)
1229            })
1230            .collect();
1231        let state = init_state_with_objects(gas_objects.clone()).await;
1232
1233        // Create a new execution scheduler instead of reusing the authority's, to examine
1234        // execution_scheduler output from rx_ready_certificates.
1235        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1236        // scheduler should output no transaction.
1237        assert!(rx_ready_certificates.try_recv().is_err());
1238        // scheduler should be empty at the beginning.
1239        execution_scheduler.check_empty_for_testing().await;
1240
1241        let obj_id = ObjectID::random();
1242        let receiving_object_new0 =
1243            Object::with_id_owner_version_for_testing(obj_id, 0.into(), Owner::AddressOwner(owner));
1244        let receiving_object_new1 =
1245            Object::with_id_owner_version_for_testing(obj_id, 1.into(), Owner::AddressOwner(owner));
1246        let receiving_object_arg0 =
1247            ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1248        let receive_object_transaction0 = make_transaction(
1249            gas_objects[0].clone(),
1250            vec![CallArg::Object(receiving_object_arg0)],
1251        );
1252
1253        let receive_object_transaction01 = make_transaction(
1254            gas_objects[1].clone(),
1255            vec![CallArg::Object(receiving_object_arg0)],
1256        );
1257
1258        let receiving_object_arg1 =
1259            ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1260        let receive_object_transaction1 = make_transaction(
1261            gas_objects[0].clone(),
1262            vec![CallArg::Object(receiving_object_arg1)],
1263        );
1264
1265        // Enqueuing a transaction with a receiving object that is available at the time it is enqueued
1266        // should become immediately available.
1267        let gas_receiving_arg = ObjectArg::Receiving(gas_objects[3].compute_object_reference());
1268        let tx1 = make_transaction(
1269            gas_objects[0].clone(),
1270            vec![CallArg::Object(gas_receiving_arg)],
1271        );
1272
1273        // scheduler should output no transaction yet since waiting on receiving object.
1274        execution_scheduler.enqueue_transactions(
1275            vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1276            &state.epoch_store_for_testing(),
1277        );
1278        sleep(Duration::from_secs(1)).await;
1279        assert!(rx_ready_certificates.try_recv().is_err());
1280        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1281
1282        // scheduler should output no transaction yet since waiting on receiving object.
1283        execution_scheduler.enqueue_transactions(
1284            vec![(receive_object_transaction1.clone(), ExecutionEnv::new())],
1285            &state.epoch_store_for_testing(),
1286        );
1287        sleep(Duration::from_secs(1)).await;
1288        assert!(rx_ready_certificates.try_recv().is_err());
1289        assert_eq!(execution_scheduler.num_pending_certificates(), 2);
1290
1291        // Different transaction with a duplicate receiving object reference is allowed.
1292        // Both transaction's will be outputted once the receiving object is available.
1293        execution_scheduler.enqueue_transactions(
1294            vec![(receive_object_transaction01.clone(), ExecutionEnv::new())],
1295            &state.epoch_store_for_testing(),
1296        );
1297        sleep(Duration::from_secs(1)).await;
1298        assert!(rx_ready_certificates.try_recv().is_err());
1299        assert_eq!(execution_scheduler.num_pending_certificates(), 3);
1300
1301        // Notify scheduler that the receiving object 0 is available.
1302        state
1303            .get_cache_writer()
1304            .write_object_entry_for_test(receiving_object_new0.clone());
1305
1306        // scheduler should output both transactions depending on the receiving object now that the
1307        // transaction's receiving object has become available.
1308        rx_ready_certificates.recv().await.unwrap();
1309
1310        rx_ready_certificates.recv().await.unwrap();
1311
1312        // Only two transactions that were dependent on the receiving object should be output.
1313        assert!(rx_ready_certificates.try_recv().is_err());
1314
1315        // Enqueue a transaction with a receiving object that is available at the time it is enqueued.
1316        // This should be immediately available.
1317        execution_scheduler.enqueue_transactions(
1318            vec![(tx1.clone(), ExecutionEnv::new())],
1319            &state.epoch_store_for_testing(),
1320        );
1321        sleep(Duration::from_secs(1)).await;
1322        rx_ready_certificates.recv().await.unwrap();
1323
1324        // Notify scheduler that the receiving object 0 is available.
1325        state
1326            .get_cache_writer()
1327            .write_object_entry_for_test(receiving_object_new1.clone());
1328
1329        // scheduler should output the transaction eventually now that the receiving object has become
1330        // available.
1331        rx_ready_certificates.recv().await.unwrap();
1332    }
1333
1334    #[tokio::test(flavor = "current_thread", start_paused = true)]
1335    async fn execution_scheduler_receiving_object_ready_if_current_version_greater() {
1336        telemetry_subscribers::init_for_testing();
1337        // Initialize an authority state.
1338        let (owner, _keypair) = deterministic_random_account_key();
1339        let mut gas_objects: Vec<Object> = (0..10)
1340            .map(|_| {
1341                let gas_object_id = ObjectID::random();
1342                Object::with_id_owner_for_testing(gas_object_id, owner)
1343            })
1344            .collect();
1345        let receiving_object = Object::with_id_owner_version_for_testing(
1346            ObjectID::random(),
1347            10.into(),
1348            Owner::AddressOwner(owner),
1349        );
1350        gas_objects.push(receiving_object.clone());
1351        let state = init_state_with_objects(gas_objects.clone()).await;
1352
1353        // Create a new execution scheduler instead of reusing the authority's, to examine
1354        // execution_scheduler output from rx_ready_certificates.
1355        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1356        // scheduler should output no transaction.
1357        assert!(rx_ready_certificates.try_recv().is_err());
1358        // scheduler should be empty at the beginning.
1359        execution_scheduler.check_empty_for_testing().await;
1360
1361        let receiving_object_new0 = Object::with_id_owner_version_for_testing(
1362            receiving_object.id(),
1363            0.into(),
1364            Owner::AddressOwner(owner),
1365        );
1366        let receiving_object_new1 = Object::with_id_owner_version_for_testing(
1367            receiving_object.id(),
1368            1.into(),
1369            Owner::AddressOwner(owner),
1370        );
1371        let receiving_object_arg0 =
1372            ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1373        let receive_object_transaction0 = make_transaction(
1374            gas_objects[0].clone(),
1375            vec![CallArg::Object(receiving_object_arg0)],
1376        );
1377
1378        let receive_object_transaction01 = make_transaction(
1379            gas_objects[1].clone(),
1380            vec![CallArg::Object(receiving_object_arg0)],
1381        );
1382
1383        let receiving_object_arg1 =
1384            ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1385        let receive_object_transaction1 = make_transaction(
1386            gas_objects[0].clone(),
1387            vec![CallArg::Object(receiving_object_arg1)],
1388        );
1389
1390        // scheduler should output no transaction yet since waiting on receiving object.
1391        execution_scheduler.enqueue_transactions(
1392            vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1393            &state.epoch_store_for_testing(),
1394        );
1395        execution_scheduler.enqueue_transactions(
1396            vec![(receive_object_transaction01.clone(), ExecutionEnv::new())],
1397            &state.epoch_store_for_testing(),
1398        );
1399        execution_scheduler.enqueue_transactions(
1400            vec![(receive_object_transaction1.clone(), ExecutionEnv::new())],
1401            &state.epoch_store_for_testing(),
1402        );
1403        sleep(Duration::from_secs(1)).await;
1404        rx_ready_certificates.recv().await.unwrap();
1405        rx_ready_certificates.recv().await.unwrap();
1406        rx_ready_certificates.recv().await.unwrap();
1407        assert!(rx_ready_certificates.try_recv().is_err());
1408    }
1409
1410    // Tests transaction cancellation logic in execution scheduler. Mainly tests that for cancelled transaction,
1411    // execution scheduler only waits for all non-shared objects to be available before outputting the transaction.
1412    #[tokio::test(flavor = "current_thread", start_paused = true)]
1413    async fn execution_scheduler_with_cancelled_transactions() {
1414        // Initialize an authority state, with gas objects and 3 shared objects.
1415        let (owner, _keypair) = deterministic_random_account_key();
1416        let gas_object = Object::with_id_owner_for_testing(ObjectID::random(), owner);
1417        let shared_object_1 = Object::shared_for_testing();
1418        let initial_shared_version_1 = shared_object_1.owner().start_version().unwrap();
1419        let shared_object_2 = Object::shared_for_testing();
1420        let initial_shared_version_2 = shared_object_2.owner().start_version().unwrap();
1421        let owned_object = Object::with_id_owner_for_testing(ObjectID::random(), owner);
1422
1423        let state = init_state_with_objects(vec![
1424            gas_object.clone(),
1425            shared_object_1.clone(),
1426            shared_object_2.clone(),
1427            owned_object.clone(),
1428        ])
1429        .await;
1430
1431        // Create a new execution scheduler instead of reusing the authority's, to examine
1432        // execution_scheduler output from rx_ready_certificates.
1433        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1434        // scheduler should output no transaction.
1435        assert!(rx_ready_certificates.try_recv().is_err());
1436
1437        // Enqueue one transaction with 2 shared object inputs and 1 owned input.
1438        let shared_object_arg_1 = ObjectArg::SharedObject {
1439            id: shared_object_1.id(),
1440            initial_shared_version: initial_shared_version_1,
1441            mutability: SharedObjectMutability::Mutable,
1442        };
1443        let shared_object_arg_2 = ObjectArg::SharedObject {
1444            id: shared_object_2.id(),
1445            initial_shared_version: initial_shared_version_2,
1446            mutability: SharedObjectMutability::Mutable,
1447        };
1448
1449        // Changes the desired owned object version to a higher version. We will make it available later.
1450        let owned_version = 2000.into();
1451        let mut owned_ref = owned_object.compute_object_reference();
1452        owned_ref.1 = owned_version;
1453        let owned_object_arg = ObjectArg::ImmOrOwnedObject(owned_ref);
1454
1455        let cancelled_transaction = make_transaction(
1456            gas_object.clone(),
1457            vec![
1458                CallArg::Object(shared_object_arg_1),
1459                CallArg::Object(shared_object_arg_2),
1460                CallArg::Object(owned_object_arg),
1461            ],
1462        );
1463        let assigned_versions = vec![
1464            (
1465                (
1466                    shared_object_1.id(),
1467                    shared_object_1.owner().start_version().unwrap(),
1468                ),
1469                SequenceNumber::CANCELLED_READ,
1470            ),
1471            (
1472                (
1473                    shared_object_2.id(),
1474                    shared_object_2.owner().start_version().unwrap(),
1475                ),
1476                SequenceNumber::CONGESTED,
1477            ),
1478        ];
1479
1480        execution_scheduler.enqueue_transactions(
1481            vec![(
1482                cancelled_transaction.clone(),
1483                ExecutionEnv::new()
1484                    .with_assigned_versions(AssignedVersions::new(assigned_versions, None)),
1485            )],
1486            &state.epoch_store_for_testing(),
1487        );
1488
1489        // scheduler should output no transaction yet.
1490        sleep(Duration::from_secs(1)).await;
1491        assert!(rx_ready_certificates.try_recv().is_err());
1492
1493        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1494
1495        // Notify scheduler about availability of the owned object.
1496        let mut new_owned_object = owned_object.clone();
1497        new_owned_object
1498            .data
1499            .try_as_move_mut()
1500            .unwrap()
1501            .increment_version_to(owned_version);
1502        state
1503            .get_cache_writer()
1504            .write_object_entry_for_test(new_owned_object);
1505
1506        // scheduler should output the transaction as soon as the owned object is available.
1507        let available_txn = rx_ready_certificates.recv().await.unwrap().certificate;
1508        assert_eq!(available_txn.digest(), cancelled_transaction.digest());
1509
1510        sleep(Duration::from_secs(1)).await;
1511        assert!(rx_ready_certificates.try_recv().is_err());
1512
1513        execution_scheduler.check_empty_for_testing().await;
1514    }
1515
1516    #[test]
1517    fn test_barrier_dependency_builder() {
1518        let make_transaction = |non_exclusive_writes: Vec<u32>, exclusive_writes: Vec<u32>| {
1519            assert!(
1520                non_exclusive_writes
1521                    .iter()
1522                    .all(|id| !exclusive_writes.contains(id))
1523            );
1524            assert!(
1525                exclusive_writes
1526                    .iter()
1527                    .all(|id| !non_exclusive_writes.contains(id))
1528            );
1529
1530            let non_exclusive_writes = non_exclusive_writes
1531                .into_iter()
1532                .map(|id| ObjectID::from_single_byte(id as u8));
1533            let exclusive_writes = exclusive_writes
1534                .into_iter()
1535                .map(|id| ObjectID::from_single_byte(id as u8));
1536            let mut builder = ProgrammableTransactionBuilder::new();
1537            for non_exclusive_write in non_exclusive_writes {
1538                builder
1539                    .obj(ObjectArg::SharedObject {
1540                        id: non_exclusive_write,
1541                        initial_shared_version: SequenceNumber::new(),
1542                        mutability: SharedObjectMutability::NonExclusiveWrite,
1543                    })
1544                    .unwrap();
1545            }
1546
1547            for exclusive_write in exclusive_writes {
1548                builder
1549                    .obj(ObjectArg::SharedObject {
1550                        id: exclusive_write,
1551                        initial_shared_version: SequenceNumber::new(),
1552                        mutability: SharedObjectMutability::Mutable,
1553                    })
1554                    .unwrap();
1555            }
1556
1557            let tx = TransactionKind::ProgrammableTransaction(builder.finish());
1558            let tx_data =
1559                TransactionData::new(tx, SuiAddress::default(), random_object_ref(), 1, 1);
1560            Transaction::from_data_and_signer(tx_data, vec![])
1561        };
1562
1563        // One non-exclusive write, one exclusive write.
1564        {
1565            let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1566            let tx1 = make_transaction(vec![1], vec![]);
1567            let tx2 = make_transaction(vec![], vec![1]);
1568
1569            let tx1_deps =
1570                barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1571            let tx2_deps =
1572                barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1573            assert!(tx1_deps.is_empty());
1574            assert_eq!(Vec::from_iter(tx2_deps), vec![*tx1.digest()]);
1575        }
1576
1577        // One transaction has non-exclusive writes to two different objects, and then becomes
1578        // a dependency of two barriers
1579        {
1580            let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1581            let tx1 = make_transaction(vec![1, 2], vec![]);
1582            let tx2 = make_transaction(vec![], vec![1]);
1583            let tx3 = make_transaction(vec![], vec![2]);
1584
1585            let tx1_deps =
1586                barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1587            let tx2_deps =
1588                barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1589            let tx3_deps =
1590                barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1591            assert!(tx1_deps.is_empty());
1592            assert_eq!(Vec::from_iter(tx2_deps), vec![*tx1.digest()]);
1593            assert_eq!(Vec::from_iter(tx3_deps), vec![*tx1.digest()]);
1594        }
1595
1596        // Ensure multiple-object dependences are merged
1597        {
1598            let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1599            let tx1 = make_transaction(vec![1], vec![]);
1600            let tx2 = make_transaction(vec![2], vec![]);
1601            let tx3 = make_transaction(vec![], vec![1, 2]);
1602
1603            let tx1_deps =
1604                barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1605            let tx2_deps =
1606                barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1607            let tx3_deps =
1608                barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1609            assert!(tx1_deps.is_empty());
1610            assert!(tx2_deps.is_empty());
1611            assert_eq!(tx3_deps, BTreeSet::from([*tx1.digest(), *tx2.digest()]));
1612        }
1613
1614        // Ensure dependency state is cleared
1615        {
1616            let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1617            let tx1 = make_transaction(vec![1], vec![]);
1618            let tx2 = make_transaction(vec![], vec![1]);
1619            let tx3 = make_transaction(vec![], vec![1]);
1620
1621            let tx1_deps =
1622                barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1623            let tx2_deps =
1624                barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1625            let tx3_deps =
1626                barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1627            assert!(tx1_deps.is_empty());
1628            assert_eq!(tx2_deps, BTreeSet::from([*tx1.digest()]));
1629            assert!(tx3_deps.is_empty());
1630        }
1631    }
1632}