sui_core/
transaction_orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::net::SocketAddr;
5use std::path::Path;
6use std::sync::Arc;
7use std::time::Duration;
8
9use futures::FutureExt;
10use futures::stream::{FuturesUnordered, StreamExt};
11use mysten_common::{backoff, in_antithesis};
12use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, spawn_monitored_task};
13use mysten_metrics::{add_server_timing, spawn_logged_monitored_task};
14use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
15use prometheus::{
16    HistogramVec, IntCounter, IntCounterVec, IntGauge, Registry,
17    register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
18    register_int_counter_with_registry, register_int_gauge_vec_with_registry,
19    register_int_gauge_with_registry,
20};
21use rand::Rng;
22use sui_config::NodeConfig;
23use sui_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
24use sui_types::base_types::TransactionDigest;
25use sui_types::effects::TransactionEffectsAPI;
26use sui_types::error::{ErrorCategory, SuiError, SuiErrorKind, SuiResult};
27use sui_types::messages_grpc::{SubmitTxRequest, TxType};
28use sui_types::sui_system_state::SuiSystemState;
29use sui_types::transaction::{Transaction, TransactionData, VerifiedTransaction};
30use sui_types::transaction_driver_types::{
31    EffectsFinalityInfo, ExecuteTransactionRequestType, ExecuteTransactionRequestV3,
32    ExecuteTransactionResponseV3, FinalizedEffects, IsTransactionExecutedLocally,
33    TransactionSubmissionError,
34};
35use sui_types::transaction_executor::{SimulateTransactionResult, TransactionChecks};
36use tokio::sync::broadcast::Receiver;
37use tokio::time::{Instant, sleep, timeout};
38use tracing::{Instrument, debug, error_span, info, instrument, warn};
39
40use crate::authority::AuthorityState;
41use crate::authority_aggregator::AuthorityAggregator;
42use crate::authority_client::{AuthorityAPI, NetworkAuthorityClient};
43use crate::transaction_driver::reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver};
44use crate::transaction_driver::{
45    QuorumTransactionResponse, SubmitTransactionOptions, TransactionDriver, TransactionDriverError,
46    TransactionDriverMetrics,
47};
48
49// How long to wait for local execution (including parents) before a timeout
50// is returned to client.
51const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
52
53// Timeout for waiting for finality for each transaction.
54const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(90);
55
56pub type QuorumTransactionEffectsResult = Result<
57    (Transaction, QuorumTransactionResponse),
58    (TransactionDigest, TransactionSubmissionError),
59>;
60
61/// Transaction Orchestrator is a Node component that utilizes Transaction Driver to
62/// submit transactions to validators for finality. It adds inflight deduplication,
63/// waiting for local execution, recovery, early validation, and epoch change handling
64/// on top of Transaction Driver.
65/// This is used by node RPC service to support transaction submission and finality waiting.
66pub struct TransactionOrchestrator<A: Clone> {
67    inner: Arc<Inner<A>>,
68}
69
70impl TransactionOrchestrator<NetworkAuthorityClient> {
71    pub fn new_with_auth_aggregator(
72        validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
73        validator_state: Arc<AuthorityState>,
74        reconfig_channel: Receiver<SuiSystemState>,
75        parent_path: &Path,
76        prometheus_registry: &Registry,
77        node_config: &NodeConfig,
78    ) -> Self {
79        let observer = OnsiteReconfigObserver::new(
80            reconfig_channel,
81            validator_state.get_object_cache_reader().clone(),
82            validator_state.clone_committee_store(),
83            validators.safe_client_metrics_base.clone(),
84        );
85        TransactionOrchestrator::new(
86            validators,
87            validator_state,
88            parent_path,
89            prometheus_registry,
90            observer,
91            node_config,
92        )
93    }
94}
95
96impl<A> TransactionOrchestrator<A>
97where
98    A: AuthorityAPI + Send + Sync + 'static + Clone,
99    OnsiteReconfigObserver: ReconfigObserver<A>,
100{
101    pub fn new(
102        validators: Arc<AuthorityAggregator<A>>,
103        validator_state: Arc<AuthorityState>,
104        parent_path: &Path,
105        prometheus_registry: &Registry,
106        reconfig_observer: OnsiteReconfigObserver,
107        node_config: &NodeConfig,
108    ) -> Self {
109        let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
110        let td_metrics = Arc::new(TransactionDriverMetrics::new(prometheus_registry));
111        let client_metrics = Arc::new(
112            crate::validator_client_monitor::ValidatorClientMetrics::new(prometheus_registry),
113        );
114        let reconfig_observer = Arc::new(reconfig_observer);
115
116        let transaction_driver = TransactionDriver::new(
117            validators.clone(),
118            reconfig_observer.clone(),
119            td_metrics,
120            Some(node_config),
121            client_metrics,
122        );
123
124        let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
125            parent_path.join("fullnode_pending_transactions"),
126        ));
127        Inner::start_task_to_recover_txes_in_log(
128            pending_tx_log.clone(),
129            transaction_driver.clone(),
130        );
131
132        let td_allowed_submission_list = node_config
133            .transaction_driver_config
134            .as_ref()
135            .map(|config| config.allowed_submission_validators.clone())
136            .unwrap_or_default();
137
138        let td_blocked_submission_list = node_config
139            .transaction_driver_config
140            .as_ref()
141            .map(|config| config.blocked_submission_validators.clone())
142            .unwrap_or_default();
143
144        if !td_allowed_submission_list.is_empty() && !td_blocked_submission_list.is_empty() {
145            panic!(
146                "Both allowed and blocked submission lists are set, this is not allowed, {:?} {:?}",
147                td_allowed_submission_list, td_blocked_submission_list
148            );
149        }
150
151        let enable_early_validation = node_config
152            .transaction_driver_config
153            .as_ref()
154            .map(|config| config.enable_early_validation)
155            .unwrap_or(true);
156
157        let inner = Arc::new(Inner {
158            validator_state,
159            pending_tx_log,
160            metrics,
161            transaction_driver,
162            td_allowed_submission_list,
163            td_blocked_submission_list,
164            enable_early_validation,
165        });
166        Self { inner }
167    }
168}
169
170impl<A> TransactionOrchestrator<A>
171where
172    A: AuthorityAPI + Send + Sync + 'static + Clone,
173{
174    #[instrument(name = "tx_orchestrator_execute_transaction", level = "debug", skip_all,
175    fields(
176        tx_digest = ?request.transaction.digest(),
177        tx_type = ?request_type,
178    ))]
179    pub async fn execute_transaction_block(
180        &self,
181        request: ExecuteTransactionRequestV3,
182        request_type: ExecuteTransactionRequestType,
183        client_addr: Option<SocketAddr>,
184    ) -> Result<
185        (ExecuteTransactionResponseV3, IsTransactionExecutedLocally),
186        TransactionSubmissionError,
187    > {
188        let timer = Instant::now();
189        let tx_type = if request.transaction.is_consensus_tx() {
190            TxType::SharedObject
191        } else {
192            TxType::SingleWriter
193        };
194        let tx_digest = *request.transaction.digest();
195
196        let inner = self.inner.clone();
197        let (response, mut executed_locally) = spawn_monitored_task!(
198            Inner::<A>::execute_transaction_with_retry(inner, request, client_addr)
199        )
200        .await
201        .map_err(|e| TransactionSubmissionError::TransactionFailed {
202            category: ErrorCategory::Internal,
203            details: e.to_string(),
204        })??;
205
206        if !executed_locally {
207            executed_locally = if matches!(
208                request_type,
209                ExecuteTransactionRequestType::WaitForLocalExecution
210            ) {
211                let executed_locally =
212                    Inner::<A>::wait_for_finalized_tx_executed_locally_with_timeout(
213                        &self.inner.validator_state,
214                        tx_digest,
215                        tx_type,
216                        &self.inner.metrics,
217                    )
218                    .await
219                    .is_ok();
220                add_server_timing("local_execution done");
221                executed_locally
222            } else {
223                false
224            };
225        }
226
227        let QuorumTransactionResponse {
228            effects,
229            events,
230            input_objects,
231            output_objects,
232            auxiliary_data,
233        } = response;
234
235        let response = ExecuteTransactionResponseV3 {
236            effects,
237            events,
238            input_objects,
239            output_objects,
240            auxiliary_data,
241        };
242
243        self.inner
244            .metrics
245            .request_latency
246            .with_label_values(&[
247                tx_type.as_str(),
248                "execute_transaction_block",
249                executed_locally.to_string().as_str(),
250            ])
251            .observe(timer.elapsed().as_secs_f64());
252
253        Ok((response, executed_locally))
254    }
255
256    // Utilize the handle_certificate_v3 validator api to request input/output objects
257    #[instrument(name = "tx_orchestrator_execute_transaction_v3", level = "debug", skip_all,
258                 fields(tx_digest = ?request.transaction.digest()))]
259    pub async fn execute_transaction_v3(
260        &self,
261        request: ExecuteTransactionRequestV3,
262        client_addr: Option<SocketAddr>,
263    ) -> Result<ExecuteTransactionResponseV3, TransactionSubmissionError> {
264        let timer = Instant::now();
265        let tx_type = if request.transaction.is_consensus_tx() {
266            TxType::SharedObject
267        } else {
268            TxType::SingleWriter
269        };
270
271        let inner = self.inner.clone();
272        let (response, _) = spawn_monitored_task!(Inner::<A>::execute_transaction_with_retry(
273            inner,
274            request,
275            client_addr
276        ))
277        .await
278        .map_err(|e| TransactionSubmissionError::TransactionFailed {
279            category: ErrorCategory::Internal,
280            details: e.to_string(),
281        })??;
282
283        self.inner
284            .metrics
285            .request_latency
286            .with_label_values(&[tx_type.as_str(), "execute_transaction_v3", "false"])
287            .observe(timer.elapsed().as_secs_f64());
288
289        let QuorumTransactionResponse {
290            effects,
291            events,
292            input_objects,
293            output_objects,
294            auxiliary_data,
295        } = response;
296
297        Ok(ExecuteTransactionResponseV3 {
298            effects,
299            events,
300            input_objects,
301            output_objects,
302            auxiliary_data,
303        })
304    }
305
306    pub fn authority_state(&self) -> &Arc<AuthorityState> {
307        &self.inner.validator_state
308    }
309
310    pub fn transaction_driver(&self) -> &Arc<TransactionDriver<A>> {
311        &self.inner.transaction_driver
312    }
313
314    pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
315        self.inner
316            .transaction_driver
317            .authority_aggregator()
318            .load_full()
319    }
320
321    pub fn load_all_pending_transactions_in_test(&self) -> SuiResult<Vec<VerifiedTransaction>> {
322        self.inner.pending_tx_log.load_all_pending_transactions()
323    }
324
325    pub fn empty_pending_tx_log_in_test(&self) -> bool {
326        self.inner.pending_tx_log.is_empty()
327    }
328}
329
330struct Inner<A: Clone> {
331    validator_state: Arc<AuthorityState>,
332    pending_tx_log: Arc<WritePathPendingTransactionLog>,
333    metrics: Arc<TransactionOrchestratorMetrics>,
334    transaction_driver: Arc<TransactionDriver<A>>,
335    td_allowed_submission_list: Vec<String>,
336    td_blocked_submission_list: Vec<String>,
337    enable_early_validation: bool,
338}
339
340impl<A> Inner<A>
341where
342    A: AuthorityAPI + Send + Sync + 'static + Clone,
343{
344    async fn execute_transaction_with_retry(
345        inner: Arc<Inner<A>>,
346        request: ExecuteTransactionRequestV3,
347        client_addr: Option<SocketAddr>,
348    ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), TransactionSubmissionError>
349    {
350        let result = inner
351            .execute_transaction_with_effects_waiting(
352                request.clone(),
353                client_addr,
354                /* enforce_live_input_objects */ false,
355            )
356            .await;
357
358        // If the error is retriable, retry the transaction sufficiently long.
359        if let Err(e) = &result
360            && e.is_retriable()
361        {
362            spawn_monitored_task!(async move {
363                inner.metrics.background_retry_started.inc();
364                let backoff = backoff::ExponentialBackoff::new(
365                    Duration::from_secs(1),
366                    Duration::from_secs(300),
367                );
368                const MAX_RETRIES: usize = 10;
369                for (i, delay) in backoff.enumerate() {
370                    if i == MAX_RETRIES {
371                        break;
372                    }
373                    // Start to enforce live input after 3 retries,
374                    // to avoid excessively retrying transactions with non-existent input objects.
375                    let result = inner
376                        .execute_transaction_with_effects_waiting(
377                            request.clone(),
378                            client_addr,
379                            i > 3,
380                        )
381                        .await;
382                    match result {
383                        Ok(_) => {
384                            inner
385                                .metrics
386                                .background_retry_attempts
387                                .with_label_values(&["success"])
388                                .inc();
389                            debug!(
390                                "Background retry {i} for transaction {} succeeded",
391                                request.transaction.digest()
392                            );
393                            break;
394                        }
395                        Err(e) => {
396                            if !e.is_retriable() {
397                                inner
398                                    .metrics
399                                    .background_retry_attempts
400                                    .with_label_values(&["non-retriable"])
401                                    .inc();
402                                debug!(
403                                    "Background retry {i} for transaction {} has non-retriable error: {e:?}. Terminating...",
404                                    request.transaction.digest()
405                                );
406                                break;
407                            }
408                            inner
409                                .metrics
410                                .background_retry_attempts
411                                .with_label_values(&["retriable"])
412                                .inc();
413                            debug!(
414                                "Background retry {i} for transaction {} has retriable error: {e:?}. Continuing...",
415                                request.transaction.digest()
416                            );
417                        }
418                    };
419                    sleep(delay).await;
420                }
421            });
422        }
423
424        result
425    }
426
427    /// Shared implementation for executing transactions with parallel local effects waiting
428    async fn execute_transaction_with_effects_waiting(
429        &self,
430        request: ExecuteTransactionRequestV3,
431        client_addr: Option<SocketAddr>,
432        enforce_live_input_objects: bool,
433    ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), TransactionSubmissionError>
434    {
435        let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
436        let verified_transaction = epoch_store
437            .verify_transaction_with_current_aliases(request.transaction.clone())
438            .map_err(TransactionSubmissionError::InvalidUserSignature)?
439            .into_tx();
440        let tx_digest = *verified_transaction.digest();
441
442        // Early validation check against local state before submission to catch non-retriable errors
443        // TODO: Consider moving this check to TransactionDriver for per-retry validation
444        if self.enable_early_validation
445            && let Err(e) = self.validator_state.check_transaction_validity(
446                &epoch_store,
447                &verified_transaction,
448                enforce_live_input_objects,
449            )
450        {
451            let error_category = e.categorize();
452            if !error_category.is_submission_retriable() {
453                // Skip early validation rejection if transaction has already been executed (allows retries to return cached results)
454                if !self.validator_state.is_tx_already_executed(&tx_digest) {
455                    self.metrics
456                        .early_validation_rejections
457                        .with_label_values(&[e.to_variant_name()])
458                        .inc();
459                    debug!(
460                        error = ?e,
461                        "Transaction rejected during early validation"
462                    );
463
464                    return Err(TransactionSubmissionError::TransactionFailed {
465                        category: error_category,
466                        details: e.to_string(),
467                    });
468                }
469            }
470        }
471
472        // Add transaction to WAL log.
473        let guard =
474            TransactionSubmissionGuard::new(self.pending_tx_log.clone(), &verified_transaction);
475        let is_new_transaction = guard.is_new_transaction();
476
477        let include_events = request.include_events;
478        let include_input_objects = request.include_input_objects;
479        let include_output_objects = request.include_output_objects;
480        let include_auxiliary_data = request.include_auxiliary_data;
481
482        let finality_timeout = std::env::var("WAIT_FOR_FINALITY_TIMEOUT_SECS")
483            .ok()
484            .and_then(|v| v.parse().ok())
485            .map(Duration::from_secs)
486            .unwrap_or(WAIT_FOR_FINALITY_TIMEOUT);
487
488        let num_submissions = if !is_new_transaction {
489            // No need to submit when the transaction is already being processed.
490            0
491        } else if cfg!(msim) || in_antithesis() {
492            // Allow duplicated submissions in tests.
493            let r = rand::thread_rng().gen_range(1..=100);
494            let n = if r <= 10 {
495                3
496            } else if r <= 30 {
497                2
498            } else {
499                1
500            };
501            if n > 1 {
502                debug!("Making {n} execution calls");
503            }
504            n
505        } else {
506            1
507        };
508
509        // Wait for one of the execution futures to succeed, or all of them to fail.
510        let mut execution_futures = FuturesUnordered::new();
511        for i in 0..num_submissions {
512            // Generate jitter values outside the async block
513            let should_delay = i > 0 && rand::thread_rng().gen_bool(0.8);
514            let delay_ms = if should_delay {
515                rand::thread_rng().gen_range(100..=500)
516            } else {
517                0
518            };
519
520            let request = request.clone();
521            let verified_transaction = verified_transaction.clone();
522
523            let future = async move {
524                if delay_ms > 0 {
525                    // Add jitters to duplicated submissions.
526                    sleep(Duration::from_millis(delay_ms)).await;
527                }
528                self.execute_transaction_impl(
529                    request,
530                    verified_transaction,
531                    client_addr,
532                    Some(finality_timeout),
533                )
534                .await
535            }
536            .boxed();
537            execution_futures.push(future);
538        }
539
540        // Track the last execution error.
541        let mut last_execution_error: Option<TransactionSubmissionError> = None;
542
543        // Wait for execution result outside of this call to become available.
544        let digests = [tx_digest];
545        let mut local_effects_future = self
546            .validator_state
547            .get_transaction_cache_reader()
548            .notify_read_executed_effects(
549                "TransactionOrchestrator::notify_read_execute_transaction_with_effects_waiting",
550                &digests,
551            )
552            .boxed();
553
554        // Wait for execution timeout.
555        let mut timeout_future = tokio::time::sleep(finality_timeout).boxed();
556
557        loop {
558            tokio::select! {
559                biased;
560
561                // Local effects might be available
562                all_effects = &mut local_effects_future => {
563                    debug!(
564                        "Effects became available while execution was running"
565                    );
566                    if all_effects.len() != 1 {
567                        break Err(TransactionSubmissionError::TransactionDriverInternalError(SuiErrorKind::Unknown(format!("Unexpected number of effects found: {}", all_effects.len())).into()));
568                    }
569                    self.metrics.concurrent_execution.inc();
570
571                    let effects = all_effects.into_iter().next().unwrap();
572                    let epoch = effects.executed_epoch();
573                    let events = if include_events {
574                        if effects.events_digest().is_some() {
575                            Some(self.validator_state.get_transaction_events(effects.transaction_digest())
576                                .map_err(TransactionSubmissionError::TransactionDriverInternalError)?)
577                        } else {
578                            None
579                        }
580                    } else {
581                        None
582                    };
583                    let input_objects = include_input_objects
584                        .then(|| self.validator_state.get_transaction_input_objects(&effects))
585                        .transpose()
586                        .map_err(TransactionSubmissionError::TransactionDriverInternalError)?;
587                    let output_objects = include_output_objects
588                        .then(|| self.validator_state.get_transaction_output_objects(&effects))
589                        .transpose()
590                        .map_err(TransactionSubmissionError::TransactionDriverInternalError)?;
591                    let response = QuorumTransactionResponse {
592                        effects: FinalizedEffects {
593                            effects,
594                            finality_info: EffectsFinalityInfo::QuorumExecuted(epoch),
595                        },
596                        events,
597                        input_objects,
598                        output_objects,
599                        auxiliary_data: None,
600                    };
601                    break Ok((response, true));
602                }
603
604                // This branch is disabled if execution_futures is empty.
605                Some(result) = execution_futures.next() => {
606                    match result {
607                        Ok(resp) => {
608                            // First success gets returned.
609                            debug!("Execution succeeded, returning response");
610                            let QuorumTransactionResponse {
611                                effects,
612                                events,
613                                input_objects,
614                                output_objects,
615                                auxiliary_data,
616                            } = resp;
617                            // Filter fields based on request flags.
618                            let resp = QuorumTransactionResponse {
619                                effects,
620                                events: if include_events { events } else { None },
621                                input_objects: if include_input_objects { input_objects } else { None },
622                                output_objects: if include_output_objects { output_objects } else { None },
623                                auxiliary_data: if include_auxiliary_data { auxiliary_data } else { None },
624                            };
625                            break Ok((resp, false));
626                        }
627                        Err(e) => {
628                            debug!(?e, "Execution attempt failed, wait for other attempts");
629                            last_execution_error = Some(e);
630                        }
631                    };
632
633                    // Last error must have been recorded.
634                    if execution_futures.is_empty() {
635                        break Err(last_execution_error.unwrap());
636                    }
637                }
638
639                // A timeout has occurred while waiting for finality
640                _ = &mut timeout_future => {
641                    if let Some(e) = last_execution_error {
642                        debug!("Timeout waiting for transaction finality. Last execution error: {e}");
643                    } else {
644                        debug!("Timeout waiting for transaction finality.");
645                    }
646                    self.metrics.wait_for_finality_timeout.inc();
647
648                    // TODO: Return the last execution error.
649                    break Err(TransactionSubmissionError::TimeoutBeforeFinality);
650                }
651            }
652        }
653    }
654
655    #[instrument(level = "error", skip_all)]
656    async fn execute_transaction_impl(
657        &self,
658        request: ExecuteTransactionRequestV3,
659        verified_transaction: VerifiedTransaction,
660        client_addr: Option<SocketAddr>,
661        finality_timeout: Option<Duration>,
662    ) -> Result<QuorumTransactionResponse, TransactionSubmissionError> {
663        debug!("TO Received transaction execution request.");
664
665        let timer = Instant::now();
666        let tx_type = if verified_transaction.is_consensus_tx() {
667            TxType::SharedObject
668        } else {
669            TxType::SingleWriter
670        };
671
672        let (_in_flight_metrics_guards, good_response_metrics) =
673            self.update_metrics(&request.transaction);
674
675        // TODO: refactor all the gauge and timer metrics with `monitored_scope`
676        let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
677        wait_for_finality_gauge.inc();
678        let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
679            in_flight.dec();
680        });
681
682        let response = self
683            .submit_with_transaction_driver(
684                &self.transaction_driver,
685                &request,
686                client_addr,
687                &verified_transaction,
688                good_response_metrics,
689                finality_timeout,
690            )
691            .await?;
692        let driver_type = "transaction_driver";
693
694        add_server_timing("wait_for_finality done");
695
696        self.metrics.wait_for_finality_finished.inc();
697
698        let elapsed = timer.elapsed().as_secs_f64();
699        self.metrics
700            .settlement_finality_latency
701            .with_label_values(&[tx_type.as_str(), driver_type])
702            .observe(elapsed);
703        good_response_metrics.inc();
704
705        Ok(response)
706    }
707
708    #[instrument(level = "error", skip_all, err(level = "info"))]
709    async fn submit_with_transaction_driver(
710        &self,
711        td: &Arc<TransactionDriver<A>>,
712        request: &ExecuteTransactionRequestV3,
713        client_addr: Option<SocketAddr>,
714        verified_transaction: &VerifiedTransaction,
715        good_response_metrics: &GenericCounter<AtomicU64>,
716        timeout_duration: Option<Duration>,
717    ) -> Result<QuorumTransactionResponse, TransactionSubmissionError> {
718        let tx_digest = *verified_transaction.digest();
719        debug!("Using TransactionDriver for transaction {:?}", tx_digest);
720
721        let td_response = td
722            .drive_transaction(
723                SubmitTxRequest::new_transaction(request.transaction.clone()),
724                SubmitTransactionOptions {
725                    forwarded_client_addr: client_addr,
726                    allowed_validators: self.td_allowed_submission_list.clone(),
727                    blocked_validators: self.td_blocked_submission_list.clone(),
728                },
729                timeout_duration,
730            )
731            .await
732            .map_err(|e| match e {
733                TransactionDriverError::TimeoutWithLastRetriableError {
734                    last_error,
735                    attempts,
736                    timeout,
737                } => TransactionSubmissionError::TimeoutBeforeFinalityWithErrors {
738                    last_error: last_error.map(|e| e.to_string()).unwrap_or_default(),
739                    attempts,
740                    timeout,
741                },
742                other => TransactionSubmissionError::TransactionFailed {
743                    category: other.categorize(),
744                    details: other.to_string(),
745                },
746            });
747
748        match td_response {
749            Err(e) => {
750                warn!("TransactionDriver error: {e:?}");
751                Err(e)
752            }
753            Ok(quorum_transaction_response) => {
754                good_response_metrics.inc();
755                Ok(quorum_transaction_response)
756            }
757        }
758    }
759
760    #[instrument(
761        name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout",
762        level = "debug",
763        skip_all,
764        err(level = "info")
765    )]
766    async fn wait_for_finalized_tx_executed_locally_with_timeout(
767        validator_state: &Arc<AuthorityState>,
768        tx_digest: TransactionDigest,
769        tx_type: TxType,
770        metrics: &TransactionOrchestratorMetrics,
771    ) -> SuiResult {
772        metrics.local_execution_in_flight.inc();
773        let _metrics_guard =
774            scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
775                in_flight.dec();
776            });
777
778        let _latency_guard = metrics
779            .local_execution_latency
780            .with_label_values(&[tx_type.as_str()])
781            .start_timer();
782        debug!("Waiting for finalized tx to be executed locally.");
783        match timeout(
784            LOCAL_EXECUTION_TIMEOUT,
785            validator_state
786                .get_transaction_cache_reader()
787                .notify_read_executed_effects_digests(
788                    "TransactionOrchestrator::notify_read_wait_for_local_execution",
789                    &[tx_digest],
790                ),
791        )
792        .instrument(error_span!(
793            "transaction_orchestrator::local_execution",
794            ?tx_digest
795        ))
796        .await
797        {
798            Err(_elapsed) => {
799                debug!(
800                    "Waiting for finalized tx to be executed locally timed out within {:?}.",
801                    LOCAL_EXECUTION_TIMEOUT
802                );
803                metrics.local_execution_timeout.inc();
804                Err(SuiErrorKind::TimeoutError.into())
805            }
806            Ok(_) => {
807                metrics.local_execution_success.inc();
808                Ok(())
809            }
810        }
811    }
812
813    fn update_metrics<'a>(
814        &'a self,
815        transaction: &Transaction,
816    ) -> (impl Drop + use<A>, &'a GenericCounter<AtomicU64>) {
817        let (in_flight, good_response) = if transaction.is_consensus_tx() {
818            self.metrics.total_req_received_shared_object.inc();
819            (
820                self.metrics.req_in_flight_shared_object.clone(),
821                &self.metrics.good_response_shared_object,
822            )
823        } else {
824            self.metrics.total_req_received_single_writer.inc();
825            (
826                self.metrics.req_in_flight_single_writer.clone(),
827                &self.metrics.good_response_single_writer,
828            )
829        };
830        in_flight.inc();
831        (
832            scopeguard::guard(in_flight, |in_flight| {
833                in_flight.dec();
834            }),
835            good_response,
836        )
837    }
838
839    fn start_task_to_recover_txes_in_log(
840        pending_tx_log: Arc<WritePathPendingTransactionLog>,
841        transaction_driver: Arc<TransactionDriver<A>>,
842    ) {
843        spawn_logged_monitored_task!(async move {
844            if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
845                info!("Skipping loading pending transactions from pending_tx_log.");
846                return;
847            }
848            let pending_txes = pending_tx_log
849                .load_all_pending_transactions()
850                .expect("failed to load all pending transactions");
851            let num_pending_txes = pending_txes.len();
852            info!(
853                "Recovering {} pending transactions from pending_tx_log.",
854                num_pending_txes
855            );
856            let mut recovery = pending_txes
857                .into_iter()
858                .map(|tx| {
859                    let pending_tx_log = pending_tx_log.clone();
860                    let transaction_driver = transaction_driver.clone();
861                    async move {
862                        // TODO: ideally pending_tx_log would not contain VerifiedTransaction, but that
863                        // requires a migration.
864                        let tx = tx.into_inner();
865                        let tx_digest = *tx.digest();
866                        // It's not impossible we fail to enqueue a task but that's not the end of world.
867                        // TODO(william) correctly extract client_addr from logs
868                        if let Err(err) = transaction_driver
869                            .drive_transaction(
870                                SubmitTxRequest::new_transaction(tx),
871                                SubmitTransactionOptions::default(),
872                                Some(Duration::from_secs(60)),
873                            )
874                            .await
875                        {
876                            warn!(?tx_digest, "Failed to execute recovered transaction: {err}");
877                        } else {
878                            debug!(?tx_digest, "Executed recovered transaction");
879                        }
880                        if let Err(err) = pending_tx_log.finish_transaction(&tx_digest) {
881                            warn!(
882                                ?tx_digest,
883                                "Failed to clean up transaction in pending log: {err}"
884                            );
885                        } else {
886                            debug!(?tx_digest, "Cleaned up transaction in pending log");
887                        }
888                    }
889                })
890                .collect::<FuturesUnordered<_>>();
891
892            let mut num_recovered = 0;
893            while recovery.next().await.is_some() {
894                num_recovered += 1;
895                if num_recovered % 1000 == 0 {
896                    info!(
897                        "Recovered {} out of {} transactions from pending_tx_log.",
898                        num_recovered, num_pending_txes
899                    );
900                }
901            }
902            info!(
903                "Recovery finished. Recovered {} out of {} transactions from pending_tx_log.",
904                num_recovered, num_pending_txes
905            );
906        });
907    }
908}
909/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
910#[derive(Clone)]
911pub struct TransactionOrchestratorMetrics {
912    total_req_received_single_writer: GenericCounter<AtomicU64>,
913    total_req_received_shared_object: GenericCounter<AtomicU64>,
914
915    good_response_single_writer: GenericCounter<AtomicU64>,
916    good_response_shared_object: GenericCounter<AtomicU64>,
917
918    req_in_flight_single_writer: GenericGauge<AtomicI64>,
919    req_in_flight_shared_object: GenericGauge<AtomicI64>,
920
921    wait_for_finality_in_flight: GenericGauge<AtomicI64>,
922    wait_for_finality_finished: GenericCounter<AtomicU64>,
923    wait_for_finality_timeout: GenericCounter<AtomicU64>,
924
925    local_execution_in_flight: GenericGauge<AtomicI64>,
926    local_execution_success: GenericCounter<AtomicU64>,
927    local_execution_timeout: GenericCounter<AtomicU64>,
928
929    concurrent_execution: IntCounter,
930
931    early_validation_rejections: IntCounterVec,
932
933    background_retry_started: IntGauge,
934    background_retry_attempts: IntCounterVec,
935
936    request_latency: HistogramVec,
937    local_execution_latency: HistogramVec,
938    settlement_finality_latency: HistogramVec,
939}
940
941// Note that labeled-metrics are stored upfront individually
942// to mitigate the perf hit by MetricsVec.
943// See https://github.com/tikv/rust-prometheus/tree/master/static-metric
944impl TransactionOrchestratorMetrics {
945    pub fn new(registry: &Registry) -> Self {
946        let total_req_received = register_int_counter_vec_with_registry!(
947            "tx_orchestrator_total_req_received",
948            "Total number of executions request Transaction Orchestrator receives, group by tx type",
949            &["tx_type"],
950            registry
951        )
952        .unwrap();
953
954        let total_req_received_single_writer =
955            total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
956        let total_req_received_shared_object =
957            total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
958
959        let good_response = register_int_counter_vec_with_registry!(
960            "tx_orchestrator_good_response",
961            "Total number of good responses Transaction Orchestrator generates, group by tx type",
962            &["tx_type"],
963            registry
964        )
965        .unwrap();
966
967        let good_response_single_writer =
968            good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
969        let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
970
971        let req_in_flight = register_int_gauge_vec_with_registry!(
972            "tx_orchestrator_req_in_flight",
973            "Number of requests in flights Transaction Orchestrator processes, group by tx type",
974            &["tx_type"],
975            registry
976        )
977        .unwrap();
978
979        let req_in_flight_single_writer =
980            req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
981        let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
982
983        Self {
984            total_req_received_single_writer,
985            total_req_received_shared_object,
986            good_response_single_writer,
987            good_response_shared_object,
988            req_in_flight_single_writer,
989            req_in_flight_shared_object,
990            wait_for_finality_in_flight: register_int_gauge_with_registry!(
991                "tx_orchestrator_wait_for_finality_in_flight",
992                "Number of in flight txns Transaction Orchestrator are waiting for finality for",
993                registry,
994            )
995            .unwrap(),
996            wait_for_finality_finished: register_int_counter_with_registry!(
997                "tx_orchestrator_wait_for_finality_fnished",
998                "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
999                registry,
1000            )
1001            .unwrap(),
1002            wait_for_finality_timeout: register_int_counter_with_registry!(
1003                "tx_orchestrator_wait_for_finality_timeout",
1004                "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
1005                registry,
1006            )
1007            .unwrap(),
1008            local_execution_in_flight: register_int_gauge_with_registry!(
1009                "tx_orchestrator_local_execution_in_flight",
1010                "Number of local execution txns in flights Transaction Orchestrator handles",
1011                registry,
1012            )
1013            .unwrap(),
1014            local_execution_success: register_int_counter_with_registry!(
1015                "tx_orchestrator_local_execution_success",
1016                "Total number of successful local execution txns Transaction Orchestrator handles",
1017                registry,
1018            )
1019            .unwrap(),
1020            local_execution_timeout: register_int_counter_with_registry!(
1021                "tx_orchestrator_local_execution_timeout",
1022                "Total number of timed-out local execution txns Transaction Orchestrator handles",
1023                registry,
1024            )
1025            .unwrap(),
1026            concurrent_execution: register_int_counter_with_registry!(
1027                "tx_orchestrator_concurrent_execution",
1028                "Total number of concurrent execution where effects are available locally finishing driving the transaction to finality",
1029                registry,
1030            )
1031            .unwrap(),
1032            early_validation_rejections: register_int_counter_vec_with_registry!(
1033                "tx_orchestrator_early_validation_rejections",
1034                "Total number of transactions rejected during early validation before submission, by reason",
1035                &["reason"],
1036                registry,
1037            )
1038            .unwrap(),
1039            background_retry_started: register_int_gauge_with_registry!(
1040                "tx_orchestrator_background_retry_started",
1041                "Number of background retry tasks kicked off for transactions with retriable errors",
1042                registry,
1043            )
1044            .unwrap(),
1045            background_retry_attempts: register_int_counter_vec_with_registry!(
1046                "tx_orchestrator_background_retry_attempts",
1047                "Total number of background retry attempts, by status",
1048                &["status"],
1049                registry,
1050            )
1051            .unwrap(),
1052            request_latency: register_histogram_vec_with_registry!(
1053                "tx_orchestrator_request_latency",
1054                "Time spent in processing one Transaction Orchestrator request",
1055                &["tx_type", "route", "wait_for_local_execution"],
1056                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1057                registry,
1058            )
1059            .unwrap(),
1060            local_execution_latency: register_histogram_vec_with_registry!(
1061                "tx_orchestrator_local_execution_latency",
1062                "Time spent in waiting for one Transaction Orchestrator gets locally executed",
1063                &["tx_type"],
1064                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1065                registry,
1066            )
1067            .unwrap(),
1068            settlement_finality_latency: register_histogram_vec_with_registry!(
1069                "tx_orchestrator_settlement_finality_latency",
1070                "Time spent in waiting for one Transaction Orchestrator gets settled and finalized",
1071                &["tx_type", "driver_type"],
1072                mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
1073                registry,
1074            )
1075            .unwrap(),
1076        }
1077    }
1078
1079    pub fn new_for_tests() -> Self {
1080        let registry = Registry::new();
1081        Self::new(&registry)
1082    }
1083}
1084
1085#[async_trait::async_trait]
1086impl<A> sui_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
1087where
1088    A: AuthorityAPI + Send + Sync + 'static + Clone,
1089{
1090    async fn execute_transaction(
1091        &self,
1092        request: ExecuteTransactionRequestV3,
1093        client_addr: Option<std::net::SocketAddr>,
1094    ) -> Result<ExecuteTransactionResponseV3, TransactionSubmissionError> {
1095        self.execute_transaction_v3(request, client_addr).await
1096    }
1097
1098    fn simulate_transaction(
1099        &self,
1100        transaction: TransactionData,
1101        checks: TransactionChecks,
1102        allow_mock_gas_coin: bool,
1103    ) -> Result<SimulateTransactionResult, SuiError> {
1104        self.inner
1105            .validator_state
1106            .simulate_transaction(transaction, checks, allow_mock_gas_coin)
1107    }
1108}
1109
1110/// Keeps track of inflight transactions being submitted, and helps recover transactions
1111/// on restart.
1112struct TransactionSubmissionGuard {
1113    pending_tx_log: Arc<WritePathPendingTransactionLog>,
1114    tx_digest: TransactionDigest,
1115    is_new_transaction: bool,
1116}
1117
1118impl TransactionSubmissionGuard {
1119    pub fn new(
1120        pending_tx_log: Arc<WritePathPendingTransactionLog>,
1121        tx: &VerifiedTransaction,
1122    ) -> Self {
1123        let is_new_transaction = pending_tx_log.write_pending_transaction_maybe(tx);
1124        let tx_digest = *tx.digest();
1125        if is_new_transaction {
1126            debug!(?tx_digest, "Added transaction to inflight set");
1127        } else {
1128            debug!(
1129                ?tx_digest,
1130                "Transaction already being processed, no new submission will be made"
1131            );
1132        };
1133        Self {
1134            pending_tx_log,
1135            tx_digest,
1136            is_new_transaction,
1137        }
1138    }
1139
1140    fn is_new_transaction(&self) -> bool {
1141        self.is_new_transaction
1142    }
1143}
1144
1145impl Drop for TransactionSubmissionGuard {
1146    fn drop(&mut self) {
1147        if let Err(err) = self.pending_tx_log.finish_transaction(&self.tx_digest) {
1148            warn!(?self.tx_digest, "Failed to clean up transaction in pending log: {err}");
1149        } else {
1150            debug!(?self.tx_digest, "Cleaned up transaction in pending log");
1151        }
1152    }
1153}