sui_core/
transaction_orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::net::SocketAddr;
5use std::path::Path;
6use std::sync::Arc;
7use std::time::Duration;
8
9use futures::FutureExt;
10use futures::stream::{FuturesUnordered, StreamExt};
11use mysten_common::{backoff, in_antithesis};
12use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, spawn_monitored_task};
13use mysten_metrics::{add_server_timing, spawn_logged_monitored_task};
14use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
15use prometheus::{
16    HistogramVec, IntCounter, IntCounterVec, IntGauge, Registry,
17    register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
18    register_int_counter_with_registry, register_int_gauge_vec_with_registry,
19    register_int_gauge_with_registry,
20};
21use rand::Rng;
22use sui_config::NodeConfig;
23use sui_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
24use sui_types::base_types::TransactionDigest;
25use sui_types::effects::TransactionEffectsAPI;
26use sui_types::error::{ErrorCategory, SuiError, SuiErrorKind, SuiResult};
27use sui_types::messages_grpc::{SubmitTxRequest, TxType};
28use sui_types::sui_system_state::SuiSystemState;
29use sui_types::transaction::{Transaction, TransactionData, VerifiedTransaction};
30use sui_types::transaction_driver_types::{
31    EffectsFinalityInfo, ExecuteTransactionRequestType, ExecuteTransactionRequestV3,
32    ExecuteTransactionResponseV3, FinalizedEffects, IsTransactionExecutedLocally,
33    TransactionSubmissionError,
34};
35use sui_types::transaction_executor::{SimulateTransactionResult, TransactionChecks};
36use tokio::sync::broadcast::Receiver;
37use tokio::time::{Instant, sleep, timeout};
38use tracing::{Instrument, debug, error_span, info, instrument, warn};
39
40use crate::authority::AuthorityState;
41use crate::authority_aggregator::AuthorityAggregator;
42use crate::authority_client::{AuthorityAPI, NetworkAuthorityClient};
43use crate::transaction_driver::reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver};
44use crate::transaction_driver::{
45    QuorumTransactionResponse, SubmitTransactionOptions, TransactionDriver, TransactionDriverError,
46    TransactionDriverMetrics,
47};
48
49// How long to wait for local execution (including parents) before a timeout
50// is returned to client.
51const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
52
53// Timeout for waiting for finality for each transaction.
54const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(90);
55
56pub type QuorumTransactionEffectsResult = Result<
57    (Transaction, QuorumTransactionResponse),
58    (TransactionDigest, TransactionSubmissionError),
59>;
60
61/// Transaction Orchestrator is a Node component that utilizes Transaction Driver to
62/// submit transactions to validators for finality. It adds inflight deduplication,
63/// waiting for local execution, recovery, early validation, and epoch change handling
64/// on top of Transaction Driver.
65/// This is used by node RPC service to support transaction submission and finality waiting.
66pub struct TransactionOrchestrator<A: Clone> {
67    inner: Arc<Inner<A>>,
68}
69
70impl TransactionOrchestrator<NetworkAuthorityClient> {
71    pub fn new_with_auth_aggregator(
72        validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
73        validator_state: Arc<AuthorityState>,
74        reconfig_channel: Receiver<SuiSystemState>,
75        parent_path: &Path,
76        prometheus_registry: &Registry,
77        node_config: &NodeConfig,
78    ) -> Self {
79        let observer = OnsiteReconfigObserver::new(
80            reconfig_channel,
81            validator_state.get_object_cache_reader().clone(),
82            validator_state.clone_committee_store(),
83            validators.safe_client_metrics_base.clone(),
84        );
85        TransactionOrchestrator::new(
86            validators,
87            validator_state,
88            parent_path,
89            prometheus_registry,
90            observer,
91            node_config,
92        )
93    }
94}
95
96impl<A> TransactionOrchestrator<A>
97where
98    A: AuthorityAPI + Send + Sync + 'static + Clone,
99    OnsiteReconfigObserver: ReconfigObserver<A>,
100{
101    pub fn new(
102        validators: Arc<AuthorityAggregator<A>>,
103        validator_state: Arc<AuthorityState>,
104        parent_path: &Path,
105        prometheus_registry: &Registry,
106        reconfig_observer: OnsiteReconfigObserver,
107        node_config: &NodeConfig,
108    ) -> Self {
109        let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
110        let td_metrics = Arc::new(TransactionDriverMetrics::new(prometheus_registry));
111        let client_metrics = Arc::new(
112            crate::validator_client_monitor::ValidatorClientMetrics::new(prometheus_registry),
113        );
114        let reconfig_observer = Arc::new(reconfig_observer);
115
116        let transaction_driver = TransactionDriver::new(
117            validators.clone(),
118            reconfig_observer.clone(),
119            td_metrics,
120            Some(node_config),
121            client_metrics,
122        );
123
124        let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
125            parent_path.join("fullnode_pending_transactions"),
126        ));
127        Inner::start_task_to_recover_txes_in_log(
128            pending_tx_log.clone(),
129            transaction_driver.clone(),
130        );
131
132        let td_allowed_submission_list = node_config
133            .transaction_driver_config
134            .as_ref()
135            .map(|config| config.allowed_submission_validators.clone())
136            .unwrap_or_default();
137
138        let td_blocked_submission_list = node_config
139            .transaction_driver_config
140            .as_ref()
141            .map(|config| config.blocked_submission_validators.clone())
142            .unwrap_or_default();
143
144        if !td_allowed_submission_list.is_empty() && !td_blocked_submission_list.is_empty() {
145            panic!(
146                "Both allowed and blocked submission lists are set, this is not allowed, {:?} {:?}",
147                td_allowed_submission_list, td_blocked_submission_list
148            );
149        }
150
151        let enable_early_validation = node_config
152            .transaction_driver_config
153            .as_ref()
154            .map(|config| config.enable_early_validation)
155            .unwrap_or(true);
156
157        let inner = Arc::new(Inner {
158            validator_state,
159            pending_tx_log,
160            metrics,
161            transaction_driver,
162            td_allowed_submission_list,
163            td_blocked_submission_list,
164            enable_early_validation,
165        });
166        Self { inner }
167    }
168}
169
170impl<A> TransactionOrchestrator<A>
171where
172    A: AuthorityAPI + Send + Sync + 'static + Clone,
173{
174    #[instrument(name = "tx_orchestrator_execute_transaction", level = "debug", skip_all,
175    fields(
176        tx_digest = ?request.transaction.digest(),
177        tx_type = ?request_type,
178    ))]
179    pub async fn execute_transaction_block(
180        &self,
181        request: ExecuteTransactionRequestV3,
182        request_type: ExecuteTransactionRequestType,
183        client_addr: Option<SocketAddr>,
184    ) -> Result<
185        (ExecuteTransactionResponseV3, IsTransactionExecutedLocally),
186        TransactionSubmissionError,
187    > {
188        let timer = Instant::now();
189        let tx_type = if request.transaction.is_consensus_tx() {
190            TxType::SharedObject
191        } else {
192            TxType::SingleWriter
193        };
194        let tx_digest = *request.transaction.digest();
195
196        let inner = self.inner.clone();
197        let (response, mut executed_locally) = spawn_monitored_task!(
198            Inner::<A>::execute_transaction_with_retry(inner, request, client_addr)
199        )
200        .await
201        .map_err(|e| TransactionSubmissionError::TransactionFailed {
202            category: ErrorCategory::Internal,
203            details: e.to_string(),
204        })??;
205
206        if !executed_locally {
207            executed_locally = if matches!(
208                request_type,
209                ExecuteTransactionRequestType::WaitForLocalExecution
210            ) {
211                let executed_locally =
212                    Inner::<A>::wait_for_finalized_tx_executed_locally_with_timeout(
213                        &self.inner.validator_state,
214                        tx_digest,
215                        tx_type,
216                        &self.inner.metrics,
217                    )
218                    .await
219                    .is_ok();
220                add_server_timing("local_execution done");
221                executed_locally
222            } else {
223                false
224            };
225        }
226
227        let QuorumTransactionResponse {
228            effects,
229            events,
230            input_objects,
231            output_objects,
232            auxiliary_data,
233        } = response;
234
235        let response = ExecuteTransactionResponseV3 {
236            effects,
237            events,
238            input_objects,
239            output_objects,
240            auxiliary_data,
241        };
242
243        self.inner
244            .metrics
245            .request_latency
246            .with_label_values(&[
247                tx_type.as_str(),
248                "execute_transaction_block",
249                executed_locally.to_string().as_str(),
250            ])
251            .observe(timer.elapsed().as_secs_f64());
252
253        Ok((response, executed_locally))
254    }
255
256    // Utilize the handle_certificate_v3 validator api to request input/output objects
257    #[instrument(name = "tx_orchestrator_execute_transaction_v3", level = "debug", skip_all,
258                 fields(tx_digest = ?request.transaction.digest()))]
259    pub async fn execute_transaction_v3(
260        &self,
261        request: ExecuteTransactionRequestV3,
262        client_addr: Option<SocketAddr>,
263    ) -> Result<ExecuteTransactionResponseV3, TransactionSubmissionError> {
264        let timer = Instant::now();
265        let tx_type = if request.transaction.is_consensus_tx() {
266            TxType::SharedObject
267        } else {
268            TxType::SingleWriter
269        };
270
271        let inner = self.inner.clone();
272        let (response, _) = spawn_monitored_task!(Inner::<A>::execute_transaction_with_retry(
273            inner,
274            request,
275            client_addr
276        ))
277        .await
278        .map_err(|e| TransactionSubmissionError::TransactionFailed {
279            category: ErrorCategory::Internal,
280            details: e.to_string(),
281        })??;
282
283        self.inner
284            .metrics
285            .request_latency
286            .with_label_values(&[tx_type.as_str(), "execute_transaction_v3", "false"])
287            .observe(timer.elapsed().as_secs_f64());
288
289        let QuorumTransactionResponse {
290            effects,
291            events,
292            input_objects,
293            output_objects,
294            auxiliary_data,
295        } = response;
296
297        Ok(ExecuteTransactionResponseV3 {
298            effects,
299            events,
300            input_objects,
301            output_objects,
302            auxiliary_data,
303        })
304    }
305
306    pub fn authority_state(&self) -> &Arc<AuthorityState> {
307        &self.inner.validator_state
308    }
309
310    pub fn transaction_driver(&self) -> &Arc<TransactionDriver<A>> {
311        &self.inner.transaction_driver
312    }
313
314    pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
315        self.inner
316            .transaction_driver
317            .authority_aggregator()
318            .load_full()
319    }
320
321    pub fn load_all_pending_transactions_in_test(&self) -> SuiResult<Vec<VerifiedTransaction>> {
322        self.inner.pending_tx_log.load_all_pending_transactions()
323    }
324
325    pub fn empty_pending_tx_log_in_test(&self) -> bool {
326        self.inner.pending_tx_log.is_empty()
327    }
328}
329
330struct Inner<A: Clone> {
331    validator_state: Arc<AuthorityState>,
332    pending_tx_log: Arc<WritePathPendingTransactionLog>,
333    metrics: Arc<TransactionOrchestratorMetrics>,
334    transaction_driver: Arc<TransactionDriver<A>>,
335    td_allowed_submission_list: Vec<String>,
336    td_blocked_submission_list: Vec<String>,
337    enable_early_validation: bool,
338}
339
340impl<A> Inner<A>
341where
342    A: AuthorityAPI + Send + Sync + 'static + Clone,
343{
344    async fn execute_transaction_with_retry(
345        inner: Arc<Inner<A>>,
346        request: ExecuteTransactionRequestV3,
347        client_addr: Option<SocketAddr>,
348    ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), TransactionSubmissionError>
349    {
350        let result = inner
351            .execute_transaction_with_effects_waiting(
352                request.clone(),
353                client_addr,
354                /* enforce_live_input_objects */ false,
355            )
356            .await;
357
358        // If the error is retriable, retry the transaction sufficiently long.
359        if let Err(e) = &result
360            && e.is_retriable()
361        {
362            spawn_monitored_task!(async move {
363                inner.metrics.background_retry_started.inc();
364                let backoff = backoff::ExponentialBackoff::new(
365                    Duration::from_secs(1),
366                    Duration::from_secs(300),
367                );
368                const MAX_RETRIES: usize = 10;
369                for (i, delay) in backoff.enumerate() {
370                    if i == MAX_RETRIES {
371                        break;
372                    }
373                    // Start to enforce live input after 3 retries,
374                    // to avoid excessively retrying transactions with non-existent input objects.
375                    let result = inner
376                        .execute_transaction_with_effects_waiting(
377                            request.clone(),
378                            client_addr,
379                            i > 3,
380                        )
381                        .await;
382                    match result {
383                        Ok(_) => {
384                            inner
385                                .metrics
386                                .background_retry_attempts
387                                .with_label_values(&["success"])
388                                .inc();
389                            debug!(
390                                "Background retry {i} for transaction {} succeeded",
391                                request.transaction.digest()
392                            );
393                            break;
394                        }
395                        Err(e) => {
396                            if !e.is_retriable() {
397                                inner
398                                    .metrics
399                                    .background_retry_attempts
400                                    .with_label_values(&["non-retriable"])
401                                    .inc();
402                                debug!(
403                                    "Background retry {i} for transaction {} has non-retriable error: {e:?}. Terminating...",
404                                    request.transaction.digest()
405                                );
406                                break;
407                            }
408                            inner
409                                .metrics
410                                .background_retry_attempts
411                                .with_label_values(&["retriable"])
412                                .inc();
413                            debug!(
414                                "Background retry {i} for transaction {} has retriable error: {e:?}. Continuing...",
415                                request.transaction.digest()
416                            );
417                        }
418                    };
419                    sleep(delay).await;
420                }
421            });
422        }
423
424        result
425    }
426
427    /// Shared implementation for executing transactions with parallel local effects waiting
428    async fn execute_transaction_with_effects_waiting(
429        &self,
430        request: ExecuteTransactionRequestV3,
431        client_addr: Option<SocketAddr>,
432        enforce_live_input_objects: bool,
433    ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), TransactionSubmissionError>
434    {
435        let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
436        let verified_transaction = epoch_store
437            .verify_transaction_with_current_aliases(request.transaction.clone())
438            .map_err(TransactionSubmissionError::InvalidUserSignature)?
439            .into_tx();
440        let tx_digest = *verified_transaction.digest();
441
442        // Early validation check against local state before submission to catch non-retriable errors
443        // TODO: Consider moving this check to TransactionDriver for per-retry validation
444        if self.enable_early_validation
445            && let Err(e) = self.validator_state.check_transaction_validity(
446                &epoch_store,
447                &verified_transaction,
448                enforce_live_input_objects,
449            )
450        {
451            let error_category = e.categorize();
452            if !error_category.is_submission_retriable() {
453                // Skip early validation rejection if transaction has already been executed (allows retries to return cached results)
454                if !self.validator_state.is_tx_already_executed(&tx_digest) {
455                    self.metrics
456                        .early_validation_rejections
457                        .with_label_values(&[e.to_variant_name()])
458                        .inc();
459                    debug!(
460                        error = ?e,
461                        "Transaction rejected during early validation"
462                    );
463
464                    return Err(TransactionSubmissionError::TransactionFailed {
465                        category: error_category,
466                        details: e.to_string(),
467                    });
468                }
469            }
470        }
471
472        // Add transaction to WAL log.
473        let guard =
474            TransactionSubmissionGuard::new(self.pending_tx_log.clone(), &verified_transaction);
475        let is_new_transaction = guard.is_new_transaction();
476
477        let include_events = request.include_events;
478        let include_input_objects = request.include_input_objects;
479        let include_output_objects = request.include_output_objects;
480        let include_auxiliary_data = request.include_auxiliary_data;
481
482        let finality_timeout = std::env::var("WAIT_FOR_FINALITY_TIMEOUT_SECS")
483            .ok()
484            .and_then(|v| v.parse().ok())
485            .map(Duration::from_secs)
486            .unwrap_or(WAIT_FOR_FINALITY_TIMEOUT);
487
488        let num_submissions = if !is_new_transaction {
489            // No need to submit when the transaction is already being processed.
490            0
491        } else if cfg!(msim) || in_antithesis() {
492            // Allow duplicated submissions in tests.
493            let r = rand::thread_rng().gen_range(1..=100);
494            let n = if r <= 10 {
495                3
496            } else if r <= 30 {
497                2
498            } else {
499                1
500            };
501            if n > 1 {
502                debug!("Making {n} execution calls");
503            }
504            n
505        } else {
506            1
507        };
508
509        // Wait for one of the execution futures to succeed, or all of them to fail.
510        let mut execution_futures = FuturesUnordered::new();
511        for i in 0..num_submissions {
512            // Generate jitter values outside the async block
513            let should_delay = i > 0 && rand::thread_rng().gen_bool(0.8);
514            let delay_ms = if should_delay {
515                rand::thread_rng().gen_range(100..=500)
516            } else {
517                0
518            };
519
520            let request = request.clone();
521            let verified_transaction = verified_transaction.clone();
522
523            let future = async move {
524                if delay_ms > 0 {
525                    // Add jitters to duplicated submissions.
526                    sleep(Duration::from_millis(delay_ms)).await;
527                }
528                self.execute_transaction_impl(
529                    request,
530                    verified_transaction,
531                    client_addr,
532                    Some(finality_timeout),
533                )
534                .await
535            }
536            .boxed();
537            execution_futures.push(future);
538        }
539
540        // Track the last execution error.
541        let mut last_execution_error: Option<TransactionSubmissionError> = None;
542
543        // Wait for execution result outside of this call to become available.
544        let digests = [tx_digest];
545        let mut local_effects_future = epoch_store
546            .within_alive_epoch(
547                self.validator_state
548                    .get_transaction_cache_reader()
549                    .notify_read_executed_effects(
550                    "TransactionOrchestrator::notify_read_execute_transaction_with_effects_waiting",
551                    &digests,
552                ),
553            )
554            .boxed();
555
556        // Wait for execution timeout.
557        let mut timeout_future = tokio::time::sleep(finality_timeout).boxed();
558
559        loop {
560            tokio::select! {
561                biased;
562
563                // Local effects might be available
564                local_effects_result = &mut local_effects_future => {
565                    match local_effects_result {
566                        Ok(effects) => {
567                            debug!(
568                                "Effects became available while execution was running"
569                            );
570                            if let Some(effects) = effects.into_iter().next() {
571                                self.metrics.concurrent_execution.inc();
572                                let epoch = effects.executed_epoch();
573                                let events = if include_events {
574                                    if effects.events_digest().is_some() {
575                                        Some(self.validator_state.get_transaction_events(effects.transaction_digest())
576                                            .map_err(TransactionSubmissionError::TransactionDriverInternalError)?)
577                                    } else {
578                                        None
579                                    }
580                                } else {
581                                    None
582                                };
583                                let input_objects = include_input_objects
584                                    .then(|| self.validator_state.get_transaction_input_objects(&effects))
585                                    .transpose()
586                                    .map_err(TransactionSubmissionError::TransactionDriverInternalError)?;
587                                let output_objects = include_output_objects
588                                    .then(|| self.validator_state.get_transaction_output_objects(&effects))
589                                    .transpose()
590                                    .map_err(TransactionSubmissionError::TransactionDriverInternalError)?;
591                                let response = QuorumTransactionResponse {
592                                    effects: FinalizedEffects {
593                                        effects,
594                                        finality_info: EffectsFinalityInfo::QuorumExecuted(epoch),
595                                    },
596                                    events,
597                                    input_objects,
598                                    output_objects,
599                                    auxiliary_data: None,
600                                };
601                                break Ok((response, true));
602                            }
603                        }
604                        Err(_) => {
605                            warn!("Epoch terminated before effects were available");
606                        }
607                    };
608
609                    // Prevent this branch from being selected again
610                    local_effects_future = futures::future::pending().boxed();
611                }
612
613                // This branch is disabled if execution_futures is empty.
614                Some(result) = execution_futures.next() => {
615                    match result {
616                        Ok(resp) => {
617                            // First success gets returned.
618                            debug!("Execution succeeded, returning response");
619                            let QuorumTransactionResponse {
620                                effects,
621                                events,
622                                input_objects,
623                                output_objects,
624                                auxiliary_data,
625                            } = resp;
626                            // Filter fields based on request flags.
627                            let resp = QuorumTransactionResponse {
628                                effects,
629                                events: if include_events { events } else { None },
630                                input_objects: if include_input_objects { input_objects } else { None },
631                                output_objects: if include_output_objects { output_objects } else { None },
632                                auxiliary_data: if include_auxiliary_data { auxiliary_data } else { None },
633                            };
634                            break Ok((resp, false));
635                        }
636                        Err(e) => {
637                            debug!(?e, "Execution attempt failed, wait for other attempts");
638                            last_execution_error = Some(e);
639                        }
640                    };
641
642                    // Last error must have been recorded.
643                    if execution_futures.is_empty() {
644                        break Err(last_execution_error.unwrap());
645                    }
646                }
647
648                // A timeout has occurred while waiting for finality
649                _ = &mut timeout_future => {
650                    if let Some(e) = last_execution_error {
651                        debug!("Timeout waiting for transaction finality. Last execution error: {e}");
652                    } else {
653                        debug!("Timeout waiting for transaction finality.");
654                    }
655                    self.metrics.wait_for_finality_timeout.inc();
656
657                    // TODO: Return the last execution error.
658                    break Err(TransactionSubmissionError::TimeoutBeforeFinality);
659                }
660            }
661        }
662    }
663
664    #[instrument(level = "error", skip_all)]
665    async fn execute_transaction_impl(
666        &self,
667        request: ExecuteTransactionRequestV3,
668        verified_transaction: VerifiedTransaction,
669        client_addr: Option<SocketAddr>,
670        finality_timeout: Option<Duration>,
671    ) -> Result<QuorumTransactionResponse, TransactionSubmissionError> {
672        debug!("TO Received transaction execution request.");
673
674        let timer = Instant::now();
675        let tx_type = if verified_transaction.is_consensus_tx() {
676            TxType::SharedObject
677        } else {
678            TxType::SingleWriter
679        };
680
681        let (_in_flight_metrics_guards, good_response_metrics) =
682            self.update_metrics(&request.transaction);
683
684        // TODO: refactor all the gauge and timer metrics with `monitored_scope`
685        let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
686        wait_for_finality_gauge.inc();
687        let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
688            in_flight.dec();
689        });
690
691        let response = self
692            .submit_with_transaction_driver(
693                &self.transaction_driver,
694                &request,
695                client_addr,
696                &verified_transaction,
697                good_response_metrics,
698                finality_timeout,
699            )
700            .await?;
701        let driver_type = "transaction_driver";
702
703        add_server_timing("wait_for_finality done");
704
705        self.metrics.wait_for_finality_finished.inc();
706
707        let elapsed = timer.elapsed().as_secs_f64();
708        self.metrics
709            .settlement_finality_latency
710            .with_label_values(&[tx_type.as_str(), driver_type])
711            .observe(elapsed);
712        good_response_metrics.inc();
713
714        Ok(response)
715    }
716
717    #[instrument(level = "error", skip_all, err(level = "info"))]
718    async fn submit_with_transaction_driver(
719        &self,
720        td: &Arc<TransactionDriver<A>>,
721        request: &ExecuteTransactionRequestV3,
722        client_addr: Option<SocketAddr>,
723        verified_transaction: &VerifiedTransaction,
724        good_response_metrics: &GenericCounter<AtomicU64>,
725        timeout_duration: Option<Duration>,
726    ) -> Result<QuorumTransactionResponse, TransactionSubmissionError> {
727        let tx_digest = *verified_transaction.digest();
728        debug!("Using TransactionDriver for transaction {:?}", tx_digest);
729
730        let td_response = td
731            .drive_transaction(
732                SubmitTxRequest::new_transaction(request.transaction.clone()),
733                SubmitTransactionOptions {
734                    forwarded_client_addr: client_addr,
735                    allowed_validators: self.td_allowed_submission_list.clone(),
736                    blocked_validators: self.td_blocked_submission_list.clone(),
737                },
738                timeout_duration,
739            )
740            .await
741            .map_err(|e| match e {
742                TransactionDriverError::TimeoutWithLastRetriableError {
743                    last_error,
744                    attempts,
745                    timeout,
746                } => TransactionSubmissionError::TimeoutBeforeFinalityWithErrors {
747                    last_error: last_error.map(|e| e.to_string()).unwrap_or_default(),
748                    attempts,
749                    timeout,
750                },
751                other => TransactionSubmissionError::TransactionFailed {
752                    category: other.categorize(),
753                    details: other.to_string(),
754                },
755            });
756
757        match td_response {
758            Err(e) => {
759                warn!("TransactionDriver error: {e:?}");
760                Err(e)
761            }
762            Ok(quorum_transaction_response) => {
763                good_response_metrics.inc();
764                Ok(quorum_transaction_response)
765            }
766        }
767    }
768
769    #[instrument(
770        name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout",
771        level = "debug",
772        skip_all,
773        err(level = "info")
774    )]
775    async fn wait_for_finalized_tx_executed_locally_with_timeout(
776        validator_state: &Arc<AuthorityState>,
777        tx_digest: TransactionDigest,
778        tx_type: TxType,
779        metrics: &TransactionOrchestratorMetrics,
780    ) -> SuiResult {
781        metrics.local_execution_in_flight.inc();
782        let _metrics_guard =
783            scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
784                in_flight.dec();
785            });
786
787        let _latency_guard = metrics
788            .local_execution_latency
789            .with_label_values(&[tx_type.as_str()])
790            .start_timer();
791        debug!("Waiting for finalized tx to be executed locally.");
792        match timeout(
793            LOCAL_EXECUTION_TIMEOUT,
794            validator_state
795                .get_transaction_cache_reader()
796                .notify_read_executed_effects_digests(
797                    "TransactionOrchestrator::notify_read_wait_for_local_execution",
798                    &[tx_digest],
799                ),
800        )
801        .instrument(error_span!(
802            "transaction_orchestrator::local_execution",
803            ?tx_digest
804        ))
805        .await
806        {
807            Err(_elapsed) => {
808                debug!(
809                    "Waiting for finalized tx to be executed locally timed out within {:?}.",
810                    LOCAL_EXECUTION_TIMEOUT
811                );
812                metrics.local_execution_timeout.inc();
813                Err(SuiErrorKind::TimeoutError.into())
814            }
815            Ok(_) => {
816                metrics.local_execution_success.inc();
817                Ok(())
818            }
819        }
820    }
821
822    fn update_metrics<'a>(
823        &'a self,
824        transaction: &Transaction,
825    ) -> (impl Drop + use<A>, &'a GenericCounter<AtomicU64>) {
826        let (in_flight, good_response) = if transaction.is_consensus_tx() {
827            self.metrics.total_req_received_shared_object.inc();
828            (
829                self.metrics.req_in_flight_shared_object.clone(),
830                &self.metrics.good_response_shared_object,
831            )
832        } else {
833            self.metrics.total_req_received_single_writer.inc();
834            (
835                self.metrics.req_in_flight_single_writer.clone(),
836                &self.metrics.good_response_single_writer,
837            )
838        };
839        in_flight.inc();
840        (
841            scopeguard::guard(in_flight, |in_flight| {
842                in_flight.dec();
843            }),
844            good_response,
845        )
846    }
847
848    fn start_task_to_recover_txes_in_log(
849        pending_tx_log: Arc<WritePathPendingTransactionLog>,
850        transaction_driver: Arc<TransactionDriver<A>>,
851    ) {
852        spawn_logged_monitored_task!(async move {
853            if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
854                info!("Skipping loading pending transactions from pending_tx_log.");
855                return;
856            }
857            let pending_txes = pending_tx_log
858                .load_all_pending_transactions()
859                .expect("failed to load all pending transactions");
860            let num_pending_txes = pending_txes.len();
861            info!(
862                "Recovering {} pending transactions from pending_tx_log.",
863                num_pending_txes
864            );
865            let mut recovery = pending_txes
866                .into_iter()
867                .map(|tx| {
868                    let pending_tx_log = pending_tx_log.clone();
869                    let transaction_driver = transaction_driver.clone();
870                    async move {
871                        // TODO: ideally pending_tx_log would not contain VerifiedTransaction, but that
872                        // requires a migration.
873                        let tx = tx.into_inner();
874                        let tx_digest = *tx.digest();
875                        // It's not impossible we fail to enqueue a task but that's not the end of world.
876                        // TODO(william) correctly extract client_addr from logs
877                        if let Err(err) = transaction_driver
878                            .drive_transaction(
879                                SubmitTxRequest::new_transaction(tx),
880                                SubmitTransactionOptions::default(),
881                                Some(Duration::from_secs(60)),
882                            )
883                            .await
884                        {
885                            warn!(?tx_digest, "Failed to execute recovered transaction: {err}");
886                        } else {
887                            debug!(?tx_digest, "Executed recovered transaction");
888                        }
889                        if let Err(err) = pending_tx_log.finish_transaction(&tx_digest) {
890                            warn!(
891                                ?tx_digest,
892                                "Failed to clean up transaction in pending log: {err}"
893                            );
894                        } else {
895                            debug!(?tx_digest, "Cleaned up transaction in pending log");
896                        }
897                    }
898                })
899                .collect::<FuturesUnordered<_>>();
900
901            let mut num_recovered = 0;
902            while recovery.next().await.is_some() {
903                num_recovered += 1;
904                if num_recovered % 1000 == 0 {
905                    info!(
906                        "Recovered {} out of {} transactions from pending_tx_log.",
907                        num_recovered, num_pending_txes
908                    );
909                }
910            }
911            info!(
912                "Recovery finished. Recovered {} out of {} transactions from pending_tx_log.",
913                num_recovered, num_pending_txes
914            );
915        });
916    }
917}
918/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
919#[derive(Clone)]
920pub struct TransactionOrchestratorMetrics {
921    total_req_received_single_writer: GenericCounter<AtomicU64>,
922    total_req_received_shared_object: GenericCounter<AtomicU64>,
923
924    good_response_single_writer: GenericCounter<AtomicU64>,
925    good_response_shared_object: GenericCounter<AtomicU64>,
926
927    req_in_flight_single_writer: GenericGauge<AtomicI64>,
928    req_in_flight_shared_object: GenericGauge<AtomicI64>,
929
930    wait_for_finality_in_flight: GenericGauge<AtomicI64>,
931    wait_for_finality_finished: GenericCounter<AtomicU64>,
932    wait_for_finality_timeout: GenericCounter<AtomicU64>,
933
934    local_execution_in_flight: GenericGauge<AtomicI64>,
935    local_execution_success: GenericCounter<AtomicU64>,
936    local_execution_timeout: GenericCounter<AtomicU64>,
937
938    concurrent_execution: IntCounter,
939
940    early_validation_rejections: IntCounterVec,
941
942    background_retry_started: IntGauge,
943    background_retry_attempts: IntCounterVec,
944
945    request_latency: HistogramVec,
946    local_execution_latency: HistogramVec,
947    settlement_finality_latency: HistogramVec,
948}
949
950// Note that labeled-metrics are stored upfront individually
951// to mitigate the perf hit by MetricsVec.
952// See https://github.com/tikv/rust-prometheus/tree/master/static-metric
953impl TransactionOrchestratorMetrics {
954    pub fn new(registry: &Registry) -> Self {
955        let total_req_received = register_int_counter_vec_with_registry!(
956            "tx_orchestrator_total_req_received",
957            "Total number of executions request Transaction Orchestrator receives, group by tx type",
958            &["tx_type"],
959            registry
960        )
961        .unwrap();
962
963        let total_req_received_single_writer =
964            total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
965        let total_req_received_shared_object =
966            total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
967
968        let good_response = register_int_counter_vec_with_registry!(
969            "tx_orchestrator_good_response",
970            "Total number of good responses Transaction Orchestrator generates, group by tx type",
971            &["tx_type"],
972            registry
973        )
974        .unwrap();
975
976        let good_response_single_writer =
977            good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
978        let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
979
980        let req_in_flight = register_int_gauge_vec_with_registry!(
981            "tx_orchestrator_req_in_flight",
982            "Number of requests in flights Transaction Orchestrator processes, group by tx type",
983            &["tx_type"],
984            registry
985        )
986        .unwrap();
987
988        let req_in_flight_single_writer =
989            req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
990        let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
991
992        Self {
993            total_req_received_single_writer,
994            total_req_received_shared_object,
995            good_response_single_writer,
996            good_response_shared_object,
997            req_in_flight_single_writer,
998            req_in_flight_shared_object,
999            wait_for_finality_in_flight: register_int_gauge_with_registry!(
1000                "tx_orchestrator_wait_for_finality_in_flight",
1001                "Number of in flight txns Transaction Orchestrator are waiting for finality for",
1002                registry,
1003            )
1004            .unwrap(),
1005            wait_for_finality_finished: register_int_counter_with_registry!(
1006                "tx_orchestrator_wait_for_finality_fnished",
1007                "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
1008                registry,
1009            )
1010            .unwrap(),
1011            wait_for_finality_timeout: register_int_counter_with_registry!(
1012                "tx_orchestrator_wait_for_finality_timeout",
1013                "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
1014                registry,
1015            )
1016            .unwrap(),
1017            local_execution_in_flight: register_int_gauge_with_registry!(
1018                "tx_orchestrator_local_execution_in_flight",
1019                "Number of local execution txns in flights Transaction Orchestrator handles",
1020                registry,
1021            )
1022            .unwrap(),
1023            local_execution_success: register_int_counter_with_registry!(
1024                "tx_orchestrator_local_execution_success",
1025                "Total number of successful local execution txns Transaction Orchestrator handles",
1026                registry,
1027            )
1028            .unwrap(),
1029            local_execution_timeout: register_int_counter_with_registry!(
1030                "tx_orchestrator_local_execution_timeout",
1031                "Total number of timed-out local execution txns Transaction Orchestrator handles",
1032                registry,
1033            )
1034            .unwrap(),
1035            concurrent_execution: register_int_counter_with_registry!(
1036                "tx_orchestrator_concurrent_execution",
1037                "Total number of concurrent execution where effects are available locally finishing driving the transaction to finality",
1038                registry,
1039            )
1040            .unwrap(),
1041            early_validation_rejections: register_int_counter_vec_with_registry!(
1042                "tx_orchestrator_early_validation_rejections",
1043                "Total number of transactions rejected during early validation before submission, by reason",
1044                &["reason"],
1045                registry,
1046            )
1047            .unwrap(),
1048            background_retry_started: register_int_gauge_with_registry!(
1049                "tx_orchestrator_background_retry_started",
1050                "Number of background retry tasks kicked off for transactions with retriable errors",
1051                registry,
1052            )
1053            .unwrap(),
1054            background_retry_attempts: register_int_counter_vec_with_registry!(
1055                "tx_orchestrator_background_retry_attempts",
1056                "Total number of background retry attempts, by status",
1057                &["status"],
1058                registry,
1059            )
1060            .unwrap(),
1061            request_latency: register_histogram_vec_with_registry!(
1062                "tx_orchestrator_request_latency",
1063                "Time spent in processing one Transaction Orchestrator request",
1064                &["tx_type", "route", "wait_for_local_execution"],
1065                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1066                registry,
1067            )
1068            .unwrap(),
1069            local_execution_latency: register_histogram_vec_with_registry!(
1070                "tx_orchestrator_local_execution_latency",
1071                "Time spent in waiting for one Transaction Orchestrator gets locally executed",
1072                &["tx_type"],
1073                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1074                registry,
1075            )
1076            .unwrap(),
1077            settlement_finality_latency: register_histogram_vec_with_registry!(
1078                "tx_orchestrator_settlement_finality_latency",
1079                "Time spent in waiting for one Transaction Orchestrator gets settled and finalized",
1080                &["tx_type", "driver_type"],
1081                mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
1082                registry,
1083            )
1084            .unwrap(),
1085        }
1086    }
1087
1088    pub fn new_for_tests() -> Self {
1089        let registry = Registry::new();
1090        Self::new(&registry)
1091    }
1092}
1093
1094#[async_trait::async_trait]
1095impl<A> sui_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
1096where
1097    A: AuthorityAPI + Send + Sync + 'static + Clone,
1098{
1099    async fn execute_transaction(
1100        &self,
1101        request: ExecuteTransactionRequestV3,
1102        client_addr: Option<std::net::SocketAddr>,
1103    ) -> Result<ExecuteTransactionResponseV3, TransactionSubmissionError> {
1104        self.execute_transaction_v3(request, client_addr).await
1105    }
1106
1107    fn simulate_transaction(
1108        &self,
1109        transaction: TransactionData,
1110        checks: TransactionChecks,
1111    ) -> Result<SimulateTransactionResult, SuiError> {
1112        self.inner
1113            .validator_state
1114            .simulate_transaction(transaction, checks)
1115    }
1116}
1117
1118/// Keeps track of inflight transactions being submitted, and helps recover transactions
1119/// on restart.
1120struct TransactionSubmissionGuard {
1121    pending_tx_log: Arc<WritePathPendingTransactionLog>,
1122    tx_digest: TransactionDigest,
1123    is_new_transaction: bool,
1124}
1125
1126impl TransactionSubmissionGuard {
1127    pub fn new(
1128        pending_tx_log: Arc<WritePathPendingTransactionLog>,
1129        tx: &VerifiedTransaction,
1130    ) -> Self {
1131        let is_new_transaction = pending_tx_log.write_pending_transaction_maybe(tx);
1132        let tx_digest = *tx.digest();
1133        if is_new_transaction {
1134            debug!(?tx_digest, "Added transaction to inflight set");
1135        } else {
1136            debug!(
1137                ?tx_digest,
1138                "Transaction already being processed, no new submission will be made"
1139            );
1140        };
1141        Self {
1142            pending_tx_log,
1143            tx_digest,
1144            is_new_transaction,
1145        }
1146    }
1147
1148    fn is_new_transaction(&self) -> bool {
1149        self.is_new_transaction
1150    }
1151}
1152
1153impl Drop for TransactionSubmissionGuard {
1154    fn drop(&mut self) {
1155        if let Err(err) = self.pending_tx_log.finish_transaction(&self.tx_digest) {
1156            warn!(?self.tx_digest, "Failed to clean up transaction in pending log: {err}");
1157        } else {
1158            debug!(?self.tx_digest, "Cleaned up transaction in pending log");
1159        }
1160    }
1161}