sui_core/
transaction_orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::net::SocketAddr;
5use std::ops::Deref;
6use std::path::Path;
7use std::sync::Arc;
8use std::time::Duration;
9
10use futures::FutureExt;
11use futures::stream::{FuturesUnordered, StreamExt};
12use mysten_common::{backoff, in_antithesis};
13use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, spawn_monitored_task};
14use mysten_metrics::{add_server_timing, spawn_logged_monitored_task};
15use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
16use prometheus::{
17    HistogramVec, IntCounter, IntCounterVec, IntGauge, Registry,
18    register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
19    register_int_counter_with_registry, register_int_gauge_vec_with_registry,
20    register_int_gauge_with_registry,
21};
22use rand::Rng;
23use sui_config::NodeConfig;
24use sui_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
25use sui_types::base_types::TransactionDigest;
26use sui_types::effects::TransactionEffectsAPI;
27use sui_types::error::{ErrorCategory, SuiError, SuiErrorKind, SuiResult};
28use sui_types::messages_grpc::{SubmitTxRequest, TxType};
29use sui_types::quorum_driver_types::{
30    EffectsFinalityInfo, ExecuteTransactionRequestType, ExecuteTransactionRequestV3,
31    ExecuteTransactionResponseV3, FinalizedEffects, IsTransactionExecutedLocally,
32    QuorumDriverError,
33};
34use sui_types::sui_system_state::SuiSystemState;
35use sui_types::transaction::{Transaction, TransactionData, VerifiedTransaction};
36use sui_types::transaction_executor::{SimulateTransactionResult, TransactionChecks};
37use tokio::sync::broadcast::Receiver;
38use tokio::time::{Instant, sleep, timeout};
39use tracing::{Instrument, debug, error_span, info, instrument, warn};
40
41use crate::authority::AuthorityState;
42use crate::authority_aggregator::AuthorityAggregator;
43use crate::authority_client::{AuthorityAPI, NetworkAuthorityClient};
44use crate::quorum_driver::reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver};
45use crate::transaction_driver::{
46    QuorumTransactionResponse, SubmitTransactionOptions, TransactionDriver, TransactionDriverError,
47    TransactionDriverMetrics,
48};
49
50// How long to wait for local execution (including parents) before a timeout
51// is returned to client.
52const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
53
54// Timeout for waiting for finality for each transaction.
55const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(90);
56
57pub type QuorumTransactionEffectsResult =
58    Result<(Transaction, QuorumTransactionResponse), (TransactionDigest, QuorumDriverError)>;
59
60/// Transaction Orchestrator is a Node component that utilizes Transaction Driver to
61/// submit transactions to validators for finality. It adds inflight deduplication,
62/// waiting for local execution, recovery, early validation, and epoch change handling
63/// on top of Transaction Driver.
64/// This is used by node RPC service to support transaction submission and finality waiting.
65pub struct TransactionOrchestrator<A: Clone> {
66    inner: Arc<Inner<A>>,
67}
68
69impl TransactionOrchestrator<NetworkAuthorityClient> {
70    pub fn new_with_auth_aggregator(
71        validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
72        validator_state: Arc<AuthorityState>,
73        reconfig_channel: Receiver<SuiSystemState>,
74        parent_path: &Path,
75        prometheus_registry: &Registry,
76        node_config: &NodeConfig,
77    ) -> Self {
78        let observer = OnsiteReconfigObserver::new(
79            reconfig_channel,
80            validator_state.get_object_cache_reader().clone(),
81            validator_state.clone_committee_store(),
82            validators.safe_client_metrics_base.clone(),
83            validators.metrics.deref().clone(),
84        );
85        TransactionOrchestrator::new(
86            validators,
87            validator_state,
88            parent_path,
89            prometheus_registry,
90            observer,
91            node_config,
92        )
93    }
94}
95
96impl<A> TransactionOrchestrator<A>
97where
98    A: AuthorityAPI + Send + Sync + 'static + Clone,
99    OnsiteReconfigObserver: ReconfigObserver<A>,
100{
101    pub fn new(
102        validators: Arc<AuthorityAggregator<A>>,
103        validator_state: Arc<AuthorityState>,
104        parent_path: &Path,
105        prometheus_registry: &Registry,
106        reconfig_observer: OnsiteReconfigObserver,
107        node_config: &NodeConfig,
108    ) -> Self {
109        let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
110        let td_metrics = Arc::new(TransactionDriverMetrics::new(prometheus_registry));
111        let client_metrics = Arc::new(
112            crate::validator_client_monitor::ValidatorClientMetrics::new(prometheus_registry),
113        );
114        let reconfig_observer = Arc::new(reconfig_observer);
115
116        let transaction_driver = TransactionDriver::new(
117            validators.clone(),
118            reconfig_observer.clone(),
119            td_metrics,
120            Some(node_config),
121            client_metrics,
122        );
123
124        let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
125            parent_path.join("fullnode_pending_transactions"),
126        ));
127        Inner::start_task_to_recover_txes_in_log(
128            pending_tx_log.clone(),
129            transaction_driver.clone(),
130        );
131
132        let td_allowed_submission_list = node_config
133            .transaction_driver_config
134            .as_ref()
135            .map(|config| config.allowed_submission_validators.clone())
136            .unwrap_or_default();
137
138        let td_blocked_submission_list = node_config
139            .transaction_driver_config
140            .as_ref()
141            .map(|config| config.blocked_submission_validators.clone())
142            .unwrap_or_default();
143
144        if !td_allowed_submission_list.is_empty() && !td_blocked_submission_list.is_empty() {
145            panic!(
146                "Both allowed and blocked submission lists are set, this is not allowed, {:?} {:?}",
147                td_allowed_submission_list, td_blocked_submission_list
148            );
149        }
150
151        let enable_early_validation = node_config
152            .transaction_driver_config
153            .as_ref()
154            .map(|config| config.enable_early_validation)
155            .unwrap_or(true);
156
157        let inner = Arc::new(Inner {
158            validator_state,
159            pending_tx_log,
160            metrics,
161            transaction_driver,
162            td_allowed_submission_list,
163            td_blocked_submission_list,
164            enable_early_validation,
165        });
166        Self { inner }
167    }
168}
169
170impl<A> TransactionOrchestrator<A>
171where
172    A: AuthorityAPI + Send + Sync + 'static + Clone,
173{
174    #[instrument(name = "tx_orchestrator_execute_transaction", level = "debug", skip_all,
175    fields(
176        tx_digest = ?request.transaction.digest(),
177        tx_type = ?request_type,
178    ))]
179    pub async fn execute_transaction_block(
180        &self,
181        request: ExecuteTransactionRequestV3,
182        request_type: ExecuteTransactionRequestType,
183        client_addr: Option<SocketAddr>,
184    ) -> Result<(ExecuteTransactionResponseV3, IsTransactionExecutedLocally), QuorumDriverError>
185    {
186        let timer = Instant::now();
187        let tx_type = if request.transaction.is_consensus_tx() {
188            TxType::SharedObject
189        } else {
190            TxType::SingleWriter
191        };
192        let tx_digest = *request.transaction.digest();
193
194        let inner = self.inner.clone();
195        let (response, mut executed_locally) = spawn_monitored_task!(
196            Inner::<A>::execute_transaction_with_retry(inner, request, client_addr)
197        )
198        .await
199        .map_err(|e| QuorumDriverError::TransactionFailed {
200            category: ErrorCategory::Internal,
201            details: e.to_string(),
202        })??;
203
204        if !executed_locally {
205            executed_locally = if matches!(
206                request_type,
207                ExecuteTransactionRequestType::WaitForLocalExecution
208            ) {
209                let executed_locally =
210                    Inner::<A>::wait_for_finalized_tx_executed_locally_with_timeout(
211                        &self.inner.validator_state,
212                        tx_digest,
213                        tx_type,
214                        &self.inner.metrics,
215                    )
216                    .await
217                    .is_ok();
218                add_server_timing("local_execution done");
219                executed_locally
220            } else {
221                false
222            };
223        }
224
225        let QuorumTransactionResponse {
226            effects,
227            events,
228            input_objects,
229            output_objects,
230            auxiliary_data,
231        } = response;
232
233        let response = ExecuteTransactionResponseV3 {
234            effects,
235            events,
236            input_objects,
237            output_objects,
238            auxiliary_data,
239        };
240
241        self.inner
242            .metrics
243            .request_latency
244            .with_label_values(&[
245                tx_type.as_str(),
246                "execute_transaction_block",
247                executed_locally.to_string().as_str(),
248            ])
249            .observe(timer.elapsed().as_secs_f64());
250
251        Ok((response, executed_locally))
252    }
253
254    // Utilize the handle_certificate_v3 validator api to request input/output objects
255    #[instrument(name = "tx_orchestrator_execute_transaction_v3", level = "debug", skip_all,
256                 fields(tx_digest = ?request.transaction.digest()))]
257    pub async fn execute_transaction_v3(
258        &self,
259        request: ExecuteTransactionRequestV3,
260        client_addr: Option<SocketAddr>,
261    ) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
262        let timer = Instant::now();
263        let tx_type = if request.transaction.is_consensus_tx() {
264            TxType::SharedObject
265        } else {
266            TxType::SingleWriter
267        };
268
269        let inner = self.inner.clone();
270        let (response, _) = spawn_monitored_task!(Inner::<A>::execute_transaction_with_retry(
271            inner,
272            request,
273            client_addr
274        ))
275        .await
276        .map_err(|e| QuorumDriverError::TransactionFailed {
277            category: ErrorCategory::Internal,
278            details: e.to_string(),
279        })??;
280
281        self.inner
282            .metrics
283            .request_latency
284            .with_label_values(&[tx_type.as_str(), "execute_transaction_v3", "false"])
285            .observe(timer.elapsed().as_secs_f64());
286
287        let QuorumTransactionResponse {
288            effects,
289            events,
290            input_objects,
291            output_objects,
292            auxiliary_data,
293        } = response;
294
295        Ok(ExecuteTransactionResponseV3 {
296            effects,
297            events,
298            input_objects,
299            output_objects,
300            auxiliary_data,
301        })
302    }
303
304    pub fn authority_state(&self) -> &Arc<AuthorityState> {
305        &self.inner.validator_state
306    }
307
308    pub fn transaction_driver(&self) -> &Arc<TransactionDriver<A>> {
309        &self.inner.transaction_driver
310    }
311
312    pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
313        self.inner
314            .transaction_driver
315            .authority_aggregator()
316            .load_full()
317    }
318
319    pub fn load_all_pending_transactions_in_test(&self) -> SuiResult<Vec<VerifiedTransaction>> {
320        self.inner.pending_tx_log.load_all_pending_transactions()
321    }
322
323    pub fn empty_pending_tx_log_in_test(&self) -> bool {
324        self.inner.pending_tx_log.is_empty()
325    }
326}
327
328struct Inner<A: Clone> {
329    validator_state: Arc<AuthorityState>,
330    pending_tx_log: Arc<WritePathPendingTransactionLog>,
331    metrics: Arc<TransactionOrchestratorMetrics>,
332    transaction_driver: Arc<TransactionDriver<A>>,
333    td_allowed_submission_list: Vec<String>,
334    td_blocked_submission_list: Vec<String>,
335    enable_early_validation: bool,
336}
337
338impl<A> Inner<A>
339where
340    A: AuthorityAPI + Send + Sync + 'static + Clone,
341{
342    async fn execute_transaction_with_retry(
343        inner: Arc<Inner<A>>,
344        request: ExecuteTransactionRequestV3,
345        client_addr: Option<SocketAddr>,
346    ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), QuorumDriverError> {
347        let result = inner
348            .execute_transaction_with_effects_waiting(
349                request.clone(),
350                client_addr,
351                /* enforce_live_input_objects */ false,
352            )
353            .await;
354
355        // If the error is retriable, retry the transaction sufficiently long.
356        if let Err(e) = &result
357            && e.is_retriable()
358        {
359            spawn_monitored_task!(async move {
360                inner.metrics.background_retry_started.inc();
361                let backoff = backoff::ExponentialBackoff::new(
362                    Duration::from_secs(1),
363                    Duration::from_secs(300),
364                );
365                const MAX_RETRIES: usize = 10;
366                for (i, delay) in backoff.enumerate() {
367                    if i == MAX_RETRIES {
368                        break;
369                    }
370                    // Start to enforce live input after 3 retries,
371                    // to avoid excessively retrying transactions with non-existent input objects.
372                    let result = inner
373                        .execute_transaction_with_effects_waiting(
374                            request.clone(),
375                            client_addr,
376                            i > 3,
377                        )
378                        .await;
379                    match result {
380                        Ok(_) => {
381                            inner
382                                .metrics
383                                .background_retry_attempts
384                                .with_label_values(&["success"])
385                                .inc();
386                            debug!(
387                                "Background retry {i} for transaction {} succeeded",
388                                request.transaction.digest()
389                            );
390                            break;
391                        }
392                        Err(e) => {
393                            if !e.is_retriable() {
394                                inner
395                                    .metrics
396                                    .background_retry_attempts
397                                    .with_label_values(&["non-retriable"])
398                                    .inc();
399                                debug!(
400                                    "Background retry {i} for transaction {} has non-retriable error: {e:?}. Terminating...",
401                                    request.transaction.digest()
402                                );
403                                break;
404                            }
405                            inner
406                                .metrics
407                                .background_retry_attempts
408                                .with_label_values(&["retriable"])
409                                .inc();
410                            debug!(
411                                "Background retry {i} for transaction {} has retriable error: {e:?}. Continuing...",
412                                request.transaction.digest()
413                            );
414                        }
415                    };
416                    sleep(delay).await;
417                }
418            });
419        }
420
421        result
422    }
423
424    /// Shared implementation for executing transactions with parallel local effects waiting
425    async fn execute_transaction_with_effects_waiting(
426        &self,
427        request: ExecuteTransactionRequestV3,
428        client_addr: Option<SocketAddr>,
429        enforce_live_input_objects: bool,
430    ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), QuorumDriverError> {
431        let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
432        let verified_transaction = epoch_store
433            .verify_transaction_with_current_aliases(request.transaction.clone())
434            .map_err(QuorumDriverError::InvalidUserSignature)?
435            .into_tx();
436        let tx_digest = *verified_transaction.digest();
437
438        // Early validation check against local state before submission to catch non-retriable errors
439        // TODO: Consider moving this check to TransactionDriver for per-retry validation
440        if self.enable_early_validation
441            && let Err(e) = self.validator_state.check_transaction_validity(
442                &epoch_store,
443                &verified_transaction,
444                enforce_live_input_objects,
445            )
446        {
447            let error_category = e.categorize();
448            if !error_category.is_submission_retriable() {
449                // Skip early validation rejection if transaction has already been executed (allows retries to return cached results)
450                if !self.validator_state.is_tx_already_executed(&tx_digest) {
451                    self.metrics
452                        .early_validation_rejections
453                        .with_label_values(&[e.to_variant_name()])
454                        .inc();
455                    debug!(
456                        error = ?e,
457                        "Transaction rejected during early validation"
458                    );
459
460                    return Err(QuorumDriverError::TransactionFailed {
461                        category: error_category,
462                        details: e.to_string(),
463                    });
464                }
465            }
466        }
467
468        // Add transaction to WAL log.
469        let guard =
470            TransactionSubmissionGuard::new(self.pending_tx_log.clone(), &verified_transaction);
471        let is_new_transaction = guard.is_new_transaction();
472
473        let include_events = request.include_events;
474        let include_input_objects = request.include_input_objects;
475        let include_output_objects = request.include_output_objects;
476        let include_auxiliary_data = request.include_auxiliary_data;
477
478        let finality_timeout = std::env::var("WAIT_FOR_FINALITY_TIMEOUT_SECS")
479            .ok()
480            .and_then(|v| v.parse().ok())
481            .map(Duration::from_secs)
482            .unwrap_or(WAIT_FOR_FINALITY_TIMEOUT);
483
484        let num_submissions = if !is_new_transaction {
485            // No need to submit when the transaction is already being processed.
486            0
487        } else if cfg!(msim) || in_antithesis() {
488            // Allow duplicated submissions in tests.
489            let r = rand::thread_rng().gen_range(1..=100);
490            let n = if r <= 10 {
491                3
492            } else if r <= 30 {
493                2
494            } else {
495                1
496            };
497            if n > 1 {
498                debug!("Making {n} execution calls");
499            }
500            n
501        } else {
502            1
503        };
504
505        // Wait for one of the execution futures to succeed, or all of them to fail.
506        let mut execution_futures = FuturesUnordered::new();
507        for i in 0..num_submissions {
508            // Generate jitter values outside the async block
509            let should_delay = i > 0 && rand::thread_rng().gen_bool(0.8);
510            let delay_ms = if should_delay {
511                rand::thread_rng().gen_range(100..=500)
512            } else {
513                0
514            };
515
516            let request = request.clone();
517            let verified_transaction = verified_transaction.clone();
518
519            let future = async move {
520                if delay_ms > 0 {
521                    // Add jitters to duplicated submissions.
522                    sleep(Duration::from_millis(delay_ms)).await;
523                }
524                self.execute_transaction_impl(
525                    request,
526                    verified_transaction,
527                    client_addr,
528                    Some(finality_timeout),
529                )
530                .await
531            }
532            .boxed();
533            execution_futures.push(future);
534        }
535
536        // Track the last execution error.
537        let mut last_execution_error: Option<QuorumDriverError> = None;
538
539        // Wait for execution result outside of this call to become available.
540        let digests = [tx_digest];
541        let mut local_effects_future = epoch_store
542            .within_alive_epoch(
543                self.validator_state
544                    .get_transaction_cache_reader()
545                    .notify_read_executed_effects(
546                    "TransactionOrchestrator::notify_read_execute_transaction_with_effects_waiting",
547                    &digests,
548                ),
549            )
550            .boxed();
551
552        // Wait for execution timeout.
553        let mut timeout_future = tokio::time::sleep(finality_timeout).boxed();
554
555        loop {
556            tokio::select! {
557                biased;
558
559                // Local effects might be available
560                local_effects_result = &mut local_effects_future => {
561                    match local_effects_result {
562                        Ok(effects) => {
563                            debug!(
564                                "Effects became available while execution was running"
565                            );
566                            if let Some(effects) = effects.into_iter().next() {
567                                self.metrics.concurrent_execution.inc();
568                                let epoch = effects.executed_epoch();
569                                let events = if include_events {
570                                    if effects.events_digest().is_some() {
571                                        Some(self.validator_state.get_transaction_events(effects.transaction_digest())
572                                            .map_err(QuorumDriverError::QuorumDriverInternalError)?)
573                                    } else {
574                                        None
575                                    }
576                                } else {
577                                    None
578                                };
579                                let input_objects = include_input_objects
580                                    .then(|| self.validator_state.get_transaction_input_objects(&effects))
581                                    .transpose()
582                                    .map_err(QuorumDriverError::QuorumDriverInternalError)?;
583                                let output_objects = include_output_objects
584                                    .then(|| self.validator_state.get_transaction_output_objects(&effects))
585                                    .transpose()
586                                    .map_err(QuorumDriverError::QuorumDriverInternalError)?;
587                                let response = QuorumTransactionResponse {
588                                    effects: FinalizedEffects {
589                                        effects,
590                                        finality_info: EffectsFinalityInfo::QuorumExecuted(epoch),
591                                    },
592                                    events,
593                                    input_objects,
594                                    output_objects,
595                                    auxiliary_data: None,
596                                };
597                                break Ok((response, true));
598                            }
599                        }
600                        Err(_) => {
601                            warn!("Epoch terminated before effects were available");
602                        }
603                    };
604
605                    // Prevent this branch from being selected again
606                    local_effects_future = futures::future::pending().boxed();
607                }
608
609                // This branch is disabled if execution_futures is empty.
610                Some(result) = execution_futures.next() => {
611                    match result {
612                        Ok(resp) => {
613                            // First success gets returned.
614                            debug!("Execution succeeded, returning response");
615                            let QuorumTransactionResponse {
616                                effects,
617                                events,
618                                input_objects,
619                                output_objects,
620                                auxiliary_data,
621                            } = resp;
622                            // Filter fields based on request flags.
623                            let resp = QuorumTransactionResponse {
624                                effects,
625                                events: if include_events { events } else { None },
626                                input_objects: if include_input_objects { input_objects } else { None },
627                                output_objects: if include_output_objects { output_objects } else { None },
628                                auxiliary_data: if include_auxiliary_data { auxiliary_data } else { None },
629                            };
630                            break Ok((resp, false));
631                        }
632                        Err(e) => {
633                            debug!(?e, "Execution attempt failed, wait for other attempts");
634                            last_execution_error = Some(e);
635                        }
636                    };
637
638                    // Last error must have been recorded.
639                    if execution_futures.is_empty() {
640                        break Err(last_execution_error.unwrap());
641                    }
642                }
643
644                // A timeout has occurred while waiting for finality
645                _ = &mut timeout_future => {
646                    if let Some(e) = last_execution_error {
647                        debug!("Timeout waiting for transaction finality. Last execution error: {e}");
648                    } else {
649                        debug!("Timeout waiting for transaction finality.");
650                    }
651                    self.metrics.wait_for_finality_timeout.inc();
652
653                    // TODO: Return the last execution error.
654                    break Err(QuorumDriverError::TimeoutBeforeFinality);
655                }
656            }
657        }
658    }
659
660    #[instrument(level = "error", skip_all)]
661    async fn execute_transaction_impl(
662        &self,
663        request: ExecuteTransactionRequestV3,
664        verified_transaction: VerifiedTransaction,
665        client_addr: Option<SocketAddr>,
666        finality_timeout: Option<Duration>,
667    ) -> Result<QuorumTransactionResponse, QuorumDriverError> {
668        debug!("TO Received transaction execution request.");
669
670        let timer = Instant::now();
671        let tx_type = if verified_transaction.is_consensus_tx() {
672            TxType::SharedObject
673        } else {
674            TxType::SingleWriter
675        };
676
677        let (_in_flight_metrics_guards, good_response_metrics) =
678            self.update_metrics(&request.transaction);
679
680        // TODO: refactor all the gauge and timer metrics with `monitored_scope`
681        let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
682        wait_for_finality_gauge.inc();
683        let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
684            in_flight.dec();
685        });
686
687        let response = self
688            .submit_with_transaction_driver(
689                &self.transaction_driver,
690                &request,
691                client_addr,
692                &verified_transaction,
693                good_response_metrics,
694                finality_timeout,
695            )
696            .await?;
697        let driver_type = "transaction_driver";
698
699        add_server_timing("wait_for_finality done");
700
701        self.metrics.wait_for_finality_finished.inc();
702
703        let elapsed = timer.elapsed().as_secs_f64();
704        self.metrics
705            .settlement_finality_latency
706            .with_label_values(&[tx_type.as_str(), driver_type])
707            .observe(elapsed);
708        good_response_metrics.inc();
709
710        Ok(response)
711    }
712
713    #[instrument(level = "error", skip_all, err(level = "info"))]
714    async fn submit_with_transaction_driver(
715        &self,
716        td: &Arc<TransactionDriver<A>>,
717        request: &ExecuteTransactionRequestV3,
718        client_addr: Option<SocketAddr>,
719        verified_transaction: &VerifiedTransaction,
720        good_response_metrics: &GenericCounter<AtomicU64>,
721        timeout_duration: Option<Duration>,
722    ) -> Result<QuorumTransactionResponse, QuorumDriverError> {
723        let tx_digest = *verified_transaction.digest();
724        debug!("Using TransactionDriver for transaction {:?}", tx_digest);
725
726        let td_response = td
727            .drive_transaction(
728                SubmitTxRequest::new_transaction(request.transaction.clone()),
729                SubmitTransactionOptions {
730                    forwarded_client_addr: client_addr,
731                    allowed_validators: self.td_allowed_submission_list.clone(),
732                    blocked_validators: self.td_blocked_submission_list.clone(),
733                },
734                timeout_duration,
735            )
736            .await
737            .map_err(|e| match e {
738                TransactionDriverError::TimeoutWithLastRetriableError {
739                    last_error,
740                    attempts,
741                    timeout,
742                } => QuorumDriverError::TimeoutBeforeFinalityWithErrors {
743                    last_error: last_error.map(|e| e.to_string()).unwrap_or_default(),
744                    attempts,
745                    timeout,
746                },
747                other => QuorumDriverError::TransactionFailed {
748                    category: other.categorize(),
749                    details: other.to_string(),
750                },
751            });
752
753        match td_response {
754            Err(e) => {
755                warn!("TransactionDriver error: {e:?}");
756                Err(e)
757            }
758            Ok(quorum_transaction_response) => {
759                good_response_metrics.inc();
760                Ok(quorum_transaction_response)
761            }
762        }
763    }
764
765    #[instrument(
766        name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout",
767        level = "debug",
768        skip_all,
769        err(level = "info")
770    )]
771    async fn wait_for_finalized_tx_executed_locally_with_timeout(
772        validator_state: &Arc<AuthorityState>,
773        tx_digest: TransactionDigest,
774        tx_type: TxType,
775        metrics: &TransactionOrchestratorMetrics,
776    ) -> SuiResult {
777        metrics.local_execution_in_flight.inc();
778        let _metrics_guard =
779            scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
780                in_flight.dec();
781            });
782
783        let _latency_guard = metrics
784            .local_execution_latency
785            .with_label_values(&[tx_type.as_str()])
786            .start_timer();
787        debug!("Waiting for finalized tx to be executed locally.");
788        match timeout(
789            LOCAL_EXECUTION_TIMEOUT,
790            validator_state
791                .get_transaction_cache_reader()
792                .notify_read_executed_effects_digests(
793                    "TransactionOrchestrator::notify_read_wait_for_local_execution",
794                    &[tx_digest],
795                ),
796        )
797        .instrument(error_span!(
798            "transaction_orchestrator::local_execution",
799            ?tx_digest
800        ))
801        .await
802        {
803            Err(_elapsed) => {
804                debug!(
805                    "Waiting for finalized tx to be executed locally timed out within {:?}.",
806                    LOCAL_EXECUTION_TIMEOUT
807                );
808                metrics.local_execution_timeout.inc();
809                Err(SuiErrorKind::TimeoutError.into())
810            }
811            Ok(_) => {
812                metrics.local_execution_success.inc();
813                Ok(())
814            }
815        }
816    }
817
818    fn update_metrics<'a>(
819        &'a self,
820        transaction: &Transaction,
821    ) -> (impl Drop + use<A>, &'a GenericCounter<AtomicU64>) {
822        let (in_flight, good_response) = if transaction.is_consensus_tx() {
823            self.metrics.total_req_received_shared_object.inc();
824            (
825                self.metrics.req_in_flight_shared_object.clone(),
826                &self.metrics.good_response_shared_object,
827            )
828        } else {
829            self.metrics.total_req_received_single_writer.inc();
830            (
831                self.metrics.req_in_flight_single_writer.clone(),
832                &self.metrics.good_response_single_writer,
833            )
834        };
835        in_flight.inc();
836        (
837            scopeguard::guard(in_flight, |in_flight| {
838                in_flight.dec();
839            }),
840            good_response,
841        )
842    }
843
844    fn start_task_to_recover_txes_in_log(
845        pending_tx_log: Arc<WritePathPendingTransactionLog>,
846        transaction_driver: Arc<TransactionDriver<A>>,
847    ) {
848        spawn_logged_monitored_task!(async move {
849            if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
850                info!("Skipping loading pending transactions from pending_tx_log.");
851                return;
852            }
853            let pending_txes = pending_tx_log
854                .load_all_pending_transactions()
855                .expect("failed to load all pending transactions");
856            let num_pending_txes = pending_txes.len();
857            info!(
858                "Recovering {} pending transactions from pending_tx_log.",
859                num_pending_txes
860            );
861            let mut recovery = pending_txes
862                .into_iter()
863                .map(|tx| {
864                    let pending_tx_log = pending_tx_log.clone();
865                    let transaction_driver = transaction_driver.clone();
866                    async move {
867                        // TODO: ideally pending_tx_log would not contain VerifiedTransaction, but that
868                        // requires a migration.
869                        let tx = tx.into_inner();
870                        let tx_digest = *tx.digest();
871                        // It's not impossible we fail to enqueue a task but that's not the end of world.
872                        // TODO(william) correctly extract client_addr from logs
873                        if let Err(err) = transaction_driver
874                            .drive_transaction(
875                                SubmitTxRequest::new_transaction(tx),
876                                SubmitTransactionOptions::default(),
877                                Some(Duration::from_secs(60)),
878                            )
879                            .await
880                        {
881                            warn!(?tx_digest, "Failed to execute recovered transaction: {err}");
882                        } else {
883                            debug!(?tx_digest, "Executed recovered transaction");
884                        }
885                        if let Err(err) = pending_tx_log.finish_transaction(&tx_digest) {
886                            warn!(
887                                ?tx_digest,
888                                "Failed to clean up transaction in pending log: {err}"
889                            );
890                        } else {
891                            debug!(?tx_digest, "Cleaned up transaction in pending log");
892                        }
893                    }
894                })
895                .collect::<FuturesUnordered<_>>();
896
897            let mut num_recovered = 0;
898            while recovery.next().await.is_some() {
899                num_recovered += 1;
900                if num_recovered % 1000 == 0 {
901                    info!(
902                        "Recovered {} out of {} transactions from pending_tx_log.",
903                        num_recovered, num_pending_txes
904                    );
905                }
906            }
907            info!(
908                "Recovery finished. Recovered {} out of {} transactions from pending_tx_log.",
909                num_recovered, num_pending_txes
910            );
911        });
912    }
913}
914/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
915#[derive(Clone)]
916pub struct TransactionOrchestratorMetrics {
917    total_req_received_single_writer: GenericCounter<AtomicU64>,
918    total_req_received_shared_object: GenericCounter<AtomicU64>,
919
920    good_response_single_writer: GenericCounter<AtomicU64>,
921    good_response_shared_object: GenericCounter<AtomicU64>,
922
923    req_in_flight_single_writer: GenericGauge<AtomicI64>,
924    req_in_flight_shared_object: GenericGauge<AtomicI64>,
925
926    wait_for_finality_in_flight: GenericGauge<AtomicI64>,
927    wait_for_finality_finished: GenericCounter<AtomicU64>,
928    wait_for_finality_timeout: GenericCounter<AtomicU64>,
929
930    local_execution_in_flight: GenericGauge<AtomicI64>,
931    local_execution_success: GenericCounter<AtomicU64>,
932    local_execution_timeout: GenericCounter<AtomicU64>,
933
934    concurrent_execution: IntCounter,
935
936    early_validation_rejections: IntCounterVec,
937
938    background_retry_started: IntGauge,
939    background_retry_attempts: IntCounterVec,
940
941    request_latency: HistogramVec,
942    local_execution_latency: HistogramVec,
943    settlement_finality_latency: HistogramVec,
944}
945
946// Note that labeled-metrics are stored upfront individually
947// to mitigate the perf hit by MetricsVec.
948// See https://github.com/tikv/rust-prometheus/tree/master/static-metric
949impl TransactionOrchestratorMetrics {
950    pub fn new(registry: &Registry) -> Self {
951        let total_req_received = register_int_counter_vec_with_registry!(
952            "tx_orchestrator_total_req_received",
953            "Total number of executions request Transaction Orchestrator receives, group by tx type",
954            &["tx_type"],
955            registry
956        )
957        .unwrap();
958
959        let total_req_received_single_writer =
960            total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
961        let total_req_received_shared_object =
962            total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
963
964        let good_response = register_int_counter_vec_with_registry!(
965            "tx_orchestrator_good_response",
966            "Total number of good responses Transaction Orchestrator generates, group by tx type",
967            &["tx_type"],
968            registry
969        )
970        .unwrap();
971
972        let good_response_single_writer =
973            good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
974        let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
975
976        let req_in_flight = register_int_gauge_vec_with_registry!(
977            "tx_orchestrator_req_in_flight",
978            "Number of requests in flights Transaction Orchestrator processes, group by tx type",
979            &["tx_type"],
980            registry
981        )
982        .unwrap();
983
984        let req_in_flight_single_writer =
985            req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
986        let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
987
988        Self {
989            total_req_received_single_writer,
990            total_req_received_shared_object,
991            good_response_single_writer,
992            good_response_shared_object,
993            req_in_flight_single_writer,
994            req_in_flight_shared_object,
995            wait_for_finality_in_flight: register_int_gauge_with_registry!(
996                "tx_orchestrator_wait_for_finality_in_flight",
997                "Number of in flight txns Transaction Orchestrator are waiting for finality for",
998                registry,
999            )
1000            .unwrap(),
1001            wait_for_finality_finished: register_int_counter_with_registry!(
1002                "tx_orchestrator_wait_for_finality_fnished",
1003                "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
1004                registry,
1005            )
1006            .unwrap(),
1007            wait_for_finality_timeout: register_int_counter_with_registry!(
1008                "tx_orchestrator_wait_for_finality_timeout",
1009                "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
1010                registry,
1011            )
1012            .unwrap(),
1013            local_execution_in_flight: register_int_gauge_with_registry!(
1014                "tx_orchestrator_local_execution_in_flight",
1015                "Number of local execution txns in flights Transaction Orchestrator handles",
1016                registry,
1017            )
1018            .unwrap(),
1019            local_execution_success: register_int_counter_with_registry!(
1020                "tx_orchestrator_local_execution_success",
1021                "Total number of successful local execution txns Transaction Orchestrator handles",
1022                registry,
1023            )
1024            .unwrap(),
1025            local_execution_timeout: register_int_counter_with_registry!(
1026                "tx_orchestrator_local_execution_timeout",
1027                "Total number of timed-out local execution txns Transaction Orchestrator handles",
1028                registry,
1029            )
1030            .unwrap(),
1031            concurrent_execution: register_int_counter_with_registry!(
1032                "tx_orchestrator_concurrent_execution",
1033                "Total number of concurrent execution where effects are available locally finishing driving the transaction to finality",
1034                registry,
1035            )
1036            .unwrap(),
1037            early_validation_rejections: register_int_counter_vec_with_registry!(
1038                "tx_orchestrator_early_validation_rejections",
1039                "Total number of transactions rejected during early validation before submission, by reason",
1040                &["reason"],
1041                registry,
1042            )
1043            .unwrap(),
1044            background_retry_started: register_int_gauge_with_registry!(
1045                "tx_orchestrator_background_retry_started",
1046                "Number of background retry tasks kicked off for transactions with retriable errors",
1047                registry,
1048            )
1049            .unwrap(),
1050            background_retry_attempts: register_int_counter_vec_with_registry!(
1051                "tx_orchestrator_background_retry_attempts",
1052                "Total number of background retry attempts, by status",
1053                &["status"],
1054                registry,
1055            )
1056            .unwrap(),
1057            request_latency: register_histogram_vec_with_registry!(
1058                "tx_orchestrator_request_latency",
1059                "Time spent in processing one Transaction Orchestrator request",
1060                &["tx_type", "route", "wait_for_local_execution"],
1061                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1062                registry,
1063            )
1064            .unwrap(),
1065            local_execution_latency: register_histogram_vec_with_registry!(
1066                "tx_orchestrator_local_execution_latency",
1067                "Time spent in waiting for one Transaction Orchestrator gets locally executed",
1068                &["tx_type"],
1069                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1070                registry,
1071            )
1072            .unwrap(),
1073            settlement_finality_latency: register_histogram_vec_with_registry!(
1074                "tx_orchestrator_settlement_finality_latency",
1075                "Time spent in waiting for one Transaction Orchestrator gets settled and finalized",
1076                &["tx_type", "driver_type"],
1077                mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
1078                registry,
1079            )
1080            .unwrap(),
1081        }
1082    }
1083
1084    pub fn new_for_tests() -> Self {
1085        let registry = Registry::new();
1086        Self::new(&registry)
1087    }
1088}
1089
1090#[async_trait::async_trait]
1091impl<A> sui_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
1092where
1093    A: AuthorityAPI + Send + Sync + 'static + Clone,
1094{
1095    async fn execute_transaction(
1096        &self,
1097        request: ExecuteTransactionRequestV3,
1098        client_addr: Option<std::net::SocketAddr>,
1099    ) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
1100        self.execute_transaction_v3(request, client_addr).await
1101    }
1102
1103    fn simulate_transaction(
1104        &self,
1105        transaction: TransactionData,
1106        checks: TransactionChecks,
1107    ) -> Result<SimulateTransactionResult, SuiError> {
1108        self.inner
1109            .validator_state
1110            .simulate_transaction(transaction, checks)
1111    }
1112}
1113
1114/// Keeps track of inflight transactions being submitted, and helps recover transactions
1115/// on restart.
1116struct TransactionSubmissionGuard {
1117    pending_tx_log: Arc<WritePathPendingTransactionLog>,
1118    tx_digest: TransactionDigest,
1119    is_new_transaction: bool,
1120}
1121
1122impl TransactionSubmissionGuard {
1123    pub fn new(
1124        pending_tx_log: Arc<WritePathPendingTransactionLog>,
1125        tx: &VerifiedTransaction,
1126    ) -> Self {
1127        let is_new_transaction = pending_tx_log.write_pending_transaction_maybe(tx);
1128        let tx_digest = *tx.digest();
1129        if is_new_transaction {
1130            debug!(?tx_digest, "Added transaction to inflight set");
1131        } else {
1132            debug!(
1133                ?tx_digest,
1134                "Transaction already being processed, no new submission will be made"
1135            );
1136        };
1137        Self {
1138            pending_tx_log,
1139            tx_digest,
1140            is_new_transaction,
1141        }
1142    }
1143
1144    fn is_new_transaction(&self) -> bool {
1145        self.is_new_transaction
1146    }
1147}
1148
1149impl Drop for TransactionSubmissionGuard {
1150    fn drop(&mut self) {
1151        if let Err(err) = self.pending_tx_log.finish_transaction(&self.tx_digest) {
1152            warn!(?self.tx_digest, "Failed to clean up transaction in pending log: {err}");
1153        } else {
1154            debug!(?self.tx_digest, "Cleaned up transaction in pending log");
1155        }
1156    }
1157}