sui_core/transaction_driver/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4mod effects_certifier;
5mod error;
6mod metrics;
7mod request_retrier;
8mod transaction_submitter;
9
10/// Exports
11pub use error::TransactionDriverError;
12pub use metrics::*;
13use mysten_common::backoff::ExponentialBackoff;
14
15use std::{
16    net::SocketAddr,
17    sync::Arc,
18    time::{Duration, Instant},
19};
20
21use arc_swap::ArcSwap;
22use effects_certifier::*;
23use mysten_metrics::{monitored_future, spawn_logged_monitored_task};
24use parking_lot::Mutex;
25use rand::Rng;
26use sui_types::{
27    committee::EpochId,
28    error::{ErrorCategory, UserInputError},
29    messages_grpc::{PingType, SubmitTxRequest, SubmitTxResult, TxType},
30    transaction::TransactionDataAPI as _,
31};
32use tokio::{
33    task::JoinSet,
34    time::{interval, sleep},
35};
36use tracing::instrument;
37use transaction_submitter::*;
38
39use crate::{
40    authority_aggregator::AuthorityAggregator,
41    authority_client::AuthorityAPI,
42    quorum_driver::{AuthorityAggregatorUpdatable, reconfig_observer::ReconfigObserver},
43    validator_client_monitor::{
44        OperationFeedback, OperationType, ValidatorClientMetrics, ValidatorClientMonitor,
45    },
46};
47use sui_config::NodeConfig;
48/// Options for submitting a transaction.
49#[derive(Clone, Default, Debug)]
50pub struct SubmitTransactionOptions {
51    /// When forwarding transactions on behalf of a client, this is the client's address
52    /// specified for ddos protection.
53    pub forwarded_client_addr: Option<SocketAddr>,
54
55    /// When submitting a transaction, only the validators in the allowed validator list can be used to submit the transaction to.
56    /// When the allowed validator list is empty, any validator can be used.
57    pub allowed_validators: Vec<String>,
58
59    /// When submitting a transaction, the validators in the blocked validator list cannot be used to submit the transaction to.
60    /// When the blocked validator list is empty, no restrictions are applied.
61    pub blocked_validators: Vec<String>,
62}
63
64#[derive(Clone, Debug)]
65pub struct QuorumTransactionResponse {
66    // TODO(fastpath): Stop using QD types
67    pub effects: sui_types::quorum_driver_types::FinalizedEffects,
68
69    pub events: Option<sui_types::effects::TransactionEvents>,
70    // Input objects will only be populated in the happy path
71    pub input_objects: Option<Vec<sui_types::object::Object>>,
72    // Output objects will only be populated in the happy path
73    pub output_objects: Option<Vec<sui_types::object::Object>>,
74    pub auxiliary_data: Option<Vec<u8>>,
75}
76
77pub struct TransactionDriver<A: Clone> {
78    authority_aggregator: Arc<ArcSwap<AuthorityAggregator<A>>>,
79    state: Mutex<State>,
80    metrics: Arc<TransactionDriverMetrics>,
81    submitter: TransactionSubmitter,
82    certifier: EffectsCertifier,
83    client_monitor: Arc<ValidatorClientMonitor<A>>,
84}
85
86impl<A> TransactionDriver<A>
87where
88    A: AuthorityAPI + Send + Sync + 'static + Clone,
89{
90    // TODO: accept a TransactionDriverConfig to set default allowed & blocked validators.
91    pub fn new(
92        authority_aggregator: Arc<AuthorityAggregator<A>>,
93        reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
94        metrics: Arc<TransactionDriverMetrics>,
95        node_config: Option<&NodeConfig>,
96        client_metrics: Arc<ValidatorClientMetrics>,
97    ) -> Arc<Self> {
98        let shared_swap = Arc::new(ArcSwap::new(authority_aggregator));
99
100        // Extract validator client monitor config from NodeConfig or use default
101        let monitor_config = node_config
102            .and_then(|nc| nc.validator_client_monitor_config.clone())
103            .unwrap_or_default();
104        let client_monitor =
105            ValidatorClientMonitor::new(monitor_config, client_metrics, shared_swap.clone());
106
107        let driver = Arc::new(Self {
108            authority_aggregator: shared_swap,
109            state: Mutex::new(State::new()),
110            metrics: metrics.clone(),
111            submitter: TransactionSubmitter::new(metrics.clone()),
112            certifier: EffectsCertifier::new(metrics),
113            client_monitor,
114        });
115
116        let driver_clone = driver.clone();
117
118        spawn_logged_monitored_task!(Self::run_latency_checks(driver_clone));
119
120        driver.enable_reconfig(reconfig_observer);
121        driver
122    }
123
124    /// Returns the authority aggregator wrapper which upgrades on epoch changes.
125    pub fn authority_aggregator(&self) -> &Arc<ArcSwap<AuthorityAggregator<A>>> {
126        &self.authority_aggregator
127    }
128
129    /// Drives transaction to finalization.
130    ///
131    /// Internally, retries the attempt to finalize a transaction until:
132    /// - The transaction is finalized.
133    /// - The transaction observes a non-retriable error.
134    /// - Timeout is reached.
135    #[instrument(level = "error", skip_all, fields(tx_digest = ?request.transaction.as_ref().map(|t| t.digest()), ping = %request.ping_type.is_some()))]
136    pub async fn drive_transaction(
137        &self,
138        request: SubmitTxRequest,
139        options: SubmitTransactionOptions,
140        timeout_duration: Option<Duration>,
141    ) -> Result<QuorumTransactionResponse, TransactionDriverError> {
142        const MAX_DRIVE_TRANSACTION_RETRY_DELAY: Duration = Duration::from_secs(10);
143
144        // For ping requests, the amplification factor is always 1.
145        let amplification_factor = if request.ping_type.is_some() {
146            1
147        } else {
148            let gas_price = request
149                .transaction
150                .as_ref()
151                .unwrap()
152                .transaction_data()
153                .gas_price();
154            let reference_gas_price = self.authority_aggregator.load().reference_gas_price;
155            let amplification_factor = gas_price / reference_gas_price.max(1);
156            if amplification_factor == 0 {
157                return Err(TransactionDriverError::ValidationFailed {
158                    error: UserInputError::GasPriceUnderRGP {
159                        gas_price,
160                        reference_gas_price,
161                    }
162                    .to_string(),
163                });
164            }
165            amplification_factor
166        };
167
168        let tx_type = request.tx_type();
169        let ping_label = if request.ping_type.is_some() {
170            "true"
171        } else {
172            "false"
173        };
174        let timer = Instant::now();
175
176        self.metrics
177            .total_transactions_submitted
178            .with_label_values(&[tx_type.as_str(), ping_label])
179            .inc();
180
181        let mut backoff = ExponentialBackoff::new(MAX_DRIVE_TRANSACTION_RETRY_DELAY);
182        let mut attempts = 0;
183        let mut latest_retriable_error = None;
184
185        let retry_loop = async {
186            loop {
187                // TODO(fastpath): Check local state before submitting transaction
188                match self
189                    .drive_transaction_once(amplification_factor, request.clone(), &options)
190                    .await
191                {
192                    Ok(resp) => {
193                        let settlement_finality_latency = timer.elapsed().as_secs_f64();
194                        self.metrics
195                            .settlement_finality_latency
196                            .with_label_values(&[tx_type.as_str(), ping_label])
197                            .observe(settlement_finality_latency);
198                        // Record the number of retries for successful transaction
199                        self.metrics
200                            .transaction_retries
201                            .with_label_values(&["success", tx_type.as_str(), ping_label])
202                            .observe(attempts as f64);
203                        return Ok(resp);
204                    }
205                    Err(e) => {
206                        self.metrics
207                            .drive_transaction_errors
208                            .with_label_values(&[
209                                e.categorize().into(),
210                                tx_type.as_str(),
211                                ping_label,
212                            ])
213                            .inc();
214                        if !e.is_submission_retriable() {
215                            // Record the number of retries for failed transaction
216                            self.metrics
217                                .transaction_retries
218                                .with_label_values(&["failure", tx_type.as_str(), ping_label])
219                                .observe(attempts as f64);
220                            if request.transaction.is_some() {
221                                tracing::info!(
222                                    "Failed to finalize transaction with non-retriable error after {} attempts: {}",
223                                    attempts,
224                                    e
225                                );
226                            }
227                            return Err(e);
228                        }
229                        if request.transaction.is_some() {
230                            tracing::info!(
231                                "Failed to finalize transaction (attempt {}): {}. Retrying ...",
232                                attempts,
233                                e
234                            );
235                        }
236                        // Buffer the latest retriable error to be returned in case of timeout
237                        latest_retriable_error = Some(e);
238                    }
239                }
240
241                let overload = if let Some(e) = &latest_retriable_error {
242                    e.categorize() == ErrorCategory::ValidatorOverloaded
243                } else {
244                    false
245                };
246                let delay = if overload {
247                    // Increase delay during overload.
248                    const OVERLOAD_ADDITIONAL_DELAY: Duration = Duration::from_secs(10);
249                    backoff.next().unwrap() + OVERLOAD_ADDITIONAL_DELAY
250                } else {
251                    backoff.next().unwrap()
252                };
253                sleep(delay).await;
254
255                attempts += 1;
256            }
257        };
258
259        match timeout_duration {
260            Some(duration) => {
261                tokio::time::timeout(duration, retry_loop)
262                    .await
263                    .unwrap_or_else(|_| {
264                        // Timeout occurred, return with latest retriable error if available
265                        let e = TransactionDriverError::TimeoutWithLastRetriableError {
266                            last_error: latest_retriable_error.map(Box::new),
267                            attempts,
268                            timeout: duration,
269                        };
270                        if request.transaction.is_some() {
271                            tracing::info!(
272                                "Transaction timed out after {} attempts. Last error: {}",
273                                attempts,
274                                e
275                            );
276                        }
277                        Err(e)
278                    })
279            }
280            None => retry_loop.await,
281        }
282    }
283
284    #[instrument(level = "error", skip_all, err(level = "debug"))]
285    async fn drive_transaction_once(
286        &self,
287        amplification_factor: u64,
288        request: SubmitTxRequest,
289        options: &SubmitTransactionOptions,
290    ) -> Result<QuorumTransactionResponse, TransactionDriverError> {
291        let auth_agg = self.authority_aggregator.load();
292        let amplification_factor =
293            amplification_factor.min(auth_agg.committee.num_members() as u64);
294        let start_time = Instant::now();
295        let tx_type = request.tx_type();
296        let tx_digest = request.tx_digest();
297        let ping_type = request.ping_type;
298
299        let (name, submit_txn_result) = self
300            .submitter
301            .submit_transaction(
302                &auth_agg,
303                &self.client_monitor,
304                tx_type,
305                amplification_factor,
306                request,
307                options,
308            )
309            .await?;
310        if let SubmitTxResult::Rejected { error } = &submit_txn_result {
311            return Err(TransactionDriverError::ClientInternal {
312                error: format!(
313                    "SubmitTxResult::Rejected should have been returned as an error in submit_transaction(): {}",
314                    error
315                ),
316            });
317        }
318
319        // Wait for quorum effects using EffectsCertifier
320        let result = self
321            .certifier
322            .get_certified_finalized_effects(
323                &auth_agg,
324                &self.client_monitor,
325                tx_digest,
326                tx_type,
327                name,
328                submit_txn_result,
329                options,
330            )
331            .await;
332
333        if result.is_ok() {
334            self.client_monitor
335                .record_interaction_result(OperationFeedback {
336                    authority_name: name,
337                    display_name: auth_agg.get_display_name(&name),
338                    operation: if tx_type == TxType::SingleWriter {
339                        OperationType::FastPath
340                    } else {
341                        OperationType::Consensus
342                    },
343                    ping_type,
344                    result: Ok(start_time.elapsed()),
345                });
346        }
347        result
348    }
349
350    // Runs a background task to send ping transactions to all validators to perform latency checks to test both the fast path and the consensus path.
351    async fn run_latency_checks(self: Arc<Self>) {
352        const INTERVAL_BETWEEN_RUNS: Duration = Duration::from_secs(15);
353        const MAX_JITTER: Duration = Duration::from_secs(10);
354        const PING_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
355
356        let mut interval = interval(INTERVAL_BETWEEN_RUNS);
357        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
358
359        loop {
360            interval.tick().await;
361
362            let mut tasks = JoinSet::new();
363
364            for tx_type in [TxType::SingleWriter, TxType::SharedObject] {
365                Self::ping_for_tx_type(
366                    self.clone(),
367                    &mut tasks,
368                    tx_type,
369                    MAX_JITTER,
370                    PING_REQUEST_TIMEOUT,
371                );
372            }
373
374            while let Some(result) = tasks.join_next().await {
375                if let Err(e) = result {
376                    tracing::debug!("Error while driving ping transaction: {}", e);
377                }
378            }
379        }
380    }
381
382    /// Pings all validators for e2e latency with the provided transaction type.
383    fn ping_for_tx_type(
384        self: Arc<Self>,
385        tasks: &mut JoinSet<()>,
386        tx_type: TxType,
387        max_jitter: Duration,
388        ping_timeout: Duration,
389    ) {
390        // We are iterating over the single writer and shared object transaction types to test both the fast path and the consensus path.
391        let auth_agg = self.authority_aggregator.load().clone();
392        let validators = auth_agg.committee.names().cloned().collect::<Vec<_>>();
393
394        self.metrics
395            .latency_check_runs
396            .with_label_values(&[tx_type.as_str()])
397            .inc();
398
399        for name in validators {
400            let display_name = auth_agg.get_display_name(&name);
401            let delay_ms = rand::thread_rng().gen_range(0..max_jitter.as_millis()) as u64;
402            let self_clone = self.clone();
403
404            let task = async move {
405                // Add some random delay to the task to avoid all tasks running at the same time
406                if delay_ms > 0 {
407                    tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
408                }
409                let start_time = Instant::now();
410
411                let ping_type = if tx_type == TxType::SingleWriter {
412                    PingType::FastPath
413                } else {
414                    PingType::Consensus
415                };
416
417                // Now send a ping transaction to the chosen validator for the provided tx type
418                match self_clone
419                    .drive_transaction(
420                        SubmitTxRequest::new_ping(ping_type),
421                        SubmitTransactionOptions {
422                            allowed_validators: vec![display_name.clone()],
423                            ..Default::default()
424                        },
425                        Some(ping_timeout),
426                    )
427                    .await
428                {
429                    Ok(_) => {
430                        tracing::debug!(
431                            "Ping transaction to validator {} for tx type {} completed end to end in {} seconds",
432                            display_name,
433                            tx_type.as_str(),
434                            start_time.elapsed().as_secs_f64()
435                        );
436                    }
437                    Err(err) => {
438                        tracing::debug!(
439                            "Failed to get certified finalized effects for tx type {}, for ping transaction to validator {}: {}",
440                            tx_type.as_str(),
441                            display_name,
442                            err
443                        );
444                    }
445                }
446            };
447
448            tasks.spawn(task);
449        }
450    }
451
452    fn enable_reconfig(
453        self: &Arc<Self>,
454        reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
455    ) {
456        let driver = self.clone();
457        self.state.lock().tasks.spawn(monitored_future!(async move {
458            let mut reconfig_observer = reconfig_observer.clone_boxed();
459            reconfig_observer.run(driver).await;
460        }));
461    }
462}
463
464impl<A> AuthorityAggregatorUpdatable<A> for TransactionDriver<A>
465where
466    A: AuthorityAPI + Send + Sync + 'static + Clone,
467{
468    fn epoch(&self) -> EpochId {
469        self.authority_aggregator.load().committee.epoch
470    }
471
472    fn authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
473        self.authority_aggregator.load_full()
474    }
475
476    fn update_authority_aggregator(&self, new_authorities: Arc<AuthorityAggregator<A>>) {
477        tracing::info!(
478            "Transaction Driver updating AuthorityAggregator with committee {}",
479            new_authorities.committee
480        );
481
482        self.authority_aggregator.store(new_authorities);
483    }
484}
485
486// Chooses the percentage of transactions to be driven by TransactionDriver.
487pub fn choose_transaction_driver_percentage(
488    chain_id: Option<sui_types::digests::ChainIdentifier>,
489) -> u8 {
490    if let Ok(v) = std::env::var("TRANSACTION_DRIVER")
491        && let Ok(tx_driver_percentage) = v.parse::<u8>()
492        && (0..=100).contains(&tx_driver_percentage)
493    {
494        return tx_driver_percentage;
495    }
496
497    if let Some(chain_identifier) = chain_id
498        && chain_identifier.chain() == sui_protocol_config::Chain::Unknown
499    {
500        // Kep test coverage for QD.
501        return 50;
502    }
503
504    // Default to 100% everywhere
505    100
506}
507
508// Inner state of TransactionDriver.
509struct State {
510    tasks: JoinSet<()>,
511}
512
513impl State {
514    fn new() -> Self {
515        Self {
516            tasks: JoinSet::new(),
517        }
518    }
519}