sui_core/execution_scheduler/
execution_scheduler_impl.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{
5    authority::{
6        AuthorityMetrics, ExecutionEnv, authority_per_epoch_store::AuthorityPerEpochStore,
7        shared_object_version_manager::Schedulable,
8    },
9    execution_cache::{ObjectCacheRead, TransactionCacheRead},
10    execution_scheduler::{
11        ExecutingGuard, PendingCertificateStats,
12        balance_withdraw_scheduler::{
13            BalanceSettlement, ScheduleStatus, TxBalanceWithdraw,
14            scheduler::BalanceWithdrawScheduler,
15        },
16    },
17};
18use futures::stream::{FuturesUnordered, StreamExt};
19use mysten_common::debug_fatal;
20use mysten_metrics::spawn_monitored_task;
21use parking_lot::Mutex;
22use std::{
23    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
24    sync::Arc,
25};
26use sui_config::node::AuthorityOverloadConfig;
27use sui_types::{
28    SUI_ACCUMULATOR_ROOT_OBJECT_ID,
29    base_types::{FullObjectID, ObjectID},
30    digests::TransactionDigest,
31    error::SuiResult,
32    executable_transaction::VerifiedExecutableTransaction,
33    storage::{ChildObjectResolver, InputKey},
34    transaction::{
35        SenderSignedData, SharedInputObject, SharedObjectMutability, TransactionData,
36        TransactionDataAPI, TransactionKey,
37    },
38};
39use tokio::sync::mpsc::UnboundedSender;
40use tokio::time::Instant;
41use tracing::{debug, error, instrument};
42
43use super::{PendingCertificate, overload_tracker::OverloadTracker};
44
45/// Utility struct for collecting barrier dependencies
46pub(crate) struct BarrierDependencyBuilder {
47    dep_state: BTreeMap<ObjectID, BTreeSet<TransactionDigest>>,
48}
49
50impl BarrierDependencyBuilder {
51    pub fn new() -> Self {
52        Self {
53            dep_state: Default::default(),
54        }
55    }
56
57    /// process_tx must be called for each transaction in scheduling order. If the
58    /// transaction has a non-exclusive write to an object, the transaction digest is
59    /// stored to become a dependency of the eventual barrier transaction. If a
60    /// transaction has an exclusive write to an object, all pending non-exclusive write
61    /// transactions for that object are added to the barrier dependencies.
62    pub fn process_tx(
63        &mut self,
64        tx_digest: TransactionDigest,
65        tx: &TransactionData,
66    ) -> BTreeSet<TransactionDigest> {
67        let mut barrier_deps = BTreeSet::new();
68        for SharedInputObject { id, mutability, .. } in tx.kind().shared_input_objects() {
69            match mutability {
70                SharedObjectMutability::NonExclusiveWrite => {
71                    self.dep_state.entry(id).or_default().insert(tx_digest);
72                }
73                SharedObjectMutability::Mutable => {
74                    // If there were preceding non-exclusive writes to this object id, this
75                    // transaction is a barrier and must wait for them to finish.
76                    if let Some(deps) = self.dep_state.remove(&id) {
77                        barrier_deps.extend(deps);
78                    }
79                }
80                SharedObjectMutability::Immutable => (),
81            }
82        }
83        barrier_deps
84    }
85}
86
87#[derive(Clone)]
88pub struct ExecutionScheduler {
89    object_cache_read: Arc<dyn ObjectCacheRead>,
90    transaction_cache_read: Arc<dyn TransactionCacheRead>,
91    overload_tracker: Arc<OverloadTracker>,
92    tx_ready_certificates: UnboundedSender<PendingCertificate>,
93    balance_withdraw_scheduler: Arc<Mutex<Option<BalanceWithdrawScheduler>>>,
94    metrics: Arc<AuthorityMetrics>,
95}
96
97struct PendingGuard<'a> {
98    scheduler: &'a ExecutionScheduler,
99    cert: &'a VerifiedExecutableTransaction,
100}
101
102impl<'a> PendingGuard<'a> {
103    pub fn new(scheduler: &'a ExecutionScheduler, cert: &'a VerifiedExecutableTransaction) -> Self {
104        scheduler
105            .metrics
106            .transaction_manager_num_pending_certificates
107            .inc();
108        scheduler
109            .overload_tracker
110            .add_pending_certificate(cert.data());
111        Self { scheduler, cert }
112    }
113}
114
115impl Drop for PendingGuard<'_> {
116    fn drop(&mut self) {
117        self.scheduler
118            .metrics
119            .transaction_manager_num_pending_certificates
120            .dec();
121        self.scheduler
122            .overload_tracker
123            .remove_pending_certificate(self.cert.data());
124    }
125}
126
127impl ExecutionScheduler {
128    pub fn new(
129        object_cache_read: Arc<dyn ObjectCacheRead>,
130        child_object_resolver: Arc<dyn ChildObjectResolver + Send + Sync>,
131        transaction_cache_read: Arc<dyn TransactionCacheRead>,
132        tx_ready_certificates: UnboundedSender<PendingCertificate>,
133        epoch_store: &Arc<AuthorityPerEpochStore>,
134        metrics: Arc<AuthorityMetrics>,
135    ) -> Self {
136        tracing::info!("Creating new ExecutionScheduler");
137        let balance_withdraw_scheduler =
138            Arc::new(Mutex::new(Self::initialize_balance_withdraw_scheduler(
139                epoch_store,
140                &object_cache_read,
141                child_object_resolver,
142            )));
143        Self {
144            object_cache_read,
145            transaction_cache_read,
146            overload_tracker: Arc::new(OverloadTracker::new()),
147            tx_ready_certificates,
148            balance_withdraw_scheduler,
149            metrics,
150        }
151    }
152
153    fn initialize_balance_withdraw_scheduler(
154        epoch_store: &Arc<AuthorityPerEpochStore>,
155        object_cache_read: &Arc<dyn ObjectCacheRead>,
156        child_object_resolver: Arc<dyn ChildObjectResolver + Send + Sync>,
157    ) -> Option<BalanceWithdrawScheduler> {
158        let withdraw_scheduler_enabled =
159            epoch_store.is_validator() && epoch_store.accumulators_enabled();
160        if !withdraw_scheduler_enabled {
161            return None;
162        }
163        let starting_accumulator_version = object_cache_read
164            .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
165            .expect("Accumulator root object must be present if balance accumulator is enabled")
166            .version();
167        Some(BalanceWithdrawScheduler::new(
168            Arc::new(child_object_resolver),
169            starting_accumulator_version,
170        ))
171    }
172
173    #[instrument(level = "debug", skip_all, fields(tx_digest = ?cert.digest()))]
174    async fn schedule_transaction(
175        self,
176        cert: VerifiedExecutableTransaction,
177        execution_env: ExecutionEnv,
178        epoch_store: &Arc<AuthorityPerEpochStore>,
179    ) {
180        let enqueue_time = Instant::now();
181        let tx_digest = cert.digest();
182        let digests = [*tx_digest];
183
184        let tx_data = cert.transaction_data();
185        let input_object_kinds = tx_data
186            .input_objects()
187            .expect("input_objects() cannot fail");
188        let input_object_keys: Vec<_> = epoch_store
189            .get_input_object_keys(
190                &cert.key(),
191                &input_object_kinds,
192                &execution_env.assigned_versions,
193            )
194            .into_iter()
195            .collect();
196
197        let receiving_object_keys: HashSet<_> = tx_data
198            .receiving_objects()
199            .into_iter()
200            .map(|entry| {
201                InputKey::VersionedObject {
202                    // TODO: Add support for receiving ConsensusV2 objects. For now this assumes fastpath.
203                    id: FullObjectID::new(entry.0, None),
204                    version: entry.1,
205                }
206            })
207            .collect();
208        let input_and_receiving_keys = [
209            input_object_keys,
210            receiving_object_keys.iter().cloned().collect(),
211        ]
212        .concat();
213
214        let epoch = epoch_store.epoch();
215        debug!(
216            ?tx_digest,
217            "Scheduled transaction, waiting for input objects: {:?}", input_and_receiving_keys,
218        );
219
220        let availability = self
221            .object_cache_read
222            .multi_input_objects_available_cache_only(&input_and_receiving_keys);
223        // Most of the times, the transaction's input objects are already available.
224        // We can check the availability of the input objects first, and only wait for the
225        // missing input objects if necessary.
226        let missing_input_keys: Vec<_> = input_and_receiving_keys
227            .into_iter()
228            .zip(availability)
229            .filter_map(|(key, available)| if !available { Some(key) } else { None })
230            .collect();
231
232        let has_missing_barrier_dependencies = self
233            .transaction_cache_read
234            .multi_get_executed_effects_digests(&execution_env.barrier_dependencies)
235            .into_iter()
236            .any(|r| r.is_none());
237
238        if missing_input_keys.is_empty() && !has_missing_barrier_dependencies {
239            self.metrics
240                .transaction_manager_num_enqueued_certificates
241                .with_label_values(&["ready"])
242                .inc();
243            debug!(?tx_digest, "Input objects already available");
244            self.send_transaction_for_execution(&cert, execution_env, enqueue_time);
245            return;
246        }
247
248        let _pending_guard = PendingGuard::new(&self, &cert);
249        self.metrics
250            .transaction_manager_num_enqueued_certificates
251            .with_label_values(&["pending"])
252            .inc();
253
254        if !execution_env.barrier_dependencies.is_empty() {
255            debug!(
256                "waiting for barrier dependencies to be executed: {:?}",
257                execution_env.barrier_dependencies
258            );
259            self.transaction_cache_read
260                .notify_read_executed_effects_digests(
261                    "wait_for_barrier_dependencies",
262                    &execution_env.barrier_dependencies,
263                )
264                .await;
265        }
266
267        tokio::select! {
268            _ = self.object_cache_read
269                .notify_read_input_objects(&missing_input_keys, &receiving_object_keys, epoch)
270                => {
271                    self.metrics
272                        .transaction_manager_transaction_queue_age_s
273                        .observe(enqueue_time.elapsed().as_secs_f64());
274                    debug!(?tx_digest, "Input objects available");
275                    // TODO: Eventually we could fold execution_driver into the scheduler.
276                    self.send_transaction_for_execution(
277                        &cert,
278                        execution_env,
279                        enqueue_time,
280                    );
281                }
282            _ = self.transaction_cache_read.notify_read_executed_effects_digests(
283                "ExecutionScheduler::notify_read_executed_effects_digests",
284                &digests,
285            ) => {
286                debug!(?tx_digest, "Transaction already executed");
287            }
288        };
289    }
290
291    fn send_transaction_for_execution(
292        &self,
293        cert: &VerifiedExecutableTransaction,
294        execution_env: ExecutionEnv,
295        enqueue_time: Instant,
296    ) {
297        let pending_cert = PendingCertificate {
298            certificate: cert.clone(),
299            execution_env,
300            stats: PendingCertificateStats {
301                enqueue_time,
302                ready_time: Some(Instant::now()),
303            },
304            executing_guard: Some(ExecutingGuard::new(
305                self.metrics
306                    .transaction_manager_num_executing_certificates
307                    .clone(),
308            )),
309        };
310        let _ = self.tx_ready_certificates.send(pending_cert);
311    }
312
313    fn schedule_balance_withdraws(
314        &self,
315        certs: Vec<(VerifiedExecutableTransaction, ExecutionEnv)>,
316        epoch_store: &Arc<AuthorityPerEpochStore>,
317    ) {
318        if certs.is_empty() {
319            return;
320        }
321        let mut withdraws = BTreeMap::new();
322        let mut prev_version = None;
323        for (cert, env) in &certs {
324            let tx_withdraws = cert
325                .transaction_data()
326                .process_funds_withdrawals_for_execution();
327            assert!(!tx_withdraws.is_empty());
328            let accumulator_version = env
329                .assigned_versions
330                .accumulator_version
331                .expect("accumulator_version must be set when there are withdraws");
332            if let Some(prev_version) = prev_version {
333                // Transactions must be in order.
334                assert!(prev_version <= accumulator_version);
335            }
336            prev_version = Some(accumulator_version);
337            let tx_digest = *cert.digest();
338            withdraws
339                .entry(accumulator_version)
340                .or_insert(Vec::new())
341                .push(TxBalanceWithdraw {
342                    tx_digest,
343                    reservations: tx_withdraws,
344                });
345        }
346        let mut receivers = FuturesUnordered::new();
347        {
348            let guard = self.balance_withdraw_scheduler.lock();
349            let withdraw_scheduler = guard
350                .as_ref()
351                .expect("Balance withdraw scheduler must be enabled if there are withdraws");
352            for (version, tx_withdraws) in withdraws {
353                receivers.extend(withdraw_scheduler.schedule_withdraws(version, tx_withdraws));
354            }
355            // guard will be dropped here
356        }
357        let scheduler = self.clone();
358        let epoch_store = epoch_store.clone();
359        spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
360            let mut cert_map = HashMap::new();
361            for (cert, env) in certs {
362                cert_map.insert(*cert.digest(), (cert, env));
363            }
364            while let Some(result) = receivers.next().await {
365                match result {
366                    Ok(result) => match result.status {
367                        ScheduleStatus::InsufficientBalance => {
368                            let tx_digest = result.tx_digest;
369                            debug!(
370                                ?tx_digest,
371                                "Balance withdraw scheduling result: Insufficient balance"
372                            );
373                            let (cert, env) = cert_map.remove(&tx_digest).expect("cert must exist");
374                            let env = env.with_insufficient_balance();
375                            scheduler.enqueue_transactions(vec![(cert, env)], &epoch_store);
376                        }
377                        ScheduleStatus::SufficientBalance => {
378                            let tx_digest = result.tx_digest;
379                            debug!(?tx_digest, "Balance withdraw scheduling result: Success");
380                            let (cert, env) = cert_map.remove(&tx_digest).expect("cert must exist");
381                            let env = env.with_sufficient_balance();
382                            scheduler.enqueue_transactions(vec![(cert, env)], &epoch_store);
383                        }
384                        ScheduleStatus::SkipSchedule => {
385                            let tx_digest = result.tx_digest;
386                            debug!(?tx_digest, "Skip scheduling balance withdraw");
387                        }
388                    },
389                    Err(e) => {
390                        error!("Withdraw scheduler stopped: {:?}", e);
391                    }
392                }
393            }
394        }));
395    }
396
397    fn schedule_settlement_transactions(
398        &self,
399        settlement_txns: Vec<(TransactionKey, ExecutionEnv)>,
400        epoch_store: &Arc<AuthorityPerEpochStore>,
401    ) {
402        if !settlement_txns.is_empty() {
403            let scheduler = self.clone();
404            let epoch_store = epoch_store.clone();
405
406            spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
407                let mut futures: FuturesUnordered<_> =
408                        settlement_txns
409                            .into_iter()
410                            .map(|(key, env)| {
411                                let epoch_store = epoch_store.clone();
412                                async move {
413                                    (epoch_store.wait_for_settlement_transactions(key).await, env)
414                                }
415                            })
416                            .collect();
417
418                while let Some((txns, env)) = futures.next().await {
419                    let mut barrier_deps = BarrierDependencyBuilder::new();
420                    let txns = txns
421                        .into_iter()
422                        .map(|tx| {
423                            let deps = barrier_deps.process_tx(*tx.digest(), tx.transaction_data());
424                            let env = env.clone().with_barrier_dependencies(deps);
425                            (tx, env)
426                        })
427                        .collect::<Vec<_>>();
428
429                    scheduler.enqueue_transactions(txns, &epoch_store);
430                }
431            }));
432        }
433    }
434
435    fn schedule_tx_keys(
436        &self,
437        tx_with_keys: Vec<(TransactionKey, ExecutionEnv)>,
438        epoch_store: &Arc<AuthorityPerEpochStore>,
439    ) {
440        if tx_with_keys.is_empty() {
441            return;
442        }
443
444        let scheduler = self.clone();
445        let epoch_store = epoch_store.clone();
446        spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
447            let tx_keys: Vec<_> = tx_with_keys.iter().map(|(key, _)| key).cloned().collect();
448            let digests = epoch_store
449                .notify_read_tx_key_to_digest(&tx_keys)
450                .await
451                .expect("db error");
452            let transactions = scheduler
453                .transaction_cache_read
454                .multi_get_transaction_blocks(&digests)
455                .into_iter()
456                .map(|tx| {
457                    let tx = tx.expect("tx must exist").as_ref().clone();
458                    VerifiedExecutableTransaction::new_system(tx, epoch_store.epoch())
459                })
460                .zip(tx_with_keys.into_iter().map(|(_, env)| env))
461                .collect::<Vec<_>>();
462            scheduler.enqueue_transactions(transactions, &epoch_store);
463        }));
464    }
465
466    /// When we schedule a certificate, it should be impossible for it to have been executed in a
467    /// previous epoch.
468    #[cfg(debug_assertions)]
469    fn assert_cert_not_executed_previous_epochs(&self, cert: &VerifiedExecutableTransaction) {
470        let epoch = cert.epoch();
471        let digest = *cert.digest();
472        let digests = [digest];
473        let executed = self
474            .transaction_cache_read
475            .multi_get_executed_effects(&digests)
476            .pop()
477            .unwrap();
478        // Due to pruning, we may not always have an executed effects for the certificate
479        // even if it was executed. So this is a best-effort check.
480        if let Some(executed) = executed {
481            use sui_types::effects::TransactionEffectsAPI;
482
483            assert_eq!(
484                executed.executed_epoch(),
485                epoch,
486                "Transaction {} was executed in epoch {}, but scheduled again in epoch {}",
487                digest,
488                executed.executed_epoch(),
489                epoch
490            );
491        }
492    }
493}
494
495impl ExecutionScheduler {
496    pub fn enqueue(
497        &self,
498        certs: Vec<(Schedulable, ExecutionEnv)>,
499        epoch_store: &Arc<AuthorityPerEpochStore>,
500    ) {
501        // schedule all transactions immediately
502        let mut ordinary_txns = Vec::with_capacity(certs.len());
503        let mut tx_with_keys = Vec::new();
504        let mut tx_with_withdraws = Vec::new();
505        let mut settlement_txns = Vec::new();
506
507        for (schedulable, env) in certs {
508            match schedulable {
509                Schedulable::Transaction(tx) => {
510                    if tx.transaction_data().has_funds_withdrawals() {
511                        tx_with_withdraws.push((tx, env));
512                    } else {
513                        ordinary_txns.push((tx, env));
514                    }
515                }
516                s @ Schedulable::RandomnessStateUpdate(..) => {
517                    tx_with_keys.push((s.key(), env));
518                }
519                Schedulable::AccumulatorSettlement(_, _) => {
520                    settlement_txns.push((schedulable.key(), env));
521                }
522                Schedulable::ConsensusCommitPrologue(_, _, _) => {
523                    // we only use Schedulable::ConsensusCommitPrologue as a temporary placeholder
524                    // during version assignment, by the time we schedule transactions it should be
525                    // converted to Schedulable::Transaction
526                    unreachable!("Schedulable::ConsensusCommitPrologue should not be enqueued");
527                }
528            }
529        }
530
531        self.enqueue_transactions(ordinary_txns, epoch_store);
532        self.schedule_tx_keys(tx_with_keys, epoch_store);
533        self.schedule_balance_withdraws(tx_with_withdraws, epoch_store);
534        self.schedule_settlement_transactions(settlement_txns, epoch_store);
535    }
536
537    pub fn enqueue_transactions(
538        &self,
539        certs: Vec<(VerifiedExecutableTransaction, ExecutionEnv)>,
540        epoch_store: &Arc<AuthorityPerEpochStore>,
541    ) {
542        // Filter out certificates from wrong epoch.
543        let certs: Vec<_> = certs
544            .into_iter()
545            .filter_map(|cert| {
546                if cert.0.epoch() == epoch_store.epoch() {
547                    #[cfg(debug_assertions)]
548                    self.assert_cert_not_executed_previous_epochs(&cert.0);
549
550                    Some(cert)
551                } else {
552                    debug_fatal!(
553                        "We should never enqueue certificate from wrong epoch. Expected={} Certificate={:?}",
554                        epoch_store.epoch(),
555                        cert.0.epoch()
556                    );
557                    None
558                }
559            })
560            .collect();
561        let digests: Vec<_> = certs.iter().map(|(cert, _)| *cert.digest()).collect();
562        let executed = self
563            .transaction_cache_read
564            .multi_get_executed_effects_digests(&digests);
565        let mut already_executed_certs_num = 0;
566        let pending_certs =
567            certs
568                .into_iter()
569                .zip(executed)
570                .filter_map(|((cert, execution_env), executed)| {
571                    if executed.is_none() {
572                        Some((cert, execution_env))
573                    } else {
574                        already_executed_certs_num += 1;
575                        None
576                    }
577                });
578
579        for (cert, execution_env) in pending_certs {
580            let scheduler = self.clone();
581            let epoch_store = epoch_store.clone();
582            spawn_monitored_task!(
583                epoch_store.within_alive_epoch(scheduler.schedule_transaction(
584                    cert,
585                    execution_env,
586                    &epoch_store,
587                ))
588            );
589        }
590
591        self.metrics
592            .transaction_manager_num_enqueued_certificates
593            .with_label_values(&["already_executed"])
594            .inc_by(already_executed_certs_num);
595    }
596
597    pub fn settle_balances(&self, settlement: BalanceSettlement) {
598        self.balance_withdraw_scheduler
599            .lock()
600            .as_ref()
601            .expect("Balance withdraw scheduler must be enabled if there are settlements")
602            .settle_balances(settlement);
603    }
604
605    /// Reconfigure internal state at epoch start. This resets the balance withdraw scheduler
606    /// to the current accumulator root object version.
607    pub fn reconfigure(
608        &self,
609        new_epoch_store: &Arc<AuthorityPerEpochStore>,
610        child_object_resolver: &Arc<dyn ChildObjectResolver + Send + Sync>,
611    ) {
612        let scheduler = Self::initialize_balance_withdraw_scheduler(
613            new_epoch_store,
614            &self.object_cache_read,
615            child_object_resolver.clone(),
616        );
617        let mut guard = self.balance_withdraw_scheduler.lock();
618        if let Some(old_scheduler) = guard.as_ref() {
619            old_scheduler.close_epoch();
620        }
621        *guard = scheduler;
622    }
623
624    pub fn check_execution_overload(
625        &self,
626        overload_config: &AuthorityOverloadConfig,
627        tx_data: &SenderSignedData,
628    ) -> SuiResult {
629        let inflight_queue_len = self.num_pending_certificates();
630        self.overload_tracker
631            .check_execution_overload(overload_config, tx_data, inflight_queue_len)
632    }
633
634    pub fn num_pending_certificates(&self) -> usize {
635        (self
636            .metrics
637            .transaction_manager_num_pending_certificates
638            .get()
639            + self
640                .metrics
641                .transaction_manager_num_executing_certificates
642                .get()) as usize
643    }
644
645    #[cfg(test)]
646    pub fn check_empty_for_testing(&self) {
647        assert_eq!(self.num_pending_certificates(), 0);
648    }
649}
650
651#[cfg(test)]
652mod test {
653    use super::{BarrierDependencyBuilder, ExecutionScheduler, PendingCertificate};
654    use crate::authority::ExecutionEnv;
655    use crate::authority::shared_object_version_manager::AssignedVersions;
656    use crate::authority::{AuthorityState, authority_tests::init_state_with_objects};
657    use crate::execution_scheduler::SchedulingSource;
658    use std::collections::BTreeSet;
659    use std::{time::Duration, vec};
660    use sui_test_transaction_builder::TestTransactionBuilder;
661    use sui_types::base_types::{SuiAddress, random_object_ref};
662    use sui_types::executable_transaction::VerifiedExecutableTransaction;
663    use sui_types::object::Owner;
664    use sui_types::programmable_transaction_builder::ProgrammableTransactionBuilder;
665    use sui_types::transaction::{
666        SharedObjectMutability, Transaction, TransactionData, TransactionKind, VerifiedTransaction,
667    };
668    use sui_types::{
669        SUI_FRAMEWORK_PACKAGE_ID,
670        base_types::{ObjectID, SequenceNumber},
671        crypto::deterministic_random_account_key,
672        object::Object,
673        transaction::{CallArg, ObjectArg},
674    };
675    use tokio::time::Instant;
676    use tokio::{
677        sync::mpsc::{UnboundedReceiver, error::TryRecvError, unbounded_channel},
678        time::sleep,
679    };
680
681    #[allow(clippy::disallowed_methods)] // allow unbounded_channel()
682    fn make_execution_scheduler(
683        state: &AuthorityState,
684    ) -> (ExecutionScheduler, UnboundedReceiver<PendingCertificate>) {
685        // Create a new execution scheduler instead of reusing the authority's, to examine
686        // execution_scheduler output from rx_ready_certificates.
687        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
688        let execution_scheduler = ExecutionScheduler::new(
689            state.get_object_cache_reader().clone(),
690            state.get_child_object_resolver().clone(),
691            state.get_transaction_cache_reader().clone(),
692            tx_ready_certificates,
693            &state.epoch_store_for_testing(),
694            state.metrics.clone(),
695        );
696
697        (execution_scheduler, rx_ready_certificates)
698    }
699
700    fn make_transaction(gas_object: Object, input: Vec<CallArg>) -> VerifiedExecutableTransaction {
701        // Use fake module, function, package and gas prices since they are irrelevant for testing
702        // execution scheduler.
703        let rgp = 100;
704        let (sender, keypair) = deterministic_random_account_key();
705        let transaction =
706            TestTransactionBuilder::new(sender, gas_object.compute_object_reference(), rgp)
707                .move_call(SUI_FRAMEWORK_PACKAGE_ID, "counter", "assert_value", input)
708                .build_and_sign(&keypair);
709        VerifiedExecutableTransaction::new_system(
710            VerifiedTransaction::new_unchecked(transaction),
711            0,
712        )
713    }
714
715    #[tokio::test(flavor = "current_thread", start_paused = true)]
716    async fn execution_scheduler_basics() {
717        // Initialize an authority state.
718        let (owner, _keypair) = deterministic_random_account_key();
719        let gas_objects: Vec<Object> = (0..10)
720            .map(|_| {
721                let gas_object_id = ObjectID::random();
722                Object::with_id_owner_for_testing(gas_object_id, owner)
723            })
724            .collect();
725        let state = init_state_with_objects(gas_objects.clone()).await;
726
727        // Create a new execution scheduler instead of reusing the authority's, to examine
728        // execution_scheduler output from rx_ready_certificates.
729        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
730        // scheduler should output no transaction.
731        assert!(
732            rx_ready_certificates
733                .try_recv()
734                .is_err_and(|err| err == TryRecvError::Empty)
735        );
736        // scheduler should be empty at the beginning.
737        assert_eq!(execution_scheduler.num_pending_certificates(), 0);
738
739        // Enqueue empty vec should not crash.
740        execution_scheduler.enqueue_transactions(vec![], &state.epoch_store_for_testing());
741        // scheduler should output no transaction.
742        assert!(
743            rx_ready_certificates
744                .try_recv()
745                .is_err_and(|err| err == TryRecvError::Empty)
746        );
747
748        // Enqueue a transaction with existing gas object, empty input.
749        let transaction = make_transaction(gas_objects[0].clone(), vec![]);
750        let tx_start_time = Instant::now();
751        execution_scheduler.enqueue_transactions(
752            vec![(
753                transaction.clone(),
754                ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
755            )],
756            &state.epoch_store_for_testing(),
757        );
758        // scheduler should output the transaction eventually.
759        let pending_certificate = rx_ready_certificates.recv().await.unwrap();
760
761        // Tests that pending certificate stats are recorded properly.
762        assert!(pending_certificate.stats.enqueue_time >= tx_start_time);
763        assert!(
764            pending_certificate.stats.ready_time.unwrap() >= pending_certificate.stats.enqueue_time
765        );
766
767        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
768
769        // Predent we have just executed the transaction.
770        drop(pending_certificate);
771
772        // scheduler should be empty.
773        execution_scheduler.check_empty_for_testing();
774
775        // Enqueue a transaction with a new gas object, empty input.
776        let gas_object_new = Object::with_id_owner_version_for_testing(
777            ObjectID::random(),
778            0.into(),
779            Owner::AddressOwner(owner),
780        );
781        let transaction = make_transaction(gas_object_new.clone(), vec![]);
782        let tx_start_time = Instant::now();
783        execution_scheduler.enqueue_transactions(
784            vec![(
785                transaction.clone(),
786                ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
787            )],
788            &state.epoch_store_for_testing(),
789        );
790        // scheduler should output no transaction yet.
791        sleep(Duration::from_secs(1)).await;
792        assert!(
793            rx_ready_certificates
794                .try_recv()
795                .is_err_and(|err| err == TryRecvError::Empty)
796        );
797
798        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
799
800        // Duplicated enqueue is allowed.
801        execution_scheduler.enqueue_transactions(
802            vec![(
803                transaction.clone(),
804                ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
805            )],
806            &state.epoch_store_for_testing(),
807        );
808        sleep(Duration::from_secs(1)).await;
809        assert!(
810            rx_ready_certificates
811                .try_recv()
812                .is_err_and(|err| err == TryRecvError::Empty)
813        );
814
815        assert_eq!(execution_scheduler.num_pending_certificates(), 2);
816
817        // Notify scheduler about availability of the gas object.
818        state
819            .get_cache_writer()
820            .write_object_entry_for_test(gas_object_new);
821        // scheduler should output the transaction eventually.
822        // We will see both the original and the duplicated transaction.
823        let pending_certificate = rx_ready_certificates.recv().await.unwrap();
824        let pending_certificate2 = rx_ready_certificates.recv().await.unwrap();
825        assert_eq!(
826            pending_certificate.certificate.digest(),
827            pending_certificate2.certificate.digest()
828        );
829
830        // Tests that pending certificate stats are recorded properly. The ready time should be
831        // 2 seconds apart from the enqueue time.
832        assert!(pending_certificate.stats.enqueue_time >= tx_start_time);
833        assert!(
834            pending_certificate.stats.ready_time.unwrap() - pending_certificate.stats.enqueue_time
835                >= Duration::from_secs(2)
836        );
837
838        // Predent we have just executed the transaction.
839        drop(pending_certificate);
840        drop(pending_certificate2);
841
842        // scheduler should be empty at the end.
843        execution_scheduler.check_empty_for_testing();
844    }
845
846    // Tests when objects become available, correct set of transactions can be sent to execute.
847    // Specifically, we have following setup,
848    //         shared_object     shared_object_2
849    //       /    |    \     \    /
850    //    tx_0  tx_1  tx_2    tx_3
851    //     r      r     w      r
852    // And when shared_object is available, tx_0, tx_1, and tx_2 can be executed. And when
853    // shared_object_2 becomes available, tx_3 can be executed.
854    #[tokio::test(flavor = "current_thread", start_paused = true)]
855    async fn execution_scheduler_object_dependency() {
856        telemetry_subscribers::init_for_testing();
857        // Initialize an authority state, with gas objects and a shared object.
858        let (owner, _keypair) = deterministic_random_account_key();
859        let gas_objects: Vec<Object> = (0..10)
860            .map(|_| {
861                let gas_object_id = ObjectID::random();
862                Object::with_id_owner_for_testing(gas_object_id, owner)
863            })
864            .collect();
865        let shared_object = Object::shared_for_testing();
866        let initial_shared_version = shared_object.owner().start_version().unwrap();
867        let shared_object_2 = Object::shared_for_testing();
868        let initial_shared_version_2 = shared_object_2.owner().start_version().unwrap();
869
870        let state = init_state_with_objects(
871            [
872                gas_objects.clone(),
873                vec![shared_object.clone(), shared_object_2.clone()],
874            ]
875            .concat(),
876        )
877        .await;
878
879        // Create a new execution scheduler instead of reusing the authority's, to examine
880        // execution_scheduler output from rx_ready_certificates.
881        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
882        // scheduler should output no transaction.
883        assert!(rx_ready_certificates.try_recv().is_err());
884
885        // Enqueue two transactions with the same shared object input in read-only mode.
886        let shared_version = 1000.into();
887        let shared_object_arg_read = ObjectArg::SharedObject {
888            id: shared_object.id(),
889            initial_shared_version,
890            mutability: SharedObjectMutability::Immutable,
891        };
892        let transaction_read_0 = make_transaction(
893            gas_objects[0].clone(),
894            vec![CallArg::Object(shared_object_arg_read)],
895        );
896        let transaction_read_1 = make_transaction(
897            gas_objects[1].clone(),
898            vec![CallArg::Object(shared_object_arg_read)],
899        );
900        let tx_read_0_assigned_versions = vec![(
901            (
902                shared_object.id(),
903                shared_object.owner().start_version().unwrap(),
904            ),
905            shared_version,
906        )];
907        let tx_read_1_assigned_versions = vec![(
908            (
909                shared_object.id(),
910                shared_object.owner().start_version().unwrap(),
911            ),
912            shared_version,
913        )];
914
915        // Enqueue one transaction with the same shared object in mutable mode.
916        let shared_object_arg_default = ObjectArg::SharedObject {
917            id: shared_object.id(),
918            initial_shared_version,
919            mutability: SharedObjectMutability::Mutable,
920        };
921        let transaction_default = make_transaction(
922            gas_objects[2].clone(),
923            vec![CallArg::Object(shared_object_arg_default)],
924        );
925        let tx_default_assigned_versions = vec![(
926            (
927                shared_object.id(),
928                shared_object.owner().start_version().unwrap(),
929            ),
930            shared_version,
931        )];
932
933        // Enqueue one transaction with two readonly shared object inputs, `shared_object` and `shared_object_2`.
934        let shared_version_2 = 1000.into();
935        let shared_object_arg_read_2 = ObjectArg::SharedObject {
936            id: shared_object_2.id(),
937            initial_shared_version: initial_shared_version_2,
938            mutability: SharedObjectMutability::Immutable,
939        };
940        let transaction_read_2 = make_transaction(
941            gas_objects[3].clone(),
942            vec![
943                CallArg::Object(shared_object_arg_default),
944                CallArg::Object(shared_object_arg_read_2),
945            ],
946        );
947        let tx_read_2_assigned_versions = vec![
948            (
949                (
950                    shared_object.id(),
951                    shared_object.owner().start_version().unwrap(),
952                ),
953                shared_version,
954            ),
955            (
956                (
957                    shared_object_2.id(),
958                    shared_object_2.owner().start_version().unwrap(),
959                ),
960                shared_version_2,
961            ),
962        ];
963
964        execution_scheduler.enqueue_transactions(
965            vec![
966                (
967                    transaction_read_0.clone(),
968                    ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
969                        tx_read_0_assigned_versions,
970                        None,
971                    )),
972                ),
973                (
974                    transaction_read_1.clone(),
975                    ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
976                        tx_read_1_assigned_versions,
977                        None,
978                    )),
979                ),
980                (
981                    transaction_default.clone(),
982                    ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
983                        tx_default_assigned_versions,
984                        None,
985                    )),
986                ),
987                (
988                    transaction_read_2.clone(),
989                    ExecutionEnv::new().with_assigned_versions(AssignedVersions::new(
990                        tx_read_2_assigned_versions,
991                        None,
992                    )),
993                ),
994            ],
995            &state.epoch_store_for_testing(),
996        );
997
998        // scheduler should output no transaction yet.
999        sleep(Duration::from_secs(1)).await;
1000        assert!(rx_ready_certificates.try_recv().is_err());
1001
1002        assert_eq!(execution_scheduler.num_pending_certificates(), 4);
1003
1004        // Notify scheduler about availability of the first shared object.
1005        let mut new_shared_object = shared_object.clone();
1006        new_shared_object
1007            .data
1008            .try_as_move_mut()
1009            .unwrap()
1010            .increment_version_to(shared_version_2);
1011        state
1012            .get_cache_writer()
1013            .write_object_entry_for_test(new_shared_object);
1014
1015        // scheduler should output the 3 transactions that are only waiting for this object.
1016        let tx_0 = rx_ready_certificates.recv().await.unwrap().certificate;
1017        let tx_1 = rx_ready_certificates.recv().await.unwrap().certificate;
1018        let tx_2 = rx_ready_certificates.recv().await.unwrap().certificate;
1019        {
1020            let mut want_digests = vec![
1021                transaction_read_0.digest(),
1022                transaction_read_1.digest(),
1023                transaction_default.digest(),
1024            ];
1025            want_digests.sort();
1026            let mut got_digests = vec![tx_0.digest(), tx_1.digest(), tx_2.digest()];
1027            got_digests.sort();
1028            assert_eq!(want_digests, got_digests);
1029        }
1030
1031        sleep(Duration::from_secs(1)).await;
1032        assert!(rx_ready_certificates.try_recv().is_err());
1033
1034        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1035
1036        // Make shared_object_2 available.
1037        let mut new_shared_object_2 = shared_object_2.clone();
1038        new_shared_object_2
1039            .data
1040            .try_as_move_mut()
1041            .unwrap()
1042            .increment_version_to(shared_version_2);
1043        state
1044            .get_cache_writer()
1045            .write_object_entry_for_test(new_shared_object_2);
1046
1047        // Now, the transaction waiting for both shared objects can be executed.
1048        let tx_3 = rx_ready_certificates.recv().await.unwrap().certificate;
1049        assert_eq!(transaction_read_2.digest(), tx_3.digest());
1050
1051        sleep(Duration::from_secs(1)).await;
1052        assert!(rx_ready_certificates.try_recv().is_err());
1053
1054        execution_scheduler.check_empty_for_testing();
1055    }
1056
1057    #[tokio::test(flavor = "current_thread", start_paused = true)]
1058    async fn execution_scheduler_receiving_notify_commit() {
1059        telemetry_subscribers::init_for_testing();
1060        // Initialize an authority state.
1061        let (owner, _keypair) = deterministic_random_account_key();
1062        let gas_objects: Vec<Object> = (0..10)
1063            .map(|_| {
1064                let gas_object_id = ObjectID::random();
1065                Object::with_id_owner_for_testing(gas_object_id, owner)
1066            })
1067            .collect();
1068        let state = init_state_with_objects(gas_objects.clone()).await;
1069
1070        // Create a new execution scheduler instead of reusing the authority's, to examine
1071        // execution_scheduler output from rx_ready_certificates.
1072        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1073        // scheduler should output no transaction.
1074        assert!(rx_ready_certificates.try_recv().is_err());
1075        // scheduler should be empty at the beginning.
1076        execution_scheduler.check_empty_for_testing();
1077
1078        let obj_id = ObjectID::random();
1079        let object_arguments: Vec<_> = (0..10)
1080            .map(|i| {
1081                let object = Object::with_id_owner_version_for_testing(
1082                    obj_id,
1083                    i.into(),
1084                    Owner::AddressOwner(owner),
1085                );
1086                // Every other transaction receives the object, and we create a run of multiple receives in
1087                // a row at the beginning to test that the scheduler doesn't get stuck in either configuration of:
1088                // ImmOrOwnedObject => Receiving,
1089                // Receiving => Receiving
1090                // Receiving => ImmOrOwnedObject
1091                // ImmOrOwnedObject => ImmOrOwnedObject is already tested as the default case on mainnet.
1092                let object_arg = if i % 2 == 0 || i == 3 {
1093                    ObjectArg::Receiving(object.compute_object_reference())
1094                } else {
1095                    ObjectArg::ImmOrOwnedObject(object.compute_object_reference())
1096                };
1097                let txn =
1098                    make_transaction(gas_objects[0].clone(), vec![CallArg::Object(object_arg)]);
1099                (object, txn)
1100            })
1101            .collect();
1102
1103        for (i, (_, txn)) in object_arguments.iter().enumerate() {
1104            // scheduler should output no transaction yet since waiting on receiving object or
1105            // ImmOrOwnedObject input.
1106            execution_scheduler.enqueue_transactions(
1107                vec![(
1108                    txn.clone(),
1109                    ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1110                )],
1111                &state.epoch_store_for_testing(),
1112            );
1113            sleep(Duration::from_secs(1)).await;
1114            assert!(rx_ready_certificates.try_recv().is_err());
1115            assert_eq!(execution_scheduler.num_pending_certificates(), i + 1);
1116        }
1117
1118        // Now start to unravel the transactions by notifying that each subsequent
1119        // transaction has been processed.
1120        let len = object_arguments.len();
1121        for (i, (object, txn)) in object_arguments.into_iter().enumerate() {
1122            // Mark the object as available.
1123            // We should now eventually see the transaction as ready.
1124            state
1125                .get_cache_writer()
1126                .write_object_entry_for_test(object.clone());
1127
1128            // scheduler should output the transaction eventually now that the receiving object has become
1129            // available.
1130            rx_ready_certificates.recv().await.unwrap();
1131
1132            // Only one transaction at a time should become available though. So if we try to get
1133            // another one it should fail.
1134            sleep(Duration::from_secs(1)).await;
1135            assert!(rx_ready_certificates.try_recv().is_err());
1136
1137            // Notify the scheduler that the transaction has been processed.
1138            drop(txn);
1139
1140            // scheduler should now output another transaction to run since it the next version of that object
1141            // has become available.
1142            assert_eq!(execution_scheduler.num_pending_certificates(), len - i - 1);
1143        }
1144
1145        // After everything scheduler should be empty.
1146        execution_scheduler.check_empty_for_testing();
1147    }
1148
1149    #[tokio::test(flavor = "current_thread", start_paused = true)]
1150    async fn execution_scheduler_receiving_object_ready_notifications() {
1151        telemetry_subscribers::init_for_testing();
1152        // Initialize an authority state.
1153        let (owner, _keypair) = deterministic_random_account_key();
1154        let gas_objects: Vec<Object> = (0..10)
1155            .map(|_| {
1156                let gas_object_id = ObjectID::random();
1157                Object::with_id_owner_for_testing(gas_object_id, owner)
1158            })
1159            .collect();
1160        let state = init_state_with_objects(gas_objects.clone()).await;
1161
1162        // Create a new execution scheduler instead of reusing the authority's, to examine
1163        // execution_scheduler output from rx_ready_certificates.
1164        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1165        // scheduler should output no transaction.
1166        assert!(rx_ready_certificates.try_recv().is_err());
1167        // scheduler should be empty at the beginning.
1168        execution_scheduler.check_empty_for_testing();
1169
1170        let obj_id = ObjectID::random();
1171        let receiving_object_new0 =
1172            Object::with_id_owner_version_for_testing(obj_id, 0.into(), Owner::AddressOwner(owner));
1173        let receiving_object_new1 =
1174            Object::with_id_owner_version_for_testing(obj_id, 1.into(), Owner::AddressOwner(owner));
1175        let receiving_object_arg0 =
1176            ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1177        let receive_object_transaction0 = make_transaction(
1178            gas_objects[0].clone(),
1179            vec![CallArg::Object(receiving_object_arg0)],
1180        );
1181
1182        let receiving_object_arg1 =
1183            ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1184        let receive_object_transaction1 = make_transaction(
1185            gas_objects[0].clone(),
1186            vec![CallArg::Object(receiving_object_arg1)],
1187        );
1188
1189        // scheduler should output no transaction yet since waiting on receiving object.
1190        execution_scheduler.enqueue_transactions(
1191            vec![(
1192                receive_object_transaction0.clone(),
1193                ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1194            )],
1195            &state.epoch_store_for_testing(),
1196        );
1197        sleep(Duration::from_secs(1)).await;
1198        assert!(rx_ready_certificates.try_recv().is_err());
1199        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1200
1201        // scheduler should output no transaction yet since waiting on receiving object.
1202        execution_scheduler.enqueue_transactions(
1203            vec![(
1204                receive_object_transaction1.clone(),
1205                ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1206            )],
1207            &state.epoch_store_for_testing(),
1208        );
1209        sleep(Duration::from_secs(1)).await;
1210        assert!(rx_ready_certificates.try_recv().is_err());
1211        assert_eq!(execution_scheduler.num_pending_certificates(), 2);
1212
1213        // Duplicate enqueue of receiving object is allowed.
1214        execution_scheduler.enqueue_transactions(
1215            vec![(
1216                receive_object_transaction0.clone(),
1217                ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1218            )],
1219            &state.epoch_store_for_testing(),
1220        );
1221        sleep(Duration::from_secs(1)).await;
1222        assert!(rx_ready_certificates.try_recv().is_err());
1223        assert_eq!(execution_scheduler.num_pending_certificates(), 3);
1224
1225        // Notify scheduler that the receiving object 0 is available.
1226        state
1227            .get_cache_writer()
1228            .write_object_entry_for_test(receiving_object_new0.clone());
1229
1230        // scheduler should output the transaction eventually now that the receiving object has become
1231        // available.
1232        rx_ready_certificates.recv().await.unwrap();
1233
1234        // Notify scheduler that the receiving object 0 is available.
1235        state
1236            .get_cache_writer()
1237            .write_object_entry_for_test(receiving_object_new1.clone());
1238
1239        // scheduler should output the transaction eventually now that the receiving object has become
1240        // available.
1241        rx_ready_certificates.recv().await.unwrap();
1242    }
1243
1244    #[tokio::test(flavor = "current_thread", start_paused = true)]
1245    async fn execution_scheduler_receiving_object_ready_notifications_multiple_of_same_receiving() {
1246        telemetry_subscribers::init_for_testing();
1247        // Initialize an authority state.
1248        let (owner, _keypair) = deterministic_random_account_key();
1249        let gas_objects: Vec<Object> = (0..10)
1250            .map(|_| {
1251                let gas_object_id = ObjectID::random();
1252                Object::with_id_owner_for_testing(gas_object_id, owner)
1253            })
1254            .collect();
1255        let state = init_state_with_objects(gas_objects.clone()).await;
1256
1257        // Create a new execution scheduler instead of reusing the authority's, to examine
1258        // execution_scheduler output from rx_ready_certificates.
1259        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1260        // scheduler should output no transaction.
1261        assert!(rx_ready_certificates.try_recv().is_err());
1262        // scheduler should be empty at the beginning.
1263        execution_scheduler.check_empty_for_testing();
1264
1265        let obj_id = ObjectID::random();
1266        let receiving_object_new0 =
1267            Object::with_id_owner_version_for_testing(obj_id, 0.into(), Owner::AddressOwner(owner));
1268        let receiving_object_new1 =
1269            Object::with_id_owner_version_for_testing(obj_id, 1.into(), Owner::AddressOwner(owner));
1270        let receiving_object_arg0 =
1271            ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1272        let receive_object_transaction0 = make_transaction(
1273            gas_objects[0].clone(),
1274            vec![CallArg::Object(receiving_object_arg0)],
1275        );
1276
1277        let receive_object_transaction01 = make_transaction(
1278            gas_objects[1].clone(),
1279            vec![CallArg::Object(receiving_object_arg0)],
1280        );
1281
1282        let receiving_object_arg1 =
1283            ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1284        let receive_object_transaction1 = make_transaction(
1285            gas_objects[0].clone(),
1286            vec![CallArg::Object(receiving_object_arg1)],
1287        );
1288
1289        // Enqueuing a transaction with a receiving object that is available at the time it is enqueued
1290        // should become immediately available.
1291        let gas_receiving_arg = ObjectArg::Receiving(gas_objects[3].compute_object_reference());
1292        let tx1 = make_transaction(
1293            gas_objects[0].clone(),
1294            vec![CallArg::Object(gas_receiving_arg)],
1295        );
1296
1297        // scheduler should output no transaction yet since waiting on receiving object.
1298        execution_scheduler.enqueue_transactions(
1299            vec![(
1300                receive_object_transaction0.clone(),
1301                ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1302            )],
1303            &state.epoch_store_for_testing(),
1304        );
1305        sleep(Duration::from_secs(1)).await;
1306        assert!(rx_ready_certificates.try_recv().is_err());
1307        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1308
1309        // scheduler should output no transaction yet since waiting on receiving object.
1310        execution_scheduler.enqueue_transactions(
1311            vec![(
1312                receive_object_transaction1.clone(),
1313                ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1314            )],
1315            &state.epoch_store_for_testing(),
1316        );
1317        sleep(Duration::from_secs(1)).await;
1318        assert!(rx_ready_certificates.try_recv().is_err());
1319        assert_eq!(execution_scheduler.num_pending_certificates(), 2);
1320
1321        // Different transaction with a duplicate receiving object reference is allowed.
1322        // Both transaction's will be outputted once the receiving object is available.
1323        execution_scheduler.enqueue_transactions(
1324            vec![(
1325                receive_object_transaction01.clone(),
1326                ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1327            )],
1328            &state.epoch_store_for_testing(),
1329        );
1330        sleep(Duration::from_secs(1)).await;
1331        assert!(rx_ready_certificates.try_recv().is_err());
1332        assert_eq!(execution_scheduler.num_pending_certificates(), 3);
1333
1334        // Notify scheduler that the receiving object 0 is available.
1335        state
1336            .get_cache_writer()
1337            .write_object_entry_for_test(receiving_object_new0.clone());
1338
1339        // scheduler should output both transactions depending on the receiving object now that the
1340        // transaction's receiving object has become available.
1341        rx_ready_certificates.recv().await.unwrap();
1342
1343        rx_ready_certificates.recv().await.unwrap();
1344
1345        // Only two transactions that were dependent on the receiving object should be output.
1346        assert!(rx_ready_certificates.try_recv().is_err());
1347
1348        // Enqueue a transaction with a receiving object that is available at the time it is enqueued.
1349        // This should be immediately available.
1350        execution_scheduler.enqueue_transactions(
1351            vec![(
1352                tx1.clone(),
1353                ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1354            )],
1355            &state.epoch_store_for_testing(),
1356        );
1357        sleep(Duration::from_secs(1)).await;
1358        rx_ready_certificates.recv().await.unwrap();
1359
1360        // Notify scheduler that the receiving object 0 is available.
1361        state
1362            .get_cache_writer()
1363            .write_object_entry_for_test(receiving_object_new1.clone());
1364
1365        // scheduler should output the transaction eventually now that the receiving object has become
1366        // available.
1367        rx_ready_certificates.recv().await.unwrap();
1368    }
1369
1370    #[tokio::test(flavor = "current_thread", start_paused = true)]
1371    async fn execution_scheduler_receiving_object_ready_if_current_version_greater() {
1372        telemetry_subscribers::init_for_testing();
1373        // Initialize an authority state.
1374        let (owner, _keypair) = deterministic_random_account_key();
1375        let mut gas_objects: Vec<Object> = (0..10)
1376            .map(|_| {
1377                let gas_object_id = ObjectID::random();
1378                Object::with_id_owner_for_testing(gas_object_id, owner)
1379            })
1380            .collect();
1381        let receiving_object = Object::with_id_owner_version_for_testing(
1382            ObjectID::random(),
1383            10.into(),
1384            Owner::AddressOwner(owner),
1385        );
1386        gas_objects.push(receiving_object.clone());
1387        let state = init_state_with_objects(gas_objects.clone()).await;
1388
1389        // Create a new execution scheduler instead of reusing the authority's, to examine
1390        // execution_scheduler output from rx_ready_certificates.
1391        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1392        // scheduler should output no transaction.
1393        assert!(rx_ready_certificates.try_recv().is_err());
1394        // scheduler should be empty at the beginning.
1395        execution_scheduler.check_empty_for_testing();
1396
1397        let receiving_object_new0 = Object::with_id_owner_version_for_testing(
1398            receiving_object.id(),
1399            0.into(),
1400            Owner::AddressOwner(owner),
1401        );
1402        let receiving_object_new1 = Object::with_id_owner_version_for_testing(
1403            receiving_object.id(),
1404            1.into(),
1405            Owner::AddressOwner(owner),
1406        );
1407        let receiving_object_arg0 =
1408            ObjectArg::Receiving(receiving_object_new0.compute_object_reference());
1409        let receive_object_transaction0 = make_transaction(
1410            gas_objects[0].clone(),
1411            vec![CallArg::Object(receiving_object_arg0)],
1412        );
1413
1414        let receive_object_transaction01 = make_transaction(
1415            gas_objects[1].clone(),
1416            vec![CallArg::Object(receiving_object_arg0)],
1417        );
1418
1419        let receiving_object_arg1 =
1420            ObjectArg::Receiving(receiving_object_new1.compute_object_reference());
1421        let receive_object_transaction1 = make_transaction(
1422            gas_objects[0].clone(),
1423            vec![CallArg::Object(receiving_object_arg1)],
1424        );
1425
1426        // scheduler should output no transaction yet since waiting on receiving object.
1427        execution_scheduler.enqueue_transactions(
1428            vec![(
1429                receive_object_transaction0.clone(),
1430                ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1431            )],
1432            &state.epoch_store_for_testing(),
1433        );
1434        execution_scheduler.enqueue_transactions(
1435            vec![(
1436                receive_object_transaction01.clone(),
1437                ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1438            )],
1439            &state.epoch_store_for_testing(),
1440        );
1441        execution_scheduler.enqueue_transactions(
1442            vec![(
1443                receive_object_transaction1.clone(),
1444                ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1445            )],
1446            &state.epoch_store_for_testing(),
1447        );
1448        sleep(Duration::from_secs(1)).await;
1449        rx_ready_certificates.recv().await.unwrap();
1450        rx_ready_certificates.recv().await.unwrap();
1451        rx_ready_certificates.recv().await.unwrap();
1452        assert!(rx_ready_certificates.try_recv().is_err());
1453    }
1454
1455    // Tests transaction cancellation logic in execution scheduler. Mainly tests that for cancelled transaction,
1456    // execution scheduler only waits for all non-shared objects to be available before outputting the transaction.
1457    #[tokio::test(flavor = "current_thread", start_paused = true)]
1458    async fn execution_scheduler_with_cancelled_transactions() {
1459        // Initialize an authority state, with gas objects and 3 shared objects.
1460        let (owner, _keypair) = deterministic_random_account_key();
1461        let gas_object = Object::with_id_owner_for_testing(ObjectID::random(), owner);
1462        let shared_object_1 = Object::shared_for_testing();
1463        let initial_shared_version_1 = shared_object_1.owner().start_version().unwrap();
1464        let shared_object_2 = Object::shared_for_testing();
1465        let initial_shared_version_2 = shared_object_2.owner().start_version().unwrap();
1466        let owned_object = Object::with_id_owner_for_testing(ObjectID::random(), owner);
1467
1468        let state = init_state_with_objects(vec![
1469            gas_object.clone(),
1470            shared_object_1.clone(),
1471            shared_object_2.clone(),
1472            owned_object.clone(),
1473        ])
1474        .await;
1475
1476        // Create a new execution scheduler instead of reusing the authority's, to examine
1477        // execution_scheduler output from rx_ready_certificates.
1478        let (execution_scheduler, mut rx_ready_certificates) = make_execution_scheduler(&state);
1479        // scheduler should output no transaction.
1480        assert!(rx_ready_certificates.try_recv().is_err());
1481
1482        // Enqueue one transaction with 2 shared object inputs and 1 owned input.
1483        let shared_object_arg_1 = ObjectArg::SharedObject {
1484            id: shared_object_1.id(),
1485            initial_shared_version: initial_shared_version_1,
1486            mutability: SharedObjectMutability::Mutable,
1487        };
1488        let shared_object_arg_2 = ObjectArg::SharedObject {
1489            id: shared_object_2.id(),
1490            initial_shared_version: initial_shared_version_2,
1491            mutability: SharedObjectMutability::Mutable,
1492        };
1493
1494        // Changes the desired owned object version to a higher version. We will make it available later.
1495        let owned_version = 2000.into();
1496        let mut owned_ref = owned_object.compute_object_reference();
1497        owned_ref.1 = owned_version;
1498        let owned_object_arg = ObjectArg::ImmOrOwnedObject(owned_ref);
1499
1500        let cancelled_transaction = make_transaction(
1501            gas_object.clone(),
1502            vec![
1503                CallArg::Object(shared_object_arg_1),
1504                CallArg::Object(shared_object_arg_2),
1505                CallArg::Object(owned_object_arg),
1506            ],
1507        );
1508        let assigned_versions = vec![
1509            (
1510                (
1511                    shared_object_1.id(),
1512                    shared_object_1.owner().start_version().unwrap(),
1513                ),
1514                SequenceNumber::CANCELLED_READ,
1515            ),
1516            (
1517                (
1518                    shared_object_2.id(),
1519                    shared_object_2.owner().start_version().unwrap(),
1520                ),
1521                SequenceNumber::CONGESTED,
1522            ),
1523        ];
1524
1525        execution_scheduler.enqueue_transactions(
1526            vec![(
1527                cancelled_transaction.clone(),
1528                ExecutionEnv::new()
1529                    .with_assigned_versions(AssignedVersions::new(assigned_versions, None)),
1530            )],
1531            &state.epoch_store_for_testing(),
1532        );
1533
1534        // scheduler should output no transaction yet.
1535        sleep(Duration::from_secs(1)).await;
1536        assert!(rx_ready_certificates.try_recv().is_err());
1537
1538        assert_eq!(execution_scheduler.num_pending_certificates(), 1);
1539
1540        // Notify scheduler about availability of the owned object.
1541        let mut new_owned_object = owned_object.clone();
1542        new_owned_object
1543            .data
1544            .try_as_move_mut()
1545            .unwrap()
1546            .increment_version_to(owned_version);
1547        state
1548            .get_cache_writer()
1549            .write_object_entry_for_test(new_owned_object);
1550
1551        // scheduler should output the transaction as soon as the owned object is available.
1552        let available_txn = rx_ready_certificates.recv().await.unwrap().certificate;
1553        assert_eq!(available_txn.digest(), cancelled_transaction.digest());
1554
1555        sleep(Duration::from_secs(1)).await;
1556        assert!(rx_ready_certificates.try_recv().is_err());
1557
1558        execution_scheduler.check_empty_for_testing();
1559    }
1560
1561    #[test]
1562    fn test_barrier_dependency_builder() {
1563        let make_transaction = |non_exclusive_writes: Vec<u32>, exclusive_writes: Vec<u32>| {
1564            assert!(
1565                non_exclusive_writes
1566                    .iter()
1567                    .all(|id| !exclusive_writes.contains(id))
1568            );
1569            assert!(
1570                exclusive_writes
1571                    .iter()
1572                    .all(|id| !non_exclusive_writes.contains(id))
1573            );
1574
1575            let non_exclusive_writes = non_exclusive_writes
1576                .into_iter()
1577                .map(|id| ObjectID::from_single_byte(id as u8));
1578            let exclusive_writes = exclusive_writes
1579                .into_iter()
1580                .map(|id| ObjectID::from_single_byte(id as u8));
1581            let mut builder = ProgrammableTransactionBuilder::new();
1582            for non_exclusive_write in non_exclusive_writes {
1583                builder
1584                    .obj(ObjectArg::SharedObject {
1585                        id: non_exclusive_write,
1586                        initial_shared_version: SequenceNumber::new(),
1587                        mutability: SharedObjectMutability::NonExclusiveWrite,
1588                    })
1589                    .unwrap();
1590            }
1591
1592            for exclusive_write in exclusive_writes {
1593                builder
1594                    .obj(ObjectArg::SharedObject {
1595                        id: exclusive_write,
1596                        initial_shared_version: SequenceNumber::new(),
1597                        mutability: SharedObjectMutability::Mutable,
1598                    })
1599                    .unwrap();
1600            }
1601
1602            let tx = TransactionKind::ProgrammableTransaction(builder.finish());
1603            let tx_data =
1604                TransactionData::new(tx, SuiAddress::default(), random_object_ref(), 1, 1);
1605            Transaction::from_data_and_signer(tx_data, vec![])
1606        };
1607
1608        // One non-exclusive write, one exclusive write.
1609        {
1610            let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1611            let tx1 = make_transaction(vec![1], vec![]);
1612            let tx2 = make_transaction(vec![], vec![1]);
1613
1614            let tx1_deps =
1615                barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1616            let tx2_deps =
1617                barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1618            assert!(tx1_deps.is_empty());
1619            assert_eq!(Vec::from_iter(tx2_deps), vec![*tx1.digest()]);
1620        }
1621
1622        // One transaction has non-exclusive writes to two different objects, and then becomes
1623        // a dependency of two barriers
1624        {
1625            let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1626            let tx1 = make_transaction(vec![1, 2], vec![]);
1627            let tx2 = make_transaction(vec![], vec![1]);
1628            let tx3 = make_transaction(vec![], vec![2]);
1629
1630            let tx1_deps =
1631                barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1632            let tx2_deps =
1633                barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1634            let tx3_deps =
1635                barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1636            assert!(tx1_deps.is_empty());
1637            assert_eq!(Vec::from_iter(tx2_deps), vec![*tx1.digest()]);
1638            assert_eq!(Vec::from_iter(tx3_deps), vec![*tx1.digest()]);
1639        }
1640
1641        // Ensure multiple-object dependences are merged
1642        {
1643            let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1644            let tx1 = make_transaction(vec![1], vec![]);
1645            let tx2 = make_transaction(vec![2], vec![]);
1646            let tx3 = make_transaction(vec![], vec![1, 2]);
1647
1648            let tx1_deps =
1649                barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1650            let tx2_deps =
1651                barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1652            let tx3_deps =
1653                barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1654            assert!(tx1_deps.is_empty());
1655            assert!(tx2_deps.is_empty());
1656            assert_eq!(tx3_deps, BTreeSet::from([*tx1.digest(), *tx2.digest()]));
1657        }
1658
1659        // Ensure dependency state is cleared
1660        {
1661            let mut barrier_dependency_builder = BarrierDependencyBuilder::new();
1662            let tx1 = make_transaction(vec![1], vec![]);
1663            let tx2 = make_transaction(vec![], vec![1]);
1664            let tx3 = make_transaction(vec![], vec![1]);
1665
1666            let tx1_deps =
1667                barrier_dependency_builder.process_tx(*tx1.digest(), tx1.transaction_data());
1668            let tx2_deps =
1669                barrier_dependency_builder.process_tx(*tx2.digest(), tx2.transaction_data());
1670            let tx3_deps =
1671                barrier_dependency_builder.process_tx(*tx3.digest(), tx3.transaction_data());
1672            assert!(tx1_deps.is_empty());
1673            assert_eq!(tx2_deps, BTreeSet::from([*tx1.digest()]));
1674            assert!(tx3_deps.is_empty());
1675        }
1676    }
1677}