sui_core/transaction_driver/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4mod effects_certifier;
5mod error;
6mod metrics;
7mod request_retrier;
8mod transaction_submitter;
9
10/// Exports
11pub use error::TransactionDriverError;
12pub use metrics::*;
13use mysten_common::backoff::ExponentialBackoff;
14
15use std::{
16    net::SocketAddr,
17    sync::Arc,
18    time::{Duration, Instant},
19};
20
21use arc_swap::ArcSwap;
22use effects_certifier::*;
23use mysten_metrics::{monitored_future, spawn_logged_monitored_task};
24use parking_lot::Mutex;
25use rand::Rng;
26use sui_types::{
27    committee::EpochId,
28    error::{ErrorCategory, UserInputError},
29    messages_grpc::{PingType, SubmitTxRequest, SubmitTxResult, TxType},
30    transaction::TransactionDataAPI as _,
31};
32use tokio::{
33    task::JoinSet,
34    time::{interval, sleep},
35};
36use tracing::instrument;
37use transaction_submitter::*;
38
39use crate::{
40    authority_aggregator::AuthorityAggregator,
41    authority_client::AuthorityAPI,
42    quorum_driver::{AuthorityAggregatorUpdatable, reconfig_observer::ReconfigObserver},
43    validator_client_monitor::{
44        OperationFeedback, OperationType, ValidatorClientMetrics, ValidatorClientMonitor,
45    },
46};
47use sui_config::NodeConfig;
48/// Options for submitting a transaction.
49#[derive(Clone, Default, Debug)]
50pub struct SubmitTransactionOptions {
51    /// When forwarding transactions on behalf of a client, this is the client's address
52    /// specified for ddos protection.
53    pub forwarded_client_addr: Option<SocketAddr>,
54
55    /// When submitting a transaction, only the validators in the allowed validator list can be used to submit the transaction to.
56    /// When the allowed validator list is empty, any validator can be used.
57    pub allowed_validators: Vec<String>,
58
59    /// When submitting a transaction, the validators in the blocked validator list cannot be used to submit the transaction to.
60    /// When the blocked validator list is empty, no restrictions are applied.
61    pub blocked_validators: Vec<String>,
62}
63
64#[derive(Clone, Debug)]
65pub struct QuorumTransactionResponse {
66    // TODO(fastpath): Stop using QD types
67    pub effects: sui_types::quorum_driver_types::FinalizedEffects,
68
69    pub events: Option<sui_types::effects::TransactionEvents>,
70    // Input objects will only be populated in the happy path
71    pub input_objects: Option<Vec<sui_types::object::Object>>,
72    // Output objects will only be populated in the happy path
73    pub output_objects: Option<Vec<sui_types::object::Object>>,
74    pub auxiliary_data: Option<Vec<u8>>,
75}
76
77pub struct TransactionDriver<A: Clone> {
78    authority_aggregator: Arc<ArcSwap<AuthorityAggregator<A>>>,
79    state: Mutex<State>,
80    metrics: Arc<TransactionDriverMetrics>,
81    submitter: TransactionSubmitter,
82    certifier: EffectsCertifier,
83    client_monitor: Arc<ValidatorClientMonitor<A>>,
84}
85
86impl<A> TransactionDriver<A>
87where
88    A: AuthorityAPI + Send + Sync + 'static + Clone,
89{
90    pub fn new(
91        authority_aggregator: Arc<AuthorityAggregator<A>>,
92        reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
93        metrics: Arc<TransactionDriverMetrics>,
94        node_config: Option<&NodeConfig>,
95        client_metrics: Arc<ValidatorClientMetrics>,
96    ) -> Arc<Self> {
97        let shared_swap = Arc::new(ArcSwap::new(authority_aggregator));
98
99        // Extract validator client monitor config from NodeConfig or use default
100        let monitor_config = node_config
101            .and_then(|nc| nc.validator_client_monitor_config.clone())
102            .unwrap_or_default();
103        let client_monitor =
104            ValidatorClientMonitor::new(monitor_config, client_metrics, shared_swap.clone());
105
106        let driver = Arc::new(Self {
107            authority_aggregator: shared_swap,
108            state: Mutex::new(State::new()),
109            metrics: metrics.clone(),
110            submitter: TransactionSubmitter::new(metrics.clone()),
111            certifier: EffectsCertifier::new(metrics),
112            client_monitor,
113        });
114
115        let driver_clone = driver.clone();
116
117        spawn_logged_monitored_task!(Self::run_latency_checks(driver_clone));
118
119        driver.enable_reconfig(reconfig_observer);
120        driver
121    }
122
123    // Runs a background task to send ping transactions to all validators to perform latency checks to test both the fast path and the consensus path.
124    async fn run_latency_checks(self: Arc<Self>) {
125        const INTERVAL_BETWEEN_RUNS: Duration = Duration::from_secs(15);
126        const MAX_JITTER: Duration = Duration::from_secs(10);
127        const PING_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
128
129        let mut interval = interval(INTERVAL_BETWEEN_RUNS);
130        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
131
132        loop {
133            interval.tick().await;
134
135            let mut tasks = JoinSet::new();
136
137            for tx_type in [TxType::SingleWriter, TxType::SharedObject] {
138                Self::ping_for_tx_type(
139                    self.clone(),
140                    &mut tasks,
141                    tx_type,
142                    MAX_JITTER,
143                    PING_REQUEST_TIMEOUT,
144                );
145            }
146
147            while let Some(result) = tasks.join_next().await {
148                if let Err(e) = result {
149                    tracing::debug!("Error while driving ping transaction: {}", e);
150                }
151            }
152        }
153    }
154
155    /// Pings all validators for e2e latency with the provided transaction type.
156    fn ping_for_tx_type(
157        self: Arc<Self>,
158        tasks: &mut JoinSet<()>,
159        tx_type: TxType,
160        max_jitter: Duration,
161        ping_timeout: Duration,
162    ) {
163        // We are iterating over the single writer and shared object transaction types to test both the fast path and the consensus path.
164        let auth_agg = self.authority_aggregator.load().clone();
165        let validators = auth_agg.committee.names().cloned().collect::<Vec<_>>();
166
167        self.metrics
168            .latency_check_runs
169            .with_label_values(&[tx_type.as_str()])
170            .inc();
171
172        for name in validators {
173            let display_name = auth_agg.get_display_name(&name);
174            let delay_ms = rand::thread_rng().gen_range(0..max_jitter.as_millis()) as u64;
175            let self_clone = self.clone();
176
177            let task = async move {
178                // Add some random delay to the task to avoid all tasks running at the same time
179                if delay_ms > 0 {
180                    tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
181                }
182                let start_time = Instant::now();
183
184                let ping_type = if tx_type == TxType::SingleWriter {
185                    PingType::FastPath
186                } else {
187                    PingType::Consensus
188                };
189
190                // Now send a ping transaction to the chosen validator for the provided tx type
191                match self_clone
192                    .drive_transaction(
193                        SubmitTxRequest::new_ping(ping_type),
194                        SubmitTransactionOptions {
195                            allowed_validators: vec![display_name.clone()],
196                            ..Default::default()
197                        },
198                        Some(ping_timeout),
199                    )
200                    .await
201                {
202                    Ok(_) => {
203                        tracing::debug!(
204                            "Ping transaction to validator {} for tx type {} completed end to end in {} seconds",
205                            display_name,
206                            tx_type.as_str(),
207                            start_time.elapsed().as_secs_f64()
208                        );
209                    }
210                    Err(err) => {
211                        tracing::debug!(
212                            "Failed to get certified finalized effects for tx type {}, for ping transaction to validator {}: {}",
213                            tx_type.as_str(),
214                            display_name,
215                            err
216                        );
217                    }
218                }
219            };
220
221            tasks.spawn(task);
222        }
223    }
224
225    /// Drives transaction to submission and effects certification. Ping requests are derived from the submitted payload.
226    #[instrument(level = "error", skip_all, fields(tx_digest = ?request.transaction.as_ref().map(|t| t.digest()), ping = %request.ping_type.is_some()))]
227    pub async fn drive_transaction(
228        &self,
229        request: SubmitTxRequest,
230        options: SubmitTransactionOptions,
231        timeout_duration: Option<Duration>,
232    ) -> Result<QuorumTransactionResponse, TransactionDriverError> {
233        const MAX_DRIVE_TRANSACTION_RETRY_DELAY: Duration = Duration::from_secs(10);
234
235        // For ping requests, the amplification factor is always 1.
236        let amplification_factor = if request.ping_type.is_some() {
237            1
238        } else {
239            let gas_price = request
240                .transaction
241                .as_ref()
242                .unwrap()
243                .transaction_data()
244                .gas_price();
245            let reference_gas_price = self.authority_aggregator.load().reference_gas_price;
246            let amplification_factor = gas_price / reference_gas_price.max(1);
247            if amplification_factor == 0 {
248                return Err(TransactionDriverError::ValidationFailed {
249                    error: UserInputError::GasPriceUnderRGP {
250                        gas_price,
251                        reference_gas_price,
252                    }
253                    .to_string(),
254                });
255            }
256            amplification_factor
257        };
258
259        let tx_type = request.tx_type();
260        let ping_label = if request.ping_type.is_some() {
261            "true"
262        } else {
263            "false"
264        };
265        let timer = Instant::now();
266
267        self.metrics
268            .total_transactions_submitted
269            .with_label_values(&[tx_type.as_str(), ping_label])
270            .inc();
271
272        let mut backoff = ExponentialBackoff::new(MAX_DRIVE_TRANSACTION_RETRY_DELAY);
273        let mut attempts = 0;
274        let mut latest_retriable_error = None;
275
276        let retry_loop = async {
277            loop {
278                // TODO(fastpath): Check local state before submitting transaction
279                match self
280                    .drive_transaction_once(amplification_factor, request.clone(), &options)
281                    .await
282                {
283                    Ok(resp) => {
284                        let settlement_finality_latency = timer.elapsed().as_secs_f64();
285                        self.metrics
286                            .settlement_finality_latency
287                            .with_label_values(&[tx_type.as_str(), ping_label])
288                            .observe(settlement_finality_latency);
289                        // Record the number of retries for successful transaction
290                        self.metrics
291                            .transaction_retries
292                            .with_label_values(&["success", tx_type.as_str(), ping_label])
293                            .observe(attempts as f64);
294                        return Ok(resp);
295                    }
296                    Err(e) => {
297                        self.metrics
298                            .drive_transaction_errors
299                            .with_label_values(&[
300                                e.categorize().into(),
301                                tx_type.as_str(),
302                                ping_label,
303                            ])
304                            .inc();
305                        if !e.is_submission_retriable() {
306                            // Record the number of retries for failed transaction
307                            self.metrics
308                                .transaction_retries
309                                .with_label_values(&["failure", tx_type.as_str(), ping_label])
310                                .observe(attempts as f64);
311                            if request.transaction.is_some() {
312                                tracing::info!(
313                                    "Failed to finalize transaction with non-retriable error after {} attempts: {}",
314                                    attempts,
315                                    e
316                                );
317                            }
318                            return Err(e);
319                        }
320                        if request.transaction.is_some() {
321                            tracing::info!(
322                                "Failed to finalize transaction (attempt {}): {}. Retrying ...",
323                                attempts,
324                                e
325                            );
326                        }
327                        // Buffer the latest retriable error to be returned in case of timeout
328                        latest_retriable_error = Some(e);
329                    }
330                }
331
332                let overload = if let Some(e) = &latest_retriable_error {
333                    e.categorize() == ErrorCategory::ValidatorOverloaded
334                } else {
335                    false
336                };
337                let delay = if overload {
338                    // Increase delay during overload.
339                    const OVERLOAD_ADDITIONAL_DELAY: Duration = Duration::from_secs(10);
340                    backoff.next().unwrap() + OVERLOAD_ADDITIONAL_DELAY
341                } else {
342                    backoff.next().unwrap()
343                };
344                sleep(delay).await;
345
346                attempts += 1;
347            }
348        };
349
350        match timeout_duration {
351            Some(duration) => {
352                tokio::time::timeout(duration, retry_loop)
353                    .await
354                    .unwrap_or_else(|_| {
355                        // Timeout occurred, return with latest retriable error if available
356                        let e = TransactionDriverError::TimeoutWithLastRetriableError {
357                            last_error: latest_retriable_error.map(Box::new),
358                            attempts,
359                            timeout: duration,
360                        };
361                        if request.transaction.is_some() {
362                            tracing::info!(
363                                "Transaction timed out after {} attempts. Last error: {}",
364                                attempts,
365                                e
366                            );
367                        }
368                        Err(e)
369                    })
370            }
371            None => retry_loop.await,
372        }
373    }
374
375    #[instrument(level = "error", skip_all, err(level = "debug"))]
376    async fn drive_transaction_once(
377        &self,
378        amplification_factor: u64,
379        request: SubmitTxRequest,
380        options: &SubmitTransactionOptions,
381    ) -> Result<QuorumTransactionResponse, TransactionDriverError> {
382        let auth_agg = self.authority_aggregator.load();
383        let amplification_factor =
384            amplification_factor.min(auth_agg.committee.num_members() as u64);
385        let start_time = Instant::now();
386        let tx_type = request.tx_type();
387        let tx_digest = request.tx_digest();
388        let ping_type = request.ping_type;
389
390        let (name, submit_txn_result) = self
391            .submitter
392            .submit_transaction(
393                &auth_agg,
394                &self.client_monitor,
395                tx_type,
396                amplification_factor,
397                request,
398                options,
399            )
400            .await?;
401        if let SubmitTxResult::Rejected { error } = &submit_txn_result {
402            return Err(TransactionDriverError::ClientInternal {
403                error: format!(
404                    "SubmitTxResult::Rejected should have been returned as an error in submit_transaction(): {}",
405                    error
406                ),
407            });
408        }
409
410        // Wait for quorum effects using EffectsCertifier
411        let result = self
412            .certifier
413            .get_certified_finalized_effects(
414                &auth_agg,
415                &self.client_monitor,
416                tx_digest,
417                tx_type,
418                name,
419                submit_txn_result,
420                options,
421            )
422            .await;
423
424        if result.is_ok() {
425            self.client_monitor
426                .record_interaction_result(OperationFeedback {
427                    authority_name: name,
428                    display_name: auth_agg.get_display_name(&name),
429                    operation: if tx_type == TxType::SingleWriter {
430                        OperationType::FastPath
431                    } else {
432                        OperationType::Consensus
433                    },
434                    ping_type,
435                    result: Ok(start_time.elapsed()),
436                });
437        }
438        result
439    }
440
441    fn enable_reconfig(
442        self: &Arc<Self>,
443        reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
444    ) {
445        let driver = self.clone();
446        self.state.lock().tasks.spawn(monitored_future!(async move {
447            let mut reconfig_observer = reconfig_observer.clone_boxed();
448            reconfig_observer.run(driver).await;
449        }));
450    }
451}
452
453impl<A> AuthorityAggregatorUpdatable<A> for TransactionDriver<A>
454where
455    A: AuthorityAPI + Send + Sync + 'static + Clone,
456{
457    fn epoch(&self) -> EpochId {
458        self.authority_aggregator.load().committee.epoch
459    }
460
461    fn authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
462        self.authority_aggregator.load_full()
463    }
464
465    fn update_authority_aggregator(&self, new_authorities: Arc<AuthorityAggregator<A>>) {
466        tracing::info!(
467            "Transaction Driver updating AuthorityAggregator with committee {}",
468            new_authorities.committee
469        );
470
471        self.authority_aggregator.store(new_authorities);
472    }
473}
474
475// Chooses the percentage of transactions to be driven by TransactionDriver.
476pub fn choose_transaction_driver_percentage(
477    chain_id: Option<sui_types::digests::ChainIdentifier>,
478) -> u8 {
479    if let Ok(v) = std::env::var("TRANSACTION_DRIVER")
480        && let Ok(tx_driver_percentage) = v.parse::<u8>()
481        && (0..=100).contains(&tx_driver_percentage)
482    {
483        return tx_driver_percentage;
484    }
485
486    if let Some(chain_identifier) = chain_id
487        && chain_identifier.chain() == sui_protocol_config::Chain::Unknown
488    {
489        // Kep test coverage for QD.
490        return 50;
491    }
492
493    // Default to 100% everywhere
494    100
495}
496
497// Inner state of TransactionDriver.
498struct State {
499    tasks: JoinSet<()>,
500}
501
502impl State {
503    fn new() -> Self {
504        Self {
505            tasks: JoinSet::new(),
506        }
507    }
508}