sui_core/transaction_driver/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4mod effects_certifier;
5mod error;
6mod metrics;
7mod request_retrier;
8mod transaction_submitter;
9
10/// Exports
11pub use error::TransactionDriverError;
12pub use metrics::*;
13use mysten_common::backoff::ExponentialBackoff;
14
15use std::{
16    net::SocketAddr,
17    sync::Arc,
18    time::{Duration, Instant},
19};
20
21use arc_swap::ArcSwap;
22use effects_certifier::*;
23use mysten_metrics::{monitored_future, spawn_logged_monitored_task};
24use parking_lot::Mutex;
25use rand::Rng;
26use sui_types::{
27    committee::EpochId,
28    error::{ErrorCategory, UserInputError},
29    messages_grpc::{PingType, SubmitTxRequest, SubmitTxResult, TxType},
30    transaction::TransactionDataAPI as _,
31};
32use tokio::{
33    task::JoinSet,
34    time::{interval, sleep},
35};
36use tracing::instrument;
37use transaction_submitter::*;
38
39use crate::{
40    authority_aggregator::AuthorityAggregator,
41    authority_client::AuthorityAPI,
42    quorum_driver::{AuthorityAggregatorUpdatable, reconfig_observer::ReconfigObserver},
43    validator_client_monitor::{
44        OperationFeedback, OperationType, ValidatorClientMetrics, ValidatorClientMonitor,
45    },
46};
47use sui_config::NodeConfig;
48/// Options for submitting a transaction.
49#[derive(Clone, Default, Debug)]
50pub struct SubmitTransactionOptions {
51    /// When forwarding transactions on behalf of a client, this is the client's address
52    /// specified for ddos protection.
53    pub forwarded_client_addr: Option<SocketAddr>,
54
55    /// When submitting a transaction, only the validators in the allowed validator list can be used to submit the transaction to.
56    /// When the allowed validator list is empty, any validator can be used.
57    pub allowed_validators: Vec<String>,
58
59    /// When submitting a transaction, the validators in the blocked validator list cannot be used to submit the transaction to.
60    /// When the blocked validator list is empty, no restrictions are applied.
61    pub blocked_validators: Vec<String>,
62}
63
64#[derive(Clone, Debug)]
65pub struct QuorumTransactionResponse {
66    // TODO(fastpath): Stop using QD types
67    pub effects: sui_types::quorum_driver_types::FinalizedEffects,
68
69    pub events: Option<sui_types::effects::TransactionEvents>,
70    // Input objects will only be populated in the happy path
71    pub input_objects: Option<Vec<sui_types::object::Object>>,
72    // Output objects will only be populated in the happy path
73    pub output_objects: Option<Vec<sui_types::object::Object>>,
74    pub auxiliary_data: Option<Vec<u8>>,
75}
76
77pub struct TransactionDriver<A: Clone> {
78    authority_aggregator: Arc<ArcSwap<AuthorityAggregator<A>>>,
79    state: Mutex<State>,
80    metrics: Arc<TransactionDriverMetrics>,
81    submitter: TransactionSubmitter,
82    certifier: EffectsCertifier,
83    client_monitor: Arc<ValidatorClientMonitor<A>>,
84}
85
86impl<A> TransactionDriver<A>
87where
88    A: AuthorityAPI + Send + Sync + 'static + Clone,
89{
90    // TODO: accept a TransactionDriverConfig to set default allowed & blocked validators.
91    pub fn new(
92        authority_aggregator: Arc<AuthorityAggregator<A>>,
93        reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
94        metrics: Arc<TransactionDriverMetrics>,
95        node_config: Option<&NodeConfig>,
96        client_metrics: Arc<ValidatorClientMetrics>,
97    ) -> Arc<Self> {
98        if std::env::var("TRANSACTION_DRIVER").is_ok() {
99            tracing::warn!(
100                "Transaction Driver is the only supported driver for transaction submission. Setting TRANSACTION_DRIVER is a no-op."
101            );
102        }
103
104        let shared_swap = Arc::new(ArcSwap::new(authority_aggregator));
105
106        // Extract validator client monitor config from NodeConfig or use default
107        let monitor_config = node_config
108            .and_then(|nc| nc.validator_client_monitor_config.clone())
109            .unwrap_or_default();
110        let client_monitor =
111            ValidatorClientMonitor::new(monitor_config, client_metrics, shared_swap.clone());
112
113        let driver = Arc::new(Self {
114            authority_aggregator: shared_swap,
115            state: Mutex::new(State::new()),
116            metrics: metrics.clone(),
117            submitter: TransactionSubmitter::new(metrics.clone()),
118            certifier: EffectsCertifier::new(metrics),
119            client_monitor,
120        });
121
122        let driver_clone = driver.clone();
123
124        spawn_logged_monitored_task!(Self::run_latency_checks(driver_clone));
125
126        driver.enable_reconfig(reconfig_observer);
127        driver
128    }
129
130    /// Returns the authority aggregator wrapper which upgrades on epoch changes.
131    pub fn authority_aggregator(&self) -> &Arc<ArcSwap<AuthorityAggregator<A>>> {
132        &self.authority_aggregator
133    }
134
135    /// Drives transaction to finalization.
136    ///
137    /// Internally, retries the attempt to finalize a transaction until:
138    /// - The transaction is finalized.
139    /// - The transaction observes a non-retriable error.
140    /// - Timeout is reached.
141    #[instrument(level = "error", skip_all, fields(tx_digest = ?request.transaction.as_ref().map(|t| t.digest()), ping = %request.ping_type.is_some()))]
142    pub async fn drive_transaction(
143        &self,
144        request: SubmitTxRequest,
145        options: SubmitTransactionOptions,
146        timeout_duration: Option<Duration>,
147    ) -> Result<QuorumTransactionResponse, TransactionDriverError> {
148        const MAX_DRIVE_TRANSACTION_RETRY_DELAY: Duration = Duration::from_secs(10);
149
150        // For ping requests, the amplification factor is always 1.
151        let amplification_factor = if request.ping_type.is_some() {
152            1
153        } else {
154            let gas_price = request
155                .transaction
156                .as_ref()
157                .unwrap()
158                .transaction_data()
159                .gas_price();
160            let reference_gas_price = self.authority_aggregator.load().reference_gas_price;
161            let amplification_factor = gas_price / reference_gas_price.max(1);
162            if amplification_factor == 0 {
163                return Err(TransactionDriverError::ValidationFailed {
164                    error: UserInputError::GasPriceUnderRGP {
165                        gas_price,
166                        reference_gas_price,
167                    }
168                    .to_string(),
169                });
170            }
171            amplification_factor
172        };
173
174        let tx_type = request.tx_type();
175        let ping_label = if request.ping_type.is_some() {
176            "true"
177        } else {
178            "false"
179        };
180        let timer = Instant::now();
181
182        self.metrics
183            .total_transactions_submitted
184            .with_label_values(&[tx_type.as_str(), ping_label])
185            .inc();
186
187        let mut backoff = ExponentialBackoff::new(
188            Duration::from_millis(100),
189            MAX_DRIVE_TRANSACTION_RETRY_DELAY,
190        );
191        let mut attempts = 0;
192        let mut latest_retriable_error = None;
193
194        let retry_loop = async {
195            loop {
196                // TODO(fastpath): Check local state before submitting transaction
197                match self
198                    .drive_transaction_once(amplification_factor, request.clone(), &options)
199                    .await
200                {
201                    Ok(resp) => {
202                        let settlement_finality_latency = timer.elapsed().as_secs_f64();
203                        self.metrics
204                            .settlement_finality_latency
205                            .with_label_values(&[tx_type.as_str(), ping_label])
206                            .observe(settlement_finality_latency);
207                        // Record the number of retries for successful transaction
208                        self.metrics
209                            .transaction_retries
210                            .with_label_values(&["success", tx_type.as_str(), ping_label])
211                            .observe(attempts as f64);
212                        return Ok(resp);
213                    }
214                    Err(e) => {
215                        self.metrics
216                            .drive_transaction_errors
217                            .with_label_values(&[
218                                e.categorize().into(),
219                                tx_type.as_str(),
220                                ping_label,
221                            ])
222                            .inc();
223                        if !e.is_submission_retriable() {
224                            // Record the number of retries for failed transaction
225                            self.metrics
226                                .transaction_retries
227                                .with_label_values(&["failure", tx_type.as_str(), ping_label])
228                                .observe(attempts as f64);
229                            if request.transaction.is_some() {
230                                tracing::info!(
231                                    "User transaction failed to finalize (attempt {}), with non-retriable error: {}",
232                                    attempts,
233                                    e
234                                );
235                            }
236                            return Err(e);
237                        }
238                        if request.transaction.is_some() {
239                            tracing::info!(
240                                "User transaction failed to finalize (attempt {}): {}. Retrying ...",
241                                attempts,
242                                e
243                            );
244                        }
245                        // Buffer the latest retriable error to be returned in case of timeout
246                        latest_retriable_error = Some(e);
247                    }
248                }
249
250                let overload = if let Some(e) = &latest_retriable_error {
251                    e.categorize() == ErrorCategory::ValidatorOverloaded
252                } else {
253                    false
254                };
255                let delay = if overload {
256                    // Increase delay during overload.
257                    const OVERLOAD_ADDITIONAL_DELAY: Duration = Duration::from_secs(10);
258                    backoff.next().unwrap() + OVERLOAD_ADDITIONAL_DELAY
259                } else {
260                    backoff.next().unwrap()
261                };
262                sleep(delay).await;
263
264                attempts += 1;
265            }
266        };
267
268        match timeout_duration {
269            Some(duration) => {
270                tokio::time::timeout(duration, retry_loop)
271                    .await
272                    .unwrap_or_else(|_| {
273                        // Timeout occurred, return with latest retriable error if available
274                        let e = TransactionDriverError::TimeoutWithLastRetriableError {
275                            last_error: latest_retriable_error.map(Box::new),
276                            attempts,
277                            timeout: duration,
278                        };
279                        if request.transaction.is_some() {
280                            tracing::info!(
281                                "User transaction timed out after {} attempts. Last error: {}",
282                                attempts,
283                                e
284                            );
285                        }
286                        Err(e)
287                    })
288            }
289            None => retry_loop.await,
290        }
291    }
292
293    #[instrument(level = "error", skip_all, err(level = "debug"))]
294    async fn drive_transaction_once(
295        &self,
296        amplification_factor: u64,
297        request: SubmitTxRequest,
298        options: &SubmitTransactionOptions,
299    ) -> Result<QuorumTransactionResponse, TransactionDriverError> {
300        let auth_agg = self.authority_aggregator.load();
301        let amplification_factor =
302            amplification_factor.min(auth_agg.committee.num_members() as u64);
303        let start_time = Instant::now();
304        let tx_type = request.tx_type();
305        let tx_digest = request.tx_digest();
306        let ping_type = request.ping_type;
307
308        let (name, submit_txn_result) = self
309            .submitter
310            .submit_transaction(
311                &auth_agg,
312                &self.client_monitor,
313                tx_type,
314                amplification_factor,
315                request,
316                options,
317            )
318            .await?;
319        if let SubmitTxResult::Rejected { error } = &submit_txn_result {
320            return Err(TransactionDriverError::ClientInternal {
321                error: format!(
322                    "SubmitTxResult::Rejected should have been returned as an error in submit_transaction(): {}",
323                    error
324                ),
325            });
326        }
327
328        // Wait for quorum effects using EffectsCertifier
329        let result = self
330            .certifier
331            .get_certified_finalized_effects(
332                &auth_agg,
333                &self.client_monitor,
334                tx_digest,
335                tx_type,
336                name,
337                submit_txn_result,
338                options,
339            )
340            .await;
341
342        if result.is_ok() {
343            self.client_monitor
344                .record_interaction_result(OperationFeedback {
345                    authority_name: name,
346                    display_name: auth_agg.get_display_name(&name),
347                    operation: if tx_type == TxType::SingleWriter {
348                        OperationType::FastPath
349                    } else {
350                        OperationType::Consensus
351                    },
352                    ping_type,
353                    result: Ok(start_time.elapsed()),
354                });
355        }
356        result
357    }
358
359    // Runs a background task to send ping transactions to all validators to perform latency checks to test both the fast path and the consensus path.
360    async fn run_latency_checks(self: Arc<Self>) {
361        const INTERVAL_BETWEEN_RUNS: Duration = Duration::from_secs(15);
362        const MAX_JITTER: Duration = Duration::from_secs(10);
363        const PING_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
364
365        let mut interval = interval(INTERVAL_BETWEEN_RUNS);
366        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
367
368        loop {
369            interval.tick().await;
370
371            let mut tasks = JoinSet::new();
372
373            for tx_type in [TxType::SingleWriter, TxType::SharedObject] {
374                Self::ping_for_tx_type(
375                    self.clone(),
376                    &mut tasks,
377                    tx_type,
378                    MAX_JITTER,
379                    PING_REQUEST_TIMEOUT,
380                );
381            }
382
383            while let Some(result) = tasks.join_next().await {
384                if let Err(e) = result {
385                    tracing::debug!("Error while driving ping transaction: {}", e);
386                }
387            }
388        }
389    }
390
391    /// Pings all validators for e2e latency with the provided transaction type.
392    fn ping_for_tx_type(
393        self: Arc<Self>,
394        tasks: &mut JoinSet<()>,
395        tx_type: TxType,
396        max_jitter: Duration,
397        ping_timeout: Duration,
398    ) {
399        // We are iterating over the single writer and shared object transaction types to test both the fast path and the consensus path.
400        let auth_agg = self.authority_aggregator.load().clone();
401        let validators = auth_agg.committee.names().cloned().collect::<Vec<_>>();
402
403        self.metrics
404            .latency_check_runs
405            .with_label_values(&[tx_type.as_str()])
406            .inc();
407
408        for name in validators {
409            let display_name = auth_agg.get_display_name(&name);
410            let delay_ms = rand::thread_rng().gen_range(0..max_jitter.as_millis()) as u64;
411            let self_clone = self.clone();
412
413            let task = async move {
414                // Add some random delay to the task to avoid all tasks running at the same time
415                if delay_ms > 0 {
416                    tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
417                }
418                let start_time = Instant::now();
419
420                let ping_type = if tx_type == TxType::SingleWriter {
421                    PingType::FastPath
422                } else {
423                    PingType::Consensus
424                };
425
426                // Now send a ping transaction to the chosen validator for the provided tx type
427                match self_clone
428                    .drive_transaction(
429                        SubmitTxRequest::new_ping(ping_type),
430                        SubmitTransactionOptions {
431                            allowed_validators: vec![display_name.clone()],
432                            ..Default::default()
433                        },
434                        Some(ping_timeout),
435                    )
436                    .await
437                {
438                    Ok(_) => {
439                        tracing::debug!(
440                            "Ping transaction to validator {} for tx type {} completed end to end in {} seconds",
441                            display_name,
442                            tx_type.as_str(),
443                            start_time.elapsed().as_secs_f64()
444                        );
445                    }
446                    Err(err) => {
447                        tracing::debug!(
448                            "Failed to get certified finalized effects for tx type {}, for ping transaction to validator {}: {}",
449                            tx_type.as_str(),
450                            display_name,
451                            err
452                        );
453                    }
454                }
455            };
456
457            tasks.spawn(task);
458        }
459    }
460
461    fn enable_reconfig(
462        self: &Arc<Self>,
463        reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
464    ) {
465        let driver = self.clone();
466        self.state.lock().tasks.spawn(monitored_future!(async move {
467            let mut reconfig_observer = reconfig_observer.clone_boxed();
468            reconfig_observer.run(driver).await;
469        }));
470    }
471}
472
473impl<A> AuthorityAggregatorUpdatable<A> for TransactionDriver<A>
474where
475    A: AuthorityAPI + Send + Sync + 'static + Clone,
476{
477    fn epoch(&self) -> EpochId {
478        self.authority_aggregator.load().committee.epoch
479    }
480
481    fn authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
482        self.authority_aggregator.load_full()
483    }
484
485    fn update_authority_aggregator(&self, new_authorities: Arc<AuthorityAggregator<A>>) {
486        tracing::info!(
487            "Transaction Driver updating AuthorityAggregator with committee {}",
488            new_authorities.committee
489        );
490
491        self.authority_aggregator.store(new_authorities);
492    }
493}
494
495// Inner state of TransactionDriver.
496struct State {
497    tasks: JoinSet<()>,
498}
499
500impl State {
501    fn new() -> Self {
502        Self {
503            tasks: JoinSet::new(),
504        }
505    }
506}