sui_core/execution_scheduler/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::{
    authority::{
        authority_per_epoch_store::AuthorityPerEpochStore,
        shared_object_version_manager::Schedulable, AuthorityMetrics, ExecutionEnv,
    },
    execution_cache::{ObjectCacheRead, TransactionCacheRead},
    execution_scheduler::balance_withdraw_scheduler::BalanceSettlement,
};
use enum_dispatch::enum_dispatch;
use execution_scheduler_impl::ExecutionScheduler;
use prometheus::IntGauge;
use std::{collections::BTreeSet, sync::Arc};
use sui_config::node::AuthorityOverloadConfig;
use sui_types::{
    error::SuiResult, executable_transaction::VerifiedExecutableTransaction, storage::InputKey,
    transaction::SenderSignedData,
};
use tokio::sync::mpsc::UnboundedSender;
use tokio::time::Instant;
use transaction_manager::TransactionManager;

mod balance_withdraw_scheduler;
pub(crate) mod execution_scheduler_impl;
mod overload_tracker;
pub(crate) mod transaction_manager;

#[derive(Clone, Debug)]
pub struct PendingCertificateStats {
    // The time this certificate enters execution scheduler.
    #[allow(unused)]
    pub enqueue_time: Instant,
    // The time this certificate becomes ready for execution.
    pub ready_time: Option<Instant>,
}

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum SchedulingSource {
    MysticetiFastPath,
    NonFastPath,
}

#[derive(Debug)]
pub struct PendingCertificate {
    // Certified transaction to be executed.
    pub certificate: VerifiedExecutableTransaction,
    // Environment in which the transaction will be executed.
    pub execution_env: ExecutionEnv,
    // The input object this certificate is waiting for to become available in order to be executed.
    // This is only used by TransactionManager.
    pub waiting_input_objects: BTreeSet<InputKey>,
    // Stores stats about this transaction.
    pub stats: PendingCertificateStats,
    pub executing_guard: Option<ExecutingGuard>,
}

#[derive(Debug)]
pub struct ExecutingGuard {
    num_executing_certificates: IntGauge,
}

#[enum_dispatch]
pub trait ExecutionSchedulerAPI {
    fn enqueue_transactions(
        &self,
        certs: Vec<(VerifiedExecutableTransaction, ExecutionEnv)>,
        epoch_store: &Arc<AuthorityPerEpochStore>,
    );

    fn enqueue(
        &self,
        certs: Vec<(Schedulable, ExecutionEnv)>,
        epoch_store: &Arc<AuthorityPerEpochStore>,
    );

    fn settle_balances(&self, settlement: BalanceSettlement);

    fn check_execution_overload(
        &self,
        overload_config: &AuthorityOverloadConfig,
        tx_data: &SenderSignedData,
    ) -> SuiResult;

    // Returns the number of transactions pending or being executed right now.
    fn num_pending_certificates(&self) -> usize;

    // Verify TM has no pending item for tests.
    #[cfg(test)]
    fn check_empty_for_testing(&self);
}

#[enum_dispatch(ExecutionSchedulerAPI)]
pub enum ExecutionSchedulerWrapper {
    ExecutionScheduler(ExecutionScheduler),
    TransactionManager(TransactionManager),
}

impl ExecutionSchedulerWrapper {
    pub fn new(
        object_cache_read: Arc<dyn ObjectCacheRead>,
        transaction_cache_read: Arc<dyn TransactionCacheRead>,
        tx_ready_certificates: UnboundedSender<PendingCertificate>,
        epoch_store: &Arc<AuthorityPerEpochStore>,
        _is_fullnode: bool,
        metrics: Arc<AuthorityMetrics>,
    ) -> Self {
        // Execution scheduler is enabled by default unless ENABLE_TRANSACTION_MANAGER is explicitly set.
        let enable_execution_scheduler = std::env::var("ENABLE_TRANSACTION_MANAGER").is_err();
        if enable_execution_scheduler {
            let enable_accumulators = epoch_store.accumulators_enabled();
            Self::ExecutionScheduler(ExecutionScheduler::new(
                object_cache_read,
                transaction_cache_read,
                tx_ready_certificates,
                enable_accumulators,
                metrics,
            ))
        } else {
            Self::TransactionManager(TransactionManager::new(
                object_cache_read,
                transaction_cache_read,
                epoch_store,
                tx_ready_certificates,
                metrics,
            ))
        }
    }
}

impl ExecutingGuard {
    pub fn new(num_executing_certificates: IntGauge) -> Self {
        num_executing_certificates.inc();
        Self {
            num_executing_certificates,
        }
    }
}

impl Drop for ExecutingGuard {
    fn drop(&mut self) {
        self.num_executing_certificates.dec();
    }
}