sui_core/transaction_driver/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4mod effects_certifier;
5mod error;
6mod metrics;
7mod reconfig_observer;
8mod request_retrier;
9mod transaction_submitter;
10
11/// Exports
12pub use error::TransactionDriverError;
13pub use metrics::*;
14pub use reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver};
15
16use std::{
17    net::SocketAddr,
18    sync::Arc,
19    time::{Duration, Instant},
20};
21
22use arc_swap::ArcSwap;
23use effects_certifier::*;
24use mysten_common::backoff::ExponentialBackoff;
25use mysten_metrics::{monitored_future, spawn_logged_monitored_task};
26use parking_lot::Mutex;
27use rand::Rng;
28use sui_config::NodeConfig;
29use sui_types::{
30    committee::EpochId,
31    error::{ErrorCategory, UserInputError},
32    messages_grpc::{SubmitTxRequest, SubmitTxResult, TxType},
33    transaction::TransactionDataAPI as _,
34};
35use tokio::{
36    task::JoinSet,
37    time::{interval, sleep},
38};
39use tracing::instrument;
40use transaction_submitter::*;
41
42use crate::{
43    authority_aggregator::AuthorityAggregator,
44    authority_client::AuthorityAPI,
45    validator_client_monitor::{
46        OperationFeedback, OperationType, ValidatorClientMetrics, ValidatorClientMonitor,
47    },
48};
49
50/// Trait for components that can update their AuthorityAggregator during reconfiguration.
51/// Used by ReconfigObserver to notify components of epoch changes.
52pub trait AuthorityAggregatorUpdatable<A: Clone>: Send + Sync + 'static {
53    fn epoch(&self) -> EpochId;
54    fn authority_aggregator(&self) -> Arc<AuthorityAggregator<A>>;
55    fn update_authority_aggregator(&self, new_authorities: Arc<AuthorityAggregator<A>>);
56}
57
58/// Options for submitting a transaction.
59#[derive(Clone, Default, Debug)]
60pub struct SubmitTransactionOptions {
61    /// When forwarding transactions on behalf of a client, this is the client's address
62    /// specified for ddos protection.
63    pub forwarded_client_addr: Option<SocketAddr>,
64
65    /// When submitting a transaction, only the validators in the allowed validator list can be used to submit the transaction to.
66    /// When the allowed validator list is empty, any validator can be used.
67    pub allowed_validators: Vec<String>,
68
69    /// When submitting a transaction, the validators in the blocked validator list cannot be used to submit the transaction to.
70    /// When the blocked validator list is empty, no restrictions are applied.
71    pub blocked_validators: Vec<String>,
72}
73
74#[derive(Clone, Debug)]
75pub struct QuorumTransactionResponse {
76    pub effects: sui_types::transaction_driver_types::FinalizedEffects,
77
78    pub events: Option<sui_types::effects::TransactionEvents>,
79    // Input objects will only be populated in the happy path
80    pub input_objects: Option<Vec<sui_types::object::Object>>,
81    // Output objects will only be populated in the happy path
82    pub output_objects: Option<Vec<sui_types::object::Object>>,
83    pub auxiliary_data: Option<Vec<u8>>,
84}
85
86pub struct TransactionDriver<A: Clone> {
87    authority_aggregator: Arc<ArcSwap<AuthorityAggregator<A>>>,
88    state: Mutex<State>,
89    metrics: Arc<TransactionDriverMetrics>,
90    submitter: TransactionSubmitter,
91    certifier: EffectsCertifier,
92    client_monitor: Arc<ValidatorClientMonitor<A>>,
93}
94
95impl<A> TransactionDriver<A>
96where
97    A: AuthorityAPI + Send + Sync + 'static + Clone,
98{
99    // TODO: accept a TransactionDriverConfig to set default allowed & blocked validators.
100    pub fn new(
101        authority_aggregator: Arc<AuthorityAggregator<A>>,
102        reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
103        metrics: Arc<TransactionDriverMetrics>,
104        node_config: Option<&NodeConfig>,
105        client_metrics: Arc<ValidatorClientMetrics>,
106    ) -> Arc<Self> {
107        if std::env::var("TRANSACTION_DRIVER").is_ok() {
108            tracing::warn!(
109                "Transaction Driver is the only supported driver for transaction submission. Setting TRANSACTION_DRIVER is a no-op."
110            );
111        }
112
113        let shared_swap = Arc::new(ArcSwap::new(authority_aggregator));
114
115        // Extract validator client monitor config from NodeConfig or use default
116        let monitor_config = node_config
117            .and_then(|nc| nc.validator_client_monitor_config.clone())
118            .unwrap_or_default();
119        let client_monitor =
120            ValidatorClientMonitor::new(monitor_config, client_metrics, shared_swap.clone());
121
122        let driver = Arc::new(Self {
123            authority_aggregator: shared_swap,
124            state: Mutex::new(State::new()),
125            metrics: metrics.clone(),
126            submitter: TransactionSubmitter::new(metrics.clone()),
127            certifier: EffectsCertifier::new(metrics),
128            client_monitor,
129        });
130
131        let driver_clone = driver.clone();
132
133        spawn_logged_monitored_task!(Self::run_latency_checks(driver_clone));
134
135        driver.enable_reconfig(reconfig_observer);
136        driver
137    }
138
139    /// Returns the authority aggregator wrapper which upgrades on epoch changes.
140    pub fn authority_aggregator(&self) -> &Arc<ArcSwap<AuthorityAggregator<A>>> {
141        &self.authority_aggregator
142    }
143
144    /// Drives transaction to finalization.
145    ///
146    /// Internally, retries the attempt to finalize a transaction until:
147    /// - The transaction is finalized.
148    /// - The transaction observes a non-retriable error.
149    /// - Timeout is reached.
150    #[instrument(level = "error", skip_all, fields(tx_digest = ?request.transaction.as_ref().map(|t| t.digest()), ping = %request.ping_type.is_some()))]
151    pub async fn drive_transaction(
152        &self,
153        request: SubmitTxRequest,
154        options: SubmitTransactionOptions,
155        timeout_duration: Option<Duration>,
156    ) -> Result<QuorumTransactionResponse, TransactionDriverError> {
157        const MAX_DRIVE_TRANSACTION_RETRY_DELAY: Duration = Duration::from_secs(10);
158
159        let tx_data = request.transaction.as_ref().map(|t| t.transaction_data());
160        // gas_price=0 for gasless; use 1 for baseline (RGP-equivalent) priority
161        let amplification_factor =
162            if request.ping_type.is_some() || tx_data.is_some_and(|d| d.is_gasless_transaction()) {
163                1
164            } else {
165                let tx_data = tx_data.unwrap();
166                let gas_price = tx_data.gas_price();
167                let reference_gas_price = self.authority_aggregator.load().reference_gas_price;
168                let amplification_factor = gas_price / reference_gas_price.max(1);
169                if amplification_factor == 0 {
170                    return Err(TransactionDriverError::ValidationFailed {
171                        error: UserInputError::GasPriceUnderRGP {
172                            gas_price,
173                            reference_gas_price,
174                        }
175                        .to_string(),
176                    });
177                }
178                amplification_factor
179            };
180
181        let tx_type = request.tx_type();
182        let ping_label = if request.ping_type.is_some() {
183            "true"
184        } else {
185            "false"
186        };
187        let timer = Instant::now();
188
189        self.metrics
190            .total_transactions_submitted
191            .with_label_values(&[tx_type.as_str(), ping_label])
192            .inc();
193
194        let mut backoff = ExponentialBackoff::new(
195            Duration::from_millis(100),
196            MAX_DRIVE_TRANSACTION_RETRY_DELAY,
197        );
198        let mut attempts = 0;
199        let mut latest_retriable_error = None;
200
201        let retry_loop = async {
202            loop {
203                // TODO(fastpath): Check local state before submitting transaction
204                match self
205                    .drive_transaction_once(amplification_factor, request.clone(), &options)
206                    .await
207                {
208                    Ok(resp) => {
209                        let settlement_finality_latency = timer.elapsed().as_secs_f64();
210                        self.metrics
211                            .settlement_finality_latency
212                            .with_label_values(&[tx_type.as_str(), ping_label])
213                            .observe(settlement_finality_latency);
214                        let is_out_of_expected_range = settlement_finality_latency >= 8.0
215                            || settlement_finality_latency <= 0.1;
216                        tracing::debug!(
217                            ?tx_type,
218                            ?is_out_of_expected_range,
219                            "Settlement finality latency: {:.3} seconds",
220                            settlement_finality_latency
221                        );
222                        // Record the number of retries for successful transaction
223                        self.metrics
224                            .transaction_retries
225                            .with_label_values(&["success", tx_type.as_str(), ping_label])
226                            .observe(attempts as f64);
227                        return Ok(resp);
228                    }
229                    Err(e) => {
230                        self.metrics
231                            .drive_transaction_errors
232                            .with_label_values(&[
233                                e.categorize().into(),
234                                tx_type.as_str(),
235                                ping_label,
236                            ])
237                            .inc();
238                        if !e.is_submission_retriable() {
239                            // Record the number of retries for failed transaction
240                            self.metrics
241                                .transaction_retries
242                                .with_label_values(&["failure", tx_type.as_str(), ping_label])
243                                .observe(attempts as f64);
244                            if request.transaction.is_some() {
245                                tracing::info!(
246                                    "User transaction failed to finalize (attempt {}), with non-retriable error: {} ({})",
247                                    attempts,
248                                    e,
249                                    Into::<&str>::into(e.categorize())
250                                );
251                            }
252                            return Err(e);
253                        }
254                        if request.transaction.is_some() {
255                            tracing::info!(
256                                "User transaction failed to finalize (attempt {}): {} ({}). Retrying ...",
257                                attempts,
258                                e,
259                                Into::<&str>::into(e.categorize())
260                            );
261                        }
262                        // Buffer the latest retriable error to be returned in case of timeout
263                        latest_retriable_error = Some(e);
264                    }
265                }
266
267                let overload = if let Some(e) = &latest_retriable_error {
268                    e.categorize() == ErrorCategory::ValidatorOverloaded
269                } else {
270                    false
271                };
272                let delay = if overload {
273                    // Increase delay during overload.
274                    const OVERLOAD_ADDITIONAL_DELAY: Duration = Duration::from_secs(10);
275                    backoff.next().unwrap() + OVERLOAD_ADDITIONAL_DELAY
276                } else {
277                    backoff.next().unwrap()
278                };
279
280                tracing::debug!("Retrying after {:.3}s", delay.as_secs_f32());
281                sleep(delay).await;
282
283                attempts += 1;
284            }
285        };
286
287        match timeout_duration {
288            Some(duration) => {
289                tokio::time::timeout(duration, retry_loop)
290                    .await
291                    .unwrap_or_else(|_| {
292                        // Timeout occurred, return with latest retriable error if available
293                        let e = TransactionDriverError::TimeoutWithLastRetriableError {
294                            last_error: latest_retriable_error.map(Box::new),
295                            attempts,
296                            timeout: duration,
297                        };
298                        if request.transaction.is_some() {
299                            tracing::info!(
300                                "User transaction timed out after {} attempts. Last error: {}",
301                                attempts,
302                                e
303                            );
304                        }
305                        Err(e)
306                    })
307            }
308            None => retry_loop.await,
309        }
310    }
311
312    #[instrument(level = "error", skip_all, err(level = "debug"))]
313    async fn drive_transaction_once(
314        &self,
315        amplification_factor: u64,
316        request: SubmitTxRequest,
317        options: &SubmitTransactionOptions,
318    ) -> Result<QuorumTransactionResponse, TransactionDriverError> {
319        let auth_agg = self.authority_aggregator.load();
320        let start_time = Instant::now();
321        let tx_type = request.tx_type();
322        let tx_digest = request.tx_digest();
323        let ping_type = request.ping_type;
324
325        let (name, submit_txn_result) = self
326            .submitter
327            .submit_transaction(
328                &auth_agg,
329                &self.client_monitor,
330                tx_type,
331                amplification_factor,
332                request,
333                options,
334            )
335            .await?;
336        if let SubmitTxResult::Rejected { error } = &submit_txn_result {
337            return Err(TransactionDriverError::ClientInternal {
338                error: format!(
339                    "SubmitTxResult::Rejected should have been returned as an error in submit_transaction(): {}",
340                    error
341                ),
342            });
343        }
344
345        // Wait for quorum effects using EffectsCertifier
346        let result = self
347            .certifier
348            .get_certified_finalized_effects(
349                &auth_agg,
350                &self.client_monitor,
351                tx_digest,
352                tx_type,
353                name,
354                submit_txn_result,
355                options,
356            )
357            .await;
358
359        if result.is_ok() {
360            self.client_monitor
361                .record_interaction_result(OperationFeedback {
362                    authority_name: name,
363                    display_name: auth_agg.get_display_name(&name),
364                    operation: if tx_type == TxType::SingleWriter {
365                        OperationType::SingleWriterFinality
366                    } else {
367                        OperationType::SharedObjectFinality
368                    },
369                    ping_type,
370                    result: Ok(start_time.elapsed()),
371                });
372        }
373        result
374    }
375
376    // Runs a background task to send ping transactions to all validators to perform latency checks for the consensus path.
377    async fn run_latency_checks(self: Arc<Self>) {
378        const INTERVAL_BETWEEN_RUNS: Duration = Duration::from_secs(15);
379        const MAX_JITTER: Duration = Duration::from_secs(10);
380        const PING_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
381
382        let mut interval = interval(INTERVAL_BETWEEN_RUNS);
383        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
384
385        loop {
386            interval.tick().await;
387
388            // Only run latency checks for shared object transactions since single writer
389            // transactions no longer use a separate fast path and go through consensus.
390            let auth_agg = self.authority_aggregator.load().clone();
391            let validators = auth_agg.committee.names().cloned().collect::<Vec<_>>();
392
393            self.metrics.latency_check_runs.inc();
394
395            let mut tasks = JoinSet::new();
396
397            for name in validators {
398                let display_name = auth_agg.get_display_name(&name);
399                let delay_ms = rand::thread_rng().gen_range(0..MAX_JITTER.as_millis()) as u64;
400                let self_clone = self.clone();
401
402                let task = async move {
403                    // Add some random delay to the task to avoid all tasks running at the same time
404                    if delay_ms > 0 {
405                        tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
406                    }
407                    let start_time = Instant::now();
408
409                    // Send a consensus ping transaction to the validator
410                    match self_clone
411                        .drive_transaction(
412                            SubmitTxRequest::new_ping(),
413                            SubmitTransactionOptions {
414                                allowed_validators: vec![display_name.clone()],
415                                ..Default::default()
416                            },
417                            Some(PING_REQUEST_TIMEOUT),
418                        )
419                        .await
420                    {
421                        Ok(_) => {
422                            tracing::debug!(
423                                "Ping transaction to validator {} completed end to end in {} seconds",
424                                display_name,
425                                start_time.elapsed().as_secs_f64()
426                            );
427                        }
428                        Err(err) => {
429                            tracing::debug!(
430                                "Failed to get certified finalized effects for ping transaction to validator {}: {}",
431                                display_name,
432                                err
433                            );
434                        }
435                    }
436                };
437
438                tasks.spawn(task);
439            }
440
441            while let Some(result) = tasks.join_next().await {
442                if let Err(e) = result {
443                    tracing::debug!("Error while driving ping transaction: {}", e);
444                }
445            }
446        }
447    }
448
449    fn enable_reconfig(
450        self: &Arc<Self>,
451        reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
452    ) {
453        let driver = self.clone();
454        self.state.lock().tasks.spawn(monitored_future!(async move {
455            let mut reconfig_observer = reconfig_observer.clone_boxed();
456            reconfig_observer.run(driver).await;
457        }));
458    }
459}
460
461impl<A> AuthorityAggregatorUpdatable<A> for TransactionDriver<A>
462where
463    A: AuthorityAPI + Send + Sync + 'static + Clone,
464{
465    fn epoch(&self) -> EpochId {
466        self.authority_aggregator.load().committee.epoch
467    }
468
469    fn authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
470        self.authority_aggregator.load_full()
471    }
472
473    fn update_authority_aggregator(&self, new_authorities: Arc<AuthorityAggregator<A>>) {
474        tracing::info!(
475            "Transaction Driver updating AuthorityAggregator with committee {}",
476            new_authorities.committee
477        );
478
479        self.authority_aggregator.store(new_authorities);
480    }
481}
482
483// Inner state of TransactionDriver.
484struct State {
485    tasks: JoinSet<()>,
486}
487
488impl State {
489    fn new() -> Self {
490        Self {
491            tasks: JoinSet::new(),
492        }
493    }
494}