sui_core/
transaction_orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::net::SocketAddr;
5use std::path::Path;
6use std::sync::Arc;
7use std::time::Duration;
8
9use futures::FutureExt;
10use futures::stream::{FuturesUnordered, StreamExt};
11use mysten_common::{backoff, in_integration_test};
12use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, spawn_monitored_task};
13use mysten_metrics::{add_server_timing, spawn_logged_monitored_task};
14use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
15use prometheus::{
16    HistogramVec, IntCounter, IntCounterVec, IntGauge, Registry,
17    register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
18    register_int_counter_with_registry, register_int_gauge_vec_with_registry,
19    register_int_gauge_with_registry,
20};
21use rand::Rng;
22use sui_config::NodeConfig;
23use sui_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
24use sui_types::base_types::TransactionDigest;
25use sui_types::effects::TransactionEffectsAPI;
26use sui_types::error::{ErrorCategory, SuiError, SuiErrorKind, SuiResult};
27use sui_types::messages_grpc::{SubmitTxRequest, TxType};
28use sui_types::sui_system_state::SuiSystemState;
29use sui_types::transaction::{Transaction, TransactionData, VerifiedTransaction};
30use sui_types::transaction_driver_types::{
31    EffectsFinalityInfo, ExecuteTransactionRequestType, ExecuteTransactionRequestV3,
32    ExecuteTransactionResponseV3, FinalizedEffects, IsTransactionExecutedLocally,
33    TransactionSubmissionError,
34};
35use sui_types::transaction_executor::{SimulateTransactionResult, TransactionChecks};
36use tokio::sync::broadcast::Receiver;
37use tokio::time::{Instant, sleep, timeout};
38use tracing::{Instrument, debug, error_span, info, instrument, warn};
39
40use crate::authority::AuthorityState;
41use crate::authority_aggregator::AuthorityAggregator;
42use crate::authority_client::{AuthorityAPI, NetworkAuthorityClient};
43use crate::transaction_driver::{OnsiteReconfigObserver, ReconfigObserver};
44use crate::transaction_driver::{
45    QuorumTransactionResponse, SubmitTransactionOptions, TransactionDriver, TransactionDriverError,
46    TransactionDriverMetrics,
47};
48
49// How long to wait for local execution (including parents) before a timeout
50// is returned to client.
51const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
52
53// Timeout for waiting for finality for each transaction.
54const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(90);
55
56pub type QuorumTransactionEffectsResult = Result<
57    (Transaction, QuorumTransactionResponse),
58    (TransactionDigest, TransactionSubmissionError),
59>;
60
61/// Transaction Orchestrator is a Node component that utilizes Transaction Driver to
62/// submit transactions to validators for finality. It adds inflight deduplication,
63/// waiting for local execution, recovery, early validation, and epoch change handling
64/// on top of Transaction Driver.
65/// This is used by node RPC service to support transaction submission and finality waiting.
66pub struct TransactionOrchestrator<A: Clone> {
67    inner: Arc<Inner<A>>,
68}
69
70impl TransactionOrchestrator<NetworkAuthorityClient> {
71    pub fn new_with_auth_aggregator(
72        validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
73        validator_state: Arc<AuthorityState>,
74        reconfig_channel: Receiver<SuiSystemState>,
75        parent_path: &Path,
76        prometheus_registry: &Registry,
77        node_config: &NodeConfig,
78    ) -> Self {
79        let observer = OnsiteReconfigObserver::new(
80            reconfig_channel,
81            validator_state.get_object_cache_reader().clone(),
82            validator_state.clone_committee_store(),
83            validators.safe_client_metrics_base.clone(),
84        );
85        TransactionOrchestrator::new(
86            validators,
87            validator_state,
88            parent_path,
89            prometheus_registry,
90            observer,
91            node_config,
92        )
93    }
94}
95
96impl<A> TransactionOrchestrator<A>
97where
98    A: AuthorityAPI + Send + Sync + 'static + Clone,
99    OnsiteReconfigObserver: ReconfigObserver<A>,
100{
101    pub fn new(
102        validators: Arc<AuthorityAggregator<A>>,
103        validator_state: Arc<AuthorityState>,
104        parent_path: &Path,
105        prometheus_registry: &Registry,
106        reconfig_observer: OnsiteReconfigObserver,
107        node_config: &NodeConfig,
108    ) -> Self {
109        let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
110        let td_metrics = Arc::new(TransactionDriverMetrics::new(prometheus_registry));
111        let client_metrics = Arc::new(
112            crate::validator_client_monitor::ValidatorClientMetrics::new(prometheus_registry),
113        );
114        let reconfig_observer = Arc::new(reconfig_observer);
115
116        let transaction_driver = TransactionDriver::new(
117            validators.clone(),
118            reconfig_observer.clone(),
119            td_metrics,
120            Some(node_config),
121            client_metrics,
122        );
123
124        let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
125            parent_path.join("fullnode_pending_transactions"),
126        ));
127        Inner::start_task_to_recover_txes_in_log(
128            pending_tx_log.clone(),
129            transaction_driver.clone(),
130        );
131
132        let td_allowed_submission_list = node_config
133            .transaction_driver_config
134            .as_ref()
135            .map(|config| config.allowed_submission_validators.clone())
136            .unwrap_or_default();
137
138        let td_blocked_submission_list = node_config
139            .transaction_driver_config
140            .as_ref()
141            .map(|config| config.blocked_submission_validators.clone())
142            .unwrap_or_default();
143
144        if !td_allowed_submission_list.is_empty() && !td_blocked_submission_list.is_empty() {
145            panic!(
146                "Both allowed and blocked submission lists are set, this is not allowed, {:?} {:?}",
147                td_allowed_submission_list, td_blocked_submission_list
148            );
149        }
150
151        let enable_early_validation = node_config
152            .transaction_driver_config
153            .as_ref()
154            .map(|config| config.enable_early_validation)
155            .unwrap_or(true);
156
157        let inner = Arc::new(Inner {
158            validator_state,
159            pending_tx_log,
160            metrics,
161            transaction_driver,
162            td_allowed_submission_list,
163            td_blocked_submission_list,
164            enable_early_validation,
165        });
166        Self { inner }
167    }
168}
169
170impl<A> TransactionOrchestrator<A>
171where
172    A: AuthorityAPI + Send + Sync + 'static + Clone,
173{
174    #[instrument(name = "tx_orchestrator_execute_transaction", level = "debug", skip_all,
175    fields(
176        tx_digest = ?request.transaction.digest(),
177        tx_type = ?request_type,
178    ))]
179    pub async fn execute_transaction_block(
180        &self,
181        request: ExecuteTransactionRequestV3,
182        request_type: ExecuteTransactionRequestType,
183        client_addr: Option<SocketAddr>,
184    ) -> Result<
185        (ExecuteTransactionResponseV3, IsTransactionExecutedLocally),
186        TransactionSubmissionError,
187    > {
188        let timer = Instant::now();
189        let tx_type = if request.transaction.is_consensus_tx() {
190            TxType::SharedObject
191        } else {
192            TxType::SingleWriter
193        };
194        let tx_digest = *request.transaction.digest();
195
196        let inner = self.inner.clone();
197        let (response, mut executed_locally) = spawn_monitored_task!(
198            Inner::<A>::execute_transaction_with_retry(inner, request, client_addr)
199        )
200        .await
201        .map_err(|e| TransactionSubmissionError::TransactionFailed {
202            category: ErrorCategory::Internal,
203            details: e.to_string(),
204        })??;
205
206        if matches!(
207            request_type,
208            ExecuteTransactionRequestType::WaitForLocalExecution
209        ) {
210            // Always wait for the checkpoint containing this tx to be finalized,
211            // even when effects are already available locally. With batched index
212            // writes, index data is only committed at checkpoint boundaries, so
213            // callers relying on up-to-date index data after WaitForLocalExecution
214            // need the checkpoint to be processed.
215            executed_locally = Inner::<A>::wait_for_finalized_tx_executed_locally_with_timeout(
216                &self.inner.validator_state,
217                tx_digest,
218                tx_type,
219                &self.inner.metrics,
220            )
221            .await
222            .is_ok();
223            add_server_timing("local_execution done");
224        }
225
226        let QuorumTransactionResponse {
227            effects,
228            events,
229            input_objects,
230            output_objects,
231            auxiliary_data,
232        } = response;
233
234        let response = ExecuteTransactionResponseV3 {
235            effects,
236            events,
237            input_objects,
238            output_objects,
239            auxiliary_data,
240        };
241
242        let request_latency = timer.elapsed().as_secs_f64();
243        if request_latency > 10.0 {
244            warn!(
245                ?tx_digest,
246                "Request latency {} is too high", request_latency,
247            );
248        }
249        self.inner
250            .metrics
251            .request_latency
252            .with_label_values(&[
253                tx_type.as_str(),
254                "execute_transaction_block",
255                executed_locally.to_string().as_str(),
256            ])
257            .observe(request_latency);
258
259        Ok((response, executed_locally))
260    }
261
262    // Utilize the handle_certificate_v3 validator api to request input/output objects
263    #[instrument(name = "tx_orchestrator_execute_transaction_v3", level = "debug", skip_all,
264                 fields(tx_digest = ?request.transaction.digest()))]
265    pub async fn execute_transaction_v3(
266        &self,
267        request: ExecuteTransactionRequestV3,
268        client_addr: Option<SocketAddr>,
269    ) -> Result<ExecuteTransactionResponseV3, TransactionSubmissionError> {
270        let timer = Instant::now();
271        let tx_type = if request.transaction.is_consensus_tx() {
272            TxType::SharedObject
273        } else {
274            TxType::SingleWriter
275        };
276
277        let inner = self.inner.clone();
278        let (response, _) = spawn_monitored_task!(Inner::<A>::execute_transaction_with_retry(
279            inner,
280            request,
281            client_addr
282        ))
283        .await
284        .map_err(|e| TransactionSubmissionError::TransactionFailed {
285            category: ErrorCategory::Internal,
286            details: e.to_string(),
287        })??;
288
289        self.inner
290            .metrics
291            .request_latency
292            .with_label_values(&[tx_type.as_str(), "execute_transaction_v3", "false"])
293            .observe(timer.elapsed().as_secs_f64());
294
295        let QuorumTransactionResponse {
296            effects,
297            events,
298            input_objects,
299            output_objects,
300            auxiliary_data,
301        } = response;
302
303        Ok(ExecuteTransactionResponseV3 {
304            effects,
305            events,
306            input_objects,
307            output_objects,
308            auxiliary_data,
309        })
310    }
311
312    pub fn authority_state(&self) -> &Arc<AuthorityState> {
313        &self.inner.validator_state
314    }
315
316    pub fn transaction_driver(&self) -> &Arc<TransactionDriver<A>> {
317        &self.inner.transaction_driver
318    }
319
320    pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
321        self.inner
322            .transaction_driver
323            .authority_aggregator()
324            .load_full()
325    }
326
327    pub fn load_all_pending_transactions_in_test(&self) -> SuiResult<Vec<VerifiedTransaction>> {
328        self.inner.pending_tx_log.load_all_pending_transactions()
329    }
330
331    pub fn empty_pending_tx_log_in_test(&self) -> bool {
332        self.inner.pending_tx_log.is_empty()
333    }
334}
335
336struct Inner<A: Clone> {
337    validator_state: Arc<AuthorityState>,
338    pending_tx_log: Arc<WritePathPendingTransactionLog>,
339    metrics: Arc<TransactionOrchestratorMetrics>,
340    transaction_driver: Arc<TransactionDriver<A>>,
341    td_allowed_submission_list: Vec<String>,
342    td_blocked_submission_list: Vec<String>,
343    enable_early_validation: bool,
344}
345
346impl<A> Inner<A>
347where
348    A: AuthorityAPI + Send + Sync + 'static + Clone,
349{
350    async fn execute_transaction_with_retry(
351        inner: Arc<Inner<A>>,
352        request: ExecuteTransactionRequestV3,
353        client_addr: Option<SocketAddr>,
354    ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), TransactionSubmissionError>
355    {
356        let result = inner
357            .execute_transaction_with_effects_waiting(
358                request.clone(),
359                client_addr,
360                /* enforce_live_input_objects */ false,
361            )
362            .await;
363
364        // If the error is retriable, retry the transaction sufficiently long.
365        if let Err(e) = &result
366            && e.is_retriable()
367        {
368            spawn_monitored_task!(async move {
369                inner.metrics.background_retry_started.inc();
370                let backoff = backoff::ExponentialBackoff::new(
371                    Duration::from_secs(1),
372                    Duration::from_secs(300),
373                );
374                const MAX_RETRIES: usize = 10;
375                for (i, delay) in backoff.enumerate() {
376                    if i == MAX_RETRIES {
377                        break;
378                    }
379                    // Start to enforce live input after 3 retries,
380                    // to avoid excessively retrying transactions with non-existent input objects.
381                    let result = inner
382                        .execute_transaction_with_effects_waiting(
383                            request.clone(),
384                            client_addr,
385                            i > 3,
386                        )
387                        .await;
388                    match result {
389                        Ok(_) => {
390                            inner
391                                .metrics
392                                .background_retry_attempts
393                                .with_label_values(&["success"])
394                                .inc();
395                            debug!(
396                                "Background retry {i} for transaction {} succeeded",
397                                request.transaction.digest()
398                            );
399                            break;
400                        }
401                        Err(e) => {
402                            if !e.is_retriable() {
403                                inner
404                                    .metrics
405                                    .background_retry_attempts
406                                    .with_label_values(&["non-retriable"])
407                                    .inc();
408                                debug!(
409                                    "Background retry {i} for transaction {} has non-retriable error: {e:?}. Terminating...",
410                                    request.transaction.digest()
411                                );
412                                break;
413                            }
414                            inner
415                                .metrics
416                                .background_retry_attempts
417                                .with_label_values(&["retriable"])
418                                .inc();
419                            debug!(
420                                "Background retry {i} for transaction {} has retriable error: {e:?}. Continuing...",
421                                request.transaction.digest()
422                            );
423                        }
424                    };
425                    tracing::debug!("Wait for {:.3}s before next retry", delay.as_secs_f32());
426                    sleep(delay).await;
427                }
428            });
429        }
430
431        result
432    }
433
434    fn build_response_from_local_effects(
435        &self,
436        effects: sui_types::effects::TransactionEffects,
437        include_events: bool,
438        include_input_objects: bool,
439        include_output_objects: bool,
440    ) -> Result<QuorumTransactionResponse, TransactionSubmissionError> {
441        let epoch = effects.executed_epoch();
442        let events = if include_events {
443            if effects.events_digest().is_some() {
444                Some(
445                    self.validator_state
446                        .get_transaction_events(effects.transaction_digest())
447                        .map_err(TransactionSubmissionError::TransactionDriverInternalError)?,
448                )
449            } else {
450                None
451            }
452        } else {
453            None
454        };
455        let input_objects = include_input_objects
456            .then(|| self.validator_state.get_transaction_input_objects(&effects))
457            .transpose()
458            .map_err(TransactionSubmissionError::TransactionDriverInternalError)?;
459        let output_objects = include_output_objects
460            .then(|| {
461                self.validator_state
462                    .get_transaction_output_objects(&effects)
463            })
464            .transpose()
465            .map_err(TransactionSubmissionError::TransactionDriverInternalError)?;
466
467        Ok(QuorumTransactionResponse {
468            effects: FinalizedEffects {
469                effects,
470                finality_info: EffectsFinalityInfo::QuorumExecuted(epoch),
471            },
472            events,
473            input_objects,
474            output_objects,
475            auxiliary_data: None,
476        })
477    }
478
479    /// Shared implementation for executing transactions with parallel local effects waiting
480    async fn execute_transaction_with_effects_waiting(
481        &self,
482        request: ExecuteTransactionRequestV3,
483        client_addr: Option<SocketAddr>,
484        enforce_live_input_objects: bool,
485    ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), TransactionSubmissionError>
486    {
487        let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
488        let verified_transaction = epoch_store
489            .verify_transaction_with_current_aliases(request.transaction.clone())
490            .map_err(TransactionSubmissionError::InvalidUserSignature)?
491            .into_tx();
492        let tx_digest = *verified_transaction.digest();
493
494        // Early validation check against local state before submission to catch non-retriable errors
495        // TODO: Consider moving this check to TransactionDriver for per-retry validation
496        if self.enable_early_validation
497            && let Err(e) = self.validator_state.check_transaction_validity(
498                &epoch_store,
499                &verified_transaction,
500                enforce_live_input_objects,
501            )
502        {
503            let error_category = e.categorize();
504            if !error_category.is_submission_retriable() {
505                // Skip early validation rejection if transaction has already been executed (allows retries to return cached results)
506                if !self.validator_state.is_tx_already_executed(&tx_digest) {
507                    self.metrics
508                        .early_validation_rejections
509                        .with_label_values(&[e.to_variant_name()])
510                        .inc();
511                    debug!(
512                        error = ?e,
513                        "Transaction rejected during early validation"
514                    );
515
516                    return Err(TransactionSubmissionError::TransactionFailed {
517                        category: error_category,
518                        details: e.to_string(),
519                    });
520                }
521            }
522        }
523
524        // Add transaction to WAL log.
525        let guard =
526            TransactionSubmissionGuard::new(self.pending_tx_log.clone(), &verified_transaction);
527        let is_new_transaction = guard.is_new_transaction();
528
529        let include_events = request.include_events;
530        let include_input_objects = request.include_input_objects;
531        let include_output_objects = request.include_output_objects;
532        let include_auxiliary_data = request.include_auxiliary_data;
533
534        // Check if transaction has already been executed locally and return cached results
535        if let Some(effects) = self
536            .validator_state
537            .get_transaction_cache_reader()
538            .get_executed_effects(&tx_digest)
539        {
540            self.metrics.early_cached_response.inc();
541            debug!(
542                ?tx_digest,
543                "Returning cached results for already-executed transaction"
544            );
545            let response = self.build_response_from_local_effects(
546                effects,
547                include_events,
548                include_input_objects,
549                include_output_objects,
550            )?;
551            return Ok((response, true));
552        }
553
554        let finality_timeout = std::env::var("WAIT_FOR_FINALITY_TIMEOUT_SECS")
555            .ok()
556            .and_then(|v| v.parse().ok())
557            .map(Duration::from_secs)
558            .unwrap_or(WAIT_FOR_FINALITY_TIMEOUT);
559
560        let num_submissions = if !is_new_transaction {
561            // No need to submit when the transaction is already being processed.
562            0
563        } else if in_integration_test() {
564            // Allow duplicated submissions in tests.
565            let r = rand::thread_rng().gen_range(1..=100);
566            let n = if r <= 10 {
567                3
568            } else if r <= 30 {
569                2
570            } else {
571                1
572            };
573            if n > 1 {
574                debug!("Making {n} execution calls");
575            }
576            n
577        } else {
578            1
579        };
580
581        // Wait for one of the execution futures to succeed, or all of them to fail.
582        let mut execution_futures = FuturesUnordered::new();
583        for i in 0..num_submissions {
584            // Generate jitter values outside the async block
585            let should_delay = i > 0 && rand::thread_rng().gen_bool(0.8);
586            let delay_ms = if should_delay {
587                rand::thread_rng().gen_range(100..=500)
588            } else {
589                0
590            };
591
592            let request = request.clone();
593            let verified_transaction = verified_transaction.clone();
594
595            let future = async move {
596                if delay_ms > 0 {
597                    // Add jitters to duplicated submissions.
598                    sleep(Duration::from_millis(delay_ms)).await;
599                }
600                self.execute_transaction_impl(
601                    request,
602                    verified_transaction,
603                    client_addr,
604                    Some(finality_timeout),
605                )
606                .await
607            }
608            .boxed();
609            execution_futures.push(future);
610        }
611
612        // Track the last execution error.
613        let mut last_execution_error: Option<TransactionSubmissionError> = None;
614
615        // Wait for execution result outside of this call to become available.
616        let digests = [tx_digest];
617        let mut local_effects_future = self
618            .validator_state
619            .get_transaction_cache_reader()
620            .notify_read_executed_effects_may_fail(
621                "TransactionOrchestrator::notify_read_execute_transaction_with_effects_waiting",
622                &digests,
623            )
624            .boxed();
625
626        // Wait for execution timeout.
627        let mut timeout_future = tokio::time::sleep(finality_timeout).boxed();
628
629        loop {
630            tokio::select! {
631                // Local effects might be available
632                all_effects_result = &mut local_effects_future => {
633                    let all_effects = all_effects_result
634                        .map_err(TransactionSubmissionError::TransactionDriverInternalError)?;
635                    if all_effects.len() != 1 {
636                        break Err(TransactionSubmissionError::TransactionDriverInternalError(
637                            SuiErrorKind::Unknown(format!("Unexpected number of effects found: {}", all_effects.len())).into()
638                        ));
639                    }
640                    debug!(
641                        "Effects became available while execution was running"
642                    );
643                    self.metrics.concurrent_execution.inc();
644
645                    let effects = all_effects.into_iter().next().unwrap();
646                    let response = self.build_response_from_local_effects(
647                        effects,
648                        include_events,
649                        include_input_objects,
650                        include_output_objects,
651                    )?;
652                    break Ok((response, true));
653                }
654
655                // This branch is disabled if execution_futures is empty.
656                Some(result) = execution_futures.next() => {
657                    match result {
658                        Ok(resp) => {
659                            // First success gets returned.
660                            debug!("Execution succeeded, returning response");
661                            let QuorumTransactionResponse {
662                                effects,
663                                events,
664                                input_objects,
665                                output_objects,
666                                auxiliary_data,
667                            } = resp;
668                            // Filter fields based on request flags.
669                            let resp = QuorumTransactionResponse {
670                                effects,
671                                events: if include_events { events } else { None },
672                                input_objects: if include_input_objects { input_objects } else { None },
673                                output_objects: if include_output_objects { output_objects } else { None },
674                                auxiliary_data: if include_auxiliary_data { auxiliary_data } else { None },
675                            };
676                            break Ok((resp, false));
677                        }
678                        Err(e) => {
679                            debug!(?e, "Execution attempt failed, wait for other attempts");
680                            last_execution_error = Some(e);
681                        }
682                    };
683
684                    // Last error must have been recorded.
685                    if execution_futures.is_empty() {
686                        break Err(last_execution_error.unwrap());
687                    }
688                }
689
690                // A timeout has occurred while waiting for finality
691                _ = &mut timeout_future => {
692                    if let Some(e) = last_execution_error {
693                        debug!("Timeout waiting for transaction finality. Last execution error: {e}");
694                    } else {
695                        debug!("Timeout waiting for transaction finality.");
696                    }
697                    self.metrics.wait_for_finality_timeout.inc();
698
699                    // TODO: Return the last execution error.
700                    break Err(TransactionSubmissionError::TimeoutBeforeFinality);
701                }
702            }
703        }
704    }
705
706    #[instrument(level = "error", skip_all)]
707    async fn execute_transaction_impl(
708        &self,
709        request: ExecuteTransactionRequestV3,
710        verified_transaction: VerifiedTransaction,
711        client_addr: Option<SocketAddr>,
712        finality_timeout: Option<Duration>,
713    ) -> Result<QuorumTransactionResponse, TransactionSubmissionError> {
714        let timer = Instant::now();
715        let tx_digest = *verified_transaction.digest();
716        let tx_type = if verified_transaction.is_consensus_tx() {
717            TxType::SharedObject
718        } else {
719            TxType::SingleWriter
720        };
721
722        let (_in_flight_metrics_guards, good_response_metrics) =
723            self.update_metrics(&request.transaction);
724
725        // TODO: refactor all the gauge and timer metrics with `monitored_scope`
726        let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
727        wait_for_finality_gauge.inc();
728        let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
729            in_flight.dec();
730        });
731
732        let response = self
733            .submit_with_transaction_driver(
734                &self.transaction_driver,
735                &request,
736                client_addr,
737                &verified_transaction,
738                good_response_metrics,
739                finality_timeout,
740            )
741            .await?;
742        let driver_type = "transaction_driver";
743
744        add_server_timing("wait_for_finality done");
745
746        self.metrics.wait_for_finality_finished.inc();
747
748        let elapsed = timer.elapsed().as_secs_f64();
749        if elapsed > 10.0 {
750            warn!(
751                ?tx_digest,
752                "Settlement finality latency {} is too high", elapsed,
753            );
754        }
755        self.metrics
756            .settlement_finality_latency
757            .with_label_values(&[tx_type.as_str(), driver_type])
758            .observe(elapsed);
759        good_response_metrics.inc();
760
761        Ok(response)
762    }
763
764    #[instrument(level = "error", skip_all, err(level = "info"))]
765    async fn submit_with_transaction_driver(
766        &self,
767        td: &Arc<TransactionDriver<A>>,
768        request: &ExecuteTransactionRequestV3,
769        client_addr: Option<SocketAddr>,
770        verified_transaction: &VerifiedTransaction,
771        good_response_metrics: &GenericCounter<AtomicU64>,
772        timeout_duration: Option<Duration>,
773    ) -> Result<QuorumTransactionResponse, TransactionSubmissionError> {
774        let tx_digest = *verified_transaction.digest();
775        let td_response = td
776            .drive_transaction(
777                SubmitTxRequest::new_transaction(request.transaction.clone()),
778                SubmitTransactionOptions {
779                    forwarded_client_addr: client_addr,
780                    allowed_validators: self.td_allowed_submission_list.clone(),
781                    blocked_validators: self.td_blocked_submission_list.clone(),
782                },
783                timeout_duration,
784            )
785            .await
786            .map_err(|e| match e {
787                TransactionDriverError::TimeoutWithLastRetriableError {
788                    last_error,
789                    attempts,
790                    timeout,
791                } => TransactionSubmissionError::TimeoutBeforeFinalityWithErrors {
792                    last_error: last_error.map(|e| e.to_string()).unwrap_or_default(),
793                    attempts,
794                    timeout,
795                },
796                other => TransactionSubmissionError::TransactionFailed {
797                    category: other.categorize(),
798                    details: other.to_string(),
799                },
800            });
801
802        match td_response {
803            Err(e) => {
804                warn!(?tx_digest, "TransactionDriver error: {e:?}");
805                Err(e)
806            }
807            Ok(quorum_transaction_response) => {
808                good_response_metrics.inc();
809                Ok(quorum_transaction_response)
810            }
811        }
812    }
813
814    #[instrument(
815        name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout",
816        level = "debug",
817        skip_all,
818        err(level = "info")
819    )]
820    async fn wait_for_finalized_tx_executed_locally_with_timeout(
821        validator_state: &Arc<AuthorityState>,
822        tx_digest: TransactionDigest,
823        tx_type: TxType,
824        metrics: &TransactionOrchestratorMetrics,
825    ) -> SuiResult {
826        metrics.local_execution_in_flight.inc();
827        let _metrics_guard =
828            scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
829                in_flight.dec();
830            });
831
832        let _latency_guard = metrics
833            .local_execution_latency
834            .with_label_values(&[tx_type.as_str()])
835            .start_timer();
836        debug!("Waiting for finalized tx to be executed locally.");
837        match timeout(LOCAL_EXECUTION_TIMEOUT, async move {
838            validator_state
839                .get_transaction_cache_reader()
840                .notify_read_executed_effects_digests(
841                    "TransactionOrchestrator::notify_read_wait_for_local_execution",
842                    &[tx_digest],
843                )
844                .await;
845            // Wait for the checkpoint containing this tx to be finalized.
846            // Index data is committed before the checkpoint notification fires,
847            // so it is guaranteed to be available when this resolves.
848            let epoch_store = validator_state.load_epoch_store_one_call_per_task();
849            epoch_store
850                .transactions_executed_in_checkpoint_notify(vec![tx_digest])
851                .await
852                .expect("db error waiting for transaction checkpointing");
853        })
854        .instrument(error_span!(
855            "transaction_orchestrator::local_execution",
856            ?tx_digest
857        ))
858        .await
859        {
860            Err(_elapsed) => {
861                debug!(
862                    "Waiting for finalized tx to be executed locally timed out within {:?}.",
863                    LOCAL_EXECUTION_TIMEOUT
864                );
865                metrics.local_execution_timeout.inc();
866                Err(SuiErrorKind::TimeoutError.into())
867            }
868            Ok(_) => {
869                metrics.local_execution_success.inc();
870                Ok(())
871            }
872        }
873    }
874
875    fn update_metrics<'a>(
876        &'a self,
877        transaction: &Transaction,
878    ) -> (impl Drop + use<A>, &'a GenericCounter<AtomicU64>) {
879        let (in_flight, good_response) = if transaction.is_consensus_tx() {
880            self.metrics.total_req_received_shared_object.inc();
881            (
882                self.metrics.req_in_flight_shared_object.clone(),
883                &self.metrics.good_response_shared_object,
884            )
885        } else {
886            self.metrics.total_req_received_single_writer.inc();
887            (
888                self.metrics.req_in_flight_single_writer.clone(),
889                &self.metrics.good_response_single_writer,
890            )
891        };
892        in_flight.inc();
893        (
894            scopeguard::guard(in_flight, |in_flight| {
895                in_flight.dec();
896            }),
897            good_response,
898        )
899    }
900
901    fn start_task_to_recover_txes_in_log(
902        pending_tx_log: Arc<WritePathPendingTransactionLog>,
903        transaction_driver: Arc<TransactionDriver<A>>,
904    ) {
905        spawn_logged_monitored_task!(async move {
906            if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
907                info!("Skipping loading pending transactions from pending_tx_log.");
908                return;
909            }
910            let pending_txes = pending_tx_log
911                .load_all_pending_transactions()
912                .expect("failed to load all pending transactions");
913            let num_pending_txes = pending_txes.len();
914            info!(
915                "Recovering {} pending transactions from pending_tx_log.",
916                num_pending_txes
917            );
918            let mut recovery = pending_txes
919                .into_iter()
920                .map(|tx| {
921                    let pending_tx_log = pending_tx_log.clone();
922                    let transaction_driver = transaction_driver.clone();
923                    async move {
924                        // TODO: ideally pending_tx_log would not contain VerifiedTransaction, but that
925                        // requires a migration.
926                        let tx = tx.into_inner();
927                        let tx_digest = *tx.digest();
928                        // It's not impossible we fail to enqueue a task but that's not the end of world.
929                        // TODO(william) correctly extract client_addr from logs
930                        if let Err(err) = transaction_driver
931                            .drive_transaction(
932                                SubmitTxRequest::new_transaction(tx),
933                                SubmitTransactionOptions::default(),
934                                Some(Duration::from_secs(60)),
935                            )
936                            .await
937                        {
938                            warn!(?tx_digest, "Failed to execute recovered transaction: {err}");
939                        } else {
940                            debug!(?tx_digest, "Executed recovered transaction");
941                        }
942                        if let Err(err) = pending_tx_log.finish_transaction(&tx_digest) {
943                            warn!(
944                                ?tx_digest,
945                                "Failed to clean up transaction in pending log: {err}"
946                            );
947                        } else {
948                            debug!(?tx_digest, "Cleaned up transaction in pending log");
949                        }
950                    }
951                })
952                .collect::<FuturesUnordered<_>>();
953
954            let mut num_recovered = 0;
955            while recovery.next().await.is_some() {
956                num_recovered += 1;
957                if num_recovered % 1000 == 0 {
958                    info!(
959                        "Recovered {} out of {} transactions from pending_tx_log.",
960                        num_recovered, num_pending_txes
961                    );
962                }
963            }
964            info!(
965                "Recovery finished. Recovered {} out of {} transactions from pending_tx_log.",
966                num_recovered, num_pending_txes
967            );
968        });
969    }
970}
971/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
972#[derive(Clone)]
973pub struct TransactionOrchestratorMetrics {
974    total_req_received_single_writer: GenericCounter<AtomicU64>,
975    total_req_received_shared_object: GenericCounter<AtomicU64>,
976
977    good_response_single_writer: GenericCounter<AtomicU64>,
978    good_response_shared_object: GenericCounter<AtomicU64>,
979
980    req_in_flight_single_writer: GenericGauge<AtomicI64>,
981    req_in_flight_shared_object: GenericGauge<AtomicI64>,
982
983    wait_for_finality_in_flight: GenericGauge<AtomicI64>,
984    wait_for_finality_finished: GenericCounter<AtomicU64>,
985    wait_for_finality_timeout: GenericCounter<AtomicU64>,
986
987    local_execution_in_flight: GenericGauge<AtomicI64>,
988    local_execution_success: GenericCounter<AtomicU64>,
989    local_execution_timeout: GenericCounter<AtomicU64>,
990
991    early_cached_response: IntCounter,
992    concurrent_execution: IntCounter,
993
994    early_validation_rejections: IntCounterVec,
995
996    background_retry_started: IntGauge,
997    background_retry_attempts: IntCounterVec,
998
999    request_latency: HistogramVec,
1000    local_execution_latency: HistogramVec,
1001    settlement_finality_latency: HistogramVec,
1002}
1003
1004// Note that labeled-metrics are stored upfront individually
1005// to mitigate the perf hit by MetricsVec.
1006// See https://github.com/tikv/rust-prometheus/tree/master/static-metric
1007impl TransactionOrchestratorMetrics {
1008    pub fn new(registry: &Registry) -> Self {
1009        let total_req_received = register_int_counter_vec_with_registry!(
1010            "tx_orchestrator_total_req_received",
1011            "Total number of executions request Transaction Orchestrator receives, group by tx type",
1012            &["tx_type"],
1013            registry
1014        )
1015        .unwrap();
1016
1017        let total_req_received_single_writer =
1018            total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
1019        let total_req_received_shared_object =
1020            total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
1021
1022        let good_response = register_int_counter_vec_with_registry!(
1023            "tx_orchestrator_good_response",
1024            "Total number of good responses Transaction Orchestrator generates, group by tx type",
1025            &["tx_type"],
1026            registry
1027        )
1028        .unwrap();
1029
1030        let good_response_single_writer =
1031            good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
1032        let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
1033
1034        let req_in_flight = register_int_gauge_vec_with_registry!(
1035            "tx_orchestrator_req_in_flight",
1036            "Number of requests in flights Transaction Orchestrator processes, group by tx type",
1037            &["tx_type"],
1038            registry
1039        )
1040        .unwrap();
1041
1042        let req_in_flight_single_writer =
1043            req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
1044        let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
1045
1046        Self {
1047            total_req_received_single_writer,
1048            total_req_received_shared_object,
1049            good_response_single_writer,
1050            good_response_shared_object,
1051            req_in_flight_single_writer,
1052            req_in_flight_shared_object,
1053            wait_for_finality_in_flight: register_int_gauge_with_registry!(
1054                "tx_orchestrator_wait_for_finality_in_flight",
1055                "Number of in flight txns Transaction Orchestrator are waiting for finality for",
1056                registry,
1057            )
1058            .unwrap(),
1059            wait_for_finality_finished: register_int_counter_with_registry!(
1060                "tx_orchestrator_wait_for_finality_fnished",
1061                "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
1062                registry,
1063            )
1064            .unwrap(),
1065            wait_for_finality_timeout: register_int_counter_with_registry!(
1066                "tx_orchestrator_wait_for_finality_timeout",
1067                "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
1068                registry,
1069            )
1070            .unwrap(),
1071            local_execution_in_flight: register_int_gauge_with_registry!(
1072                "tx_orchestrator_local_execution_in_flight",
1073                "Number of local execution txns in flights Transaction Orchestrator handles",
1074                registry,
1075            )
1076            .unwrap(),
1077            local_execution_success: register_int_counter_with_registry!(
1078                "tx_orchestrator_local_execution_success",
1079                "Total number of successful local execution txns Transaction Orchestrator handles",
1080                registry,
1081            )
1082            .unwrap(),
1083            local_execution_timeout: register_int_counter_with_registry!(
1084                "tx_orchestrator_local_execution_timeout",
1085                "Total number of timed-out local execution txns Transaction Orchestrator handles",
1086                registry,
1087            )
1088            .unwrap(),
1089            early_cached_response: register_int_counter_with_registry!(
1090                "tx_orchestrator_early_cached_response",
1091                "Total number of requests returning cached results for already-executed transactions",
1092                registry,
1093            )
1094            .unwrap(),
1095            concurrent_execution: register_int_counter_with_registry!(
1096                "tx_orchestrator_concurrent_execution",
1097                "Total number of concurrent execution where effects are available locally finishing driving the transaction to finality",
1098                registry,
1099            )
1100            .unwrap(),
1101            early_validation_rejections: register_int_counter_vec_with_registry!(
1102                "tx_orchestrator_early_validation_rejections",
1103                "Total number of transactions rejected during early validation before submission, by reason",
1104                &["reason"],
1105                registry,
1106            )
1107            .unwrap(),
1108            background_retry_started: register_int_gauge_with_registry!(
1109                "tx_orchestrator_background_retry_started",
1110                "Number of background retry tasks kicked off for transactions with retriable errors",
1111                registry,
1112            )
1113            .unwrap(),
1114            background_retry_attempts: register_int_counter_vec_with_registry!(
1115                "tx_orchestrator_background_retry_attempts",
1116                "Total number of background retry attempts, by status",
1117                &["status"],
1118                registry,
1119            )
1120            .unwrap(),
1121            request_latency: register_histogram_vec_with_registry!(
1122                "tx_orchestrator_request_latency",
1123                "Time spent in processing one Transaction Orchestrator request",
1124                &["tx_type", "route", "wait_for_local_execution"],
1125                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1126                registry,
1127            )
1128            .unwrap(),
1129            local_execution_latency: register_histogram_vec_with_registry!(
1130                "tx_orchestrator_local_execution_latency",
1131                "Time spent in waiting for one Transaction Orchestrator gets locally executed",
1132                &["tx_type"],
1133                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1134                registry,
1135            )
1136            .unwrap(),
1137            settlement_finality_latency: register_histogram_vec_with_registry!(
1138                "tx_orchestrator_settlement_finality_latency",
1139                "Time spent in waiting for one Transaction Orchestrator gets settled and finalized",
1140                &["tx_type", "driver_type"],
1141                mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
1142                registry,
1143            )
1144            .unwrap(),
1145        }
1146    }
1147
1148    pub fn new_for_tests() -> Self {
1149        let registry = Registry::new();
1150        Self::new(&registry)
1151    }
1152}
1153
1154#[async_trait::async_trait]
1155impl<A> sui_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
1156where
1157    A: AuthorityAPI + Send + Sync + 'static + Clone,
1158{
1159    async fn execute_transaction(
1160        &self,
1161        request: ExecuteTransactionRequestV3,
1162        client_addr: Option<std::net::SocketAddr>,
1163    ) -> Result<ExecuteTransactionResponseV3, TransactionSubmissionError> {
1164        self.execute_transaction_v3(request, client_addr).await
1165    }
1166
1167    fn simulate_transaction(
1168        &self,
1169        transaction: TransactionData,
1170        checks: TransactionChecks,
1171        allow_mock_gas_coin: bool,
1172    ) -> Result<SimulateTransactionResult, SuiError> {
1173        self.inner
1174            .validator_state
1175            .simulate_transaction(transaction, checks, allow_mock_gas_coin)
1176    }
1177}
1178
1179/// Keeps track of inflight transactions being submitted, and helps recover transactions
1180/// on restart.
1181struct TransactionSubmissionGuard {
1182    pending_tx_log: Arc<WritePathPendingTransactionLog>,
1183    tx_digest: TransactionDigest,
1184    is_new_transaction: bool,
1185}
1186
1187impl TransactionSubmissionGuard {
1188    pub fn new(
1189        pending_tx_log: Arc<WritePathPendingTransactionLog>,
1190        tx: &VerifiedTransaction,
1191    ) -> Self {
1192        let is_new_transaction = pending_tx_log.write_pending_transaction_maybe(tx);
1193        let tx_digest = *tx.digest();
1194        if is_new_transaction {
1195            debug!(?tx_digest, "Added transaction to inflight set");
1196        } else {
1197            debug!(
1198                ?tx_digest,
1199                "Transaction already being processed, no new submission will be made"
1200            );
1201        };
1202        Self {
1203            pending_tx_log,
1204            tx_digest,
1205            is_new_transaction,
1206        }
1207    }
1208
1209    fn is_new_transaction(&self) -> bool {
1210        self.is_new_transaction
1211    }
1212}
1213
1214impl Drop for TransactionSubmissionGuard {
1215    fn drop(&mut self) {
1216        if let Err(err) = self.pending_tx_log.finish_transaction(&self.tx_digest) {
1217            warn!(?self.tx_digest, "Failed to clean up transaction in pending log: {err}");
1218        } else {
1219            debug!(?self.tx_digest, "Cleaned up transaction in pending log");
1220        }
1221    }
1222}