sui_core/execution_scheduler/
execution_scheduler_impl.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{
5    accumulators::funds_read::AccountFundsRead,
6    authority::{
7        AuthorityMetrics, ExecutionEnv, authority_per_epoch_store::AuthorityPerEpochStore,
8        shared_object_version_manager::Schedulable,
9    },
10    execution_cache::{ObjectCacheRead, TransactionCacheRead},
11    execution_scheduler::{
12        ExecutingGuard, PendingCertificateStats,
13        funds_withdraw_scheduler::{
14            AddressFundsSchedulerMetrics, FundsSettlement, ScheduleStatus, TxFundsWithdraw,
15            WithdrawReservations, scheduler::FundsWithdrawScheduler,
16        },
17    },
18};
19use futures::stream::{FuturesUnordered, StreamExt};
20use mysten_common::ZipDebugEqIteratorExt;
21use mysten_common::{assert_reachable, debug_fatal};
22use mysten_metrics::spawn_monitored_task;
23use parking_lot::Mutex;
24use std::{
25    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
26    sync::Arc,
27};
28use sui_config::node::{AuthorityOverloadConfig, FundsWithdrawSchedulerType};
29use sui_types::{
30    SUI_ACCUMULATOR_ROOT_OBJECT_ID,
31    base_types::{FullObjectID, ObjectID},
32    digests::TransactionDigest,
33    error::SuiResult,
34    executable_transaction::VerifiedExecutableTransaction,
35    storage::InputKey,
36    transaction::{
37        SenderSignedData, SharedInputObject, SharedObjectMutability, TransactionData,
38        TransactionDataAPI, TransactionKey,
39    },
40};
41use tokio::sync::mpsc::UnboundedSender;
42use tokio::time::Instant;
43use tracing::{debug, error, instrument};
44
45use super::{PendingCertificate, overload_tracker::OverloadTracker};
46
47/// Utility struct for collecting barrier dependencies
48pub(crate) struct BarrierDependencyBuilder {
49    dep_state: BTreeMap<ObjectID, BTreeSet<TransactionDigest>>,
50}
51
52impl BarrierDependencyBuilder {
53    pub fn new() -> Self {
54        Self {
55            dep_state: Default::default(),
56        }
57    }
58
59    /// process_tx must be called for each transaction in scheduling order. If the
60    /// transaction has a non-exclusive write to an object, the transaction digest is
61    /// stored to become a dependency of the eventual barrier transaction. If a
62    /// transaction has an exclusive write to an object, all pending non-exclusive write
63    /// transactions for that object are added to the barrier dependencies.
64    pub fn process_tx(
65        &mut self,
66        tx_digest: TransactionDigest,
67        tx: &TransactionData,
68    ) -> BTreeSet<TransactionDigest> {
69        let mut barrier_deps = BTreeSet::new();
70        for SharedInputObject { id, mutability, .. } in tx.kind().shared_input_objects() {
71            match mutability {
72                SharedObjectMutability::NonExclusiveWrite => {
73                    self.dep_state.entry(id).or_default().insert(tx_digest);
74                }
75                SharedObjectMutability::Mutable => {
76                    // If there were preceding non-exclusive writes to this object id, this
77                    // transaction is a barrier and must wait for them to finish.
78                    if let Some(deps) = self.dep_state.remove(&id) {
79                        barrier_deps.extend(deps);
80                    }
81                }
82                SharedObjectMutability::Immutable => (),
83            }
84        }
85        barrier_deps
86    }
87}
88
89#[derive(Clone)]
90pub struct ExecutionScheduler {
91    object_cache_read: Arc<dyn ObjectCacheRead>,
92    transaction_cache_read: Arc<dyn TransactionCacheRead>,
93    overload_tracker: Arc<OverloadTracker>,
94    tx_ready_certificates: UnboundedSender<PendingCertificate>,
95    address_funds_withdraw_scheduler: Arc<Mutex<Option<FundsWithdrawScheduler>>>,
96    funds_withdraw_scheduler_type: FundsWithdrawSchedulerType,
97    metrics: Arc<AuthorityMetrics>,
98    address_funds_scheduler_metrics: Arc<AddressFundsSchedulerMetrics>,
99}
100
101struct PendingGuard<'a> {
102    scheduler: &'a ExecutionScheduler,
103    cert: &'a VerifiedExecutableTransaction,
104}
105
106impl<'a> PendingGuard<'a> {
107    pub fn new(scheduler: &'a ExecutionScheduler, cert: &'a VerifiedExecutableTransaction) -> Self {
108        scheduler
109            .metrics
110            .transaction_manager_num_pending_certificates
111            .inc();
112        scheduler
113            .overload_tracker
114            .add_pending_certificate(cert.data());
115        Self { scheduler, cert }
116    }
117}
118
119impl Drop for PendingGuard<'_> {
120    fn drop(&mut self) {
121        self.scheduler
122            .metrics
123            .transaction_manager_num_pending_certificates
124            .dec();
125        self.scheduler
126            .overload_tracker
127            .remove_pending_certificate(self.cert.data());
128    }
129}
130
131impl ExecutionScheduler {
132    pub fn new(
133        object_cache_read: Arc<dyn ObjectCacheRead>,
134        account_funds_read: Arc<dyn AccountFundsRead>,
135        transaction_cache_read: Arc<dyn TransactionCacheRead>,
136        tx_ready_certificates: UnboundedSender<PendingCertificate>,
137        epoch_store: &Arc<AuthorityPerEpochStore>,
138        funds_withdraw_scheduler_type: FundsWithdrawSchedulerType,
139        metrics: Arc<AuthorityMetrics>,
140        prometheus_registry: &prometheus::Registry,
141    ) -> Self {
142        tracing::info!(
143            ?funds_withdraw_scheduler_type,
144            "Creating new ExecutionScheduler"
145        );
146        let address_funds_scheduler_metrics =
147            Arc::new(AddressFundsSchedulerMetrics::new(prometheus_registry));
148        let address_funds_withdraw_scheduler = Self::initialize_funds_withdraw_scheduler(
149            epoch_store,
150            &object_cache_read,
151            account_funds_read,
152            funds_withdraw_scheduler_type,
153            &address_funds_scheduler_metrics,
154        );
155        Self {
156            object_cache_read,
157            transaction_cache_read,
158            overload_tracker: Arc::new(OverloadTracker::new()),
159            tx_ready_certificates,
160            address_funds_withdraw_scheduler: Arc::new(Mutex::new(
161                address_funds_withdraw_scheduler,
162            )),
163            funds_withdraw_scheduler_type,
164            metrics,
165            address_funds_scheduler_metrics,
166        }
167    }
168
169    fn initialize_funds_withdraw_scheduler(
170        epoch_store: &Arc<AuthorityPerEpochStore>,
171        object_cache_read: &Arc<dyn ObjectCacheRead>,
172        account_funds_read: Arc<dyn AccountFundsRead>,
173        scheduler_type: FundsWithdrawSchedulerType,
174        address_funds_scheduler_metrics: &Arc<AddressFundsSchedulerMetrics>,
175    ) -> Option<FundsWithdrawScheduler> {
176        let withdraw_scheduler_enabled =
177            epoch_store.is_validator() && epoch_store.accumulators_enabled();
178        if !withdraw_scheduler_enabled {
179            return None;
180        }
181        let starting_accumulator_version = object_cache_read
182            .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
183            .expect("Accumulator root object must be present if funds accumulator is enabled")
184            .version();
185        let address_funds_withdraw_scheduler = FundsWithdrawScheduler::new(
186            account_funds_read.clone(),
187            starting_accumulator_version,
188            scheduler_type,
189            address_funds_scheduler_metrics.clone(),
190        );
191
192        Some(address_funds_withdraw_scheduler)
193    }
194
195    #[instrument(level = "debug", skip_all, fields(tx_digest = ?cert.digest()))]
196    async fn schedule_transaction(
197        self,
198        cert: VerifiedExecutableTransaction,
199        execution_env: ExecutionEnv,
200        epoch_store: &Arc<AuthorityPerEpochStore>,
201    ) {
202        let enqueue_time = Instant::now();
203        let tx_digest = cert.digest();
204        let digests = [*tx_digest];
205
206        let tx_data = cert.transaction_data();
207        let input_object_kinds = tx_data
208            .input_objects()
209            .expect("input_objects() cannot fail");
210        let input_object_keys: Vec<_> = epoch_store
211            .get_input_object_keys(
212                &cert.key(),
213                &input_object_kinds,
214                &execution_env.assigned_versions,
215            )
216            .into_iter()
217            .collect();
218
219        let receiving_object_keys: HashSet<_> = tx_data
220            .receiving_objects()
221            .into_iter()
222            .map(|entry| {
223                InputKey::VersionedObject {
224                    // TODO: Add support for receiving ConsensusV2 objects. For now this assumes fastpath.
225                    id: FullObjectID::new(entry.0, None),
226                    version: entry.1,
227                }
228            })
229            .collect();
230        let input_and_receiving_keys = [
231            input_object_keys,
232            receiving_object_keys.iter().cloned().collect(),
233        ]
234        .concat();
235
236        let epoch = epoch_store.epoch();
237        debug!(
238            ?tx_digest,
239            "Scheduled transaction, waiting for input objects: {:?}", input_and_receiving_keys,
240        );
241
242        let availability = self
243            .object_cache_read
244            .multi_input_objects_available_cache_only(&input_and_receiving_keys);
245        // Most of the times, the transaction's input objects are already available.
246        // We can check the availability of the input objects first, and only wait for the
247        // missing input objects if necessary.
248        let missing_input_keys: Vec<_> = input_and_receiving_keys
249            .into_iter()
250            .zip_debug_eq(availability)
251            .filter_map(|(key, available)| if !available { Some(key) } else { None })
252            .collect();
253
254        let has_missing_barrier_dependencies = self
255            .transaction_cache_read
256            .multi_get_executed_effects_digests(&execution_env.barrier_dependencies)
257            .into_iter()
258            .any(|r| r.is_none());
259
260        if missing_input_keys.is_empty() && !has_missing_barrier_dependencies {
261            self.metrics
262                .transaction_manager_num_enqueued_certificates
263                .with_label_values(&["ready"])
264                .inc();
265            debug!(?tx_digest, "Input objects already available");
266            self.send_transaction_for_execution(&cert, execution_env, enqueue_time);
267            return;
268        }
269
270        let _pending_guard = PendingGuard::new(&self, &cert);
271        self.metrics
272            .transaction_manager_num_enqueued_certificates
273            .with_label_values(&["pending"])
274            .inc();
275
276        if !execution_env.barrier_dependencies.is_empty() {
277            debug!(
278                "waiting for barrier dependencies to be executed: {:?}",
279                execution_env.barrier_dependencies
280            );
281            self.transaction_cache_read
282                .notify_read_executed_effects_digests(
283                    "wait_for_barrier_dependencies",
284                    &execution_env.barrier_dependencies,
285                )
286                .await;
287        }
288
289        tokio::select! {
290            _ = self.object_cache_read
291                .notify_read_input_objects(&missing_input_keys, &receiving_object_keys, epoch)
292                => {
293                    self.metrics
294                        .transaction_manager_transaction_queue_age_s
295                        .observe(enqueue_time.elapsed().as_secs_f64());
296                    debug!(?tx_digest, "Input objects available");
297                    // TODO: Eventually we could fold execution_driver into the scheduler.
298                    self.send_transaction_for_execution(
299                        &cert,
300                        execution_env,
301                        enqueue_time,
302                    );
303                }
304            _ = self.transaction_cache_read.notify_read_executed_effects_digests(
305                "ExecutionScheduler::notify_read_executed_effects_digests",
306                &digests,
307            ) => {
308                debug!(?tx_digest, "Transaction already executed");
309            }
310        };
311    }
312
313    pub fn send_transaction_for_execution(
314        &self,
315        cert: &VerifiedExecutableTransaction,
316        execution_env: ExecutionEnv,
317        enqueue_time: Instant,
318    ) {
319        let pending_cert = PendingCertificate {
320            certificate: cert.clone(),
321            execution_env,
322            stats: PendingCertificateStats {
323                enqueue_time,
324                ready_time: Some(Instant::now()),
325            },
326            executing_guard: Some(ExecutingGuard::new(
327                self.metrics
328                    .transaction_manager_num_executing_certificates
329                    .clone(),
330            )),
331        };
332        let _ = self.tx_ready_certificates.send(pending_cert);
333    }
334
335    fn schedule_funds_withdraws(
336        &self,
337        certs: Vec<(VerifiedExecutableTransaction, ExecutionEnv)>,
338        epoch_store: &Arc<AuthorityPerEpochStore>,
339    ) {
340        if certs.is_empty() {
341            return;
342        }
343        let mut withdraws = BTreeMap::new();
344        let mut prev_version = None;
345        for (cert, env) in &certs {
346            let tx_withdraws = cert
347                .transaction_data()
348                .process_funds_withdrawals_for_execution(epoch_store.get_chain_identifier());
349            assert!(!tx_withdraws.is_empty());
350            let accumulator_version = env
351                .assigned_versions
352                .accumulator_version
353                .expect("accumulator_version must be set when there are withdraws");
354            if let Some(prev_version) = prev_version {
355                // Transactions must be in order.
356                assert!(prev_version <= accumulator_version);
357            }
358            prev_version = Some(accumulator_version);
359            let tx_digest = *cert.digest();
360            withdraws
361                .entry(accumulator_version)
362                .or_insert(Vec::new())
363                .push(TxFundsWithdraw {
364                    tx_digest,
365                    reservations: tx_withdraws,
366                });
367        }
368        let mut receivers = FuturesUnordered::new();
369        {
370            let guard = self.address_funds_withdraw_scheduler.lock();
371            let withdraw_scheduler = guard
372                .as_ref()
373                .expect("Funds withdraw scheduler must be enabled if there are withdraws");
374            for (version, tx_withdraws) in withdraws {
375                receivers.extend(withdraw_scheduler.schedule_withdraws(WithdrawReservations {
376                    accumulator_version: version,
377                    withdraws: tx_withdraws,
378                }));
379            }
380            // guard will be dropped here
381        }
382        let scheduler = self.clone();
383        let epoch_store = epoch_store.clone();
384        spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
385            let mut cert_map = HashMap::new();
386            for (cert, env) in certs {
387                cert_map.insert(*cert.digest(), (cert, env));
388            }
389            while let Some(result) = receivers.next().await {
390                match result {
391                    Ok((tx_digest, status)) => match status {
392                        ScheduleStatus::InsufficientFunds => {
393                            assert_reachable!("tx cancelled, insufficient funds");
394                            debug!(
395                                ?tx_digest,
396                                "Funds withdraw scheduling result: Insufficient funds"
397                            );
398                            let (cert, env) = cert_map.remove(&tx_digest).expect("cert must exist");
399                            let env = env.with_insufficient_funds();
400                            scheduler.enqueue_transactions(vec![(cert, env)], &epoch_store);
401                        }
402                        ScheduleStatus::SufficientFunds => {
403                            assert_reachable!("tx scheduled, sufficient funds");
404                            debug!(?tx_digest, "Funds withdraw scheduling result: Success");
405                            let (cert, env) = cert_map.remove(&tx_digest).expect("cert must exist");
406                            scheduler.enqueue_transactions(vec![(cert, env)], &epoch_store);
407                        }
408                        ScheduleStatus::SkipSchedule => {
409                            assert_reachable!("tx withdrawal scheduling skipped");
410                            debug!(?tx_digest, "Skip scheduling funds withdraw");
411                        }
412                    },
413                    Err(e) => {
414                        error!("Withdraw scheduler stopped: {:?}", e);
415                    }
416                }
417            }
418        }));
419    }
420
421    fn schedule_tx_keys(
422        &self,
423        tx_with_keys: Vec<(TransactionKey, ExecutionEnv)>,
424        epoch_store: &Arc<AuthorityPerEpochStore>,
425    ) {
426        if tx_with_keys.is_empty() {
427            return;
428        }
429
430        let scheduler = self.clone();
431        let epoch_store = epoch_store.clone();
432        spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
433            let tx_keys: Vec<_> = tx_with_keys.iter().map(|(key, _)| key).cloned().collect();
434            let digests = epoch_store
435                .notify_read_tx_key_to_digest(&tx_keys)
436                .await
437                .expect("db error");
438            let transactions = scheduler
439                .transaction_cache_read
440                .multi_get_transaction_blocks(&digests)
441                .into_iter()
442                .map(|tx| {
443                    let tx = tx.expect("tx must exist").as_ref().clone();
444                    VerifiedExecutableTransaction::new_system(tx, epoch_store.epoch())
445                })
446                .zip_debug_eq(tx_with_keys.into_iter().map(|(_, env)| env))
447                .collect::<Vec<_>>();
448            scheduler.enqueue_transactions(transactions, &epoch_store);
449        }));
450    }
451
452    /// When we schedule a certificate, it should be impossible for it to have been executed in a
453    /// previous epoch.
454    #[cfg(debug_assertions)]
455    fn assert_cert_not_executed_previous_epochs(&self, cert: &VerifiedExecutableTransaction) {
456        let epoch = cert.epoch();
457        let digest = *cert.digest();
458        let digests = [digest];
459        let executed = self
460            .transaction_cache_read
461            .multi_get_executed_effects(&digests)
462            .pop()
463            .unwrap();
464        // Due to pruning, we may not always have an executed effects for the certificate
465        // even if it was executed. So this is a best-effort check.
466        if let Some(executed) = executed {
467            use sui_types::effects::TransactionEffectsAPI;
468
469            assert_eq!(
470                executed.executed_epoch(),
471                epoch,
472                "Transaction {} was executed in epoch {}, but scheduled again in epoch {}",
473                digest,
474                executed.executed_epoch(),
475                epoch
476            );
477        }
478    }
479}
480
481impl ExecutionScheduler {
482    pub fn enqueue(
483        &self,
484        certs: Vec<(Schedulable, ExecutionEnv)>,
485        epoch_store: &Arc<AuthorityPerEpochStore>,
486    ) {
487        // schedule all transactions immediately
488        let mut ordinary_txns = Vec::with_capacity(certs.len());
489        let mut tx_with_keys = Vec::new();
490        let mut tx_with_withdraws = Vec::new();
491
492        for (schedulable, env) in certs {
493            match schedulable {
494                Schedulable::Transaction(tx) => {
495                    if tx.transaction_data().has_funds_withdrawals() {
496                        tx_with_withdraws.push((tx, env));
497                    } else {
498                        ordinary_txns.push((tx, env));
499                    }
500                }
501                s @ Schedulable::RandomnessStateUpdate(..) => {
502                    tx_with_keys.push((s.key(), env));
503                }
504                Schedulable::AccumulatorSettlement(_, _) => {
505                    unreachable!("handled by SettlementScheduler");
506                }
507                Schedulable::ConsensusCommitPrologue(_, _, _) => {
508                    // we only use Schedulable::ConsensusCommitPrologue as a temporary placeholder
509                    // during version assignment, by the time we schedule transactions it should be
510                    // converted to Schedulable::Transaction
511                    unreachable!("Schedulable::ConsensusCommitPrologue should not be enqueued");
512                }
513            }
514        }
515
516        self.enqueue_transactions(ordinary_txns, epoch_store);
517        self.schedule_tx_keys(tx_with_keys, epoch_store);
518        self.schedule_funds_withdraws(tx_with_withdraws, epoch_store);
519    }
520
521    pub fn enqueue_transactions(
522        &self,
523        certs: Vec<(VerifiedExecutableTransaction, ExecutionEnv)>,
524        epoch_store: &Arc<AuthorityPerEpochStore>,
525    ) {
526        // Filter out certificates from wrong epoch.
527        let certs: Vec<_> = certs
528            .into_iter()
529            .filter_map(|cert| {
530                if cert.0.epoch() == epoch_store.epoch() {
531                    #[cfg(debug_assertions)]
532                    self.assert_cert_not_executed_previous_epochs(&cert.0);
533
534                    Some(cert)
535                } else {
536                    debug_fatal!(
537                        "We should never enqueue certificate from wrong epoch. Expected={} Certificate={:?}",
538                        epoch_store.epoch(),
539                        cert.0.epoch()
540                    );
541                    None
542                }
543            })
544            .collect();
545        let digests: Vec<_> = certs.iter().map(|(cert, _)| *cert.digest()).collect();
546        let executed = self
547            .transaction_cache_read
548            .multi_get_executed_effects_digests(&digests);
549        let mut already_executed_certs_num = 0;
550        let pending_certs = certs.into_iter().zip_debug_eq(executed).filter_map(
551            |((cert, execution_env), executed)| {
552                if executed.is_none() {
553                    Some((cert, execution_env))
554                } else {
555                    already_executed_certs_num += 1;
556                    None
557                }
558            },
559        );
560
561        for (cert, execution_env) in pending_certs {
562            let scheduler = self.clone();
563            let epoch_store = epoch_store.clone();
564            spawn_monitored_task!(
565                epoch_store.within_alive_epoch(scheduler.schedule_transaction(
566                    cert,
567                    execution_env,
568                    &epoch_store,
569                ))
570            );
571        }
572
573        self.metrics
574            .transaction_manager_num_enqueued_certificates
575            .with_label_values(&["already_executed"])
576            .inc_by(already_executed_certs_num);
577    }
578
579    pub fn settle_address_funds(&self, settlement: FundsSettlement) {
580        self.address_funds_withdraw_scheduler
581            .lock()
582            .as_ref()
583            .expect("Funds withdraw scheduler must be enabled if there are settlements")
584            .settle_funds(settlement);
585    }
586
587    /// Reconfigure internal state at epoch start. This resets the funds withdraw scheduler
588    /// to the current accumulator root object version.
589    pub fn reconfigure(
590        &self,
591        new_epoch_store: &Arc<AuthorityPerEpochStore>,
592        account_funds_read: &Arc<dyn AccountFundsRead>,
593    ) {
594        let address_funds_withdraw_scheduler = Self::initialize_funds_withdraw_scheduler(
595            new_epoch_store,
596            &self.object_cache_read,
597            account_funds_read.clone(),
598            self.funds_withdraw_scheduler_type,
599            &self.address_funds_scheduler_metrics,
600        );
601        let mut guard = self.address_funds_withdraw_scheduler.lock();
602        if let Some(old_scheduler) = guard.as_ref() {
603            old_scheduler.close_epoch();
604        }
605        *guard = address_funds_withdraw_scheduler;
606        drop(guard);
607    }
608
609    pub fn check_execution_overload(
610        &self,
611        overload_config: &AuthorityOverloadConfig,
612        tx_data: &SenderSignedData,
613    ) -> SuiResult {
614        let inflight_queue_len = self.num_pending_certificates();
615        self.overload_tracker
616            .check_execution_overload(overload_config, tx_data, inflight_queue_len)
617    }
618
619    pub fn num_pending_certificates(&self) -> usize {
620        (self
621            .metrics
622            .transaction_manager_num_pending_certificates
623            .get()
624            + self
625                .metrics
626                .transaction_manager_num_executing_certificates
627                .get()) as usize
628    }
629
630    #[cfg(test)]
631    pub async fn check_empty_for_testing(&self) {
632        for _ in 0..10 {
633            if self.num_pending_certificates() == 0 {
634                return;
635            }
636            tokio::task::yield_now().await;
637        }
638        assert_eq!(self.num_pending_certificates(), 0);
639    }
640}
641
642#[cfg(test)]
643mod test {
644    use super::{
645        BarrierDependencyBuilder, ExecutionScheduler, FundsWithdrawSchedulerType,
646        PendingCertificate,
647    };
648    use crate::authority::ExecutionEnv;
649    use crate::authority::shared_object_version_manager::AssignedVersions;
650    use crate::authority::{AuthorityState, authority_tests::init_state_with_objects};
651    use std::collections::BTreeSet;
652    use std::{time::Duration, vec};
653    use sui_test_transaction_builder::TestTransactionBuilder;
654    use sui_types::base_types::{SuiAddress, random_object_ref};
655    use sui_types::executable_transaction::VerifiedExecutableTransaction;
656    use sui_types::object::Owner;
657    use sui_types::programmable_transaction_builder::ProgrammableTransactionBuilder;
658    use sui_types::transaction::{
659        SharedObjectMutability, Transaction, TransactionData, TransactionKind, VerifiedTransaction,
660    };
661    use sui_types::{
662        SUI_FRAMEWORK_PACKAGE_ID,
663        base_types::{ObjectID, SequenceNumber},
664        crypto::deterministic_random_account_key,
665        object::Object,
666        transaction::{CallArg, ObjectArg},
667    };
668    use tokio::time::Instant;
669    use tokio::{
670        sync::mpsc::{UnboundedReceiver, error::TryRecvError, unbounded_channel},
671        time::sleep,
672    };
673
674    #[allow(clippy::disallowed_methods)] // allow unbounded_channel()
675    fn make_execution_scheduler(
676        state: &AuthorityState,
677    ) -> (ExecutionScheduler, UnboundedReceiver<PendingCertificate>) {
678        // Create a new execution scheduler instead of reusing the authority's, to examine
679        // execution_scheduler output from rx_ready_certificates.
680        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
681        let registry = prometheus::Registry::new();
682        let execution_scheduler = ExecutionScheduler::new(
683            state.get_object_cache_reader().clone(),
684            state.get_account_funds_read().clone(),
685            state.get_transaction_cache_reader().clone(),
686            tx_ready_certificates,
687            &state.epoch_store_for_testing(),
688            FundsWithdrawSchedulerType::default(),
689            state.metrics.clone(),
690            &registry,
691        );
692
693        (execution_scheduler, rx_ready_certificates)
694    }
695
696    fn make_transaction(gas_object: Object, input: Vec<CallArg>) -> VerifiedExecutableTransaction {
697        // Use fake module, function, package and gas prices since they are irrelevant for testing
698        // execution scheduler.
699        let rgp = 100;
700        let (sender, keypair) = deterministic_random_account_key();
701        let transaction =
702            TestTransactionBuilder::new(sender, gas_object.compute_object_reference(), rgp)
703                .move_call(SUI_FRAMEWORK_PACKAGE_ID, "counter", "assert_value", input)
704                .build_and_sign(&keypair);
705        VerifiedExecutableTransaction::new_system(
706            VerifiedTransaction::new_unchecked(transaction),
707            0,
708        )
709    }
710
711    #[tokio::test(flavor = "current_thread", start_paused = true)]
712    async fn execution_scheduler_basics() {
713        // Initialize an authority state.
714        let (owner, _keypair) = deterministic_random_account_key();
715        let gas_objects: Vec<Object> = (0..10)
716            .map(|_| {
717                let gas_object_id = ObjectID::random();
718                Object::with_id_owner_for_testing(gas_object_id, owner)
719            })
720            .collect();
721        let state = init_state_with_objects(gas_objects.clone()).await;
722
723        // Create a new execution scheduler instead of reusing the authority's, to examine
724        // execution_scheduler output from rx_ready_certificates.
725        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
726        // scheduler should output no transaction.
727        assert!(
728            rx_ready_certificates
729                .try_recv()
730                .is_err_and(|err| err == TryRecvError::Empty)
731        );
732        // scheduler should be empty at the beginning.
733        assert_eq!(execution_scheduler.num_pending_certificates(), 0);
734
735        // Enqueue empty vec should not crash.
736        execution_scheduler.enqueue_transactions(vec![], &state.epoch_store_for_testing());
737        // scheduler should output no transaction.
738        assert!(
739            rx_ready_certificates
740                .try_recv()
741                .is_err_and(|err| err == TryRecvError::Empty)
742        );
743
744        // Enqueue a transaction with existing gas object, empty input.
745        let transaction = make_transaction(gas_objects[0].clone(), vec![]);
746        let tx_start_time = Instant::now();
747        execution_scheduler.enqueue_transactions(
748            vec![(transaction.clone(), ExecutionEnv::new())],
749            &state.epoch_store_for_testing(),
750        );
751        // scheduler should output the transaction eventually.
752        let pending_certificate = rx_ready_certificates.recv().await.unwrap();
753
754        // Tests that pending certificate stats are recorded properly.
755        assert!(pending_certificate.stats.enqueue_time >= tx_start_time);
756        assert!(
757            pending_certificate.stats.ready_time.unwrap() >= pending_certificate.stats.enqueue_time
758        );
759
760        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
761
762        // Predent we have just executed the transaction.
763        drop(pending_certificate);
764
765        // scheduler should be empty.
766        execution_scheduler.check_empty_for_testing().await;
767
768        // Enqueue a transaction with a new gas object, empty input.
769        let gas_object_new = Object::with_id_owner_version_for_testing(
770            ObjectID::random(),
771            0.into(),
772            Owner::AddressOwner(owner),
773        );
774        let transaction = make_transaction(gas_object_new.clone(), vec![]);
775        let tx_start_time = Instant::now();
776        execution_scheduler.enqueue_transactions(
777            vec![(transaction.clone(), ExecutionEnv::new())],
778            &state.epoch_store_for_testing(),
779        );
780        // scheduler should output no transaction yet.
781        sleep(Duration::from_secs(1)).await;
782        assert!(
783            rx_ready_certificates
784                .try_recv()
785                .is_err_and(|err| err == TryRecvError::Empty)
786        );
787
788        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
789
790        // Duplicated enqueue is allowed.
791        execution_scheduler.enqueue_transactions(
792            vec![(transaction.clone(), ExecutionEnv::new())],
793            &state.epoch_store_for_testing(),
794        );
795        sleep(Duration::from_secs(1)).await;
796        assert!(
797            rx_ready_certificates
798                .try_recv()
799                .is_err_and(|err| err == TryRecvError::Empty)
800        );
801
802        assert_eq!(execution_scheduler.num_pending_certificates(), 2);
803
804        // Notify scheduler about availability of the gas object.
805        state
806            .get_cache_writer()
807            .write_object_entry_for_test(gas_object_new);
808        // scheduler should output the transaction eventually.
809        // We will see both the original and the duplicated transaction.
810        let pending_certificate = rx_ready_certificates.recv().await.unwrap();
811        let pending_certificate2 = rx_ready_certificates.recv().await.unwrap();
812        assert_eq!(
813            pending_certificate.certificate.digest(),
814            pending_certificate2.certificate.digest()
815        );
816
817        // Tests that pending certificate stats are recorded properly. The ready time should be
818        // 2 seconds apart from the enqueue time.
819        assert!(pending_certificate.stats.enqueue_time >= tx_start_time);
820        assert!(
821            pending_certificate.stats.ready_time.unwrap() - pending_certificate.stats.enqueue_time
822                >= Duration::from_secs(2)
823        );
824
825        // Predent we have just executed the transaction.
826        drop(pending_certificate);
827        drop(pending_certificate2);
828
829        // scheduler should be empty at the end.
830        execution_scheduler.check_empty_for_testing().await;
831    }
832
833    // Tests when objects become available, correct set of transactions can be sent to execute.
834    // Specifically, we have following setup,
835    //         shared_object     shared_object_2
836    //       /    |    \     \    /
837    //    tx_0  tx_1  tx_2    tx_3
838    //     r      r     w      r
839    // And when shared_object is available, tx_0, tx_1, and tx_2 can be executed. And when
840    // shared_object_2 becomes available, tx_3 can be executed.
841    #[tokio::test(flavor = "current_thread", start_paused = true)]
842    async fn execution_scheduler_object_dependency() {
843        telemetry_subscribers::init_for_testing();
844        // Initialize an authority state, with gas objects and a shared object.
845        let (owner, _keypair) = deterministic_random_account_key();
846        let gas_objects: Vec<Object> = (0..10)
847            .map(|_| {
848                let gas_object_id = ObjectID::random();
849                Object::with_id_owner_for_testing(gas_object_id, owner)
850            })
851            .collect();
852        let shared_object = Object::shared_for_testing();
853        let initial_shared_version = shared_object.owner().start_version().unwrap();
854        let shared_object_2 = Object::shared_for_testing();
855        let initial_shared_version_2 = shared_object_2.owner().start_version().unwrap();
856
857        let state = init_state_with_objects(
858            [
859                gas_objects.clone(),
860                vec![shared_object.clone(), shared_object_2.clone()],
861            ]
862            .concat(),
863        )
864        .await;
865
866        // Create a new execution scheduler instead of reusing the authority's, to examine
867        // execution_scheduler output from rx_ready_certificates.
868        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
869        // scheduler should output no transaction.
870        assert!(rx_ready_certificates.try_recv().is_err());
871
872        // Enqueue two transactions with the same shared object input in read-only mode.
873        let shared_version = 1000.into();
874        let shared_object_arg_read = ObjectArg::SharedObject {
875            id: shared_object.id(),
876            initial_shared_version,
877            mutability: SharedObjectMutability::Immutable,
878        };
879        let transaction_read_0 = make_transaction(
880            gas_objects[0].clone(),
881            vec![CallArg::Object(shared_object_arg_read)],
882        );
883        let transaction_read_1 = make_transaction(
884            gas_objects[1].clone(),
885            vec![CallArg::Object(shared_object_arg_read)],
886        );
887        let tx_read_0_assigned_versions = vec![(
888            (
889                shared_object.id(),
890                shared_object.owner().start_version().unwrap(),
891            ),
892            shared_version,
893        )];
894        let tx_read_1_assigned_versions = vec![(
895            (
896                shared_object.id(),
897                shared_object.owner().start_version().unwrap(),
898            ),
899            shared_version,
900        )];
901
902        // Enqueue one transaction with the same shared object in mutable mode.
903        let shared_object_arg_default = ObjectArg::SharedObject {
904            id: shared_object.id(),
905            initial_shared_version,
906            mutability: SharedObjectMutability::Mutable,
907        };
908        let transaction_default = make_transaction(
909            gas_objects[2].clone(),
910            vec![CallArg::Object(shared_object_arg_default)],
911        );
912        let tx_default_assigned_versions = vec![(
913            (
914                shared_object.id(),
915                shared_object.owner().start_version().unwrap(),
916            ),
917            shared_version,
918        )];
919
920        // Enqueue one transaction with two readonly shared object inputs, `shared_object` and `shared_object_2`.
921        let shared_version_2 = 1000.into();
922        let shared_object_arg_read_2 = ObjectArg::SharedObject {
923            id: shared_object_2.id(),
924            initial_shared_version: initial_shared_version_2,
925            mutability: SharedObjectMutability::Immutable,
926        };
927        let transaction_read_2 = make_transaction(
928            gas_objects[3].clone(),
929            vec![
930                CallArg::Object(shared_object_arg_default),
931                CallArg::Object(shared_object_arg_read_2),
932            ],
933        );
934        let tx_read_2_assigned_versions = vec![
935            (
936                (
937                    shared_object.id(),
938                    shared_object.owner().start_version().unwrap(),
939                ),
940                shared_version,
941            ),
942            (
943                (
944                    shared_object_2.id(),
945                    shared_object_2.owner().start_version().unwrap(),
946                ),
947                shared_version_2,
948            ),
949        ];
950
951        execution_scheduler.enqueue_transactions(
952            vec![
953                (
954                    transaction_read_0.clone(),
955                    ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
956                        tx_read_0_assigned_versions,
957                        None,
958                    )),
959                ),
960                (
961                    transaction_read_1.clone(),
962                    ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
963                        tx_read_1_assigned_versions,
964                        None,
965                    )),
966                ),
967                (
968                    transaction_default.clone(),
969                    ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
970                        tx_default_assigned_versions,
971                        None,
972                    )),
973                ),
974                (
975                    transaction_read_2.clone(),
976                    ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
977                        tx_read_2_assigned_versions,
978                        None,
979                    )),
980                ),
981            ],
982            &state.epoch_store_for_testing(),
983        );
984
985        // scheduler should output no transaction yet.
986        sleep(Duration::from_secs(1)).await;
987        assert!(rx_ready_certificates.try_recv().is_err());
988
989        assert_eq!(execution_scheduler.num_pending_certificates(), 4);
990
991        // Notify scheduler about availability of the first shared object.
992        let mut new_shared_object = shared_object.clone();
993        new_shared_object
994            .data
995            .try_as_move_mut()
996            .unwrap()
997            .increment_version_to(shared_version_2);
998        state
999            .get_cache_writer()
1000            .write_object_entry_for_test(new_shared_object);
1001
1002        // scheduler should output the 3 transactions that are only waiting for this object.
1003        let tx_0 = rx_ready_certificates.recv().await.unwrap().certificate;
1004        let tx_1 = rx_ready_certificates.recv().await.unwrap().certificate;
1005        let tx_2 = rx_ready_certificates.recv().await.unwrap().certificate;
1006        {
1007            let mut want_digests = vec![
1008                transaction_read_0.digest(),
1009                transaction_read_1.digest(),
1010                transaction_default.digest(),
1011            ];
1012            want_digests.sort();
1013            let mut got_digests = vec![tx_0.digest(), tx_1.digest(), tx_2.digest()];
1014            got_digests.sort();
1015            assert_eq!(want_digests, got_digests);
1016        }
1017
1018        sleep(Duration::from_secs(1)).await;
1019        assert!(rx_ready_certificates.try_recv().is_err());
1020
1021        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1022
1023        // Make shared_object_2 available.
1024        let mut new_shared_object_2 = shared_object_2.clone();
1025        new_shared_object_2
1026            .data
1027            .try_as_move_mut()
1028            .unwrap()
1029            .increment_version_to(shared_version_2);
1030        state
1031            .get_cache_writer()
1032            .write_object_entry_for_test(new_shared_object_2);
1033
1034        // Now, the transaction waiting for both shared objects can be executed.
1035        let tx_3 = rx_ready_certificates.recv().await.unwrap().certificate;
1036        assert_eq!(transaction_read_2.digest(), tx_3.digest());
1037
1038        sleep(Duration::from_secs(1)).await;
1039        assert!(rx_ready_certificates.try_recv().is_err());
1040
1041        execution_scheduler.check_empty_for_testing().await;
1042    }
1043
1044    #[tokio::test(flavor = "current_thread", start_paused = true)]
1045    async fn execution_scheduler_receiving_notify_commit() {
1046        telemetry_subscribers::init_for_testing();
1047        // Initialize an authority state.
1048        let (owner, _keypair) = deterministic_random_account_key();
1049        let gas_objects: Vec<Object> = (0..10)
1050            .map(|_| {
1051                let gas_object_id = ObjectID::random();
1052                Object::with_id_owner_for_testing(gas_object_id, owner)
1053            })
1054            .collect();
1055        let state = init_state_with_objects(gas_objects.clone()).await;
1056
1057        // Create a new execution scheduler instead of reusing the authority's, to examine
1058        // execution_scheduler output from rx_ready_certificates.
1059        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1060        // scheduler should output no transaction.
1061        assert!(rx_ready_certificates.try_recv().is_err());
1062        // scheduler should be empty at the beginning.
1063        execution_scheduler.check_empty_for_testing().await;
1064
1065        let obj_id = ObjectID::random();
1066        let object_arguments: Vec<_> = (0..10)
1067            .map(|i| {
1068                let object = Object::with_id_owner_version_for_testing(
1069                    obj_id,
1070                    i.into(),
1071                    Owner::AddressOwner(owner),
1072                );
1073                // Every other transaction receives the object, and we create a run of multiple receives in
1074                // a row at the beginning to test that the scheduler doesn't get stuck in either configuration of:
1075                // ImmOrOwnedObject => Receiving,
1076                // Receiving => Receiving
1077                // Receiving => ImmOrOwnedObject
1078                // ImmOrOwnedObject => ImmOrOwnedObject is already tested as the default case on mainnet.
1079                let object_arg = if i % 2 == 0 || i == 3 {
1080                    ObjectArg::Receiving(object.compute_object_reference())
1081                } else {
1082                    ObjectArg::ImmOrOwnedObject(object.compute_object_reference())
1083                };
1084                let txn =
1085                    make_transaction(gas_objects[0].clone(), vec![CallArg::Object(object_arg)]);
1086                (object, txn)
1087            })
1088            .collect();
1089
1090        for (i, (_, txn)) in object_arguments.iter().enumerate() {
1091            // scheduler should output no transaction yet since waiting on receiving object or
1092            // ImmOrOwnedObject input.
1093            execution_scheduler.enqueue_transactions(
1094                vec![(txn.clone(), ExecutionEnv::new())],
1095                &state.epoch_store_for_testing(),
1096            );
1097            sleep(Duration::from_secs(1)).await;
1098            assert!(rx_ready_certificates.try_recv().is_err());
1099            assert_eq!(execution_scheduler.num_pending_certificates(), i + 1);
1100        }
1101
1102        // Now start to unravel the transactions by notifying that each subsequent
1103        // transaction has been processed.
1104        let len = object_arguments.len();
1105        for (i, (object, txn)) in object_arguments.into_iter().enumerate() {
1106            // Mark the object as available.
1107            // We should now eventually see the transaction as ready.
1108            state
1109                .get_cache_writer()
1110                .write_object_entry_for_test(object.clone());
1111
1112            // scheduler should output the transaction eventually now that the receiving object has become
1113            // available.
1114            rx_ready_certificates.recv().await.unwrap();
1115
1116            // Only one transaction at a time should become available though. So if we try to get
1117            // another one it should fail.
1118            sleep(Duration::from_secs(1)).await;
1119            assert!(rx_ready_certificates.try_recv().is_err());
1120
1121            // Notify the scheduler that the transaction has been processed.
1122            drop(txn);
1123
1124            // scheduler should now output another transaction to run since it the next version of that object
1125            // has become available.
1126            assert_eq!(execution_scheduler.num_pending_certificates(), len - i - 1);
1127        }
1128
1129        // After everything scheduler should be empty.
1130        execution_scheduler.check_empty_for_testing().await;
1131    }
1132
1133    #[tokio::test(flavor = "current_thread", start_paused = true)]
1134    async fn execution_scheduler_receiving_object_ready_notifications() {
1135        telemetry_subscribers::init_for_testing();
1136        // Initialize an authority state.
1137        let (owner, _keypair) = deterministic_random_account_key();
1138        let gas_objects: Vec<Object> = (0..10)
1139            .map(|_| {
1140                let gas_object_id = ObjectID::random();
1141                Object::with_id_owner_for_testing(gas_object_id, owner)
1142            })
1143            .collect();
1144        let state = init_state_with_objects(gas_objects.clone()).await;
1145
1146        // Create a new execution scheduler instead of reusing the authority's, to examine
1147        // execution_scheduler output from rx_ready_certificates.
1148        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1149        // scheduler should output no transaction.
1150        assert!(rx_ready_certificates.try_recv().is_err());
1151        // scheduler should be empty at the beginning.
1152        execution_scheduler.check_empty_for_testing().await;
1153
1154        let obj_id = ObjectID::random();
1155        let receiving_object_new0 =
1156            Object::with_id_owner_version_for_testing(obj_id, 0.into(), Owner::AddressOwner(owner));
1157        let receiving_object_new1 =
1158            Object::with_id_owner_version_for_testing(obj_id, 1.into(), Owner::AddressOwner(owner));
1159        let receiving_object_arg0 =
1160            ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1161        let receive_object_transaction0 = make_transaction(
1162            gas_objects[0].clone(),
1163            vec![CallArg::Object(receiving_object_arg0)],
1164        );
1165
1166        let receiving_object_arg1 =
1167            ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1168        let receive_object_transaction1 = make_transaction(
1169            gas_objects[0].clone(),
1170            vec![CallArg::Object(receiving_object_arg1)],
1171        );
1172
1173        // scheduler should output no transaction yet since waiting on receiving object.
1174        execution_scheduler.enqueue_transactions(
1175            vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1176            &state.epoch_store_for_testing(),
1177        );
1178        sleep(Duration::from_secs(1)).await;
1179        assert!(rx_ready_certificates.try_recv().is_err());
1180        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1181
1182        // scheduler should output no transaction yet since waiting on receiving object.
1183        execution_scheduler.enqueue_transactions(
1184            vec![(receive_object_transaction1.clone(), ExecutionEnv::new())],
1185            &state.epoch_store_for_testing(),
1186        );
1187        sleep(Duration::from_secs(1)).await;
1188        assert!(rx_ready_certificates.try_recv().is_err());
1189        assert_eq!(execution_scheduler.num_pending_certificates(), 2);
1190
1191        // Duplicate enqueue of receiving object is allowed.
1192        execution_scheduler.enqueue_transactions(
1193            vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1194            &state.epoch_store_for_testing(),
1195        );
1196        sleep(Duration::from_secs(1)).await;
1197        assert!(rx_ready_certificates.try_recv().is_err());
1198        assert_eq!(execution_scheduler.num_pending_certificates(), 3);
1199
1200        // Notify scheduler that the receiving object 0 is available.
1201        state
1202            .get_cache_writer()
1203            .write_object_entry_for_test(receiving_object_new0.clone());
1204
1205        // scheduler should output the transaction eventually now that the receiving object has become
1206        // available.
1207        rx_ready_certificates.recv().await.unwrap();
1208
1209        // Notify scheduler that the receiving object 0 is available.
1210        state
1211            .get_cache_writer()
1212            .write_object_entry_for_test(receiving_object_new1.clone());
1213
1214        // scheduler should output the transaction eventually now that the receiving object has become
1215        // available.
1216        rx_ready_certificates.recv().await.unwrap();
1217    }
1218
1219    #[tokio::test(flavor = "current_thread", start_paused = true)]
1220    async fn execution_scheduler_receiving_object_ready_notifications_multiple_of_same_receiving() {
1221        telemetry_subscribers::init_for_testing();
1222        // Initialize an authority state.
1223        let (owner, _keypair) = deterministic_random_account_key();
1224        let gas_objects: Vec<Object> = (0..10)
1225            .map(|_| {
1226                let gas_object_id = ObjectID::random();
1227                Object::with_id_owner_for_testing(gas_object_id, owner)
1228            })
1229            .collect();
1230        let state = init_state_with_objects(gas_objects.clone()).await;
1231
1232        // Create a new execution scheduler instead of reusing the authority's, to examine
1233        // execution_scheduler output from rx_ready_certificates.
1234        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1235        // scheduler should output no transaction.
1236        assert!(rx_ready_certificates.try_recv().is_err());
1237        // scheduler should be empty at the beginning.
1238        execution_scheduler.check_empty_for_testing().await;
1239
1240        let obj_id = ObjectID::random();
1241        let receiving_object_new0 =
1242            Object::with_id_owner_version_for_testing(obj_id, 0.into(), Owner::AddressOwner(owner));
1243        let receiving_object_new1 =
1244            Object::with_id_owner_version_for_testing(obj_id, 1.into(), Owner::AddressOwner(owner));
1245        let receiving_object_arg0 =
1246            ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1247        let receive_object_transaction0 = make_transaction(
1248            gas_objects[0].clone(),
1249            vec![CallArg::Object(receiving_object_arg0)],
1250        );
1251
1252        let receive_object_transaction01 = make_transaction(
1253            gas_objects[1].clone(),
1254            vec![CallArg::Object(receiving_object_arg0)],
1255        );
1256
1257        let receiving_object_arg1 =
1258            ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1259        let receive_object_transaction1 = make_transaction(
1260            gas_objects[0].clone(),
1261            vec![CallArg::Object(receiving_object_arg1)],
1262        );
1263
1264        // Enqueuing a transaction with a receiving object that is available at the time it is enqueued
1265        // should become immediately available.
1266        let gas_receiving_arg = ObjectArg::Receiving(gas_objects[3].compute_object_reference());
1267        let tx1 = make_transaction(
1268            gas_objects[0].clone(),
1269            vec![CallArg::Object(gas_receiving_arg)],
1270        );
1271
1272        // scheduler should output no transaction yet since waiting on receiving object.
1273        execution_scheduler.enqueue_transactions(
1274            vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1275            &state.epoch_store_for_testing(),
1276        );
1277        sleep(Duration::from_secs(1)).await;
1278        assert!(rx_ready_certificates.try_recv().is_err());
1279        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1280
1281        // scheduler should output no transaction yet since waiting on receiving object.
1282        execution_scheduler.enqueue_transactions(
1283            vec![(receive_object_transaction1.clone(), ExecutionEnv::new())],
1284            &state.epoch_store_for_testing(),
1285        );
1286        sleep(Duration::from_secs(1)).await;
1287        assert!(rx_ready_certificates.try_recv().is_err());
1288        assert_eq!(execution_scheduler.num_pending_certificates(), 2);
1289
1290        // Different transaction with a duplicate receiving object reference is allowed.
1291        // Both transaction's will be outputted once the receiving object is available.
1292        execution_scheduler.enqueue_transactions(
1293            vec![(receive_object_transaction01.clone(), ExecutionEnv::new())],
1294            &state.epoch_store_for_testing(),
1295        );
1296        sleep(Duration::from_secs(1)).await;
1297        assert!(rx_ready_certificates.try_recv().is_err());
1298        assert_eq!(execution_scheduler.num_pending_certificates(), 3);
1299
1300        // Notify scheduler that the receiving object 0 is available.
1301        state
1302            .get_cache_writer()
1303            .write_object_entry_for_test(receiving_object_new0.clone());
1304
1305        // scheduler should output both transactions depending on the receiving object now that the
1306        // transaction's receiving object has become available.
1307        rx_ready_certificates.recv().await.unwrap();
1308
1309        rx_ready_certificates.recv().await.unwrap();
1310
1311        // Only two transactions that were dependent on the receiving object should be output.
1312        assert!(rx_ready_certificates.try_recv().is_err());
1313
1314        // Enqueue a transaction with a receiving object that is available at the time it is enqueued.
1315        // This should be immediately available.
1316        execution_scheduler.enqueue_transactions(
1317            vec![(tx1.clone(), ExecutionEnv::new())],
1318            &state.epoch_store_for_testing(),
1319        );
1320        sleep(Duration::from_secs(1)).await;
1321        rx_ready_certificates.recv().await.unwrap();
1322
1323        // Notify scheduler that the receiving object 0 is available.
1324        state
1325            .get_cache_writer()
1326            .write_object_entry_for_test(receiving_object_new1.clone());
1327
1328        // scheduler should output the transaction eventually now that the receiving object has become
1329        // available.
1330        rx_ready_certificates.recv().await.unwrap();
1331    }
1332
1333    #[tokio::test(flavor = "current_thread", start_paused = true)]
1334    async fn execution_scheduler_receiving_object_ready_if_current_version_greater() {
1335        telemetry_subscribers::init_for_testing();
1336        // Initialize an authority state.
1337        let (owner, _keypair) = deterministic_random_account_key();
1338        let mut gas_objects: Vec<Object> = (0..10)
1339            .map(|_| {
1340                let gas_object_id = ObjectID::random();
1341                Object::with_id_owner_for_testing(gas_object_id, owner)
1342            })
1343            .collect();
1344        let receiving_object = Object::with_id_owner_version_for_testing(
1345            ObjectID::random(),
1346            10.into(),
1347            Owner::AddressOwner(owner),
1348        );
1349        gas_objects.push(receiving_object.clone());
1350        let state = init_state_with_objects(gas_objects.clone()).await;
1351
1352        // Create a new execution scheduler instead of reusing the authority's, to examine
1353        // execution_scheduler output from rx_ready_certificates.
1354        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1355        // scheduler should output no transaction.
1356        assert!(rx_ready_certificates.try_recv().is_err());
1357        // scheduler should be empty at the beginning.
1358        execution_scheduler.check_empty_for_testing().await;
1359
1360        let receiving_object_new0 = Object::with_id_owner_version_for_testing(
1361            receiving_object.id(),
1362            0.into(),
1363            Owner::AddressOwner(owner),
1364        );
1365        let receiving_object_new1 = Object::with_id_owner_version_for_testing(
1366            receiving_object.id(),
1367            1.into(),
1368            Owner::AddressOwner(owner),
1369        );
1370        let receiving_object_arg0 =
1371            ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1372        let receive_object_transaction0 = make_transaction(
1373            gas_objects[0].clone(),
1374            vec![CallArg::Object(receiving_object_arg0)],
1375        );
1376
1377        let receive_object_transaction01 = make_transaction(
1378            gas_objects[1].clone(),
1379            vec![CallArg::Object(receiving_object_arg0)],
1380        );
1381
1382        let receiving_object_arg1 =
1383            ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1384        let receive_object_transaction1 = make_transaction(
1385            gas_objects[0].clone(),
1386            vec![CallArg::Object(receiving_object_arg1)],
1387        );
1388
1389        // scheduler should output no transaction yet since waiting on receiving object.
1390        execution_scheduler.enqueue_transactions(
1391            vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1392            &state.epoch_store_for_testing(),
1393        );
1394        execution_scheduler.enqueue_transactions(
1395            vec![(receive_object_transaction01.clone(), ExecutionEnv::new())],
1396            &state.epoch_store_for_testing(),
1397        );
1398        execution_scheduler.enqueue_transactions(
1399            vec![(receive_object_transaction1.clone(), ExecutionEnv::new())],
1400            &state.epoch_store_for_testing(),
1401        );
1402        sleep(Duration::from_secs(1)).await;
1403        rx_ready_certificates.recv().await.unwrap();
1404        rx_ready_certificates.recv().await.unwrap();
1405        rx_ready_certificates.recv().await.unwrap();
1406        assert!(rx_ready_certificates.try_recv().is_err());
1407    }
1408
1409    // Tests transaction cancellation logic in execution scheduler. Mainly tests that for cancelled transaction,
1410    // execution scheduler only waits for all non-shared objects to be available before outputting the transaction.
1411    #[tokio::test(flavor = "current_thread", start_paused = true)]
1412    async fn execution_scheduler_with_cancelled_transactions() {
1413        // Initialize an authority state, with gas objects and 3 shared objects.
1414        let (owner, _keypair) = deterministic_random_account_key();
1415        let gas_object = Object::with_id_owner_for_testing(ObjectID::random(), owner);
1416        let shared_object_1 = Object::shared_for_testing();
1417        let initial_shared_version_1 = shared_object_1.owner().start_version().unwrap();
1418        let shared_object_2 = Object::shared_for_testing();
1419        let initial_shared_version_2 = shared_object_2.owner().start_version().unwrap();
1420        let owned_object = Object::with_id_owner_for_testing(ObjectID::random(), owner);
1421
1422        let state = init_state_with_objects(vec![
1423            gas_object.clone(),
1424            shared_object_1.clone(),
1425            shared_object_2.clone(),
1426            owned_object.clone(),
1427        ])
1428        .await;
1429
1430        // Create a new execution scheduler instead of reusing the authority's, to examine
1431        // execution_scheduler output from rx_ready_certificates.
1432        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1433        // scheduler should output no transaction.
1434        assert!(rx_ready_certificates.try_recv().is_err());
1435
1436        // Enqueue one transaction with 2 shared object inputs and 1 owned input.
1437        let shared_object_arg_1 = ObjectArg::SharedObject {
1438            id: shared_object_1.id(),
1439            initial_shared_version: initial_shared_version_1,
1440            mutability: SharedObjectMutability::Mutable,
1441        };
1442        let shared_object_arg_2 = ObjectArg::SharedObject {
1443            id: shared_object_2.id(),
1444            initial_shared_version: initial_shared_version_2,
1445            mutability: SharedObjectMutability::Mutable,
1446        };
1447
1448        // Changes the desired owned object version to a higher version. We will make it available later.
1449        let owned_version = 2000.into();
1450        let mut owned_ref = owned_object.compute_object_reference();
1451        owned_ref.1 = owned_version;
1452        let owned_object_arg = ObjectArg::ImmOrOwnedObject(owned_ref);
1453
1454        let cancelled_transaction = make_transaction(
1455            gas_object.clone(),
1456            vec![
1457                CallArg::Object(shared_object_arg_1),
1458                CallArg::Object(shared_object_arg_2),
1459                CallArg::Object(owned_object_arg),
1460            ],
1461        );
1462        let assigned_versions = vec![
1463            (
1464                (
1465                    shared_object_1.id(),
1466                    shared_object_1.owner().start_version().unwrap(),
1467                ),
1468                SequenceNumber::CANCELLED_READ,
1469            ),
1470            (
1471                (
1472                    shared_object_2.id(),
1473                    shared_object_2.owner().start_version().unwrap(),
1474                ),
1475                SequenceNumber::CONGESTED,
1476            ),
1477        ];
1478
1479        execution_scheduler.enqueue_transactions(
1480            vec![(
1481                cancelled_transaction.clone(),
1482                ExecutionEnv::new()
1483                    .with_assigned_versions(AssignedVersions::new(assigned_versions, None)),
1484            )],
1485            &state.epoch_store_for_testing(),
1486        );
1487
1488        // scheduler should output no transaction yet.
1489        sleep(Duration::from_secs(1)).await;
1490        assert!(rx_ready_certificates.try_recv().is_err());
1491
1492        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1493
1494        // Notify scheduler about availability of the owned object.
1495        let mut new_owned_object = owned_object.clone();
1496        new_owned_object
1497            .data
1498            .try_as_move_mut()
1499            .unwrap()
1500            .increment_version_to(owned_version);
1501        state
1502            .get_cache_writer()
1503            .write_object_entry_for_test(new_owned_object);
1504
1505        // scheduler should output the transaction as soon as the owned object is available.
1506        let available_txn = rx_ready_certificates.recv().await.unwrap().certificate;
1507        assert_eq!(available_txn.digest(), cancelled_transaction.digest());
1508
1509        sleep(Duration::from_secs(1)).await;
1510        assert!(rx_ready_certificates.try_recv().is_err());
1511
1512        execution_scheduler.check_empty_for_testing().await;
1513    }
1514
1515    #[test]
1516    fn test_barrier_dependency_builder() {
1517        let make_transaction = |non_exclusive_writes: Vec<u32>, exclusive_writes: Vec<u32>| {
1518            assert!(
1519                non_exclusive_writes
1520                    .iter()
1521                    .all(|id| !exclusive_writes.contains(id))
1522            );
1523            assert!(
1524                exclusive_writes
1525                    .iter()
1526                    .all(|id| !non_exclusive_writes.contains(id))
1527            );
1528
1529            let non_exclusive_writes = non_exclusive_writes
1530                .into_iter()
1531                .map(|id| ObjectID::from_single_byte(id as u8));
1532            let exclusive_writes = exclusive_writes
1533                .into_iter()
1534                .map(|id| ObjectID::from_single_byte(id as u8));
1535            let mut builder = ProgrammableTransactionBuilder::new();
1536            for non_exclusive_write in non_exclusive_writes {
1537                builder
1538                    .obj(ObjectArg::SharedObject {
1539                        id: non_exclusive_write,
1540                        initial_shared_version: SequenceNumber::new(),
1541                        mutability: SharedObjectMutability::NonExclusiveWrite,
1542                    })
1543                    .unwrap();
1544            }
1545
1546            for exclusive_write in exclusive_writes {
1547                builder
1548                    .obj(ObjectArg::SharedObject {
1549                        id: exclusive_write,
1550                        initial_shared_version: SequenceNumber::new(),
1551                        mutability: SharedObjectMutability::Mutable,
1552                    })
1553                    .unwrap();
1554            }
1555
1556            let tx = TransactionKind::ProgrammableTransaction(builder.finish());
1557            let tx_data =
1558                TransactionData::new(tx, SuiAddress::default(), random_object_ref(), 1, 1);
1559            Transaction::from_data_and_signer(tx_data, vec![])
1560        };
1561
1562        // One non-exclusive write, one exclusive write.
1563        {
1564            let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1565            let tx1 = make_transaction(vec![1], vec![]);
1566            let tx2 = make_transaction(vec![], vec![1]);
1567
1568            let tx1_deps =
1569                barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1570            let tx2_deps =
1571                barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1572            assert!(tx1_deps.is_empty());
1573            assert_eq!(Vec::from_iter(tx2_deps), vec![*tx1.digest()]);
1574        }
1575
1576        // One transaction has non-exclusive writes to two different objects, and then becomes
1577        // a dependency of two barriers
1578        {
1579            let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1580            let tx1 = make_transaction(vec![1, 2], vec![]);
1581            let tx2 = make_transaction(vec![], vec![1]);
1582            let tx3 = make_transaction(vec![], vec![2]);
1583
1584            let tx1_deps =
1585                barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1586            let tx2_deps =
1587                barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1588            let tx3_deps =
1589                barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1590            assert!(tx1_deps.is_empty());
1591            assert_eq!(Vec::from_iter(tx2_deps), vec![*tx1.digest()]);
1592            assert_eq!(Vec::from_iter(tx3_deps), vec![*tx1.digest()]);
1593        }
1594
1595        // Ensure multiple-object dependences are merged
1596        {
1597            let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1598            let tx1 = make_transaction(vec![1], vec![]);
1599            let tx2 = make_transaction(vec![2], vec![]);
1600            let tx3 = make_transaction(vec![], vec![1, 2]);
1601
1602            let tx1_deps =
1603                barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1604            let tx2_deps =
1605                barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1606            let tx3_deps =
1607                barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1608            assert!(tx1_deps.is_empty());
1609            assert!(tx2_deps.is_empty());
1610            assert_eq!(tx3_deps, BTreeSet::from([*tx1.digest(), *tx2.digest()]));
1611        }
1612
1613        // Ensure dependency state is cleared
1614        {
1615            let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1616            let tx1 = make_transaction(vec![1], vec![]);
1617            let tx2 = make_transaction(vec![], vec![1]);
1618            let tx3 = make_transaction(vec![], vec![1]);
1619
1620            let tx1_deps =
1621                barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1622            let tx2_deps =
1623                barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1624            let tx3_deps =
1625                barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1626            assert!(tx1_deps.is_empty());
1627            assert_eq!(tx2_deps, BTreeSet::from([*tx1.digest()]));
1628            assert!(tx3_deps.is_empty());
1629        }
1630    }
1631}