sui_core/
transaction_orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::net::SocketAddr;
5use std::ops::Deref;
6use std::path::Path;
7use std::sync::Arc;
8use std::time::Duration;
9
10use futures::FutureExt;
11use futures::stream::{FuturesUnordered, StreamExt};
12use mysten_common::{backoff, in_antithesis};
13use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, spawn_monitored_task};
14use mysten_metrics::{add_server_timing, spawn_logged_monitored_task};
15use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
16use prometheus::{
17    HistogramVec, IntCounter, IntCounterVec, IntGauge, Registry,
18    register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
19    register_int_counter_with_registry, register_int_gauge_vec_with_registry,
20    register_int_gauge_with_registry,
21};
22use rand::Rng;
23use sui_config::NodeConfig;
24use sui_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
25use sui_types::base_types::TransactionDigest;
26use sui_types::effects::TransactionEffectsAPI;
27use sui_types::error::{ErrorCategory, SuiError, SuiErrorKind, SuiResult};
28use sui_types::messages_grpc::{SubmitTxRequest, TxType};
29use sui_types::quorum_driver_types::{
30    EffectsFinalityInfo, ExecuteTransactionRequestType, ExecuteTransactionRequestV3,
31    ExecuteTransactionResponseV3, FinalizedEffects, IsTransactionExecutedLocally,
32    QuorumDriverError,
33};
34use sui_types::sui_system_state::SuiSystemState;
35use sui_types::transaction::{Transaction, TransactionData, VerifiedTransaction};
36use sui_types::transaction_executor::{SimulateTransactionResult, TransactionChecks};
37use tokio::sync::broadcast::Receiver;
38use tokio::time::{Instant, sleep, timeout};
39use tracing::{Instrument, debug, error_span, info, instrument, warn};
40
41use crate::authority::AuthorityState;
42use crate::authority_aggregator::AuthorityAggregator;
43use crate::authority_client::{AuthorityAPI, NetworkAuthorityClient};
44use crate::quorum_driver::reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver};
45use crate::transaction_driver::{
46    QuorumTransactionResponse, SubmitTransactionOptions, TransactionDriver, TransactionDriverError,
47    TransactionDriverMetrics,
48};
49
50// How long to wait for local execution (including parents) before a timeout
51// is returned to client.
52const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
53
54// Timeout for waiting for finality for each transaction.
55const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(90);
56
57pub type QuorumTransactionEffectsResult =
58    Result<(Transaction, QuorumTransactionResponse), (TransactionDigest, QuorumDriverError)>;
59
60/// Transaction Orchestrator is a Node component that utilizes Transaction Driver to
61/// submit transactions to validators for finality. It adds inflight deduplication,
62/// waiting for local execution, recovery, early validation, and epoch change handling
63/// on top of Transaction Driver.
64/// This is used by node RPC service to support transaction submission and finality waiting.
65pub struct TransactionOrchestrator<A: Clone> {
66    inner: Arc<Inner<A>>,
67}
68
69impl TransactionOrchestrator<NetworkAuthorityClient> {
70    pub fn new_with_auth_aggregator(
71        validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
72        validator_state: Arc<AuthorityState>,
73        reconfig_channel: Receiver<SuiSystemState>,
74        parent_path: &Path,
75        prometheus_registry: &Registry,
76        node_config: &NodeConfig,
77    ) -> Self {
78        let observer = OnsiteReconfigObserver::new(
79            reconfig_channel,
80            validator_state.get_object_cache_reader().clone(),
81            validator_state.clone_committee_store(),
82            validators.safe_client_metrics_base.clone(),
83            validators.metrics.deref().clone(),
84        );
85        TransactionOrchestrator::new(
86            validators,
87            validator_state,
88            parent_path,
89            prometheus_registry,
90            observer,
91            node_config,
92        )
93    }
94}
95
96impl<A> TransactionOrchestrator<A>
97where
98    A: AuthorityAPI + Send + Sync + 'static + Clone,
99    OnsiteReconfigObserver: ReconfigObserver<A>,
100{
101    pub fn new(
102        validators: Arc<AuthorityAggregator<A>>,
103        validator_state: Arc<AuthorityState>,
104        parent_path: &Path,
105        prometheus_registry: &Registry,
106        reconfig_observer: OnsiteReconfigObserver,
107        node_config: &NodeConfig,
108    ) -> Self {
109        let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
110        let td_metrics = Arc::new(TransactionDriverMetrics::new(prometheus_registry));
111        let client_metrics = Arc::new(
112            crate::validator_client_monitor::ValidatorClientMetrics::new(prometheus_registry),
113        );
114        let reconfig_observer = Arc::new(reconfig_observer);
115
116        let transaction_driver = TransactionDriver::new(
117            validators.clone(),
118            reconfig_observer.clone(),
119            td_metrics,
120            Some(node_config),
121            client_metrics,
122        );
123
124        let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
125            parent_path.join("fullnode_pending_transactions"),
126        ));
127        Inner::start_task_to_recover_txes_in_log(
128            pending_tx_log.clone(),
129            transaction_driver.clone(),
130        );
131
132        let td_allowed_submission_list = node_config
133            .transaction_driver_config
134            .as_ref()
135            .map(|config| config.allowed_submission_validators.clone())
136            .unwrap_or_default();
137
138        let td_blocked_submission_list = node_config
139            .transaction_driver_config
140            .as_ref()
141            .map(|config| config.blocked_submission_validators.clone())
142            .unwrap_or_default();
143
144        if !td_allowed_submission_list.is_empty() && !td_blocked_submission_list.is_empty() {
145            panic!(
146                "Both allowed and blocked submission lists are set, this is not allowed, {:?} {:?}",
147                td_allowed_submission_list, td_blocked_submission_list
148            );
149        }
150
151        let enable_early_validation = node_config
152            .transaction_driver_config
153            .as_ref()
154            .map(|config| config.enable_early_validation)
155            .unwrap_or(true);
156
157        let inner = Arc::new(Inner {
158            validator_state,
159            pending_tx_log,
160            metrics,
161            transaction_driver,
162            td_allowed_submission_list,
163            td_blocked_submission_list,
164            enable_early_validation,
165        });
166        Self { inner }
167    }
168}
169
170impl<A> TransactionOrchestrator<A>
171where
172    A: AuthorityAPI + Send + Sync + 'static + Clone,
173{
174    #[instrument(name = "tx_orchestrator_execute_transaction", level = "debug", skip_all,
175    fields(
176        tx_digest = ?request.transaction.digest(),
177        tx_type = ?request_type,
178    ))]
179    pub async fn execute_transaction_block(
180        &self,
181        request: ExecuteTransactionRequestV3,
182        request_type: ExecuteTransactionRequestType,
183        client_addr: Option<SocketAddr>,
184    ) -> Result<(ExecuteTransactionResponseV3, IsTransactionExecutedLocally), QuorumDriverError>
185    {
186        let timer = Instant::now();
187        let tx_type = if request.transaction.is_consensus_tx() {
188            TxType::SharedObject
189        } else {
190            TxType::SingleWriter
191        };
192        let tx_digest = *request.transaction.digest();
193
194        let inner = self.inner.clone();
195        let (response, mut executed_locally) = spawn_monitored_task!(
196            Inner::<A>::execute_transaction_with_retry(inner, request, client_addr)
197        )
198        .await
199        .map_err(|e| QuorumDriverError::TransactionFailed {
200            category: ErrorCategory::Internal,
201            details: e.to_string(),
202        })??;
203
204        if !executed_locally {
205            executed_locally = if matches!(
206                request_type,
207                ExecuteTransactionRequestType::WaitForLocalExecution
208            ) {
209                let executed_locally =
210                    Inner::<A>::wait_for_finalized_tx_executed_locally_with_timeout(
211                        &self.inner.validator_state,
212                        tx_digest,
213                        tx_type,
214                        &self.inner.metrics,
215                    )
216                    .await
217                    .is_ok();
218                add_server_timing("local_execution done");
219                executed_locally
220            } else {
221                false
222            };
223        }
224
225        let QuorumTransactionResponse {
226            effects,
227            events,
228            input_objects,
229            output_objects,
230            auxiliary_data,
231        } = response;
232
233        let response = ExecuteTransactionResponseV3 {
234            effects,
235            events,
236            input_objects,
237            output_objects,
238            auxiliary_data,
239        };
240
241        self.inner
242            .metrics
243            .request_latency
244            .with_label_values(&[
245                tx_type.as_str(),
246                "execute_transaction_block",
247                executed_locally.to_string().as_str(),
248            ])
249            .observe(timer.elapsed().as_secs_f64());
250
251        Ok((response, executed_locally))
252    }
253
254    // Utilize the handle_certificate_v3 validator api to request input/output objects
255    #[instrument(name = "tx_orchestrator_execute_transaction_v3", level = "debug", skip_all,
256                 fields(tx_digest = ?request.transaction.digest()))]
257    pub async fn execute_transaction_v3(
258        &self,
259        request: ExecuteTransactionRequestV3,
260        client_addr: Option<SocketAddr>,
261    ) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
262        let timer = Instant::now();
263        let tx_type = if request.transaction.is_consensus_tx() {
264            TxType::SharedObject
265        } else {
266            TxType::SingleWriter
267        };
268
269        let inner = self.inner.clone();
270        let (response, _) = spawn_monitored_task!(Inner::<A>::execute_transaction_with_retry(
271            inner,
272            request,
273            client_addr
274        ))
275        .await
276        .map_err(|e| QuorumDriverError::TransactionFailed {
277            category: ErrorCategory::Internal,
278            details: e.to_string(),
279        })??;
280
281        self.inner
282            .metrics
283            .request_latency
284            .with_label_values(&[tx_type.as_str(), "execute_transaction_v3", "false"])
285            .observe(timer.elapsed().as_secs_f64());
286
287        let QuorumTransactionResponse {
288            effects,
289            events,
290            input_objects,
291            output_objects,
292            auxiliary_data,
293        } = response;
294
295        Ok(ExecuteTransactionResponseV3 {
296            effects,
297            events,
298            input_objects,
299            output_objects,
300            auxiliary_data,
301        })
302    }
303
304    pub fn authority_state(&self) -> &Arc<AuthorityState> {
305        &self.inner.validator_state
306    }
307
308    pub fn transaction_driver(&self) -> &Arc<TransactionDriver<A>> {
309        &self.inner.transaction_driver
310    }
311
312    pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
313        self.inner
314            .transaction_driver
315            .authority_aggregator()
316            .load_full()
317    }
318
319    pub fn load_all_pending_transactions_in_test(&self) -> SuiResult<Vec<VerifiedTransaction>> {
320        self.inner.pending_tx_log.load_all_pending_transactions()
321    }
322
323    pub fn empty_pending_tx_log_in_test(&self) -> bool {
324        self.inner.pending_tx_log.is_empty()
325    }
326}
327
328struct Inner<A: Clone> {
329    validator_state: Arc<AuthorityState>,
330    pending_tx_log: Arc<WritePathPendingTransactionLog>,
331    metrics: Arc<TransactionOrchestratorMetrics>,
332    transaction_driver: Arc<TransactionDriver<A>>,
333    td_allowed_submission_list: Vec<String>,
334    td_blocked_submission_list: Vec<String>,
335    enable_early_validation: bool,
336}
337
338impl<A> Inner<A>
339where
340    A: AuthorityAPI + Send + Sync + 'static + Clone,
341{
342    async fn execute_transaction_with_retry(
343        inner: Arc<Inner<A>>,
344        request: ExecuteTransactionRequestV3,
345        client_addr: Option<SocketAddr>,
346    ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), QuorumDriverError> {
347        let result = inner
348            .execute_transaction_with_effects_waiting(
349                request.clone(),
350                client_addr,
351                /* enforce_live_input_objects */ false,
352            )
353            .await;
354
355        // If the error is retriable, retry the transaction sufficiently long.
356        if let Err(e) = &result
357            && e.is_retriable()
358        {
359            spawn_monitored_task!(async move {
360                inner.metrics.background_retry_started.inc();
361                let backoff = backoff::ExponentialBackoff::new(
362                    Duration::from_secs(1),
363                    Duration::from_secs(300),
364                );
365                const MAX_RETRIES: usize = 10;
366                for (i, delay) in backoff.enumerate() {
367                    if i == MAX_RETRIES {
368                        break;
369                    }
370                    // Start to enforce live input after 3 retries,
371                    // to avoid excessively retrying transactions with non-existent input objects.
372                    let result = inner
373                        .execute_transaction_with_effects_waiting(
374                            request.clone(),
375                            client_addr,
376                            i > 3,
377                        )
378                        .await;
379                    match result {
380                        Ok(_) => break,
381                        Err(e) => {
382                            if !e.is_retriable() {
383                                break;
384                            }
385                            inner.metrics.background_retry_errors.inc();
386                            if i % 100 == 0 {
387                                debug!(
388                                    "Background retry {i} for transaction {}: {e:?}",
389                                    request.transaction.digest()
390                                );
391                            }
392                        }
393                    };
394                    sleep(delay).await;
395                }
396            });
397        }
398
399        result
400    }
401
402    /// Shared implementation for executing transactions with parallel local effects waiting
403    async fn execute_transaction_with_effects_waiting(
404        &self,
405        request: ExecuteTransactionRequestV3,
406        client_addr: Option<SocketAddr>,
407        enforce_live_input_objects: bool,
408    ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), QuorumDriverError> {
409        let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
410        let verified_transaction = epoch_store
411            .verify_transaction_with_current_aliases(request.transaction.clone())
412            .map_err(QuorumDriverError::InvalidUserSignature)?
413            .into_tx();
414        let tx_digest = *verified_transaction.digest();
415
416        // Early validation check against local state before submission to catch non-retriable errors
417        // TODO: Consider moving this check to TransactionDriver for per-retry validation
418        if self.enable_early_validation
419            && let Err(e) = self.validator_state.check_transaction_validity(
420                &epoch_store,
421                &verified_transaction,
422                enforce_live_input_objects,
423            )
424        {
425            let error_category = e.categorize();
426            if !error_category.is_submission_retriable() {
427                // Skip early validation rejection if transaction has already been executed (allows retries to return cached results)
428                if !self.validator_state.is_tx_already_executed(&tx_digest) {
429                    self.metrics
430                        .early_validation_rejections
431                        .with_label_values(&[e.to_variant_name()])
432                        .inc();
433                    debug!(
434                        error = ?e,
435                        "Transaction rejected during early validation"
436                    );
437
438                    return Err(QuorumDriverError::TransactionFailed {
439                        category: error_category,
440                        details: e.to_string(),
441                    });
442                }
443            }
444        }
445
446        // Add transaction to WAL log.
447        let guard =
448            TransactionSubmissionGuard::new(self.pending_tx_log.clone(), &verified_transaction);
449        let is_new_transaction = guard.is_new_transaction();
450
451        let include_events = request.include_events;
452        let include_input_objects = request.include_input_objects;
453        let include_output_objects = request.include_output_objects;
454        let include_auxiliary_data = request.include_auxiliary_data;
455
456        let finality_timeout = std::env::var("WAIT_FOR_FINALITY_TIMEOUT_SECS")
457            .ok()
458            .and_then(|v| v.parse().ok())
459            .map(Duration::from_secs)
460            .unwrap_or(WAIT_FOR_FINALITY_TIMEOUT);
461
462        let num_submissions = if !is_new_transaction {
463            // No need to submit when the transaction is already being processed.
464            0
465        } else if cfg!(msim) || in_antithesis() {
466            // Allow duplicated submissions in tests.
467            let r = rand::thread_rng().gen_range(1..=100);
468            let n = if r <= 10 {
469                3
470            } else if r <= 30 {
471                2
472            } else {
473                1
474            };
475            if n > 1 {
476                debug!("Making {n} execution calls");
477            }
478            n
479        } else {
480            1
481        };
482
483        // Wait for one of the execution futures to succeed, or all of them to fail.
484        let mut execution_futures = FuturesUnordered::new();
485        for i in 0..num_submissions {
486            // Generate jitter values outside the async block
487            let should_delay = i > 0 && rand::thread_rng().gen_bool(0.8);
488            let delay_ms = if should_delay {
489                rand::thread_rng().gen_range(100..=500)
490            } else {
491                0
492            };
493
494            let request = request.clone();
495            let verified_transaction = verified_transaction.clone();
496
497            let future = async move {
498                if delay_ms > 0 {
499                    // Add jitters to duplicated submissions.
500                    sleep(Duration::from_millis(delay_ms)).await;
501                }
502                self.execute_transaction_impl(
503                    request,
504                    verified_transaction,
505                    client_addr,
506                    Some(finality_timeout),
507                )
508                .await
509            }
510            .boxed();
511            execution_futures.push(future);
512        }
513
514        // Track the last execution error.
515        let mut last_execution_error: Option<QuorumDriverError> = None;
516
517        // Wait for execution result outside of this call to become available.
518        let digests = [tx_digest];
519        let mut local_effects_future = epoch_store
520            .within_alive_epoch(
521                self.validator_state
522                    .get_transaction_cache_reader()
523                    .notify_read_executed_effects(
524                    "TransactionOrchestrator::notify_read_execute_transaction_with_effects_waiting",
525                    &digests,
526                ),
527            )
528            .boxed();
529
530        // Wait for execution timeout.
531        let mut timeout_future = tokio::time::sleep(finality_timeout).boxed();
532
533        loop {
534            tokio::select! {
535                biased;
536
537                // Local effects might be available
538                local_effects_result = &mut local_effects_future => {
539                    match local_effects_result {
540                        Ok(effects) => {
541                            debug!(
542                                "Effects became available while execution was running"
543                            );
544                            if let Some(effects) = effects.into_iter().next() {
545                                self.metrics.concurrent_execution.inc();
546                                let epoch = effects.executed_epoch();
547                                let events = if include_events {
548                                    if effects.events_digest().is_some() {
549                                        Some(self.validator_state.get_transaction_events(effects.transaction_digest())
550                                            .map_err(QuorumDriverError::QuorumDriverInternalError)?)
551                                    } else {
552                                        None
553                                    }
554                                } else {
555                                    None
556                                };
557                                let input_objects = include_input_objects
558                                    .then(|| self.validator_state.get_transaction_input_objects(&effects))
559                                    .transpose()
560                                    .map_err(QuorumDriverError::QuorumDriverInternalError)?;
561                                let output_objects = include_output_objects
562                                    .then(|| self.validator_state.get_transaction_output_objects(&effects))
563                                    .transpose()
564                                    .map_err(QuorumDriverError::QuorumDriverInternalError)?;
565                                let response = QuorumTransactionResponse {
566                                    effects: FinalizedEffects {
567                                        effects,
568                                        finality_info: EffectsFinalityInfo::QuorumExecuted(epoch),
569                                    },
570                                    events,
571                                    input_objects,
572                                    output_objects,
573                                    auxiliary_data: None,
574                                };
575                                break Ok((response, true));
576                            }
577                        }
578                        Err(_) => {
579                            warn!("Epoch terminated before effects were available");
580                        }
581                    };
582
583                    // Prevent this branch from being selected again
584                    local_effects_future = futures::future::pending().boxed();
585                }
586
587                // This branch is disabled if execution_futures is empty.
588                Some(result) = execution_futures.next() => {
589                    match result {
590                        Ok(resp) => {
591                            // First success gets returned.
592                            debug!("Execution succeeded, returning response");
593                            let QuorumTransactionResponse {
594                                effects,
595                                events,
596                                input_objects,
597                                output_objects,
598                                auxiliary_data,
599                            } = resp;
600                            // Filter fields based on request flags.
601                            let resp = QuorumTransactionResponse {
602                                effects,
603                                events: if include_events { events } else { None },
604                                input_objects: if include_input_objects { input_objects } else { None },
605                                output_objects: if include_output_objects { output_objects } else { None },
606                                auxiliary_data: if include_auxiliary_data { auxiliary_data } else { None },
607                            };
608                            break Ok((resp, false));
609                        }
610                        Err(e) => {
611                            debug!(?e, "Execution attempt failed, wait for other attempts");
612                            last_execution_error = Some(e);
613                        }
614                    };
615
616                    // Last error must have been recorded.
617                    if execution_futures.is_empty() {
618                        break Err(last_execution_error.unwrap());
619                    }
620                }
621
622                // A timeout has occurred while waiting for finality
623                _ = &mut timeout_future => {
624                    if let Some(e) = last_execution_error {
625                        debug!("Timeout waiting for transaction finality. Last execution error: {e}");
626                    } else {
627                        debug!("Timeout waiting for transaction finality.");
628                    }
629                    self.metrics.wait_for_finality_timeout.inc();
630
631                    // TODO: Return the last execution error.
632                    break Err(QuorumDriverError::TimeoutBeforeFinality);
633                }
634            }
635        }
636    }
637
638    #[instrument(level = "error", skip_all)]
639    async fn execute_transaction_impl(
640        &self,
641        request: ExecuteTransactionRequestV3,
642        verified_transaction: VerifiedTransaction,
643        client_addr: Option<SocketAddr>,
644        finality_timeout: Option<Duration>,
645    ) -> Result<QuorumTransactionResponse, QuorumDriverError> {
646        debug!("TO Received transaction execution request.");
647
648        let timer = Instant::now();
649        let tx_type = if verified_transaction.is_consensus_tx() {
650            TxType::SharedObject
651        } else {
652            TxType::SingleWriter
653        };
654
655        let (_in_flight_metrics_guards, good_response_metrics) =
656            self.update_metrics(&request.transaction);
657
658        // TODO: refactor all the gauge and timer metrics with `monitored_scope`
659        let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
660        wait_for_finality_gauge.inc();
661        let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
662            in_flight.dec();
663        });
664
665        let response = self
666            .submit_with_transaction_driver(
667                &self.transaction_driver,
668                &request,
669                client_addr,
670                &verified_transaction,
671                good_response_metrics,
672                finality_timeout,
673            )
674            .await?;
675        let driver_type = "transaction_driver";
676
677        add_server_timing("wait_for_finality done");
678
679        self.metrics.wait_for_finality_finished.inc();
680
681        let elapsed = timer.elapsed().as_secs_f64();
682        self.metrics
683            .settlement_finality_latency
684            .with_label_values(&[tx_type.as_str(), driver_type])
685            .observe(elapsed);
686        good_response_metrics.inc();
687
688        Ok(response)
689    }
690
691    #[instrument(level = "error", skip_all, err(level = "info"))]
692    async fn submit_with_transaction_driver(
693        &self,
694        td: &Arc<TransactionDriver<A>>,
695        request: &ExecuteTransactionRequestV3,
696        client_addr: Option<SocketAddr>,
697        verified_transaction: &VerifiedTransaction,
698        good_response_metrics: &GenericCounter<AtomicU64>,
699        timeout_duration: Option<Duration>,
700    ) -> Result<QuorumTransactionResponse, QuorumDriverError> {
701        let tx_digest = *verified_transaction.digest();
702        debug!("Using TransactionDriver for transaction {:?}", tx_digest);
703
704        let td_response = td
705            .drive_transaction(
706                SubmitTxRequest::new_transaction(request.transaction.clone()),
707                SubmitTransactionOptions {
708                    forwarded_client_addr: client_addr,
709                    allowed_validators: self.td_allowed_submission_list.clone(),
710                    blocked_validators: self.td_blocked_submission_list.clone(),
711                },
712                timeout_duration,
713            )
714            .await
715            .map_err(|e| match e {
716                TransactionDriverError::TimeoutWithLastRetriableError {
717                    last_error,
718                    attempts,
719                    timeout,
720                } => QuorumDriverError::TimeoutBeforeFinalityWithErrors {
721                    last_error: last_error.map(|e| e.to_string()).unwrap_or_default(),
722                    attempts,
723                    timeout,
724                },
725                other => QuorumDriverError::TransactionFailed {
726                    category: other.categorize(),
727                    details: other.to_string(),
728                },
729            });
730
731        match td_response {
732            Err(e) => {
733                warn!("TransactionDriver error: {e:?}");
734                Err(e)
735            }
736            Ok(quorum_transaction_response) => {
737                good_response_metrics.inc();
738                Ok(quorum_transaction_response)
739            }
740        }
741    }
742
743    #[instrument(
744        name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout",
745        level = "debug",
746        skip_all,
747        err(level = "info")
748    )]
749    async fn wait_for_finalized_tx_executed_locally_with_timeout(
750        validator_state: &Arc<AuthorityState>,
751        tx_digest: TransactionDigest,
752        tx_type: TxType,
753        metrics: &TransactionOrchestratorMetrics,
754    ) -> SuiResult {
755        metrics.local_execution_in_flight.inc();
756        let _metrics_guard =
757            scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
758                in_flight.dec();
759            });
760
761        let _latency_guard = metrics
762            .local_execution_latency
763            .with_label_values(&[tx_type.as_str()])
764            .start_timer();
765        debug!("Waiting for finalized tx to be executed locally.");
766        match timeout(
767            LOCAL_EXECUTION_TIMEOUT,
768            validator_state
769                .get_transaction_cache_reader()
770                .notify_read_executed_effects_digests(
771                    "TransactionOrchestrator::notify_read_wait_for_local_execution",
772                    &[tx_digest],
773                ),
774        )
775        .instrument(error_span!(
776            "transaction_orchestrator::local_execution",
777            ?tx_digest
778        ))
779        .await
780        {
781            Err(_elapsed) => {
782                debug!(
783                    "Waiting for finalized tx to be executed locally timed out within {:?}.",
784                    LOCAL_EXECUTION_TIMEOUT
785                );
786                metrics.local_execution_timeout.inc();
787                Err(SuiErrorKind::TimeoutError.into())
788            }
789            Ok(_) => {
790                metrics.local_execution_success.inc();
791                Ok(())
792            }
793        }
794    }
795
796    fn update_metrics<'a>(
797        &'a self,
798        transaction: &Transaction,
799    ) -> (impl Drop + use<A>, &'a GenericCounter<AtomicU64>) {
800        let (in_flight, good_response) = if transaction.is_consensus_tx() {
801            self.metrics.total_req_received_shared_object.inc();
802            (
803                self.metrics.req_in_flight_shared_object.clone(),
804                &self.metrics.good_response_shared_object,
805            )
806        } else {
807            self.metrics.total_req_received_single_writer.inc();
808            (
809                self.metrics.req_in_flight_single_writer.clone(),
810                &self.metrics.good_response_single_writer,
811            )
812        };
813        in_flight.inc();
814        (
815            scopeguard::guard(in_flight, |in_flight| {
816                in_flight.dec();
817            }),
818            good_response,
819        )
820    }
821
822    fn start_task_to_recover_txes_in_log(
823        pending_tx_log: Arc<WritePathPendingTransactionLog>,
824        transaction_driver: Arc<TransactionDriver<A>>,
825    ) {
826        spawn_logged_monitored_task!(async move {
827            if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
828                info!("Skipping loading pending transactions from pending_tx_log.");
829                return;
830            }
831            let pending_txes = pending_tx_log
832                .load_all_pending_transactions()
833                .expect("failed to load all pending transactions");
834            let num_pending_txes = pending_txes.len();
835            info!(
836                "Recovering {} pending transactions from pending_tx_log.",
837                num_pending_txes
838            );
839            let mut recovery = pending_txes
840                .into_iter()
841                .map(|tx| {
842                    let pending_tx_log = pending_tx_log.clone();
843                    let transaction_driver = transaction_driver.clone();
844                    async move {
845                        // TODO: ideally pending_tx_log would not contain VerifiedTransaction, but that
846                        // requires a migration.
847                        let tx = tx.into_inner();
848                        let tx_digest = *tx.digest();
849                        // It's not impossible we fail to enqueue a task but that's not the end of world.
850                        // TODO(william) correctly extract client_addr from logs
851                        if let Err(err) = transaction_driver
852                            .drive_transaction(
853                                SubmitTxRequest::new_transaction(tx),
854                                SubmitTransactionOptions::default(),
855                                Some(Duration::from_secs(60)),
856                            )
857                            .await
858                        {
859                            warn!(?tx_digest, "Failed to execute recovered transaction: {err}");
860                        } else {
861                            debug!(?tx_digest, "Executed recovered transaction");
862                        }
863                        if let Err(err) = pending_tx_log.finish_transaction(&tx_digest) {
864                            warn!(
865                                ?tx_digest,
866                                "Failed to clean up transaction in pending log: {err}"
867                            );
868                        } else {
869                            debug!(?tx_digest, "Cleaned up transaction in pending log");
870                        }
871                    }
872                })
873                .collect::<FuturesUnordered<_>>();
874
875            let mut num_recovered = 0;
876            while recovery.next().await.is_some() {
877                num_recovered += 1;
878                if num_recovered % 1000 == 0 {
879                    info!(
880                        "Recovered {} out of {} transactions from pending_tx_log.",
881                        num_recovered, num_pending_txes
882                    );
883                }
884            }
885            info!(
886                "Recovery finished. Recovered {} out of {} transactions from pending_tx_log.",
887                num_recovered, num_pending_txes
888            );
889        });
890    }
891}
892/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
893#[derive(Clone)]
894pub struct TransactionOrchestratorMetrics {
895    total_req_received_single_writer: GenericCounter<AtomicU64>,
896    total_req_received_shared_object: GenericCounter<AtomicU64>,
897
898    good_response_single_writer: GenericCounter<AtomicU64>,
899    good_response_shared_object: GenericCounter<AtomicU64>,
900
901    req_in_flight_single_writer: GenericGauge<AtomicI64>,
902    req_in_flight_shared_object: GenericGauge<AtomicI64>,
903
904    wait_for_finality_in_flight: GenericGauge<AtomicI64>,
905    wait_for_finality_finished: GenericCounter<AtomicU64>,
906    wait_for_finality_timeout: GenericCounter<AtomicU64>,
907
908    local_execution_in_flight: GenericGauge<AtomicI64>,
909    local_execution_success: GenericCounter<AtomicU64>,
910    local_execution_timeout: GenericCounter<AtomicU64>,
911
912    concurrent_execution: IntCounter,
913
914    early_validation_rejections: IntCounterVec,
915
916    background_retry_started: IntGauge,
917    background_retry_errors: IntCounter,
918
919    request_latency: HistogramVec,
920    local_execution_latency: HistogramVec,
921    settlement_finality_latency: HistogramVec,
922}
923
924// Note that labeled-metrics are stored upfront individually
925// to mitigate the perf hit by MetricsVec.
926// See https://github.com/tikv/rust-prometheus/tree/master/static-metric
927impl TransactionOrchestratorMetrics {
928    pub fn new(registry: &Registry) -> Self {
929        let total_req_received = register_int_counter_vec_with_registry!(
930            "tx_orchestrator_total_req_received",
931            "Total number of executions request Transaction Orchestrator receives, group by tx type",
932            &["tx_type"],
933            registry
934        )
935        .unwrap();
936
937        let total_req_received_single_writer =
938            total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
939        let total_req_received_shared_object =
940            total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
941
942        let good_response = register_int_counter_vec_with_registry!(
943            "tx_orchestrator_good_response",
944            "Total number of good responses Transaction Orchestrator generates, group by tx type",
945            &["tx_type"],
946            registry
947        )
948        .unwrap();
949
950        let good_response_single_writer =
951            good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
952        let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
953
954        let req_in_flight = register_int_gauge_vec_with_registry!(
955            "tx_orchestrator_req_in_flight",
956            "Number of requests in flights Transaction Orchestrator processes, group by tx type",
957            &["tx_type"],
958            registry
959        )
960        .unwrap();
961
962        let req_in_flight_single_writer =
963            req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
964        let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
965
966        Self {
967            total_req_received_single_writer,
968            total_req_received_shared_object,
969            good_response_single_writer,
970            good_response_shared_object,
971            req_in_flight_single_writer,
972            req_in_flight_shared_object,
973            wait_for_finality_in_flight: register_int_gauge_with_registry!(
974                "tx_orchestrator_wait_for_finality_in_flight",
975                "Number of in flight txns Transaction Orchestrator are waiting for finality for",
976                registry,
977            )
978            .unwrap(),
979            wait_for_finality_finished: register_int_counter_with_registry!(
980                "tx_orchestrator_wait_for_finality_fnished",
981                "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
982                registry,
983            )
984            .unwrap(),
985            wait_for_finality_timeout: register_int_counter_with_registry!(
986                "tx_orchestrator_wait_for_finality_timeout",
987                "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
988                registry,
989            )
990            .unwrap(),
991            local_execution_in_flight: register_int_gauge_with_registry!(
992                "tx_orchestrator_local_execution_in_flight",
993                "Number of local execution txns in flights Transaction Orchestrator handles",
994                registry,
995            )
996            .unwrap(),
997            local_execution_success: register_int_counter_with_registry!(
998                "tx_orchestrator_local_execution_success",
999                "Total number of successful local execution txns Transaction Orchestrator handles",
1000                registry,
1001            )
1002            .unwrap(),
1003            local_execution_timeout: register_int_counter_with_registry!(
1004                "tx_orchestrator_local_execution_timeout",
1005                "Total number of timed-out local execution txns Transaction Orchestrator handles",
1006                registry,
1007            )
1008            .unwrap(),
1009            concurrent_execution: register_int_counter_with_registry!(
1010                "tx_orchestrator_concurrent_execution",
1011                "Total number of concurrent execution where effects are available locally finishing driving the transaction to finality",
1012                registry,
1013            )
1014            .unwrap(),
1015            early_validation_rejections: register_int_counter_vec_with_registry!(
1016                "tx_orchestrator_early_validation_rejections",
1017                "Total number of transactions rejected during early validation before submission, by reason",
1018                &["reason"],
1019                registry,
1020            )
1021            .unwrap(),
1022            background_retry_started: register_int_gauge_with_registry!(
1023                "tx_orchestrator_background_retry_started",
1024                "Number of background retry tasks kicked off for transactions with retriable errors",
1025                registry,
1026            )
1027            .unwrap(),
1028            background_retry_errors: register_int_counter_with_registry!(
1029                "tx_orchestrator_background_retry_errors",
1030                "Total number of background retry errors, by error type",
1031                registry,
1032            )
1033            .unwrap(),
1034            request_latency: register_histogram_vec_with_registry!(
1035                "tx_orchestrator_request_latency",
1036                "Time spent in processing one Transaction Orchestrator request",
1037                &["tx_type", "route", "wait_for_local_execution"],
1038                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1039                registry,
1040            )
1041            .unwrap(),
1042            local_execution_latency: register_histogram_vec_with_registry!(
1043                "tx_orchestrator_local_execution_latency",
1044                "Time spent in waiting for one Transaction Orchestrator gets locally executed",
1045                &["tx_type"],
1046                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1047                registry,
1048            )
1049            .unwrap(),
1050            settlement_finality_latency: register_histogram_vec_with_registry!(
1051                "tx_orchestrator_settlement_finality_latency",
1052                "Time spent in waiting for one Transaction Orchestrator gets settled and finalized",
1053                &["tx_type", "driver_type"],
1054                mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
1055                registry,
1056            )
1057            .unwrap(),
1058        }
1059    }
1060
1061    pub fn new_for_tests() -> Self {
1062        let registry = Registry::new();
1063        Self::new(&registry)
1064    }
1065}
1066
1067#[async_trait::async_trait]
1068impl<A> sui_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
1069where
1070    A: AuthorityAPI + Send + Sync + 'static + Clone,
1071{
1072    async fn execute_transaction(
1073        &self,
1074        request: ExecuteTransactionRequestV3,
1075        client_addr: Option<std::net::SocketAddr>,
1076    ) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
1077        self.execute_transaction_v3(request, client_addr).await
1078    }
1079
1080    fn simulate_transaction(
1081        &self,
1082        transaction: TransactionData,
1083        checks: TransactionChecks,
1084    ) -> Result<SimulateTransactionResult, SuiError> {
1085        self.inner
1086            .validator_state
1087            .simulate_transaction(transaction, checks)
1088    }
1089}
1090
1091/// Keeps track of inflight transactions being submitted, and helps recover transactions
1092/// on restart.
1093struct TransactionSubmissionGuard {
1094    pending_tx_log: Arc<WritePathPendingTransactionLog>,
1095    tx_digest: TransactionDigest,
1096    is_new_transaction: bool,
1097}
1098
1099impl TransactionSubmissionGuard {
1100    pub fn new(
1101        pending_tx_log: Arc<WritePathPendingTransactionLog>,
1102        tx: &VerifiedTransaction,
1103    ) -> Self {
1104        let is_new_transaction = pending_tx_log.write_pending_transaction_maybe(tx);
1105        let tx_digest = *tx.digest();
1106        if is_new_transaction {
1107            debug!(?tx_digest, "Added transaction to inflight set");
1108        } else {
1109            debug!(
1110                ?tx_digest,
1111                "Transaction already being processed, no new submission will be made"
1112            );
1113        };
1114        Self {
1115            pending_tx_log,
1116            tx_digest,
1117            is_new_transaction,
1118        }
1119    }
1120
1121    fn is_new_transaction(&self) -> bool {
1122        self.is_new_transaction
1123    }
1124}
1125
1126impl Drop for TransactionSubmissionGuard {
1127    fn drop(&mut self) {
1128        if let Err(err) = self.pending_tx_log.finish_transaction(&self.tx_digest) {
1129            warn!(?self.tx_digest, "Failed to clean up transaction in pending log: {err}");
1130        } else {
1131            debug!(?self.tx_digest, "Cleaned up transaction in pending log");
1132        }
1133    }
1134}