sui_core/transaction_driver/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4mod effects_certifier;
5mod error;
6mod metrics;
7mod request_retrier;
8mod transaction_submitter;
9
10/// Exports
11pub use error::TransactionDriverError;
12pub use metrics::*;
13use mysten_common::backoff::ExponentialBackoff;
14
15use std::{
16    net::SocketAddr,
17    sync::Arc,
18    time::{Duration, Instant},
19};
20
21use arc_swap::ArcSwap;
22use effects_certifier::*;
23use mysten_metrics::{monitored_future, spawn_logged_monitored_task};
24use parking_lot::Mutex;
25use rand::Rng;
26use sui_types::{
27    committee::EpochId,
28    error::{ErrorCategory, UserInputError},
29    messages_grpc::{PingType, SubmitTxRequest, SubmitTxResult, TxType},
30    transaction::TransactionDataAPI as _,
31};
32use tokio::{
33    task::JoinSet,
34    time::{interval, sleep},
35};
36use tracing::instrument;
37use transaction_submitter::*;
38
39use crate::{
40    authority_aggregator::AuthorityAggregator,
41    authority_client::AuthorityAPI,
42    validator_client_monitor::{
43        OperationFeedback, OperationType, ValidatorClientMetrics, ValidatorClientMonitor,
44    },
45};
46
47pub mod reconfig_observer;
48pub use reconfig_observer::ReconfigObserver;
49
50/// Trait for components that can update their AuthorityAggregator during reconfiguration.
51/// Used by ReconfigObserver to notify components of epoch changes.
52pub trait AuthorityAggregatorUpdatable<A: Clone>: Send + Sync + 'static {
53    fn epoch(&self) -> EpochId;
54    fn authority_aggregator(&self) -> Arc<AuthorityAggregator<A>>;
55    fn update_authority_aggregator(&self, new_authorities: Arc<AuthorityAggregator<A>>);
56}
57use sui_config::NodeConfig;
58/// Options for submitting a transaction.
59#[derive(Clone, Default, Debug)]
60pub struct SubmitTransactionOptions {
61    /// When forwarding transactions on behalf of a client, this is the client's address
62    /// specified for ddos protection.
63    pub forwarded_client_addr: Option<SocketAddr>,
64
65    /// When submitting a transaction, only the validators in the allowed validator list can be used to submit the transaction to.
66    /// When the allowed validator list is empty, any validator can be used.
67    pub allowed_validators: Vec<String>,
68
69    /// When submitting a transaction, the validators in the blocked validator list cannot be used to submit the transaction to.
70    /// When the blocked validator list is empty, no restrictions are applied.
71    pub blocked_validators: Vec<String>,
72}
73
74#[derive(Clone, Debug)]
75pub struct QuorumTransactionResponse {
76    pub effects: sui_types::transaction_driver_types::FinalizedEffects,
77
78    pub events: Option<sui_types::effects::TransactionEvents>,
79    // Input objects will only be populated in the happy path
80    pub input_objects: Option<Vec<sui_types::object::Object>>,
81    // Output objects will only be populated in the happy path
82    pub output_objects: Option<Vec<sui_types::object::Object>>,
83    pub auxiliary_data: Option<Vec<u8>>,
84}
85
86pub struct TransactionDriver<A: Clone> {
87    authority_aggregator: Arc<ArcSwap<AuthorityAggregator<A>>>,
88    state: Mutex<State>,
89    metrics: Arc<TransactionDriverMetrics>,
90    submitter: TransactionSubmitter,
91    certifier: EffectsCertifier,
92    client_monitor: Arc<ValidatorClientMonitor<A>>,
93}
94
95impl<A> TransactionDriver<A>
96where
97    A: AuthorityAPI + Send + Sync + 'static + Clone,
98{
99    // TODO: accept a TransactionDriverConfig to set default allowed & blocked validators.
100    pub fn new(
101        authority_aggregator: Arc<AuthorityAggregator<A>>,
102        reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
103        metrics: Arc<TransactionDriverMetrics>,
104        node_config: Option<&NodeConfig>,
105        client_metrics: Arc<ValidatorClientMetrics>,
106    ) -> Arc<Self> {
107        if std::env::var("TRANSACTION_DRIVER").is_ok() {
108            tracing::warn!(
109                "Transaction Driver is the only supported driver for transaction submission. Setting TRANSACTION_DRIVER is a no-op."
110            );
111        }
112
113        let shared_swap = Arc::new(ArcSwap::new(authority_aggregator));
114
115        // Extract validator client monitor config from NodeConfig or use default
116        let monitor_config = node_config
117            .and_then(|nc| nc.validator_client_monitor_config.clone())
118            .unwrap_or_default();
119        let client_monitor =
120            ValidatorClientMonitor::new(monitor_config, client_metrics, shared_swap.clone());
121
122        let driver = Arc::new(Self {
123            authority_aggregator: shared_swap,
124            state: Mutex::new(State::new()),
125            metrics: metrics.clone(),
126            submitter: TransactionSubmitter::new(metrics.clone()),
127            certifier: EffectsCertifier::new(metrics),
128            client_monitor,
129        });
130
131        let driver_clone = driver.clone();
132
133        spawn_logged_monitored_task!(Self::run_latency_checks(driver_clone));
134
135        driver.enable_reconfig(reconfig_observer);
136        driver
137    }
138
139    /// Returns the authority aggregator wrapper which upgrades on epoch changes.
140    pub fn authority_aggregator(&self) -> &Arc<ArcSwap<AuthorityAggregator<A>>> {
141        &self.authority_aggregator
142    }
143
144    /// Drives transaction to finalization.
145    ///
146    /// Internally, retries the attempt to finalize a transaction until:
147    /// - The transaction is finalized.
148    /// - The transaction observes a non-retriable error.
149    /// - Timeout is reached.
150    #[instrument(level = "error", skip_all, fields(tx_digest = ?request.transaction.as_ref().map(|t| t.digest()), ping = %request.ping_type.is_some()))]
151    pub async fn drive_transaction(
152        &self,
153        request: SubmitTxRequest,
154        options: SubmitTransactionOptions,
155        timeout_duration: Option<Duration>,
156    ) -> Result<QuorumTransactionResponse, TransactionDriverError> {
157        const MAX_DRIVE_TRANSACTION_RETRY_DELAY: Duration = Duration::from_secs(10);
158
159        // For ping requests, the amplification factor is always 1.
160        let amplification_factor = if request.ping_type.is_some() {
161            1
162        } else {
163            let gas_price = request
164                .transaction
165                .as_ref()
166                .unwrap()
167                .transaction_data()
168                .gas_price();
169            let reference_gas_price = self.authority_aggregator.load().reference_gas_price;
170            let amplification_factor = gas_price / reference_gas_price.max(1);
171            if amplification_factor == 0 {
172                return Err(TransactionDriverError::ValidationFailed {
173                    error: UserInputError::GasPriceUnderRGP {
174                        gas_price,
175                        reference_gas_price,
176                    }
177                    .to_string(),
178                });
179            }
180            amplification_factor
181        };
182
183        let tx_type = request.tx_type();
184        let ping_label = if request.ping_type.is_some() {
185            "true"
186        } else {
187            "false"
188        };
189        let timer = Instant::now();
190
191        self.metrics
192            .total_transactions_submitted
193            .with_label_values(&[tx_type.as_str(), ping_label])
194            .inc();
195
196        let mut backoff = ExponentialBackoff::new(
197            Duration::from_millis(100),
198            MAX_DRIVE_TRANSACTION_RETRY_DELAY,
199        );
200        let mut attempts = 0;
201        let mut latest_retriable_error = None;
202
203        let retry_loop = async {
204            loop {
205                // TODO(fastpath): Check local state before submitting transaction
206                match self
207                    .drive_transaction_once(amplification_factor, request.clone(), &options)
208                    .await
209                {
210                    Ok(resp) => {
211                        let settlement_finality_latency = timer.elapsed().as_secs_f64();
212                        self.metrics
213                            .settlement_finality_latency
214                            .with_label_values(&[tx_type.as_str(), ping_label])
215                            .observe(settlement_finality_latency);
216                        // Record the number of retries for successful transaction
217                        self.metrics
218                            .transaction_retries
219                            .with_label_values(&["success", tx_type.as_str(), ping_label])
220                            .observe(attempts as f64);
221                        return Ok(resp);
222                    }
223                    Err(e) => {
224                        self.metrics
225                            .drive_transaction_errors
226                            .with_label_values(&[
227                                e.categorize().into(),
228                                tx_type.as_str(),
229                                ping_label,
230                            ])
231                            .inc();
232                        if !e.is_submission_retriable() {
233                            // Record the number of retries for failed transaction
234                            self.metrics
235                                .transaction_retries
236                                .with_label_values(&["failure", tx_type.as_str(), ping_label])
237                                .observe(attempts as f64);
238                            if request.transaction.is_some() {
239                                tracing::info!(
240                                    "User transaction failed to finalize (attempt {}), with non-retriable error: {}",
241                                    attempts,
242                                    e
243                                );
244                            }
245                            return Err(e);
246                        }
247                        if request.transaction.is_some() {
248                            tracing::info!(
249                                "User transaction failed to finalize (attempt {}): {}. Retrying ...",
250                                attempts,
251                                e
252                            );
253                        }
254                        // Buffer the latest retriable error to be returned in case of timeout
255                        latest_retriable_error = Some(e);
256                    }
257                }
258
259                let overload = if let Some(e) = &latest_retriable_error {
260                    e.categorize() == ErrorCategory::ValidatorOverloaded
261                } else {
262                    false
263                };
264                let delay = if overload {
265                    // Increase delay during overload.
266                    const OVERLOAD_ADDITIONAL_DELAY: Duration = Duration::from_secs(10);
267                    backoff.next().unwrap() + OVERLOAD_ADDITIONAL_DELAY
268                } else {
269                    backoff.next().unwrap()
270                };
271                sleep(delay).await;
272
273                attempts += 1;
274            }
275        };
276
277        match timeout_duration {
278            Some(duration) => {
279                tokio::time::timeout(duration, retry_loop)
280                    .await
281                    .unwrap_or_else(|_| {
282                        // Timeout occurred, return with latest retriable error if available
283                        let e = TransactionDriverError::TimeoutWithLastRetriableError {
284                            last_error: latest_retriable_error.map(Box::new),
285                            attempts,
286                            timeout: duration,
287                        };
288                        if request.transaction.is_some() {
289                            tracing::info!(
290                                "User transaction timed out after {} attempts. Last error: {}",
291                                attempts,
292                                e
293                            );
294                        }
295                        Err(e)
296                    })
297            }
298            None => retry_loop.await,
299        }
300    }
301
302    #[instrument(level = "error", skip_all, err(level = "debug"))]
303    async fn drive_transaction_once(
304        &self,
305        amplification_factor: u64,
306        request: SubmitTxRequest,
307        options: &SubmitTransactionOptions,
308    ) -> Result<QuorumTransactionResponse, TransactionDriverError> {
309        let auth_agg = self.authority_aggregator.load();
310        let amplification_factor =
311            amplification_factor.min(auth_agg.committee.num_members() as u64);
312        let start_time = Instant::now();
313        let tx_type = request.tx_type();
314        let tx_digest = request.tx_digest();
315        let ping_type = request.ping_type;
316
317        let (name, submit_txn_result) = self
318            .submitter
319            .submit_transaction(
320                &auth_agg,
321                &self.client_monitor,
322                tx_type,
323                amplification_factor,
324                request,
325                options,
326            )
327            .await?;
328        if let SubmitTxResult::Rejected { error } = &submit_txn_result {
329            return Err(TransactionDriverError::ClientInternal {
330                error: format!(
331                    "SubmitTxResult::Rejected should have been returned as an error in submit_transaction(): {}",
332                    error
333                ),
334            });
335        }
336
337        // Wait for quorum effects using EffectsCertifier
338        let result = self
339            .certifier
340            .get_certified_finalized_effects(
341                &auth_agg,
342                &self.client_monitor,
343                tx_digest,
344                tx_type,
345                name,
346                submit_txn_result,
347                options,
348            )
349            .await;
350
351        if result.is_ok() {
352            self.client_monitor
353                .record_interaction_result(OperationFeedback {
354                    authority_name: name,
355                    display_name: auth_agg.get_display_name(&name),
356                    operation: if tx_type == TxType::SingleWriter {
357                        OperationType::FastPath
358                    } else {
359                        OperationType::Consensus
360                    },
361                    ping_type,
362                    result: Ok(start_time.elapsed()),
363                });
364        }
365        result
366    }
367
368    // Runs a background task to send ping transactions to all validators to perform latency checks to test both the fast path and the consensus path.
369    async fn run_latency_checks(self: Arc<Self>) {
370        const INTERVAL_BETWEEN_RUNS: Duration = Duration::from_secs(15);
371        const MAX_JITTER: Duration = Duration::from_secs(10);
372        const PING_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
373
374        let mut interval = interval(INTERVAL_BETWEEN_RUNS);
375        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
376
377        loop {
378            interval.tick().await;
379
380            let mut tasks = JoinSet::new();
381
382            for tx_type in [TxType::SingleWriter, TxType::SharedObject] {
383                Self::ping_for_tx_type(
384                    self.clone(),
385                    &mut tasks,
386                    tx_type,
387                    MAX_JITTER,
388                    PING_REQUEST_TIMEOUT,
389                );
390            }
391
392            while let Some(result) = tasks.join_next().await {
393                if let Err(e) = result {
394                    tracing::debug!("Error while driving ping transaction: {}", e);
395                }
396            }
397        }
398    }
399
400    /// Pings all validators for e2e latency with the provided transaction type.
401    fn ping_for_tx_type(
402        self: Arc<Self>,
403        tasks: &mut JoinSet<()>,
404        tx_type: TxType,
405        max_jitter: Duration,
406        ping_timeout: Duration,
407    ) {
408        // We are iterating over the single writer and shared object transaction types to test both the fast path and the consensus path.
409        let auth_agg = self.authority_aggregator.load().clone();
410        let validators = auth_agg.committee.names().cloned().collect::<Vec<_>>();
411
412        self.metrics
413            .latency_check_runs
414            .with_label_values(&[tx_type.as_str()])
415            .inc();
416
417        for name in validators {
418            let display_name = auth_agg.get_display_name(&name);
419            let delay_ms = rand::thread_rng().gen_range(0..max_jitter.as_millis()) as u64;
420            let self_clone = self.clone();
421
422            let task = async move {
423                // Add some random delay to the task to avoid all tasks running at the same time
424                if delay_ms > 0 {
425                    tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
426                }
427                let start_time = Instant::now();
428
429                let ping_type = if tx_type == TxType::SingleWriter {
430                    PingType::FastPath
431                } else {
432                    PingType::Consensus
433                };
434
435                // Now send a ping transaction to the chosen validator for the provided tx type
436                match self_clone
437                    .drive_transaction(
438                        SubmitTxRequest::new_ping(ping_type),
439                        SubmitTransactionOptions {
440                            allowed_validators: vec![display_name.clone()],
441                            ..Default::default()
442                        },
443                        Some(ping_timeout),
444                    )
445                    .await
446                {
447                    Ok(_) => {
448                        tracing::debug!(
449                            "Ping transaction to validator {} for tx type {} completed end to end in {} seconds",
450                            display_name,
451                            tx_type.as_str(),
452                            start_time.elapsed().as_secs_f64()
453                        );
454                    }
455                    Err(err) => {
456                        tracing::debug!(
457                            "Failed to get certified finalized effects for tx type {}, for ping transaction to validator {}: {}",
458                            tx_type.as_str(),
459                            display_name,
460                            err
461                        );
462                    }
463                }
464            };
465
466            tasks.spawn(task);
467        }
468    }
469
470    fn enable_reconfig(
471        self: &Arc<Self>,
472        reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
473    ) {
474        let driver = self.clone();
475        self.state.lock().tasks.spawn(monitored_future!(async move {
476            let mut reconfig_observer = reconfig_observer.clone_boxed();
477            reconfig_observer.run(driver).await;
478        }));
479    }
480}
481
482impl<A> AuthorityAggregatorUpdatable<A> for TransactionDriver<A>
483where
484    A: AuthorityAPI + Send + Sync + 'static + Clone,
485{
486    fn epoch(&self) -> EpochId {
487        self.authority_aggregator.load().committee.epoch
488    }
489
490    fn authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
491        self.authority_aggregator.load_full()
492    }
493
494    fn update_authority_aggregator(&self, new_authorities: Arc<AuthorityAggregator<A>>) {
495        tracing::info!(
496            "Transaction Driver updating AuthorityAggregator with committee {}",
497            new_authorities.committee
498        );
499
500        self.authority_aggregator.store(new_authorities);
501    }
502}
503
504// Inner state of TransactionDriver.
505struct State {
506    tasks: JoinSet<()>,
507}
508
509impl State {
510    fn new() -> Self {
511        Self {
512            tasks: JoinSet::new(),
513        }
514    }
515}