sui_core/execution_scheduler/
execution_scheduler_impl.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{
5    accumulators::funds_read::AccountFundsRead,
6    authority::{
7        AuthorityMetrics, ExecutionEnv, authority_per_epoch_store::AuthorityPerEpochStore,
8        epoch_start_configuration::EpochStartConfigTrait,
9        shared_object_version_manager::Schedulable,
10    },
11    execution_cache::{ObjectCacheRead, TransactionCacheRead},
12    execution_scheduler::{
13        ExecutingGuard, PendingCertificateStats,
14        funds_withdraw_scheduler::{
15            AddressFundsSchedulerMetrics, FundsSettlement, ScheduleStatus, TxFundsWithdraw,
16            WithdrawReservations, scheduler::FundsWithdrawScheduler,
17        },
18    },
19};
20use futures::stream::{FuturesUnordered, StreamExt};
21use mysten_common::ZipDebugEqIteratorExt;
22use mysten_common::{assert_reachable, debug_fatal};
23use mysten_metrics::spawn_monitored_task;
24use parking_lot::Mutex;
25use std::{
26    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
27    sync::Arc,
28};
29use sui_config::node::{AuthorityOverloadConfig, FundsWithdrawSchedulerType};
30use sui_types::{
31    SUI_ACCUMULATOR_ROOT_OBJECT_ID,
32    base_types::{FullObjectID, ObjectID},
33    digests::TransactionDigest,
34    error::SuiResult,
35    executable_transaction::VerifiedExecutableTransaction,
36    storage::InputKey,
37    transaction::{
38        SenderSignedData, SharedInputObject, SharedObjectMutability, TransactionData,
39        TransactionDataAPI, TransactionKey,
40    },
41};
42use tokio::sync::mpsc::UnboundedSender;
43use tokio::time::Instant;
44use tracing::{debug, error, instrument};
45
46use super::{PendingCertificate, overload_tracker::OverloadTracker};
47
48/// Utility struct for collecting barrier dependencies
49pub(crate) struct BarrierDependencyBuilder {
50    dep_state: BTreeMap<ObjectID, BTreeSet<TransactionDigest>>,
51}
52
53impl BarrierDependencyBuilder {
54    pub fn new() -> Self {
55        Self {
56            dep_state: Default::default(),
57        }
58    }
59
60    /// process_tx must be called for each transaction in scheduling order. If the
61    /// transaction has a non-exclusive write to an object, the transaction digest is
62    /// stored to become a dependency of the eventual barrier transaction. If a
63    /// transaction has an exclusive write to an object, all pending non-exclusive write
64    /// transactions for that object are added to the barrier dependencies.
65    pub fn process_tx(
66        &mut self,
67        tx_digest: TransactionDigest,
68        tx: &TransactionData,
69    ) -> BTreeSet<TransactionDigest> {
70        let mut barrier_deps = BTreeSet::new();
71        for SharedInputObject { id, mutability, .. } in tx.kind().shared_input_objects() {
72            match mutability {
73                SharedObjectMutability::NonExclusiveWrite => {
74                    self.dep_state.entry(id).or_default().insert(tx_digest);
75                }
76                SharedObjectMutability::Mutable => {
77                    // If there were preceding non-exclusive writes to this object id, this
78                    // transaction is a barrier and must wait for them to finish.
79                    if let Some(deps) = self.dep_state.remove(&id) {
80                        barrier_deps.extend(deps);
81                    }
82                }
83                SharedObjectMutability::Immutable => (),
84            }
85        }
86        barrier_deps
87    }
88}
89
90#[derive(Clone)]
91pub struct ExecutionScheduler {
92    object_cache_read: Arc<dyn ObjectCacheRead>,
93    transaction_cache_read: Arc<dyn TransactionCacheRead>,
94    overload_tracker: Arc<OverloadTracker>,
95    tx_ready_certificates: UnboundedSender<PendingCertificate>,
96    address_funds_withdraw_scheduler: Arc<Mutex<Option<FundsWithdrawScheduler>>>,
97    funds_withdraw_scheduler_type: FundsWithdrawSchedulerType,
98    metrics: Arc<AuthorityMetrics>,
99    address_funds_scheduler_metrics: Arc<AddressFundsSchedulerMetrics>,
100}
101
102struct PendingGuard<'a> {
103    scheduler: &'a ExecutionScheduler,
104    cert: &'a VerifiedExecutableTransaction,
105}
106
107impl<'a> PendingGuard<'a> {
108    pub fn new(scheduler: &'a ExecutionScheduler, cert: &'a VerifiedExecutableTransaction) -> Self {
109        scheduler
110            .metrics
111            .transaction_manager_num_pending_certificates
112            .inc();
113        scheduler
114            .overload_tracker
115            .add_pending_certificate(cert.data());
116        Self { scheduler, cert }
117    }
118}
119
120impl Drop for PendingGuard<'_> {
121    fn drop(&mut self) {
122        self.scheduler
123            .metrics
124            .transaction_manager_num_pending_certificates
125            .dec();
126        self.scheduler
127            .overload_tracker
128            .remove_pending_certificate(self.cert.data());
129    }
130}
131
132impl ExecutionScheduler {
133    pub fn new(
134        object_cache_read: Arc<dyn ObjectCacheRead>,
135        account_funds_read: Arc<dyn AccountFundsRead>,
136        transaction_cache_read: Arc<dyn TransactionCacheRead>,
137        tx_ready_certificates: UnboundedSender<PendingCertificate>,
138        epoch_store: &Arc<AuthorityPerEpochStore>,
139        funds_withdraw_scheduler_type: FundsWithdrawSchedulerType,
140        metrics: Arc<AuthorityMetrics>,
141        prometheus_registry: &prometheus::Registry,
142    ) -> Self {
143        tracing::info!(
144            ?funds_withdraw_scheduler_type,
145            "Creating new ExecutionScheduler"
146        );
147        let address_funds_scheduler_metrics =
148            Arc::new(AddressFundsSchedulerMetrics::new(prometheus_registry));
149        let address_funds_withdraw_scheduler = Self::initialize_funds_withdraw_scheduler(
150            epoch_store,
151            &object_cache_read,
152            account_funds_read,
153            funds_withdraw_scheduler_type,
154            &address_funds_scheduler_metrics,
155        );
156        Self {
157            object_cache_read,
158            transaction_cache_read,
159            overload_tracker: Arc::new(OverloadTracker::new()),
160            tx_ready_certificates,
161            address_funds_withdraw_scheduler: Arc::new(Mutex::new(
162                address_funds_withdraw_scheduler,
163            )),
164            funds_withdraw_scheduler_type,
165            metrics,
166            address_funds_scheduler_metrics,
167        }
168    }
169
170    fn initialize_funds_withdraw_scheduler(
171        epoch_store: &Arc<AuthorityPerEpochStore>,
172        object_cache_read: &Arc<dyn ObjectCacheRead>,
173        account_funds_read: Arc<dyn AccountFundsRead>,
174        scheduler_type: FundsWithdrawSchedulerType,
175        address_funds_scheduler_metrics: &Arc<AddressFundsSchedulerMetrics>,
176    ) -> Option<FundsWithdrawScheduler> {
177        let withdraw_scheduler_enabled =
178            epoch_store.is_validator() && epoch_store.accumulators_enabled();
179        if !withdraw_scheduler_enabled {
180            return None;
181        }
182        let starting_accumulator_version = object_cache_read
183            .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
184            .expect("Accumulator root object must be present if funds accumulator is enabled")
185            .version();
186        let address_funds_withdraw_scheduler = FundsWithdrawScheduler::new(
187            account_funds_read.clone(),
188            starting_accumulator_version,
189            scheduler_type,
190            address_funds_scheduler_metrics.clone(),
191        );
192
193        Some(address_funds_withdraw_scheduler)
194    }
195
196    #[instrument(level = "debug", skip_all, fields(tx_digest = ?cert.digest()))]
197    async fn schedule_transaction(
198        self,
199        cert: VerifiedExecutableTransaction,
200        execution_env: ExecutionEnv,
201        epoch_store: &Arc<AuthorityPerEpochStore>,
202    ) {
203        let enqueue_time = Instant::now();
204        let tx_digest = cert.digest();
205        let digests = [*tx_digest];
206
207        let tx_data = cert.transaction_data();
208        let input_object_kinds = tx_data
209            .input_objects()
210            .expect("input_objects() cannot fail");
211        let mut input_object_keys: Vec<_> = epoch_store
212            .get_input_object_keys(
213                &cert.key(),
214                &input_object_kinds,
215                &execution_env.assigned_versions,
216            )
217            .into_iter()
218            .collect();
219
220        // Coin reservation transactions need to wait for the accumulator root object
221        // to reach the assigned accumulator version. This ensures settlement transactions
222        // (which create/update accumulator account objects) execute before coin reservation
223        // transactions that depend on those objects.
224        if tx_data.kind().has_coin_reservations()
225            && let Some(accumulator_version) = execution_env.assigned_versions.accumulator_version
226            && let Some(initial_shared_version) =
227                (**epoch_store.epoch_start_config()).accumulator_root_obj_initial_shared_version()
228        {
229            input_object_keys.push(InputKey::VersionedObject {
230                id: FullObjectID::new(SUI_ACCUMULATOR_ROOT_OBJECT_ID, Some(initial_shared_version)),
231                version: accumulator_version,
232            });
233        }
234
235        let receiving_object_keys: HashSet<_> = tx_data
236            .receiving_objects()
237            .into_iter()
238            .map(|entry| {
239                InputKey::VersionedObject {
240                    // TODO: Add support for receiving ConsensusV2 objects. For now this assumes fastpath.
241                    id: FullObjectID::new(entry.0, None),
242                    version: entry.1,
243                }
244            })
245            .collect();
246        let input_and_receiving_keys = [
247            input_object_keys,
248            receiving_object_keys.iter().cloned().collect(),
249        ]
250        .concat();
251
252        let epoch = epoch_store.epoch();
253        debug!(
254            ?tx_digest,
255            "Scheduled transaction, waiting for input objects: {:?}", input_and_receiving_keys,
256        );
257
258        let availability = self
259            .object_cache_read
260            .multi_input_objects_available_cache_only(&input_and_receiving_keys);
261        // Most of the times, the transaction's input objects are already available.
262        // We can check the availability of the input objects first, and only wait for the
263        // missing input objects if necessary.
264        let missing_input_keys: Vec<_> = input_and_receiving_keys
265            .into_iter()
266            .zip_debug_eq(availability)
267            .filter_map(|(key, available)| if !available { Some(key) } else { None })
268            .collect();
269
270        let has_missing_barrier_dependencies = self
271            .transaction_cache_read
272            .multi_get_executed_effects_digests(&execution_env.barrier_dependencies)
273            .into_iter()
274            .any(|r| r.is_none());
275
276        if missing_input_keys.is_empty() && !has_missing_barrier_dependencies {
277            self.metrics
278                .transaction_manager_num_enqueued_certificates
279                .with_label_values(&["ready"])
280                .inc();
281            debug!(?tx_digest, "Input objects already available");
282            self.send_transaction_for_execution(&cert, execution_env, enqueue_time);
283            return;
284        }
285
286        let _pending_guard = PendingGuard::new(&self, &cert);
287        self.metrics
288            .transaction_manager_num_enqueued_certificates
289            .with_label_values(&["pending"])
290            .inc();
291
292        if !execution_env.barrier_dependencies.is_empty() {
293            debug!(
294                "waiting for barrier dependencies to be executed: {:?}",
295                execution_env.barrier_dependencies
296            );
297            self.transaction_cache_read
298                .notify_read_executed_effects_digests(
299                    "wait_for_barrier_dependencies",
300                    &execution_env.barrier_dependencies,
301                )
302                .await;
303        }
304
305        tokio::select! {
306            _ = self.object_cache_read
307                .notify_read_input_objects(&missing_input_keys, &receiving_object_keys, epoch)
308                => {
309                    self.metrics
310                        .transaction_manager_transaction_queue_age_s
311                        .observe(enqueue_time.elapsed().as_secs_f64());
312                    debug!(?tx_digest, "Input objects available");
313                    // TODO: Eventually we could fold execution_driver into the scheduler.
314                    self.send_transaction_for_execution(
315                        &cert,
316                        execution_env,
317                        enqueue_time,
318                    );
319                }
320            _ = self.transaction_cache_read.notify_read_executed_effects_digests(
321                "ExecutionScheduler::notify_read_executed_effects_digests",
322                &digests,
323            ) => {
324                debug!(?tx_digest, "Transaction already executed");
325            }
326        };
327    }
328
329    pub fn send_transaction_for_execution(
330        &self,
331        cert: &VerifiedExecutableTransaction,
332        execution_env: ExecutionEnv,
333        enqueue_time: Instant,
334    ) {
335        let pending_cert = PendingCertificate {
336            certificate: cert.clone(),
337            execution_env,
338            stats: PendingCertificateStats {
339                enqueue_time,
340                ready_time: Some(Instant::now()),
341            },
342            executing_guard: Some(ExecutingGuard::new(
343                self.metrics
344                    .transaction_manager_num_executing_certificates
345                    .clone(),
346            )),
347        };
348        let _ = self.tx_ready_certificates.send(pending_cert);
349    }
350
351    fn schedule_funds_withdraws(
352        &self,
353        certs: Vec<(VerifiedExecutableTransaction, ExecutionEnv)>,
354        epoch_store: &Arc<AuthorityPerEpochStore>,
355    ) {
356        if certs.is_empty() {
357            return;
358        }
359        let mut withdraws = BTreeMap::new();
360        let mut prev_version = None;
361        for (cert, env) in &certs {
362            let tx_withdraws = cert
363                .transaction_data()
364                .process_funds_withdrawals_for_execution(epoch_store.get_chain_identifier());
365            assert!(!tx_withdraws.is_empty());
366            let accumulator_version = env
367                .assigned_versions
368                .accumulator_version
369                .expect("accumulator_version must be set when there are withdraws");
370            if let Some(prev_version) = prev_version {
371                // Transactions must be in order.
372                assert!(prev_version <= accumulator_version);
373            }
374            prev_version = Some(accumulator_version);
375            let tx_digest = *cert.digest();
376            withdraws
377                .entry(accumulator_version)
378                .or_insert(Vec::new())
379                .push(TxFundsWithdraw {
380                    tx_digest,
381                    reservations: tx_withdraws,
382                });
383        }
384        let mut receivers = FuturesUnordered::new();
385        {
386            let guard = self.address_funds_withdraw_scheduler.lock();
387            let withdraw_scheduler = guard
388                .as_ref()
389                .expect("Funds withdraw scheduler must be enabled if there are withdraws");
390            for (version, tx_withdraws) in withdraws {
391                receivers.extend(withdraw_scheduler.schedule_withdraws(WithdrawReservations {
392                    accumulator_version: version,
393                    withdraws: tx_withdraws,
394                }));
395            }
396            // guard will be dropped here
397        }
398        let scheduler = self.clone();
399        let epoch_store = epoch_store.clone();
400        spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
401            let mut cert_map = HashMap::new();
402            for (cert, env) in certs {
403                cert_map.insert(*cert.digest(), (cert, env));
404            }
405            while let Some(result) = receivers.next().await {
406                match result {
407                    Ok((tx_digest, status)) => match status {
408                        ScheduleStatus::InsufficientFunds => {
409                            assert_reachable!("tx cancelled, insufficient funds");
410                            debug!(
411                                ?tx_digest,
412                                "Funds withdraw scheduling result: Insufficient funds"
413                            );
414                            let (cert, env) = cert_map.remove(&tx_digest).expect("cert must exist");
415                            let env = env.with_insufficient_funds();
416                            scheduler.enqueue_transactions(vec![(cert, env)], &epoch_store);
417                        }
418                        ScheduleStatus::SufficientFunds => {
419                            assert_reachable!("tx scheduled, sufficient funds");
420                            debug!(?tx_digest, "Funds withdraw scheduling result: Success");
421                            let (cert, env) = cert_map.remove(&tx_digest).expect("cert must exist");
422                            scheduler.enqueue_transactions(vec![(cert, env)], &epoch_store);
423                        }
424                        ScheduleStatus::SkipSchedule => {
425                            assert_reachable!("tx withdrawal scheduling skipped");
426                            debug!(?tx_digest, "Skip scheduling funds withdraw");
427                        }
428                    },
429                    Err(e) => {
430                        error!("Withdraw scheduler stopped: {:?}", e);
431                    }
432                }
433            }
434        }));
435    }
436
437    fn schedule_tx_keys(
438        &self,
439        tx_with_keys: Vec<(TransactionKey, ExecutionEnv)>,
440        epoch_store: &Arc<AuthorityPerEpochStore>,
441    ) {
442        if tx_with_keys.is_empty() {
443            return;
444        }
445
446        let scheduler = self.clone();
447        let epoch_store = epoch_store.clone();
448        spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
449            let tx_keys: Vec<_> = tx_with_keys.iter().map(|(key, _)| key).cloned().collect();
450            let digests = epoch_store
451                .notify_read_tx_key_to_digest(&tx_keys)
452                .await
453                .expect("db error");
454            let transactions = scheduler
455                .transaction_cache_read
456                .multi_get_transaction_blocks(&digests)
457                .into_iter()
458                .map(|tx| {
459                    let tx = tx.expect("tx must exist").as_ref().clone();
460                    VerifiedExecutableTransaction::new_system(tx, epoch_store.epoch())
461                })
462                .zip_debug_eq(tx_with_keys.into_iter().map(|(_, env)| env))
463                .collect::<Vec<_>>();
464            scheduler.enqueue_transactions(transactions, &epoch_store);
465        }));
466    }
467
468    /// When we schedule a certificate, it should be impossible for it to have been executed in a
469    /// previous epoch.
470    #[cfg(debug_assertions)]
471    fn assert_cert_not_executed_previous_epochs(&self, cert: &VerifiedExecutableTransaction) {
472        let epoch = cert.epoch();
473        let digest = *cert.digest();
474        let digests = [digest];
475        let executed = self
476            .transaction_cache_read
477            .multi_get_executed_effects(&digests)
478            .pop()
479            .unwrap();
480        // Due to pruning, we may not always have an executed effects for the certificate
481        // even if it was executed. So this is a best-effort check.
482        if let Some(executed) = executed {
483            use sui_types::effects::TransactionEffectsAPI;
484
485            assert_eq!(
486                executed.executed_epoch(),
487                epoch,
488                "Transaction {} was executed in epoch {}, but scheduled again in epoch {}",
489                digest,
490                executed.executed_epoch(),
491                epoch
492            );
493        }
494    }
495}
496
497impl ExecutionScheduler {
498    pub fn enqueue(
499        &self,
500        certs: Vec<(Schedulable, ExecutionEnv)>,
501        epoch_store: &Arc<AuthorityPerEpochStore>,
502    ) {
503        // schedule all transactions immediately
504        let mut ordinary_txns = Vec::with_capacity(certs.len());
505        let mut tx_with_keys = Vec::new();
506        let mut tx_with_withdraws = Vec::new();
507
508        for (schedulable, env) in certs {
509            match schedulable {
510                Schedulable::Transaction(tx) => {
511                    if tx.transaction_data().has_funds_withdrawals() {
512                        tx_with_withdraws.push((tx, env));
513                    } else {
514                        ordinary_txns.push((tx, env));
515                    }
516                }
517                s @ Schedulable::RandomnessStateUpdate(..) => {
518                    tx_with_keys.push((s.key(), env));
519                }
520                Schedulable::AccumulatorSettlement(_, _) => {
521                    unreachable!("handled by SettlementScheduler");
522                }
523                Schedulable::ConsensusCommitPrologue(_, _, _) => {
524                    // we only use Schedulable::ConsensusCommitPrologue as a temporary placeholder
525                    // during version assignment, by the time we schedule transactions it should be
526                    // converted to Schedulable::Transaction
527                    unreachable!("Schedulable::ConsensusCommitPrologue should not be enqueued");
528                }
529            }
530        }
531
532        self.enqueue_transactions(ordinary_txns, epoch_store);
533        self.schedule_tx_keys(tx_with_keys, epoch_store);
534        self.schedule_funds_withdraws(tx_with_withdraws, epoch_store);
535    }
536
537    pub fn enqueue_transactions(
538        &self,
539        certs: Vec<(VerifiedExecutableTransaction, ExecutionEnv)>,
540        epoch_store: &Arc<AuthorityPerEpochStore>,
541    ) {
542        // Filter out certificates from wrong epoch.
543        let certs: Vec<_> = certs
544            .into_iter()
545            .filter_map(|cert| {
546                if cert.0.epoch() == epoch_store.epoch() {
547                    #[cfg(debug_assertions)]
548                    self.assert_cert_not_executed_previous_epochs(&cert.0);
549
550                    Some(cert)
551                } else {
552                    debug_fatal!(
553                        "We should never enqueue certificate from wrong epoch. Expected={} Certificate={:?}",
554                        epoch_store.epoch(),
555                        cert.0.epoch()
556                    );
557                    None
558                }
559            })
560            .collect();
561        let digests: Vec<_> = certs.iter().map(|(cert, _)| *cert.digest()).collect();
562        let executed = self
563            .transaction_cache_read
564            .multi_get_executed_effects_digests(&digests);
565        let mut already_executed_certs_num = 0;
566        let pending_certs = certs.into_iter().zip_debug_eq(executed).filter_map(
567            |((cert, execution_env), executed)| {
568                if executed.is_none() {
569                    Some((cert, execution_env))
570                } else {
571                    already_executed_certs_num += 1;
572                    None
573                }
574            },
575        );
576
577        for (cert, execution_env) in pending_certs {
578            let scheduler = self.clone();
579            let epoch_store = epoch_store.clone();
580            spawn_monitored_task!(
581                epoch_store.within_alive_epoch(scheduler.schedule_transaction(
582                    cert,
583                    execution_env,
584                    &epoch_store,
585                ))
586            );
587        }
588
589        self.metrics
590            .transaction_manager_num_enqueued_certificates
591            .with_label_values(&["already_executed"])
592            .inc_by(already_executed_certs_num);
593    }
594
595    pub fn settle_address_funds(&self, settlement: FundsSettlement) {
596        self.address_funds_withdraw_scheduler
597            .lock()
598            .as_ref()
599            .expect("Funds withdraw scheduler must be enabled if there are settlements")
600            .settle_funds(settlement);
601    }
602
603    /// Reconfigure internal state at epoch start. This resets the funds withdraw scheduler
604    /// to the current accumulator root object version.
605    pub fn reconfigure(
606        &self,
607        new_epoch_store: &Arc<AuthorityPerEpochStore>,
608        account_funds_read: &Arc<dyn AccountFundsRead>,
609    ) {
610        let address_funds_withdraw_scheduler = Self::initialize_funds_withdraw_scheduler(
611            new_epoch_store,
612            &self.object_cache_read,
613            account_funds_read.clone(),
614            self.funds_withdraw_scheduler_type,
615            &self.address_funds_scheduler_metrics,
616        );
617        let mut guard = self.address_funds_withdraw_scheduler.lock();
618        if let Some(old_scheduler) = guard.as_ref() {
619            old_scheduler.close_epoch();
620        }
621        *guard = address_funds_withdraw_scheduler;
622        drop(guard);
623    }
624
625    pub fn check_execution_overload(
626        &self,
627        overload_config: &AuthorityOverloadConfig,
628        tx_data: &SenderSignedData,
629    ) -> SuiResult {
630        let inflight_queue_len = self.num_pending_certificates();
631        self.overload_tracker
632            .check_execution_overload(overload_config, tx_data, inflight_queue_len)
633    }
634
635    pub fn num_pending_certificates(&self) -> usize {
636        (self
637            .metrics
638            .transaction_manager_num_pending_certificates
639            .get()
640            + self
641                .metrics
642                .transaction_manager_num_executing_certificates
643                .get()) as usize
644    }
645
646    #[cfg(test)]
647    pub async fn check_empty_for_testing(&self) {
648        for _ in 0..10 {
649            if self.num_pending_certificates() == 0 {
650                return;
651            }
652            tokio::task::yield_now().await;
653        }
654        assert_eq!(self.num_pending_certificates(), 0);
655    }
656}
657
658#[cfg(test)]
659mod test {
660    use super::{
661        BarrierDependencyBuilder, ExecutionScheduler, FundsWithdrawSchedulerType,
662        PendingCertificate,
663    };
664    use crate::authority::ExecutionEnv;
665    use crate::authority::shared_object_version_manager::AssignedVersions;
666    use crate::authority::{AuthorityState, authority_tests::init_state_with_objects};
667    use std::collections::BTreeSet;
668    use std::{time::Duration, vec};
669    use sui_test_transaction_builder::TestTransactionBuilder;
670    use sui_types::base_types::{SuiAddress, random_object_ref};
671    use sui_types::executable_transaction::VerifiedExecutableTransaction;
672    use sui_types::object::Owner;
673    use sui_types::programmable_transaction_builder::ProgrammableTransactionBuilder;
674    use sui_types::transaction::{
675        SharedObjectMutability, Transaction, TransactionData, TransactionKind, VerifiedTransaction,
676    };
677    use sui_types::{
678        SUI_FRAMEWORK_PACKAGE_ID,
679        base_types::{ObjectID, SequenceNumber},
680        crypto::deterministic_random_account_key,
681        object::Object,
682        transaction::{CallArg, ObjectArg},
683    };
684    use tokio::time::Instant;
685    use tokio::{
686        sync::mpsc::{UnboundedReceiver, error::TryRecvError, unbounded_channel},
687        time::sleep,
688    };
689
690    #[allow(clippy::disallowed_methods)] // allow unbounded_channel()
691    fn make_execution_scheduler(
692        state: &AuthorityState,
693    ) -> (ExecutionScheduler, UnboundedReceiver<PendingCertificate>) {
694        // Create a new execution scheduler instead of reusing the authority's, to examine
695        // execution_scheduler output from rx_ready_certificates.
696        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
697        let registry = prometheus::Registry::new();
698        let execution_scheduler = ExecutionScheduler::new(
699            state.get_object_cache_reader().clone(),
700            state.get_account_funds_read().clone(),
701            state.get_transaction_cache_reader().clone(),
702            tx_ready_certificates,
703            &state.epoch_store_for_testing(),
704            FundsWithdrawSchedulerType::default(),
705            state.metrics.clone(),
706            &registry,
707        );
708
709        (execution_scheduler, rx_ready_certificates)
710    }
711
712    fn make_transaction(gas_object: Object, input: Vec<CallArg>) -> VerifiedExecutableTransaction {
713        // Use fake module, function, package and gas prices since they are irrelevant for testing
714        // execution scheduler.
715        let rgp = 100;
716        let (sender, keypair) = deterministic_random_account_key();
717        let transaction =
718            TestTransactionBuilder::new(sender, gas_object.compute_object_reference(), rgp)
719                .move_call(SUI_FRAMEWORK_PACKAGE_ID, "counter", "assert_value", input)
720                .build_and_sign(&keypair);
721        VerifiedExecutableTransaction::new_system(
722            VerifiedTransaction::new_unchecked(transaction),
723            0,
724        )
725    }
726
727    #[tokio::test(flavor = "current_thread", start_paused = true)]
728    async fn execution_scheduler_basics() {
729        // Initialize an authority state.
730        let (owner, _keypair) = deterministic_random_account_key();
731        let gas_objects: Vec<Object> = (0..10)
732            .map(|_| {
733                let gas_object_id = ObjectID::random();
734                Object::with_id_owner_for_testing(gas_object_id, owner)
735            })
736            .collect();
737        let state = init_state_with_objects(gas_objects.clone()).await;
738
739        // Create a new execution scheduler instead of reusing the authority's, to examine
740        // execution_scheduler output from rx_ready_certificates.
741        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
742        // scheduler should output no transaction.
743        assert!(
744            rx_ready_certificates
745                .try_recv()
746                .is_err_and(|err| err == TryRecvError::Empty)
747        );
748        // scheduler should be empty at the beginning.
749        assert_eq!(execution_scheduler.num_pending_certificates(), 0);
750
751        // Enqueue empty vec should not crash.
752        execution_scheduler.enqueue_transactions(vec![], &state.epoch_store_for_testing());
753        // scheduler should output no transaction.
754        assert!(
755            rx_ready_certificates
756                .try_recv()
757                .is_err_and(|err| err == TryRecvError::Empty)
758        );
759
760        // Enqueue a transaction with existing gas object, empty input.
761        let transaction = make_transaction(gas_objects[0].clone(), vec![]);
762        let tx_start_time = Instant::now();
763        execution_scheduler.enqueue_transactions(
764            vec![(transaction.clone(), ExecutionEnv::new())],
765            &state.epoch_store_for_testing(),
766        );
767        // scheduler should output the transaction eventually.
768        let pending_certificate = rx_ready_certificates.recv().await.unwrap();
769
770        // Tests that pending certificate stats are recorded properly.
771        assert!(pending_certificate.stats.enqueue_time >= tx_start_time);
772        assert!(
773            pending_certificate.stats.ready_time.unwrap() >= pending_certificate.stats.enqueue_time
774        );
775
776        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
777
778        // Predent we have just executed the transaction.
779        drop(pending_certificate);
780
781        // scheduler should be empty.
782        execution_scheduler.check_empty_for_testing().await;
783
784        // Enqueue a transaction with a new gas object, empty input.
785        let gas_object_new = Object::with_id_owner_version_for_testing(
786            ObjectID::random(),
787            0.into(),
788            Owner::AddressOwner(owner),
789        );
790        let transaction = make_transaction(gas_object_new.clone(), vec![]);
791        let tx_start_time = Instant::now();
792        execution_scheduler.enqueue_transactions(
793            vec![(transaction.clone(), ExecutionEnv::new())],
794            &state.epoch_store_for_testing(),
795        );
796        // scheduler should output no transaction yet.
797        sleep(Duration::from_secs(1)).await;
798        assert!(
799            rx_ready_certificates
800                .try_recv()
801                .is_err_and(|err| err == TryRecvError::Empty)
802        );
803
804        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
805
806        // Duplicated enqueue is allowed.
807        execution_scheduler.enqueue_transactions(
808            vec![(transaction.clone(), ExecutionEnv::new())],
809            &state.epoch_store_for_testing(),
810        );
811        sleep(Duration::from_secs(1)).await;
812        assert!(
813            rx_ready_certificates
814                .try_recv()
815                .is_err_and(|err| err == TryRecvError::Empty)
816        );
817
818        assert_eq!(execution_scheduler.num_pending_certificates(), 2);
819
820        // Notify scheduler about availability of the gas object.
821        state
822            .get_cache_writer()
823            .write_object_entry_for_test(gas_object_new);
824        // scheduler should output the transaction eventually.
825        // We will see both the original and the duplicated transaction.
826        let pending_certificate = rx_ready_certificates.recv().await.unwrap();
827        let pending_certificate2 = rx_ready_certificates.recv().await.unwrap();
828        assert_eq!(
829            pending_certificate.certificate.digest(),
830            pending_certificate2.certificate.digest()
831        );
832
833        // Tests that pending certificate stats are recorded properly. The ready time should be
834        // 2 seconds apart from the enqueue time.
835        assert!(pending_certificate.stats.enqueue_time >= tx_start_time);
836        assert!(
837            pending_certificate.stats.ready_time.unwrap() - pending_certificate.stats.enqueue_time
838                >= Duration::from_secs(2)
839        );
840
841        // Predent we have just executed the transaction.
842        drop(pending_certificate);
843        drop(pending_certificate2);
844
845        // scheduler should be empty at the end.
846        execution_scheduler.check_empty_for_testing().await;
847    }
848
849    // Tests when objects become available, correct set of transactions can be sent to execute.
850    // Specifically, we have following setup,
851    //         shared_object     shared_object_2
852    //       /    |    \     \    /
853    //    tx_0  tx_1  tx_2    tx_3
854    //     r      r     w      r
855    // And when shared_object is available, tx_0, tx_1, and tx_2 can be executed. And when
856    // shared_object_2 becomes available, tx_3 can be executed.
857    #[tokio::test(flavor = "current_thread", start_paused = true)]
858    async fn execution_scheduler_object_dependency() {
859        telemetry_subscribers::init_for_testing();
860        // Initialize an authority state, with gas objects and a shared object.
861        let (owner, _keypair) = deterministic_random_account_key();
862        let gas_objects: Vec<Object> = (0..10)
863            .map(|_| {
864                let gas_object_id = ObjectID::random();
865                Object::with_id_owner_for_testing(gas_object_id, owner)
866            })
867            .collect();
868        let shared_object = Object::shared_for_testing();
869        let initial_shared_version = shared_object.owner().start_version().unwrap();
870        let shared_object_2 = Object::shared_for_testing();
871        let initial_shared_version_2 = shared_object_2.owner().start_version().unwrap();
872
873        let state = init_state_with_objects(
874            [
875                gas_objects.clone(),
876                vec![shared_object.clone(), shared_object_2.clone()],
877            ]
878            .concat(),
879        )
880        .await;
881
882        // Create a new execution scheduler instead of reusing the authority's, to examine
883        // execution_scheduler output from rx_ready_certificates.
884        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
885        // scheduler should output no transaction.
886        assert!(rx_ready_certificates.try_recv().is_err());
887
888        // Enqueue two transactions with the same shared object input in read-only mode.
889        let shared_version = 1000.into();
890        let shared_object_arg_read = ObjectArg::SharedObject {
891            id: shared_object.id(),
892            initial_shared_version,
893            mutability: SharedObjectMutability::Immutable,
894        };
895        let transaction_read_0 = make_transaction(
896            gas_objects[0].clone(),
897            vec![CallArg::Object(shared_object_arg_read)],
898        );
899        let transaction_read_1 = make_transaction(
900            gas_objects[1].clone(),
901            vec![CallArg::Object(shared_object_arg_read)],
902        );
903        let tx_read_0_assigned_versions = vec![(
904            (
905                shared_object.id(),
906                shared_object.owner().start_version().unwrap(),
907            ),
908            shared_version,
909        )];
910        let tx_read_1_assigned_versions = vec![(
911            (
912                shared_object.id(),
913                shared_object.owner().start_version().unwrap(),
914            ),
915            shared_version,
916        )];
917
918        // Enqueue one transaction with the same shared object in mutable mode.
919        let shared_object_arg_default = ObjectArg::SharedObject {
920            id: shared_object.id(),
921            initial_shared_version,
922            mutability: SharedObjectMutability::Mutable,
923        };
924        let transaction_default = make_transaction(
925            gas_objects[2].clone(),
926            vec![CallArg::Object(shared_object_arg_default)],
927        );
928        let tx_default_assigned_versions = vec![(
929            (
930                shared_object.id(),
931                shared_object.owner().start_version().unwrap(),
932            ),
933            shared_version,
934        )];
935
936        // Enqueue one transaction with two readonly shared object inputs, `shared_object` and `shared_object_2`.
937        let shared_version_2 = 1000.into();
938        let shared_object_arg_read_2 = ObjectArg::SharedObject {
939            id: shared_object_2.id(),
940            initial_shared_version: initial_shared_version_2,
941            mutability: SharedObjectMutability::Immutable,
942        };
943        let transaction_read_2 = make_transaction(
944            gas_objects[3].clone(),
945            vec![
946                CallArg::Object(shared_object_arg_default),
947                CallArg::Object(shared_object_arg_read_2),
948            ],
949        );
950        let tx_read_2_assigned_versions = vec![
951            (
952                (
953                    shared_object.id(),
954                    shared_object.owner().start_version().unwrap(),
955                ),
956                shared_version,
957            ),
958            (
959                (
960                    shared_object_2.id(),
961                    shared_object_2.owner().start_version().unwrap(),
962                ),
963                shared_version_2,
964            ),
965        ];
966
967        execution_scheduler.enqueue_transactions(
968            vec![
969                (
970                    transaction_read_0.clone(),
971                    ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
972                        tx_read_0_assigned_versions,
973                        None,
974                    )),
975                ),
976                (
977                    transaction_read_1.clone(),
978                    ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
979                        tx_read_1_assigned_versions,
980                        None,
981                    )),
982                ),
983                (
984                    transaction_default.clone(),
985                    ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
986                        tx_default_assigned_versions,
987                        None,
988                    )),
989                ),
990                (
991                    transaction_read_2.clone(),
992                    ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
993                        tx_read_2_assigned_versions,
994                        None,
995                    )),
996                ),
997            ],
998            &state.epoch_store_for_testing(),
999        );
1000
1001        // scheduler should output no transaction yet.
1002        sleep(Duration::from_secs(1)).await;
1003        assert!(rx_ready_certificates.try_recv().is_err());
1004
1005        assert_eq!(execution_scheduler.num_pending_certificates(), 4);
1006
1007        // Notify scheduler about availability of the first shared object.
1008        let mut new_shared_object = shared_object.clone();
1009        new_shared_object
1010            .data
1011            .try_as_move_mut()
1012            .unwrap()
1013            .increment_version_to(shared_version_2);
1014        state
1015            .get_cache_writer()
1016            .write_object_entry_for_test(new_shared_object);
1017
1018        // scheduler should output the 3 transactions that are only waiting for this object.
1019        let tx_0 = rx_ready_certificates.recv().await.unwrap().certificate;
1020        let tx_1 = rx_ready_certificates.recv().await.unwrap().certificate;
1021        let tx_2 = rx_ready_certificates.recv().await.unwrap().certificate;
1022        {
1023            let mut want_digests = vec![
1024                transaction_read_0.digest(),
1025                transaction_read_1.digest(),
1026                transaction_default.digest(),
1027            ];
1028            want_digests.sort();
1029            let mut got_digests = vec![tx_0.digest(), tx_1.digest(), tx_2.digest()];
1030            got_digests.sort();
1031            assert_eq!(want_digests, got_digests);
1032        }
1033
1034        sleep(Duration::from_secs(1)).await;
1035        assert!(rx_ready_certificates.try_recv().is_err());
1036
1037        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1038
1039        // Make shared_object_2 available.
1040        let mut new_shared_object_2 = shared_object_2.clone();
1041        new_shared_object_2
1042            .data
1043            .try_as_move_mut()
1044            .unwrap()
1045            .increment_version_to(shared_version_2);
1046        state
1047            .get_cache_writer()
1048            .write_object_entry_for_test(new_shared_object_2);
1049
1050        // Now, the transaction waiting for both shared objects can be executed.
1051        let tx_3 = rx_ready_certificates.recv().await.unwrap().certificate;
1052        assert_eq!(transaction_read_2.digest(), tx_3.digest());
1053
1054        sleep(Duration::from_secs(1)).await;
1055        assert!(rx_ready_certificates.try_recv().is_err());
1056
1057        execution_scheduler.check_empty_for_testing().await;
1058    }
1059
1060    #[tokio::test(flavor = "current_thread", start_paused = true)]
1061    async fn execution_scheduler_receiving_notify_commit() {
1062        telemetry_subscribers::init_for_testing();
1063        // Initialize an authority state.
1064        let (owner, _keypair) = deterministic_random_account_key();
1065        let gas_objects: Vec<Object> = (0..10)
1066            .map(|_| {
1067                let gas_object_id = ObjectID::random();
1068                Object::with_id_owner_for_testing(gas_object_id, owner)
1069            })
1070            .collect();
1071        let state = init_state_with_objects(gas_objects.clone()).await;
1072
1073        // Create a new execution scheduler instead of reusing the authority's, to examine
1074        // execution_scheduler output from rx_ready_certificates.
1075        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1076        // scheduler should output no transaction.
1077        assert!(rx_ready_certificates.try_recv().is_err());
1078        // scheduler should be empty at the beginning.
1079        execution_scheduler.check_empty_for_testing().await;
1080
1081        let obj_id = ObjectID::random();
1082        let object_arguments: Vec<_> = (0..10)
1083            .map(|i| {
1084                let object = Object::with_id_owner_version_for_testing(
1085                    obj_id,
1086                    i.into(),
1087                    Owner::AddressOwner(owner),
1088                );
1089                // Every other transaction receives the object, and we create a run of multiple receives in
1090                // a row at the beginning to test that the scheduler doesn't get stuck in either configuration of:
1091                // ImmOrOwnedObject => Receiving,
1092                // Receiving => Receiving
1093                // Receiving => ImmOrOwnedObject
1094                // ImmOrOwnedObject => ImmOrOwnedObject is already tested as the default case on mainnet.
1095                let object_arg = if i % 2 == 0 || i == 3 {
1096                    ObjectArg::Receiving(object.compute_object_reference())
1097                } else {
1098                    ObjectArg::ImmOrOwnedObject(object.compute_object_reference())
1099                };
1100                let txn =
1101                    make_transaction(gas_objects[0].clone(), vec![CallArg::Object(object_arg)]);
1102                (object, txn)
1103            })
1104            .collect();
1105
1106        for (i, (_, txn)) in object_arguments.iter().enumerate() {
1107            // scheduler should output no transaction yet since waiting on receiving object or
1108            // ImmOrOwnedObject input.
1109            execution_scheduler.enqueue_transactions(
1110                vec![(txn.clone(), ExecutionEnv::new())],
1111                &state.epoch_store_for_testing(),
1112            );
1113            sleep(Duration::from_secs(1)).await;
1114            assert!(rx_ready_certificates.try_recv().is_err());
1115            assert_eq!(execution_scheduler.num_pending_certificates(), i + 1);
1116        }
1117
1118        // Now start to unravel the transactions by notifying that each subsequent
1119        // transaction has been processed.
1120        let len = object_arguments.len();
1121        for (i, (object, txn)) in object_arguments.into_iter().enumerate() {
1122            // Mark the object as available.
1123            // We should now eventually see the transaction as ready.
1124            state
1125                .get_cache_writer()
1126                .write_object_entry_for_test(object.clone());
1127
1128            // scheduler should output the transaction eventually now that the receiving object has become
1129            // available.
1130            rx_ready_certificates.recv().await.unwrap();
1131
1132            // Only one transaction at a time should become available though. So if we try to get
1133            // another one it should fail.
1134            sleep(Duration::from_secs(1)).await;
1135            assert!(rx_ready_certificates.try_recv().is_err());
1136
1137            // Notify the scheduler that the transaction has been processed.
1138            drop(txn);
1139
1140            // scheduler should now output another transaction to run since it the next version of that object
1141            // has become available.
1142            assert_eq!(execution_scheduler.num_pending_certificates(), len - i - 1);
1143        }
1144
1145        // After everything scheduler should be empty.
1146        execution_scheduler.check_empty_for_testing().await;
1147    }
1148
1149    #[tokio::test(flavor = "current_thread", start_paused = true)]
1150    async fn execution_scheduler_receiving_object_ready_notifications() {
1151        telemetry_subscribers::init_for_testing();
1152        // Initialize an authority state.
1153        let (owner, _keypair) = deterministic_random_account_key();
1154        let gas_objects: Vec<Object> = (0..10)
1155            .map(|_| {
1156                let gas_object_id = ObjectID::random();
1157                Object::with_id_owner_for_testing(gas_object_id, owner)
1158            })
1159            .collect();
1160        let state = init_state_with_objects(gas_objects.clone()).await;
1161
1162        // Create a new execution scheduler instead of reusing the authority's, to examine
1163        // execution_scheduler output from rx_ready_certificates.
1164        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1165        // scheduler should output no transaction.
1166        assert!(rx_ready_certificates.try_recv().is_err());
1167        // scheduler should be empty at the beginning.
1168        execution_scheduler.check_empty_for_testing().await;
1169
1170        let obj_id = ObjectID::random();
1171        let receiving_object_new0 =
1172            Object::with_id_owner_version_for_testing(obj_id, 0.into(), Owner::AddressOwner(owner));
1173        let receiving_object_new1 =
1174            Object::with_id_owner_version_for_testing(obj_id, 1.into(), Owner::AddressOwner(owner));
1175        let receiving_object_arg0 =
1176            ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1177        let receive_object_transaction0 = make_transaction(
1178            gas_objects[0].clone(),
1179            vec![CallArg::Object(receiving_object_arg0)],
1180        );
1181
1182        let receiving_object_arg1 =
1183            ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1184        let receive_object_transaction1 = make_transaction(
1185            gas_objects[0].clone(),
1186            vec![CallArg::Object(receiving_object_arg1)],
1187        );
1188
1189        // scheduler should output no transaction yet since waiting on receiving object.
1190        execution_scheduler.enqueue_transactions(
1191            vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1192            &state.epoch_store_for_testing(),
1193        );
1194        sleep(Duration::from_secs(1)).await;
1195        assert!(rx_ready_certificates.try_recv().is_err());
1196        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1197
1198        // scheduler should output no transaction yet since waiting on receiving object.
1199        execution_scheduler.enqueue_transactions(
1200            vec![(receive_object_transaction1.clone(), ExecutionEnv::new())],
1201            &state.epoch_store_for_testing(),
1202        );
1203        sleep(Duration::from_secs(1)).await;
1204        assert!(rx_ready_certificates.try_recv().is_err());
1205        assert_eq!(execution_scheduler.num_pending_certificates(), 2);
1206
1207        // Duplicate enqueue of receiving object is allowed.
1208        execution_scheduler.enqueue_transactions(
1209            vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1210            &state.epoch_store_for_testing(),
1211        );
1212        sleep(Duration::from_secs(1)).await;
1213        assert!(rx_ready_certificates.try_recv().is_err());
1214        assert_eq!(execution_scheduler.num_pending_certificates(), 3);
1215
1216        // Notify scheduler that the receiving object 0 is available.
1217        state
1218            .get_cache_writer()
1219            .write_object_entry_for_test(receiving_object_new0.clone());
1220
1221        // scheduler should output the transaction eventually now that the receiving object has become
1222        // available.
1223        rx_ready_certificates.recv().await.unwrap();
1224
1225        // Notify scheduler that the receiving object 0 is available.
1226        state
1227            .get_cache_writer()
1228            .write_object_entry_for_test(receiving_object_new1.clone());
1229
1230        // scheduler should output the transaction eventually now that the receiving object has become
1231        // available.
1232        rx_ready_certificates.recv().await.unwrap();
1233    }
1234
1235    #[tokio::test(flavor = "current_thread", start_paused = true)]
1236    async fn execution_scheduler_receiving_object_ready_notifications_multiple_of_same_receiving() {
1237        telemetry_subscribers::init_for_testing();
1238        // Initialize an authority state.
1239        let (owner, _keypair) = deterministic_random_account_key();
1240        let gas_objects: Vec<Object> = (0..10)
1241            .map(|_| {
1242                let gas_object_id = ObjectID::random();
1243                Object::with_id_owner_for_testing(gas_object_id, owner)
1244            })
1245            .collect();
1246        let state = init_state_with_objects(gas_objects.clone()).await;
1247
1248        // Create a new execution scheduler instead of reusing the authority's, to examine
1249        // execution_scheduler output from rx_ready_certificates.
1250        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1251        // scheduler should output no transaction.
1252        assert!(rx_ready_certificates.try_recv().is_err());
1253        // scheduler should be empty at the beginning.
1254        execution_scheduler.check_empty_for_testing().await;
1255
1256        let obj_id = ObjectID::random();
1257        let receiving_object_new0 =
1258            Object::with_id_owner_version_for_testing(obj_id, 0.into(), Owner::AddressOwner(owner));
1259        let receiving_object_new1 =
1260            Object::with_id_owner_version_for_testing(obj_id, 1.into(), Owner::AddressOwner(owner));
1261        let receiving_object_arg0 =
1262            ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1263        let receive_object_transaction0 = make_transaction(
1264            gas_objects[0].clone(),
1265            vec![CallArg::Object(receiving_object_arg0)],
1266        );
1267
1268        let receive_object_transaction01 = make_transaction(
1269            gas_objects[1].clone(),
1270            vec![CallArg::Object(receiving_object_arg0)],
1271        );
1272
1273        let receiving_object_arg1 =
1274            ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1275        let receive_object_transaction1 = make_transaction(
1276            gas_objects[0].clone(),
1277            vec![CallArg::Object(receiving_object_arg1)],
1278        );
1279
1280        // Enqueuing a transaction with a receiving object that is available at the time it is enqueued
1281        // should become immediately available.
1282        let gas_receiving_arg = ObjectArg::Receiving(gas_objects[3].compute_object_reference());
1283        let tx1 = make_transaction(
1284            gas_objects[0].clone(),
1285            vec![CallArg::Object(gas_receiving_arg)],
1286        );
1287
1288        // scheduler should output no transaction yet since waiting on receiving object.
1289        execution_scheduler.enqueue_transactions(
1290            vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1291            &state.epoch_store_for_testing(),
1292        );
1293        sleep(Duration::from_secs(1)).await;
1294        assert!(rx_ready_certificates.try_recv().is_err());
1295        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1296
1297        // scheduler should output no transaction yet since waiting on receiving object.
1298        execution_scheduler.enqueue_transactions(
1299            vec![(receive_object_transaction1.clone(), ExecutionEnv::new())],
1300            &state.epoch_store_for_testing(),
1301        );
1302        sleep(Duration::from_secs(1)).await;
1303        assert!(rx_ready_certificates.try_recv().is_err());
1304        assert_eq!(execution_scheduler.num_pending_certificates(), 2);
1305
1306        // Different transaction with a duplicate receiving object reference is allowed.
1307        // Both transaction's will be outputted once the receiving object is available.
1308        execution_scheduler.enqueue_transactions(
1309            vec![(receive_object_transaction01.clone(), ExecutionEnv::new())],
1310            &state.epoch_store_for_testing(),
1311        );
1312        sleep(Duration::from_secs(1)).await;
1313        assert!(rx_ready_certificates.try_recv().is_err());
1314        assert_eq!(execution_scheduler.num_pending_certificates(), 3);
1315
1316        // Notify scheduler that the receiving object 0 is available.
1317        state
1318            .get_cache_writer()
1319            .write_object_entry_for_test(receiving_object_new0.clone());
1320
1321        // scheduler should output both transactions depending on the receiving object now that the
1322        // transaction's receiving object has become available.
1323        rx_ready_certificates.recv().await.unwrap();
1324
1325        rx_ready_certificates.recv().await.unwrap();
1326
1327        // Only two transactions that were dependent on the receiving object should be output.
1328        assert!(rx_ready_certificates.try_recv().is_err());
1329
1330        // Enqueue a transaction with a receiving object that is available at the time it is enqueued.
1331        // This should be immediately available.
1332        execution_scheduler.enqueue_transactions(
1333            vec![(tx1.clone(), ExecutionEnv::new())],
1334            &state.epoch_store_for_testing(),
1335        );
1336        sleep(Duration::from_secs(1)).await;
1337        rx_ready_certificates.recv().await.unwrap();
1338
1339        // Notify scheduler that the receiving object 0 is available.
1340        state
1341            .get_cache_writer()
1342            .write_object_entry_for_test(receiving_object_new1.clone());
1343
1344        // scheduler should output the transaction eventually now that the receiving object has become
1345        // available.
1346        rx_ready_certificates.recv().await.unwrap();
1347    }
1348
1349    #[tokio::test(flavor = "current_thread", start_paused = true)]
1350    async fn execution_scheduler_receiving_object_ready_if_current_version_greater() {
1351        telemetry_subscribers::init_for_testing();
1352        // Initialize an authority state.
1353        let (owner, _keypair) = deterministic_random_account_key();
1354        let mut gas_objects: Vec<Object> = (0..10)
1355            .map(|_| {
1356                let gas_object_id = ObjectID::random();
1357                Object::with_id_owner_for_testing(gas_object_id, owner)
1358            })
1359            .collect();
1360        let receiving_object = Object::with_id_owner_version_for_testing(
1361            ObjectID::random(),
1362            10.into(),
1363            Owner::AddressOwner(owner),
1364        );
1365        gas_objects.push(receiving_object.clone());
1366        let state = init_state_with_objects(gas_objects.clone()).await;
1367
1368        // Create a new execution scheduler instead of reusing the authority's, to examine
1369        // execution_scheduler output from rx_ready_certificates.
1370        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1371        // scheduler should output no transaction.
1372        assert!(rx_ready_certificates.try_recv().is_err());
1373        // scheduler should be empty at the beginning.
1374        execution_scheduler.check_empty_for_testing().await;
1375
1376        let receiving_object_new0 = Object::with_id_owner_version_for_testing(
1377            receiving_object.id(),
1378            0.into(),
1379            Owner::AddressOwner(owner),
1380        );
1381        let receiving_object_new1 = Object::with_id_owner_version_for_testing(
1382            receiving_object.id(),
1383            1.into(),
1384            Owner::AddressOwner(owner),
1385        );
1386        let receiving_object_arg0 =
1387            ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1388        let receive_object_transaction0 = make_transaction(
1389            gas_objects[0].clone(),
1390            vec![CallArg::Object(receiving_object_arg0)],
1391        );
1392
1393        let receive_object_transaction01 = make_transaction(
1394            gas_objects[1].clone(),
1395            vec![CallArg::Object(receiving_object_arg0)],
1396        );
1397
1398        let receiving_object_arg1 =
1399            ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1400        let receive_object_transaction1 = make_transaction(
1401            gas_objects[0].clone(),
1402            vec![CallArg::Object(receiving_object_arg1)],
1403        );
1404
1405        // scheduler should output no transaction yet since waiting on receiving object.
1406        execution_scheduler.enqueue_transactions(
1407            vec![(receive_object_transaction0.clone(), ExecutionEnv::new())],
1408            &state.epoch_store_for_testing(),
1409        );
1410        execution_scheduler.enqueue_transactions(
1411            vec![(receive_object_transaction01.clone(), ExecutionEnv::new())],
1412            &state.epoch_store_for_testing(),
1413        );
1414        execution_scheduler.enqueue_transactions(
1415            vec![(receive_object_transaction1.clone(), ExecutionEnv::new())],
1416            &state.epoch_store_for_testing(),
1417        );
1418        sleep(Duration::from_secs(1)).await;
1419        rx_ready_certificates.recv().await.unwrap();
1420        rx_ready_certificates.recv().await.unwrap();
1421        rx_ready_certificates.recv().await.unwrap();
1422        assert!(rx_ready_certificates.try_recv().is_err());
1423    }
1424
1425    // Tests transaction cancellation logic in execution scheduler. Mainly tests that for cancelled transaction,
1426    // execution scheduler only waits for all non-shared objects to be available before outputting the transaction.
1427    #[tokio::test(flavor = "current_thread", start_paused = true)]
1428    async fn execution_scheduler_with_cancelled_transactions() {
1429        // Initialize an authority state, with gas objects and 3 shared objects.
1430        let (owner, _keypair) = deterministic_random_account_key();
1431        let gas_object = Object::with_id_owner_for_testing(ObjectID::random(), owner);
1432        let shared_object_1 = Object::shared_for_testing();
1433        let initial_shared_version_1 = shared_object_1.owner().start_version().unwrap();
1434        let shared_object_2 = Object::shared_for_testing();
1435        let initial_shared_version_2 = shared_object_2.owner().start_version().unwrap();
1436        let owned_object = Object::with_id_owner_for_testing(ObjectID::random(), owner);
1437
1438        let state = init_state_with_objects(vec![
1439            gas_object.clone(),
1440            shared_object_1.clone(),
1441            shared_object_2.clone(),
1442            owned_object.clone(),
1443        ])
1444        .await;
1445
1446        // Create a new execution scheduler instead of reusing the authority's, to examine
1447        // execution_scheduler output from rx_ready_certificates.
1448        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1449        // scheduler should output no transaction.
1450        assert!(rx_ready_certificates.try_recv().is_err());
1451
1452        // Enqueue one transaction with 2 shared object inputs and 1 owned input.
1453        let shared_object_arg_1 = ObjectArg::SharedObject {
1454            id: shared_object_1.id(),
1455            initial_shared_version: initial_shared_version_1,
1456            mutability: SharedObjectMutability::Mutable,
1457        };
1458        let shared_object_arg_2 = ObjectArg::SharedObject {
1459            id: shared_object_2.id(),
1460            initial_shared_version: initial_shared_version_2,
1461            mutability: SharedObjectMutability::Mutable,
1462        };
1463
1464        // Changes the desired owned object version to a higher version. We will make it available later.
1465        let owned_version = 2000.into();
1466        let mut owned_ref = owned_object.compute_object_reference();
1467        owned_ref.1 = owned_version;
1468        let owned_object_arg = ObjectArg::ImmOrOwnedObject(owned_ref);
1469
1470        let cancelled_transaction = make_transaction(
1471            gas_object.clone(),
1472            vec![
1473                CallArg::Object(shared_object_arg_1),
1474                CallArg::Object(shared_object_arg_2),
1475                CallArg::Object(owned_object_arg),
1476            ],
1477        );
1478        let assigned_versions = vec![
1479            (
1480                (
1481                    shared_object_1.id(),
1482                    shared_object_1.owner().start_version().unwrap(),
1483                ),
1484                SequenceNumber::CANCELLED_READ,
1485            ),
1486            (
1487                (
1488                    shared_object_2.id(),
1489                    shared_object_2.owner().start_version().unwrap(),
1490                ),
1491                SequenceNumber::CONGESTED,
1492            ),
1493        ];
1494
1495        execution_scheduler.enqueue_transactions(
1496            vec![(
1497                cancelled_transaction.clone(),
1498                ExecutionEnv::new()
1499                    .with_assigned_versions(AssignedVersions::new(assigned_versions, None)),
1500            )],
1501            &state.epoch_store_for_testing(),
1502        );
1503
1504        // scheduler should output no transaction yet.
1505        sleep(Duration::from_secs(1)).await;
1506        assert!(rx_ready_certificates.try_recv().is_err());
1507
1508        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1509
1510        // Notify scheduler about availability of the owned object.
1511        let mut new_owned_object = owned_object.clone();
1512        new_owned_object
1513            .data
1514            .try_as_move_mut()
1515            .unwrap()
1516            .increment_version_to(owned_version);
1517        state
1518            .get_cache_writer()
1519            .write_object_entry_for_test(new_owned_object);
1520
1521        // scheduler should output the transaction as soon as the owned object is available.
1522        let available_txn = rx_ready_certificates.recv().await.unwrap().certificate;
1523        assert_eq!(available_txn.digest(), cancelled_transaction.digest());
1524
1525        sleep(Duration::from_secs(1)).await;
1526        assert!(rx_ready_certificates.try_recv().is_err());
1527
1528        execution_scheduler.check_empty_for_testing().await;
1529    }
1530
1531    #[test]
1532    fn test_barrier_dependency_builder() {
1533        let make_transaction = |non_exclusive_writes: Vec<u32>, exclusive_writes: Vec<u32>| {
1534            assert!(
1535                non_exclusive_writes
1536                    .iter()
1537                    .all(|id| !exclusive_writes.contains(id))
1538            );
1539            assert!(
1540                exclusive_writes
1541                    .iter()
1542                    .all(|id| !non_exclusive_writes.contains(id))
1543            );
1544
1545            let non_exclusive_writes = non_exclusive_writes
1546                .into_iter()
1547                .map(|id| ObjectID::from_single_byte(id as u8));
1548            let exclusive_writes = exclusive_writes
1549                .into_iter()
1550                .map(|id| ObjectID::from_single_byte(id as u8));
1551            let mut builder = ProgrammableTransactionBuilder::new();
1552            for non_exclusive_write in non_exclusive_writes {
1553                builder
1554                    .obj(ObjectArg::SharedObject {
1555                        id: non_exclusive_write,
1556                        initial_shared_version: SequenceNumber::new(),
1557                        mutability: SharedObjectMutability::NonExclusiveWrite,
1558                    })
1559                    .unwrap();
1560            }
1561
1562            for exclusive_write in exclusive_writes {
1563                builder
1564                    .obj(ObjectArg::SharedObject {
1565                        id: exclusive_write,
1566                        initial_shared_version: SequenceNumber::new(),
1567                        mutability: SharedObjectMutability::Mutable,
1568                    })
1569                    .unwrap();
1570            }
1571
1572            let tx = TransactionKind::ProgrammableTransaction(builder.finish());
1573            let tx_data =
1574                TransactionData::new(tx, SuiAddress::default(), random_object_ref(), 1, 1);
1575            Transaction::from_data_and_signer(tx_data, vec![])
1576        };
1577
1578        // One non-exclusive write, one exclusive write.
1579        {
1580            let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1581            let tx1 = make_transaction(vec![1], vec![]);
1582            let tx2 = make_transaction(vec![], vec![1]);
1583
1584            let tx1_deps =
1585                barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1586            let tx2_deps =
1587                barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1588            assert!(tx1_deps.is_empty());
1589            assert_eq!(Vec::from_iter(tx2_deps), vec![*tx1.digest()]);
1590        }
1591
1592        // One transaction has non-exclusive writes to two different objects, and then becomes
1593        // a dependency of two barriers
1594        {
1595            let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1596            let tx1 = make_transaction(vec![1, 2], vec![]);
1597            let tx2 = make_transaction(vec![], vec![1]);
1598            let tx3 = make_transaction(vec![], vec![2]);
1599
1600            let tx1_deps =
1601                barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1602            let tx2_deps =
1603                barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1604            let tx3_deps =
1605                barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1606            assert!(tx1_deps.is_empty());
1607            assert_eq!(Vec::from_iter(tx2_deps), vec![*tx1.digest()]);
1608            assert_eq!(Vec::from_iter(tx3_deps), vec![*tx1.digest()]);
1609        }
1610
1611        // Ensure multiple-object dependences are merged
1612        {
1613            let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1614            let tx1 = make_transaction(vec![1], vec![]);
1615            let tx2 = make_transaction(vec![2], vec![]);
1616            let tx3 = make_transaction(vec![], vec![1, 2]);
1617
1618            let tx1_deps =
1619                barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1620            let tx2_deps =
1621                barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1622            let tx3_deps =
1623                barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1624            assert!(tx1_deps.is_empty());
1625            assert!(tx2_deps.is_empty());
1626            assert_eq!(tx3_deps, BTreeSet::from([*tx1.digest(), *tx2.digest()]));
1627        }
1628
1629        // Ensure dependency state is cleared
1630        {
1631            let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1632            let tx1 = make_transaction(vec![1], vec![]);
1633            let tx2 = make_transaction(vec![], vec![1]);
1634            let tx3 = make_transaction(vec![], vec![1]);
1635
1636            let tx1_deps =
1637                barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1638            let tx2_deps =
1639                barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1640            let tx3_deps =
1641                barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1642            assert!(tx1_deps.is_empty());
1643            assert_eq!(tx2_deps, BTreeSet::from([*tx1.digest()]));
1644            assert!(tx3_deps.is_empty());
1645        }
1646    }
1647}