sui_core/transaction_driver/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4mod effects_certifier;
5mod error;
6mod metrics;
7mod reconfig_observer;
8mod request_retrier;
9mod transaction_submitter;
10
11/// Exports
12pub use error::TransactionDriverError;
13pub use metrics::*;
14pub use reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver};
15
16use std::{
17    net::SocketAddr,
18    sync::Arc,
19    time::{Duration, Instant},
20};
21
22use arc_swap::ArcSwap;
23use effects_certifier::*;
24use mysten_common::backoff::ExponentialBackoff;
25use mysten_metrics::{monitored_future, spawn_logged_monitored_task};
26use parking_lot::Mutex;
27use rand::Rng;
28use sui_config::NodeConfig;
29use sui_types::{
30    committee::EpochId,
31    error::{ErrorCategory, UserInputError},
32    messages_grpc::{PingType, SubmitTxRequest, SubmitTxResult, TxType},
33    transaction::TransactionDataAPI as _,
34};
35use tokio::{
36    task::JoinSet,
37    time::{interval, sleep},
38};
39use tracing::instrument;
40use transaction_submitter::*;
41
42use crate::{
43    authority_aggregator::AuthorityAggregator,
44    authority_client::AuthorityAPI,
45    validator_client_monitor::{
46        OperationFeedback, OperationType, ValidatorClientMetrics, ValidatorClientMonitor,
47    },
48};
49
50/// Trait for components that can update their AuthorityAggregator during reconfiguration.
51/// Used by ReconfigObserver to notify components of epoch changes.
52pub trait AuthorityAggregatorUpdatable<A: Clone>: Send + Sync + 'static {
53    fn epoch(&self) -> EpochId;
54    fn authority_aggregator(&self) -> Arc<AuthorityAggregator<A>>;
55    fn update_authority_aggregator(&self, new_authorities: Arc<AuthorityAggregator<A>>);
56}
57
58/// Options for submitting a transaction.
59#[derive(Clone, Default, Debug)]
60pub struct SubmitTransactionOptions {
61    /// When forwarding transactions on behalf of a client, this is the client's address
62    /// specified for ddos protection.
63    pub forwarded_client_addr: Option<SocketAddr>,
64
65    /// When submitting a transaction, only the validators in the allowed validator list can be used to submit the transaction to.
66    /// When the allowed validator list is empty, any validator can be used.
67    pub allowed_validators: Vec<String>,
68
69    /// When submitting a transaction, the validators in the blocked validator list cannot be used to submit the transaction to.
70    /// When the blocked validator list is empty, no restrictions are applied.
71    pub blocked_validators: Vec<String>,
72}
73
74#[derive(Clone, Debug)]
75pub struct QuorumTransactionResponse {
76    pub effects: sui_types::transaction_driver_types::FinalizedEffects,
77
78    pub events: Option<sui_types::effects::TransactionEvents>,
79    // Input objects will only be populated in the happy path
80    pub input_objects: Option<Vec<sui_types::object::Object>>,
81    // Output objects will only be populated in the happy path
82    pub output_objects: Option<Vec<sui_types::object::Object>>,
83    pub auxiliary_data: Option<Vec<u8>>,
84}
85
86pub struct TransactionDriver<A: Clone> {
87    authority_aggregator: Arc<ArcSwap<AuthorityAggregator<A>>>,
88    state: Mutex<State>,
89    metrics: Arc<TransactionDriverMetrics>,
90    submitter: TransactionSubmitter,
91    certifier: EffectsCertifier,
92    client_monitor: Arc<ValidatorClientMonitor<A>>,
93}
94
95impl<A> TransactionDriver<A>
96where
97    A: AuthorityAPI + Send + Sync + 'static + Clone,
98{
99    // TODO: accept a TransactionDriverConfig to set default allowed & blocked validators.
100    pub fn new(
101        authority_aggregator: Arc<AuthorityAggregator<A>>,
102        reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
103        metrics: Arc<TransactionDriverMetrics>,
104        node_config: Option<&NodeConfig>,
105        client_metrics: Arc<ValidatorClientMetrics>,
106    ) -> Arc<Self> {
107        if std::env::var("TRANSACTION_DRIVER").is_ok() {
108            tracing::warn!(
109                "Transaction Driver is the only supported driver for transaction submission. Setting TRANSACTION_DRIVER is a no-op."
110            );
111        }
112
113        let shared_swap = Arc::new(ArcSwap::new(authority_aggregator));
114
115        // Extract validator client monitor config from NodeConfig or use default
116        let monitor_config = node_config
117            .and_then(|nc| nc.validator_client_monitor_config.clone())
118            .unwrap_or_default();
119        let client_monitor =
120            ValidatorClientMonitor::new(monitor_config, client_metrics, shared_swap.clone());
121
122        let driver = Arc::new(Self {
123            authority_aggregator: shared_swap,
124            state: Mutex::new(State::new()),
125            metrics: metrics.clone(),
126            submitter: TransactionSubmitter::new(metrics.clone()),
127            certifier: EffectsCertifier::new(metrics),
128            client_monitor,
129        });
130
131        let driver_clone = driver.clone();
132
133        spawn_logged_monitored_task!(Self::run_latency_checks(driver_clone));
134
135        driver.enable_reconfig(reconfig_observer);
136        driver
137    }
138
139    /// Returns the authority aggregator wrapper which upgrades on epoch changes.
140    pub fn authority_aggregator(&self) -> &Arc<ArcSwap<AuthorityAggregator<A>>> {
141        &self.authority_aggregator
142    }
143
144    /// Drives transaction to finalization.
145    ///
146    /// Internally, retries the attempt to finalize a transaction until:
147    /// - The transaction is finalized.
148    /// - The transaction observes a non-retriable error.
149    /// - Timeout is reached.
150    #[instrument(level = "error", skip_all, fields(tx_digest = ?request.transaction.as_ref().map(|t| t.digest()), ping = %request.ping_type.is_some()))]
151    pub async fn drive_transaction(
152        &self,
153        request: SubmitTxRequest,
154        options: SubmitTransactionOptions,
155        timeout_duration: Option<Duration>,
156    ) -> Result<QuorumTransactionResponse, TransactionDriverError> {
157        const MAX_DRIVE_TRANSACTION_RETRY_DELAY: Duration = Duration::from_secs(10);
158
159        // For ping requests, the amplification factor is always 1.
160        let amplification_factor = if request.ping_type.is_some() {
161            1
162        } else {
163            let gas_price = request
164                .transaction
165                .as_ref()
166                .unwrap()
167                .transaction_data()
168                .gas_price();
169            let reference_gas_price = self.authority_aggregator.load().reference_gas_price;
170            let amplification_factor = gas_price / reference_gas_price.max(1);
171            if amplification_factor == 0 {
172                return Err(TransactionDriverError::ValidationFailed {
173                    error: UserInputError::GasPriceUnderRGP {
174                        gas_price,
175                        reference_gas_price,
176                    }
177                    .to_string(),
178                });
179            }
180            amplification_factor
181        };
182
183        let tx_type = request.tx_type();
184        let ping_label = if request.ping_type.is_some() {
185            "true"
186        } else {
187            "false"
188        };
189        let timer = Instant::now();
190
191        self.metrics
192            .total_transactions_submitted
193            .with_label_values(&[tx_type.as_str(), ping_label])
194            .inc();
195
196        let mut backoff = ExponentialBackoff::new(
197            Duration::from_millis(100),
198            MAX_DRIVE_TRANSACTION_RETRY_DELAY,
199        );
200        let mut attempts = 0;
201        let mut latest_retriable_error = None;
202
203        let retry_loop = async {
204            loop {
205                // TODO(fastpath): Check local state before submitting transaction
206                match self
207                    .drive_transaction_once(amplification_factor, request.clone(), &options)
208                    .await
209                {
210                    Ok(resp) => {
211                        let settlement_finality_latency = timer.elapsed().as_secs_f64();
212                        self.metrics
213                            .settlement_finality_latency
214                            .with_label_values(&[tx_type.as_str(), ping_label])
215                            .observe(settlement_finality_latency);
216                        let is_out_of_expected_range = settlement_finality_latency >= 8.0
217                            || settlement_finality_latency <= 0.1;
218                        tracing::debug!(
219                            ?tx_type,
220                            ?is_out_of_expected_range,
221                            "Settlement finality latency: {:.3} seconds",
222                            settlement_finality_latency
223                        );
224                        // Record the number of retries for successful transaction
225                        self.metrics
226                            .transaction_retries
227                            .with_label_values(&["success", tx_type.as_str(), ping_label])
228                            .observe(attempts as f64);
229                        return Ok(resp);
230                    }
231                    Err(e) => {
232                        self.metrics
233                            .drive_transaction_errors
234                            .with_label_values(&[
235                                e.categorize().into(),
236                                tx_type.as_str(),
237                                ping_label,
238                            ])
239                            .inc();
240                        if !e.is_submission_retriable() {
241                            // Record the number of retries for failed transaction
242                            self.metrics
243                                .transaction_retries
244                                .with_label_values(&["failure", tx_type.as_str(), ping_label])
245                                .observe(attempts as f64);
246                            if request.transaction.is_some() {
247                                tracing::info!(
248                                    "User transaction failed to finalize (attempt {}), with non-retriable error: {} ({})",
249                                    attempts,
250                                    e,
251                                    Into::<&str>::into(e.categorize())
252                                );
253                            }
254                            return Err(e);
255                        }
256                        if request.transaction.is_some() {
257                            tracing::info!(
258                                "User transaction failed to finalize (attempt {}): {} ({}). Retrying ...",
259                                attempts,
260                                e,
261                                Into::<&str>::into(e.categorize())
262                            );
263                        }
264                        // Buffer the latest retriable error to be returned in case of timeout
265                        latest_retriable_error = Some(e);
266                    }
267                }
268
269                let overload = if let Some(e) = &latest_retriable_error {
270                    e.categorize() == ErrorCategory::ValidatorOverloaded
271                } else {
272                    false
273                };
274                let delay = if overload {
275                    // Increase delay during overload.
276                    const OVERLOAD_ADDITIONAL_DELAY: Duration = Duration::from_secs(10);
277                    backoff.next().unwrap() + OVERLOAD_ADDITIONAL_DELAY
278                } else {
279                    backoff.next().unwrap()
280                };
281
282                tracing::debug!("Retrying after {:.3}s", delay.as_secs_f32());
283                sleep(delay).await;
284
285                attempts += 1;
286            }
287        };
288
289        match timeout_duration {
290            Some(duration) => {
291                tokio::time::timeout(duration, retry_loop)
292                    .await
293                    .unwrap_or_else(|_| {
294                        // Timeout occurred, return with latest retriable error if available
295                        let e = TransactionDriverError::TimeoutWithLastRetriableError {
296                            last_error: latest_retriable_error.map(Box::new),
297                            attempts,
298                            timeout: duration,
299                        };
300                        if request.transaction.is_some() {
301                            tracing::info!(
302                                "User transaction timed out after {} attempts. Last error: {}",
303                                attempts,
304                                e
305                            );
306                        }
307                        Err(e)
308                    })
309            }
310            None => retry_loop.await,
311        }
312    }
313
314    #[instrument(level = "error", skip_all, err(level = "debug"))]
315    async fn drive_transaction_once(
316        &self,
317        amplification_factor: u64,
318        request: SubmitTxRequest,
319        options: &SubmitTransactionOptions,
320    ) -> Result<QuorumTransactionResponse, TransactionDriverError> {
321        let auth_agg = self.authority_aggregator.load();
322        let start_time = Instant::now();
323        let tx_type = request.tx_type();
324        let tx_digest = request.tx_digest();
325        let ping_type = request.ping_type;
326
327        let (name, submit_txn_result) = self
328            .submitter
329            .submit_transaction(
330                &auth_agg,
331                &self.client_monitor,
332                tx_type,
333                amplification_factor,
334                request,
335                options,
336            )
337            .await?;
338        if let SubmitTxResult::Rejected { error } = &submit_txn_result {
339            return Err(TransactionDriverError::ClientInternal {
340                error: format!(
341                    "SubmitTxResult::Rejected should have been returned as an error in submit_transaction(): {}",
342                    error
343                ),
344            });
345        }
346
347        // Wait for quorum effects using EffectsCertifier
348        let result = self
349            .certifier
350            .get_certified_finalized_effects(
351                &auth_agg,
352                &self.client_monitor,
353                tx_digest,
354                tx_type,
355                name,
356                submit_txn_result,
357                options,
358            )
359            .await;
360
361        if result.is_ok() {
362            self.client_monitor
363                .record_interaction_result(OperationFeedback {
364                    authority_name: name,
365                    display_name: auth_agg.get_display_name(&name),
366                    operation: if tx_type == TxType::SingleWriter {
367                        OperationType::FastPath
368                    } else {
369                        OperationType::Consensus
370                    },
371                    ping_type,
372                    result: Ok(start_time.elapsed()),
373                });
374        }
375        result
376    }
377
378    // Runs a background task to send ping transactions to all validators to perform latency checks for the consensus path.
379    async fn run_latency_checks(self: Arc<Self>) {
380        const INTERVAL_BETWEEN_RUNS: Duration = Duration::from_secs(15);
381        const MAX_JITTER: Duration = Duration::from_secs(10);
382        const PING_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
383
384        let mut interval = interval(INTERVAL_BETWEEN_RUNS);
385        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
386
387        loop {
388            interval.tick().await;
389
390            // Only run latency checks for shared object transactions since single writer
391            // transactions no longer use a separate fast path and go through consensus.
392            let auth_agg = self.authority_aggregator.load().clone();
393            let validators = auth_agg.committee.names().cloned().collect::<Vec<_>>();
394
395            self.metrics.latency_check_runs.inc();
396
397            let mut tasks = JoinSet::new();
398
399            for name in validators {
400                let display_name = auth_agg.get_display_name(&name);
401                let delay_ms = rand::thread_rng().gen_range(0..MAX_JITTER.as_millis()) as u64;
402                let self_clone = self.clone();
403
404                let task = async move {
405                    // Add some random delay to the task to avoid all tasks running at the same time
406                    if delay_ms > 0 {
407                        tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
408                    }
409                    let start_time = Instant::now();
410
411                    // Send a consensus ping transaction to the validator
412                    match self_clone
413                        .drive_transaction(
414                            SubmitTxRequest::new_ping(PingType::Consensus),
415                            SubmitTransactionOptions {
416                                allowed_validators: vec![display_name.clone()],
417                                ..Default::default()
418                            },
419                            Some(PING_REQUEST_TIMEOUT),
420                        )
421                        .await
422                    {
423                        Ok(_) => {
424                            tracing::debug!(
425                                "Ping transaction to validator {} completed end to end in {} seconds",
426                                display_name,
427                                start_time.elapsed().as_secs_f64()
428                            );
429                        }
430                        Err(err) => {
431                            tracing::debug!(
432                                "Failed to get certified finalized effects for ping transaction to validator {}: {}",
433                                display_name,
434                                err
435                            );
436                        }
437                    }
438                };
439
440                tasks.spawn(task);
441            }
442
443            while let Some(result) = tasks.join_next().await {
444                if let Err(e) = result {
445                    tracing::debug!("Error while driving ping transaction: {}", e);
446                }
447            }
448        }
449    }
450
451    fn enable_reconfig(
452        self: &Arc<Self>,
453        reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
454    ) {
455        let driver = self.clone();
456        self.state.lock().tasks.spawn(monitored_future!(async move {
457            let mut reconfig_observer = reconfig_observer.clone_boxed();
458            reconfig_observer.run(driver).await;
459        }));
460    }
461}
462
463impl<A> AuthorityAggregatorUpdatable<A> for TransactionDriver<A>
464where
465    A: AuthorityAPI + Send + Sync + 'static + Clone,
466{
467    fn epoch(&self) -> EpochId {
468        self.authority_aggregator.load().committee.epoch
469    }
470
471    fn authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
472        self.authority_aggregator.load_full()
473    }
474
475    fn update_authority_aggregator(&self, new_authorities: Arc<AuthorityAggregator<A>>) {
476        tracing::info!(
477            "Transaction Driver updating AuthorityAggregator with committee {}",
478            new_authorities.committee
479        );
480
481        self.authority_aggregator.store(new_authorities);
482    }
483}
484
485// Inner state of TransactionDriver.
486struct State {
487    tasks: JoinSet<()>,
488}
489
490impl State {
491    fn new() -> Self {
492        Self {
493            tasks: JoinSet::new(),
494        }
495    }
496}