sui_core/transaction_driver/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4mod effects_certifier;
5mod error;
6mod metrics;
7mod request_retrier;
8mod transaction_submitter;
9
10/// Exports
11pub use error::TransactionDriverError;
12pub use metrics::*;
13use mysten_common::backoff::ExponentialBackoff;
14
15use std::{
16    net::SocketAddr,
17    sync::Arc,
18    time::{Duration, Instant},
19};
20
21use arc_swap::ArcSwap;
22use effects_certifier::*;
23use mysten_metrics::{monitored_future, spawn_logged_monitored_task};
24use parking_lot::Mutex;
25use rand::Rng;
26use sui_types::{
27    committee::EpochId,
28    error::{ErrorCategory, UserInputError},
29    messages_grpc::{PingType, SubmitTxRequest, SubmitTxResult, TxType},
30    transaction::TransactionDataAPI as _,
31};
32use tokio::{
33    task::JoinSet,
34    time::{interval, sleep},
35};
36use tracing::instrument;
37use transaction_submitter::*;
38
39use crate::{
40    authority_aggregator::AuthorityAggregator,
41    authority_client::AuthorityAPI,
42    quorum_driver::{AuthorityAggregatorUpdatable, reconfig_observer::ReconfigObserver},
43    validator_client_monitor::{
44        OperationFeedback, OperationType, ValidatorClientMetrics, ValidatorClientMonitor,
45    },
46};
47use sui_config::NodeConfig;
48/// Options for submitting a transaction.
49#[derive(Clone, Default, Debug)]
50pub struct SubmitTransactionOptions {
51    /// When forwarding transactions on behalf of a client, this is the client's address
52    /// specified for ddos protection.
53    pub forwarded_client_addr: Option<SocketAddr>,
54
55    /// When submitting a transaction, only the validators in the allowed validator list can be used to submit the transaction to.
56    /// When the allowed validator list is empty, any validator can be used.
57    pub allowed_validators: Vec<String>,
58}
59
60#[derive(Clone, Debug)]
61pub struct QuorumTransactionResponse {
62    // TODO(fastpath): Stop using QD types
63    pub effects: sui_types::quorum_driver_types::FinalizedEffects,
64
65    pub events: Option<sui_types::effects::TransactionEvents>,
66    // Input objects will only be populated in the happy path
67    pub input_objects: Option<Vec<sui_types::object::Object>>,
68    // Output objects will only be populated in the happy path
69    pub output_objects: Option<Vec<sui_types::object::Object>>,
70    pub auxiliary_data: Option<Vec<u8>>,
71}
72
73pub struct TransactionDriver<A: Clone> {
74    authority_aggregator: Arc<ArcSwap<AuthorityAggregator<A>>>,
75    state: Mutex<State>,
76    metrics: Arc<TransactionDriverMetrics>,
77    submitter: TransactionSubmitter,
78    certifier: EffectsCertifier,
79    client_monitor: Arc<ValidatorClientMonitor<A>>,
80}
81
82impl<A> TransactionDriver<A>
83where
84    A: AuthorityAPI + Send + Sync + 'static + Clone,
85{
86    pub fn new(
87        authority_aggregator: Arc<AuthorityAggregator<A>>,
88        reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
89        metrics: Arc<TransactionDriverMetrics>,
90        node_config: Option<&NodeConfig>,
91        client_metrics: Arc<ValidatorClientMetrics>,
92    ) -> Arc<Self> {
93        let shared_swap = Arc::new(ArcSwap::new(authority_aggregator));
94
95        // Extract validator client monitor config from NodeConfig or use default
96        let monitor_config = node_config
97            .and_then(|nc| nc.validator_client_monitor_config.clone())
98            .unwrap_or_default();
99        let client_monitor =
100            ValidatorClientMonitor::new(monitor_config, client_metrics, shared_swap.clone());
101
102        let driver = Arc::new(Self {
103            authority_aggregator: shared_swap,
104            state: Mutex::new(State::new()),
105            metrics: metrics.clone(),
106            submitter: TransactionSubmitter::new(metrics.clone()),
107            certifier: EffectsCertifier::new(metrics),
108            client_monitor,
109        });
110
111        let driver_clone = driver.clone();
112
113        spawn_logged_monitored_task!(Self::run_latency_checks(driver_clone));
114
115        driver.enable_reconfig(reconfig_observer);
116        driver
117    }
118
119    // Runs a background task to send ping transactions to all validators to perform latency checks to test both the fast path and the consensus path.
120    async fn run_latency_checks(self: Arc<Self>) {
121        const INTERVAL_BETWEEN_RUNS: Duration = Duration::from_secs(15);
122        const MAX_JITTER: Duration = Duration::from_secs(10);
123        const PING_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
124
125        let mut interval = interval(INTERVAL_BETWEEN_RUNS);
126        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
127
128        loop {
129            interval.tick().await;
130
131            let mut tasks = JoinSet::new();
132
133            for tx_type in [TxType::SingleWriter, TxType::SharedObject] {
134                Self::ping_for_tx_type(
135                    self.clone(),
136                    &mut tasks,
137                    tx_type,
138                    MAX_JITTER,
139                    PING_REQUEST_TIMEOUT,
140                );
141            }
142
143            while let Some(result) = tasks.join_next().await {
144                if let Err(e) = result {
145                    tracing::info!("Error while driving ping transaction: {}", e);
146                }
147            }
148        }
149    }
150
151    /// Pings all validators for e2e latency with the provided transaction type.
152    fn ping_for_tx_type(
153        self: Arc<Self>,
154        tasks: &mut JoinSet<()>,
155        tx_type: TxType,
156        max_jitter: Duration,
157        ping_timeout: Duration,
158    ) {
159        // We are iterating over the single writer and shared object transaction types to test both the fast path and the consensus path.
160        let auth_agg = self.authority_aggregator.load().clone();
161        let validators = auth_agg.committee.names().cloned().collect::<Vec<_>>();
162
163        self.metrics
164            .latency_check_runs
165            .with_label_values(&[tx_type.as_str()])
166            .inc();
167
168        for name in validators {
169            let display_name = auth_agg.get_display_name(&name);
170            let delay_ms = rand::thread_rng().gen_range(0..max_jitter.as_millis()) as u64;
171            let self_clone = self.clone();
172
173            let task = async move {
174                // Add some random delay to the task to avoid all tasks running at the same time
175                if delay_ms > 0 {
176                    tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
177                }
178                let start_time = Instant::now();
179
180                let ping_type = if tx_type == TxType::SingleWriter {
181                    PingType::FastPath
182                } else {
183                    PingType::Consensus
184                };
185
186                // Now send a ping transaction to the chosen validator for the provided tx type
187                match self_clone
188                    .drive_transaction(
189                        SubmitTxRequest::new_ping(ping_type),
190                        SubmitTransactionOptions {
191                            allowed_validators: vec![display_name.clone()],
192                            ..Default::default()
193                        },
194                        Some(ping_timeout),
195                    )
196                    .await
197                {
198                    Ok(_) => {
199                        tracing::debug!(
200                            "Ping transaction to validator {} for tx type {} completed end to end in {} seconds",
201                            display_name,
202                            tx_type.as_str(),
203                            start_time.elapsed().as_secs_f64()
204                        );
205                    }
206                    Err(err) => {
207                        tracing::info!(
208                            "Failed to get certified finalized effects for tx type {}, for ping transaction to validator {}: {}",
209                            tx_type.as_str(),
210                            display_name,
211                            err
212                        );
213                    }
214                }
215            };
216
217            tasks.spawn(task);
218        }
219    }
220
221    /// Drives transaction to submission and effects certification. Ping requests are derived from the submitted payload.
222    #[instrument(level = "error", skip_all, fields(tx_digest = ?request.transaction.as_ref().map(|t| t.digest()), ping = %request.ping_type.is_some()))]
223    pub async fn drive_transaction(
224        &self,
225        request: SubmitTxRequest,
226        options: SubmitTransactionOptions,
227        timeout_duration: Option<Duration>,
228    ) -> Result<QuorumTransactionResponse, TransactionDriverError> {
229        const MAX_DRIVE_TRANSACTION_RETRY_DELAY: Duration = Duration::from_secs(10);
230
231        // For ping requests, the amplification factor is always 1.
232        let amplification_factor = if request.ping_type.is_some() {
233            1
234        } else {
235            let gas_price = request
236                .transaction
237                .as_ref()
238                .unwrap()
239                .transaction_data()
240                .gas_price();
241            let reference_gas_price = self.authority_aggregator.load().reference_gas_price;
242            let amplification_factor = gas_price / reference_gas_price.max(1);
243            if amplification_factor == 0 {
244                return Err(TransactionDriverError::ValidationFailed {
245                    error: UserInputError::GasPriceUnderRGP {
246                        gas_price,
247                        reference_gas_price,
248                    }
249                    .to_string(),
250                });
251            }
252            amplification_factor
253        };
254
255        let tx_type = request.tx_type();
256        let ping_label = if request.ping_type.is_some() {
257            "true"
258        } else {
259            "false"
260        };
261        let timer = Instant::now();
262
263        self.metrics
264            .total_transactions_submitted
265            .with_label_values(&[tx_type.as_str(), ping_label])
266            .inc();
267
268        let mut backoff = ExponentialBackoff::new(MAX_DRIVE_TRANSACTION_RETRY_DELAY);
269        let mut attempts = 0;
270        let mut latest_retriable_error = None;
271
272        let retry_loop = async {
273            loop {
274                // TODO(fastpath): Check local state before submitting transaction
275                match self
276                    .drive_transaction_once(amplification_factor, request.clone(), &options)
277                    .await
278                {
279                    Ok(resp) => {
280                        let settlement_finality_latency = timer.elapsed().as_secs_f64();
281                        self.metrics
282                            .settlement_finality_latency
283                            .with_label_values(&[tx_type.as_str(), ping_label])
284                            .observe(settlement_finality_latency);
285                        // Record the number of retries for successful transaction
286                        self.metrics
287                            .transaction_retries
288                            .with_label_values(&["success", tx_type.as_str(), ping_label])
289                            .observe(attempts as f64);
290                        return Ok(resp);
291                    }
292                    Err(e) => {
293                        if !e.is_submission_retriable() {
294                            // Record the number of retries for failed transaction
295                            self.metrics
296                                .transaction_retries
297                                .with_label_values(&["failure", tx_type.as_str(), ping_label])
298                                .observe(attempts as f64);
299                            tracing::info!(
300                                "Failed to finalize transaction with non-retriable error after {} attempts: {}",
301                                attempts,
302                                e
303                            );
304                            return Err(e);
305                        }
306                        tracing::info!(
307                            "Failed to finalize transaction (attempt {}): {}. Retrying ...",
308                            attempts,
309                            e
310                        );
311                        // Buffer the latest retriable error to be returned in case of timeout
312                        latest_retriable_error = Some(e);
313                    }
314                }
315
316                let overload = if let Some(e) = &latest_retriable_error {
317                    e.categorize() == ErrorCategory::ValidatorOverloaded
318                } else {
319                    false
320                };
321                let delay = if overload {
322                    // Increase delay during overload.
323                    const OVERLOAD_ADDITIONAL_DELAY: Duration = Duration::from_secs(10);
324                    backoff.next().unwrap() + OVERLOAD_ADDITIONAL_DELAY
325                } else {
326                    backoff.next().unwrap()
327                };
328                sleep(delay).await;
329
330                attempts += 1;
331            }
332        };
333
334        match timeout_duration {
335            Some(duration) => {
336                tokio::time::timeout(duration, retry_loop)
337                    .await
338                    .unwrap_or_else(|_| {
339                        // Timeout occurred, return with latest retriable error if available
340                        let e = TransactionDriverError::TimeoutWithLastRetriableError {
341                            last_error: latest_retriable_error.map(Box::new),
342                            attempts,
343                            timeout: duration,
344                        };
345                        tracing::info!(
346                            "Transaction timed out after {} attempts. Last error: {}",
347                            attempts,
348                            e
349                        );
350                        Err(e)
351                    })
352            }
353            None => retry_loop.await,
354        }
355    }
356
357    #[instrument(level = "error", skip_all, err(level = "debug"))]
358    async fn drive_transaction_once(
359        &self,
360        amplification_factor: u64,
361        request: SubmitTxRequest,
362        options: &SubmitTransactionOptions,
363    ) -> Result<QuorumTransactionResponse, TransactionDriverError> {
364        let auth_agg = self.authority_aggregator.load();
365        let amplification_factor =
366            amplification_factor.min(auth_agg.committee.num_members() as u64);
367        let start_time = Instant::now();
368        let tx_type = request.tx_type();
369        let tx_digest = request.tx_digest();
370        let ping_type = request.ping_type;
371
372        let (name, submit_txn_result) = self
373            .submitter
374            .submit_transaction(
375                &auth_agg,
376                &self.client_monitor,
377                tx_type,
378                amplification_factor,
379                request,
380                options,
381            )
382            .await?;
383        if let SubmitTxResult::Rejected { error } = &submit_txn_result {
384            return Err(TransactionDriverError::ClientInternal {
385                error: format!(
386                    "SubmitTxResult::Rejected should have been returned as an error in submit_transaction(): {}",
387                    error
388                ),
389            });
390        }
391
392        // Wait for quorum effects using EffectsCertifier
393        let result = self
394            .certifier
395            .get_certified_finalized_effects(
396                &auth_agg,
397                &self.client_monitor,
398                tx_digest,
399                tx_type,
400                name,
401                submit_txn_result,
402                options,
403            )
404            .await;
405
406        if result.is_ok() {
407            self.client_monitor
408                .record_interaction_result(OperationFeedback {
409                    authority_name: name,
410                    display_name: auth_agg.get_display_name(&name),
411                    operation: if tx_type == TxType::SingleWriter {
412                        OperationType::FastPath
413                    } else {
414                        OperationType::Consensus
415                    },
416                    ping_type,
417                    result: Ok(start_time.elapsed()),
418                });
419        }
420        result
421    }
422
423    fn enable_reconfig(
424        self: &Arc<Self>,
425        reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
426    ) {
427        let driver = self.clone();
428        self.state.lock().tasks.spawn(monitored_future!(async move {
429            let mut reconfig_observer = reconfig_observer.clone_boxed();
430            reconfig_observer.run(driver).await;
431        }));
432    }
433}
434
435impl<A> AuthorityAggregatorUpdatable<A> for TransactionDriver<A>
436where
437    A: AuthorityAPI + Send + Sync + 'static + Clone,
438{
439    fn epoch(&self) -> EpochId {
440        self.authority_aggregator.load().committee.epoch
441    }
442
443    fn authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
444        self.authority_aggregator.load_full()
445    }
446
447    fn update_authority_aggregator(&self, new_authorities: Arc<AuthorityAggregator<A>>) {
448        tracing::info!(
449            "Transaction Driver updating AuthorityAggregator with committee {}",
450            new_authorities.committee
451        );
452
453        self.authority_aggregator.store(new_authorities);
454    }
455}
456
457// Chooses the percentage of transactions to be driven by TransactionDriver.
458pub fn choose_transaction_driver_percentage(
459    chain_id: Option<sui_types::digests::ChainIdentifier>,
460) -> u8 {
461    if let Ok(v) = std::env::var("TRANSACTION_DRIVER")
462        && let Ok(tx_driver_percentage) = v.parse::<u8>()
463        && tx_driver_percentage > 0
464        && tx_driver_percentage <= 100
465    {
466        return tx_driver_percentage;
467    }
468
469    if let Some(chain_identifier) = chain_id
470        && chain_identifier.chain() == sui_protocol_config::Chain::Unknown
471    {
472        // Kep test coverage for QD.
473        return 50;
474    }
475
476    // Default to 100% everywhere
477    100
478}
479
480// Inner state of TransactionDriver.
481struct State {
482    tasks: JoinSet<()>,
483}
484
485impl State {
486    fn new() -> Self {
487        Self {
488            tasks: JoinSet::new(),
489        }
490    }
491}