sui_core/
transaction_orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::net::SocketAddr;
5use std::path::Path;
6use std::sync::Arc;
7use std::time::Duration;
8
9use futures::FutureExt;
10use futures::stream::{FuturesUnordered, StreamExt};
11use mysten_common::{backoff, in_integration_test};
12use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, spawn_monitored_task};
13use mysten_metrics::{add_server_timing, spawn_logged_monitored_task};
14use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
15use prometheus::{
16    HistogramVec, IntCounter, IntCounterVec, IntGauge, Registry,
17    register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
18    register_int_counter_with_registry, register_int_gauge_vec_with_registry,
19    register_int_gauge_with_registry,
20};
21use rand::Rng;
22use sui_config::NodeConfig;
23use sui_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
24use sui_types::base_types::TransactionDigest;
25use sui_types::effects::TransactionEffectsAPI;
26use sui_types::error::{ErrorCategory, SuiError, SuiErrorKind, SuiResult};
27use sui_types::messages_grpc::{SubmitTxRequest, TxType};
28use sui_types::sui_system_state::SuiSystemState;
29use sui_types::transaction::{Transaction, TransactionData, VerifiedTransaction};
30use sui_types::transaction_driver_types::{
31    EffectsFinalityInfo, ExecuteTransactionRequestType, ExecuteTransactionRequestV3,
32    ExecuteTransactionResponseV3, FinalizedEffects, IsTransactionExecutedLocally,
33    TransactionSubmissionError,
34};
35use sui_types::transaction_executor::{SimulateTransactionResult, TransactionChecks};
36use tokio::sync::broadcast::Receiver;
37use tokio::time::{Instant, sleep, timeout};
38use tracing::{Instrument, debug, error_span, info, instrument, warn};
39
40use crate::authority::AuthorityState;
41use crate::authority_aggregator::AuthorityAggregator;
42use crate::authority_client::{AuthorityAPI, NetworkAuthorityClient};
43use crate::transaction_driver::{OnsiteReconfigObserver, ReconfigObserver};
44use crate::transaction_driver::{
45    QuorumTransactionResponse, SubmitTransactionOptions, TransactionDriver, TransactionDriverError,
46    TransactionDriverMetrics,
47};
48
49// How long to wait for local execution (including parents) before a timeout
50// is returned to client.
51const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
52
53// Timeout for waiting for finality for each transaction.
54const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(90);
55
56pub type QuorumTransactionEffectsResult = Result<
57    (Transaction, QuorumTransactionResponse),
58    (TransactionDigest, TransactionSubmissionError),
59>;
60
61/// Transaction Orchestrator is a Node component that utilizes Transaction Driver to
62/// submit transactions to validators for finality. It adds inflight deduplication,
63/// waiting for local execution, recovery, early validation, and epoch change handling
64/// on top of Transaction Driver.
65/// This is used by node RPC service to support transaction submission and finality waiting.
66pub struct TransactionOrchestrator<A: Clone> {
67    inner: Arc<Inner<A>>,
68}
69
70impl TransactionOrchestrator<NetworkAuthorityClient> {
71    pub fn new_with_auth_aggregator(
72        validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
73        validator_state: Arc<AuthorityState>,
74        reconfig_channel: Receiver<SuiSystemState>,
75        parent_path: &Path,
76        prometheus_registry: &Registry,
77        node_config: &NodeConfig,
78    ) -> Self {
79        let observer = OnsiteReconfigObserver::new(
80            reconfig_channel,
81            validator_state.get_object_cache_reader().clone(),
82            validator_state.clone_committee_store(),
83            validators.safe_client_metrics_base.clone(),
84        );
85        TransactionOrchestrator::new(
86            validators,
87            validator_state,
88            parent_path,
89            prometheus_registry,
90            observer,
91            node_config,
92        )
93    }
94}
95
96impl<A> TransactionOrchestrator<A>
97where
98    A: AuthorityAPI + Send + Sync + 'static + Clone,
99    OnsiteReconfigObserver: ReconfigObserver<A>,
100{
101    pub fn new(
102        validators: Arc<AuthorityAggregator<A>>,
103        validator_state: Arc<AuthorityState>,
104        parent_path: &Path,
105        prometheus_registry: &Registry,
106        reconfig_observer: OnsiteReconfigObserver,
107        node_config: &NodeConfig,
108    ) -> Self {
109        let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
110        let td_metrics = Arc::new(TransactionDriverMetrics::new(prometheus_registry));
111        let client_metrics = Arc::new(
112            crate::validator_client_monitor::ValidatorClientMetrics::new(prometheus_registry),
113        );
114        let reconfig_observer = Arc::new(reconfig_observer);
115
116        let transaction_driver = TransactionDriver::new(
117            validators.clone(),
118            reconfig_observer.clone(),
119            td_metrics,
120            Some(node_config),
121            client_metrics,
122        );
123
124        let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
125            parent_path.join("fullnode_pending_transactions"),
126        ));
127        Inner::start_task_to_recover_txes_in_log(
128            pending_tx_log.clone(),
129            transaction_driver.clone(),
130        );
131
132        let td_allowed_submission_list = node_config
133            .transaction_driver_config
134            .as_ref()
135            .map(|config| config.allowed_submission_validators.clone())
136            .unwrap_or_default();
137
138        let td_blocked_submission_list = node_config
139            .transaction_driver_config
140            .as_ref()
141            .map(|config| config.blocked_submission_validators.clone())
142            .unwrap_or_default();
143
144        if !td_allowed_submission_list.is_empty() && !td_blocked_submission_list.is_empty() {
145            panic!(
146                "Both allowed and blocked submission lists are set, this is not allowed, {:?} {:?}",
147                td_allowed_submission_list, td_blocked_submission_list
148            );
149        }
150
151        let enable_early_validation = node_config
152            .transaction_driver_config
153            .as_ref()
154            .map(|config| config.enable_early_validation)
155            .unwrap_or(true);
156
157        let inner = Arc::new(Inner {
158            validator_state,
159            pending_tx_log,
160            metrics,
161            transaction_driver,
162            td_allowed_submission_list,
163            td_blocked_submission_list,
164            enable_early_validation,
165        });
166        Self { inner }
167    }
168}
169
170impl<A> TransactionOrchestrator<A>
171where
172    A: AuthorityAPI + Send + Sync + 'static + Clone,
173{
174    #[instrument(name = "tx_orchestrator_execute_transaction", level = "debug", skip_all,
175    fields(
176        tx_digest = ?request.transaction.digest(),
177        tx_type = ?request_type,
178    ))]
179    pub async fn execute_transaction_block(
180        &self,
181        request: ExecuteTransactionRequestV3,
182        request_type: ExecuteTransactionRequestType,
183        client_addr: Option<SocketAddr>,
184    ) -> Result<
185        (ExecuteTransactionResponseV3, IsTransactionExecutedLocally),
186        TransactionSubmissionError,
187    > {
188        let timer = Instant::now();
189        let tx_type = if request.transaction.is_consensus_tx() {
190            TxType::SharedObject
191        } else {
192            TxType::SingleWriter
193        };
194        let tx_digest = *request.transaction.digest();
195
196        let inner = self.inner.clone();
197        let (response, mut executed_locally) = spawn_monitored_task!(
198            Inner::<A>::execute_transaction_with_retry(inner, request, client_addr)
199        )
200        .await
201        .map_err(|e| TransactionSubmissionError::TransactionFailed {
202            category: ErrorCategory::Internal,
203            details: e.to_string(),
204        })??;
205
206        if !executed_locally {
207            executed_locally = if matches!(
208                request_type,
209                ExecuteTransactionRequestType::WaitForLocalExecution
210            ) {
211                let executed_locally =
212                    Inner::<A>::wait_for_finalized_tx_executed_locally_with_timeout(
213                        &self.inner.validator_state,
214                        tx_digest,
215                        tx_type,
216                        &self.inner.metrics,
217                    )
218                    .await
219                    .is_ok();
220                add_server_timing("local_execution done");
221                executed_locally
222            } else {
223                false
224            };
225        }
226
227        let QuorumTransactionResponse {
228            effects,
229            events,
230            input_objects,
231            output_objects,
232            auxiliary_data,
233        } = response;
234
235        let response = ExecuteTransactionResponseV3 {
236            effects,
237            events,
238            input_objects,
239            output_objects,
240            auxiliary_data,
241        };
242
243        let request_latency = timer.elapsed().as_secs_f64();
244        if request_latency > 10.0 {
245            warn!(
246                ?tx_digest,
247                "Request latency {} is too high", request_latency,
248            );
249        }
250        self.inner
251            .metrics
252            .request_latency
253            .with_label_values(&[
254                tx_type.as_str(),
255                "execute_transaction_block",
256                executed_locally.to_string().as_str(),
257            ])
258            .observe(request_latency);
259
260        Ok((response, executed_locally))
261    }
262
263    // Utilize the handle_certificate_v3 validator api to request input/output objects
264    #[instrument(name = "tx_orchestrator_execute_transaction_v3", level = "debug", skip_all,
265                 fields(tx_digest = ?request.transaction.digest()))]
266    pub async fn execute_transaction_v3(
267        &self,
268        request: ExecuteTransactionRequestV3,
269        client_addr: Option<SocketAddr>,
270    ) -> Result<ExecuteTransactionResponseV3, TransactionSubmissionError> {
271        let timer = Instant::now();
272        let tx_type = if request.transaction.is_consensus_tx() {
273            TxType::SharedObject
274        } else {
275            TxType::SingleWriter
276        };
277
278        let inner = self.inner.clone();
279        let (response, _) = spawn_monitored_task!(Inner::<A>::execute_transaction_with_retry(
280            inner,
281            request,
282            client_addr
283        ))
284        .await
285        .map_err(|e| TransactionSubmissionError::TransactionFailed {
286            category: ErrorCategory::Internal,
287            details: e.to_string(),
288        })??;
289
290        self.inner
291            .metrics
292            .request_latency
293            .with_label_values(&[tx_type.as_str(), "execute_transaction_v3", "false"])
294            .observe(timer.elapsed().as_secs_f64());
295
296        let QuorumTransactionResponse {
297            effects,
298            events,
299            input_objects,
300            output_objects,
301            auxiliary_data,
302        } = response;
303
304        Ok(ExecuteTransactionResponseV3 {
305            effects,
306            events,
307            input_objects,
308            output_objects,
309            auxiliary_data,
310        })
311    }
312
313    pub fn authority_state(&self) -> &Arc<AuthorityState> {
314        &self.inner.validator_state
315    }
316
317    pub fn transaction_driver(&self) -> &Arc<TransactionDriver<A>> {
318        &self.inner.transaction_driver
319    }
320
321    pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
322        self.inner
323            .transaction_driver
324            .authority_aggregator()
325            .load_full()
326    }
327
328    pub fn load_all_pending_transactions_in_test(&self) -> SuiResult<Vec<VerifiedTransaction>> {
329        self.inner.pending_tx_log.load_all_pending_transactions()
330    }
331
332    pub fn empty_pending_tx_log_in_test(&self) -> bool {
333        self.inner.pending_tx_log.is_empty()
334    }
335}
336
337struct Inner<A: Clone> {
338    validator_state: Arc<AuthorityState>,
339    pending_tx_log: Arc<WritePathPendingTransactionLog>,
340    metrics: Arc<TransactionOrchestratorMetrics>,
341    transaction_driver: Arc<TransactionDriver<A>>,
342    td_allowed_submission_list: Vec<String>,
343    td_blocked_submission_list: Vec<String>,
344    enable_early_validation: bool,
345}
346
347impl<A> Inner<A>
348where
349    A: AuthorityAPI + Send + Sync + 'static + Clone,
350{
351    async fn execute_transaction_with_retry(
352        inner: Arc<Inner<A>>,
353        request: ExecuteTransactionRequestV3,
354        client_addr: Option<SocketAddr>,
355    ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), TransactionSubmissionError>
356    {
357        let result = inner
358            .execute_transaction_with_effects_waiting(
359                request.clone(),
360                client_addr,
361                /* enforce_live_input_objects */ false,
362            )
363            .await;
364
365        // If the error is retriable, retry the transaction sufficiently long.
366        if let Err(e) = &result
367            && e.is_retriable()
368        {
369            spawn_monitored_task!(async move {
370                inner.metrics.background_retry_started.inc();
371                let backoff = backoff::ExponentialBackoff::new(
372                    Duration::from_secs(1),
373                    Duration::from_secs(300),
374                );
375                const MAX_RETRIES: usize = 10;
376                for (i, delay) in backoff.enumerate() {
377                    if i == MAX_RETRIES {
378                        break;
379                    }
380                    // Start to enforce live input after 3 retries,
381                    // to avoid excessively retrying transactions with non-existent input objects.
382                    let result = inner
383                        .execute_transaction_with_effects_waiting(
384                            request.clone(),
385                            client_addr,
386                            i > 3,
387                        )
388                        .await;
389                    match result {
390                        Ok(_) => {
391                            inner
392                                .metrics
393                                .background_retry_attempts
394                                .with_label_values(&["success"])
395                                .inc();
396                            debug!(
397                                "Background retry {i} for transaction {} succeeded",
398                                request.transaction.digest()
399                            );
400                            break;
401                        }
402                        Err(e) => {
403                            if !e.is_retriable() {
404                                inner
405                                    .metrics
406                                    .background_retry_attempts
407                                    .with_label_values(&["non-retriable"])
408                                    .inc();
409                                debug!(
410                                    "Background retry {i} for transaction {} has non-retriable error: {e:?}. Terminating...",
411                                    request.transaction.digest()
412                                );
413                                break;
414                            }
415                            inner
416                                .metrics
417                                .background_retry_attempts
418                                .with_label_values(&["retriable"])
419                                .inc();
420                            debug!(
421                                "Background retry {i} for transaction {} has retriable error: {e:?}. Continuing...",
422                                request.transaction.digest()
423                            );
424                        }
425                    };
426                    tracing::debug!("Wait for {:.3}s before next retry", delay.as_secs_f32());
427                    sleep(delay).await;
428                }
429            });
430        }
431
432        result
433    }
434
435    fn build_response_from_local_effects(
436        &self,
437        effects: sui_types::effects::TransactionEffects,
438        include_events: bool,
439        include_input_objects: bool,
440        include_output_objects: bool,
441    ) -> Result<QuorumTransactionResponse, TransactionSubmissionError> {
442        let epoch = effects.executed_epoch();
443        let events = if include_events {
444            if effects.events_digest().is_some() {
445                Some(
446                    self.validator_state
447                        .get_transaction_events(effects.transaction_digest())
448                        .map_err(TransactionSubmissionError::TransactionDriverInternalError)?,
449                )
450            } else {
451                None
452            }
453        } else {
454            None
455        };
456        let input_objects = include_input_objects
457            .then(|| self.validator_state.get_transaction_input_objects(&effects))
458            .transpose()
459            .map_err(TransactionSubmissionError::TransactionDriverInternalError)?;
460        let output_objects = include_output_objects
461            .then(|| {
462                self.validator_state
463                    .get_transaction_output_objects(&effects)
464            })
465            .transpose()
466            .map_err(TransactionSubmissionError::TransactionDriverInternalError)?;
467
468        Ok(QuorumTransactionResponse {
469            effects: FinalizedEffects {
470                effects,
471                finality_info: EffectsFinalityInfo::QuorumExecuted(epoch),
472            },
473            events,
474            input_objects,
475            output_objects,
476            auxiliary_data: None,
477        })
478    }
479
480    /// Shared implementation for executing transactions with parallel local effects waiting
481    async fn execute_transaction_with_effects_waiting(
482        &self,
483        request: ExecuteTransactionRequestV3,
484        client_addr: Option<SocketAddr>,
485        enforce_live_input_objects: bool,
486    ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), TransactionSubmissionError>
487    {
488        let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
489        let verified_transaction = epoch_store
490            .verify_transaction_with_current_aliases(request.transaction.clone())
491            .map_err(TransactionSubmissionError::InvalidUserSignature)?
492            .into_tx();
493        let tx_digest = *verified_transaction.digest();
494
495        // Early validation check against local state before submission to catch non-retriable errors
496        // TODO: Consider moving this check to TransactionDriver for per-retry validation
497        if self.enable_early_validation
498            && let Err(e) = self.validator_state.check_transaction_validity(
499                &epoch_store,
500                &verified_transaction,
501                enforce_live_input_objects,
502            )
503        {
504            let error_category = e.categorize();
505            if !error_category.is_submission_retriable() {
506                // Skip early validation rejection if transaction has already been executed (allows retries to return cached results)
507                if !self.validator_state.is_tx_already_executed(&tx_digest) {
508                    self.metrics
509                        .early_validation_rejections
510                        .with_label_values(&[e.to_variant_name()])
511                        .inc();
512                    debug!(
513                        error = ?e,
514                        "Transaction rejected during early validation"
515                    );
516
517                    return Err(TransactionSubmissionError::TransactionFailed {
518                        category: error_category,
519                        details: e.to_string(),
520                    });
521                }
522            }
523        }
524
525        // Add transaction to WAL log.
526        let guard =
527            TransactionSubmissionGuard::new(self.pending_tx_log.clone(), &verified_transaction);
528        let is_new_transaction = guard.is_new_transaction();
529
530        let include_events = request.include_events;
531        let include_input_objects = request.include_input_objects;
532        let include_output_objects = request.include_output_objects;
533        let include_auxiliary_data = request.include_auxiliary_data;
534
535        // Check if transaction has already been executed locally and return cached results
536        if let Some(effects) = self
537            .validator_state
538            .get_transaction_cache_reader()
539            .get_executed_effects(&tx_digest)
540        {
541            self.metrics.early_cached_response.inc();
542            debug!(
543                ?tx_digest,
544                "Returning cached results for already-executed transaction"
545            );
546            let response = self.build_response_from_local_effects(
547                effects,
548                include_events,
549                include_input_objects,
550                include_output_objects,
551            )?;
552            return Ok((response, true));
553        }
554
555        let finality_timeout = std::env::var("WAIT_FOR_FINALITY_TIMEOUT_SECS")
556            .ok()
557            .and_then(|v| v.parse().ok())
558            .map(Duration::from_secs)
559            .unwrap_or(WAIT_FOR_FINALITY_TIMEOUT);
560
561        let num_submissions = if !is_new_transaction {
562            // No need to submit when the transaction is already being processed.
563            0
564        } else if in_integration_test() {
565            // Allow duplicated submissions in tests.
566            let r = rand::thread_rng().gen_range(1..=100);
567            let n = if r <= 10 {
568                3
569            } else if r <= 30 {
570                2
571            } else {
572                1
573            };
574            if n > 1 {
575                debug!("Making {n} execution calls");
576            }
577            n
578        } else {
579            1
580        };
581
582        // Wait for one of the execution futures to succeed, or all of them to fail.
583        let mut execution_futures = FuturesUnordered::new();
584        for i in 0..num_submissions {
585            // Generate jitter values outside the async block
586            let should_delay = i > 0 && rand::thread_rng().gen_bool(0.8);
587            let delay_ms = if should_delay {
588                rand::thread_rng().gen_range(100..=500)
589            } else {
590                0
591            };
592
593            let request = request.clone();
594            let verified_transaction = verified_transaction.clone();
595
596            let future = async move {
597                if delay_ms > 0 {
598                    // Add jitters to duplicated submissions.
599                    sleep(Duration::from_millis(delay_ms)).await;
600                }
601                self.execute_transaction_impl(
602                    request,
603                    verified_transaction,
604                    client_addr,
605                    Some(finality_timeout),
606                )
607                .await
608            }
609            .boxed();
610            execution_futures.push(future);
611        }
612
613        // Track the last execution error.
614        let mut last_execution_error: Option<TransactionSubmissionError> = None;
615
616        // Wait for execution result outside of this call to become available.
617        let digests = [tx_digest];
618        let mut local_effects_future = self
619            .validator_state
620            .get_transaction_cache_reader()
621            .notify_read_executed_effects_may_fail(
622                "TransactionOrchestrator::notify_read_execute_transaction_with_effects_waiting",
623                &digests,
624            )
625            .boxed();
626
627        // Wait for execution timeout.
628        let mut timeout_future = tokio::time::sleep(finality_timeout).boxed();
629
630        loop {
631            tokio::select! {
632                // Local effects might be available
633                all_effects_result = &mut local_effects_future => {
634                    let all_effects = all_effects_result
635                        .map_err(TransactionSubmissionError::TransactionDriverInternalError)?;
636                    if all_effects.len() != 1 {
637                        break Err(TransactionSubmissionError::TransactionDriverInternalError(
638                            SuiErrorKind::Unknown(format!("Unexpected number of effects found: {}", all_effects.len())).into()
639                        ));
640                    }
641                    debug!(
642                        "Effects became available while execution was running"
643                    );
644                    self.metrics.concurrent_execution.inc();
645
646                    let effects = all_effects.into_iter().next().unwrap();
647                    let response = self.build_response_from_local_effects(
648                        effects,
649                        include_events,
650                        include_input_objects,
651                        include_output_objects,
652                    )?;
653                    break Ok((response, true));
654                }
655
656                // This branch is disabled if execution_futures is empty.
657                Some(result) = execution_futures.next() => {
658                    match result {
659                        Ok(resp) => {
660                            // First success gets returned.
661                            debug!("Execution succeeded, returning response");
662                            let QuorumTransactionResponse {
663                                effects,
664                                events,
665                                input_objects,
666                                output_objects,
667                                auxiliary_data,
668                            } = resp;
669                            // Filter fields based on request flags.
670                            let resp = QuorumTransactionResponse {
671                                effects,
672                                events: if include_events { events } else { None },
673                                input_objects: if include_input_objects { input_objects } else { None },
674                                output_objects: if include_output_objects { output_objects } else { None },
675                                auxiliary_data: if include_auxiliary_data { auxiliary_data } else { None },
676                            };
677                            break Ok((resp, false));
678                        }
679                        Err(e) => {
680                            debug!(?e, "Execution attempt failed, wait for other attempts");
681                            last_execution_error = Some(e);
682                        }
683                    };
684
685                    // Last error must have been recorded.
686                    if execution_futures.is_empty() {
687                        break Err(last_execution_error.unwrap());
688                    }
689                }
690
691                // A timeout has occurred while waiting for finality
692                _ = &mut timeout_future => {
693                    if let Some(e) = last_execution_error {
694                        debug!("Timeout waiting for transaction finality. Last execution error: {e}");
695                    } else {
696                        debug!("Timeout waiting for transaction finality.");
697                    }
698                    self.metrics.wait_for_finality_timeout.inc();
699
700                    // TODO: Return the last execution error.
701                    break Err(TransactionSubmissionError::TimeoutBeforeFinality);
702                }
703            }
704        }
705    }
706
707    #[instrument(level = "error", skip_all)]
708    async fn execute_transaction_impl(
709        &self,
710        request: ExecuteTransactionRequestV3,
711        verified_transaction: VerifiedTransaction,
712        client_addr: Option<SocketAddr>,
713        finality_timeout: Option<Duration>,
714    ) -> Result<QuorumTransactionResponse, TransactionSubmissionError> {
715        let timer = Instant::now();
716        let tx_digest = *verified_transaction.digest();
717        let tx_type = if verified_transaction.is_consensus_tx() {
718            TxType::SharedObject
719        } else {
720            TxType::SingleWriter
721        };
722
723        let (_in_flight_metrics_guards, good_response_metrics) =
724            self.update_metrics(&request.transaction);
725
726        // TODO: refactor all the gauge and timer metrics with `monitored_scope`
727        let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
728        wait_for_finality_gauge.inc();
729        let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
730            in_flight.dec();
731        });
732
733        let response = self
734            .submit_with_transaction_driver(
735                &self.transaction_driver,
736                &request,
737                client_addr,
738                &verified_transaction,
739                good_response_metrics,
740                finality_timeout,
741            )
742            .await?;
743        let driver_type = "transaction_driver";
744
745        add_server_timing("wait_for_finality done");
746
747        self.metrics.wait_for_finality_finished.inc();
748
749        let elapsed = timer.elapsed().as_secs_f64();
750        if elapsed > 10.0 {
751            warn!(
752                ?tx_digest,
753                "Settlement finality latency {} is too high", elapsed,
754            );
755        }
756        self.metrics
757            .settlement_finality_latency
758            .with_label_values(&[tx_type.as_str(), driver_type])
759            .observe(elapsed);
760        good_response_metrics.inc();
761
762        Ok(response)
763    }
764
765    #[instrument(level = "error", skip_all, err(level = "info"))]
766    async fn submit_with_transaction_driver(
767        &self,
768        td: &Arc<TransactionDriver<A>>,
769        request: &ExecuteTransactionRequestV3,
770        client_addr: Option<SocketAddr>,
771        verified_transaction: &VerifiedTransaction,
772        good_response_metrics: &GenericCounter<AtomicU64>,
773        timeout_duration: Option<Duration>,
774    ) -> Result<QuorumTransactionResponse, TransactionSubmissionError> {
775        let tx_digest = *verified_transaction.digest();
776        let td_response = td
777            .drive_transaction(
778                SubmitTxRequest::new_transaction(request.transaction.clone()),
779                SubmitTransactionOptions {
780                    forwarded_client_addr: client_addr,
781                    allowed_validators: self.td_allowed_submission_list.clone(),
782                    blocked_validators: self.td_blocked_submission_list.clone(),
783                },
784                timeout_duration,
785            )
786            .await
787            .map_err(|e| match e {
788                TransactionDriverError::TimeoutWithLastRetriableError {
789                    last_error,
790                    attempts,
791                    timeout,
792                } => TransactionSubmissionError::TimeoutBeforeFinalityWithErrors {
793                    last_error: last_error.map(|e| e.to_string()).unwrap_or_default(),
794                    attempts,
795                    timeout,
796                },
797                other => TransactionSubmissionError::TransactionFailed {
798                    category: other.categorize(),
799                    details: other.to_string(),
800                },
801            });
802
803        match td_response {
804            Err(e) => {
805                warn!(?tx_digest, "TransactionDriver error: {e:?}");
806                Err(e)
807            }
808            Ok(quorum_transaction_response) => {
809                good_response_metrics.inc();
810                Ok(quorum_transaction_response)
811            }
812        }
813    }
814
815    #[instrument(
816        name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout",
817        level = "debug",
818        skip_all,
819        err(level = "info")
820    )]
821    async fn wait_for_finalized_tx_executed_locally_with_timeout(
822        validator_state: &Arc<AuthorityState>,
823        tx_digest: TransactionDigest,
824        tx_type: TxType,
825        metrics: &TransactionOrchestratorMetrics,
826    ) -> SuiResult {
827        metrics.local_execution_in_flight.inc();
828        let _metrics_guard =
829            scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
830                in_flight.dec();
831            });
832
833        let _latency_guard = metrics
834            .local_execution_latency
835            .with_label_values(&[tx_type.as_str()])
836            .start_timer();
837        debug!("Waiting for finalized tx to be executed locally.");
838        match timeout(
839            LOCAL_EXECUTION_TIMEOUT,
840            validator_state
841                .get_transaction_cache_reader()
842                .notify_read_executed_effects_digests(
843                    "TransactionOrchestrator::notify_read_wait_for_local_execution",
844                    &[tx_digest],
845                ),
846        )
847        .instrument(error_span!(
848            "transaction_orchestrator::local_execution",
849            ?tx_digest
850        ))
851        .await
852        {
853            Err(_elapsed) => {
854                debug!(
855                    "Waiting for finalized tx to be executed locally timed out within {:?}.",
856                    LOCAL_EXECUTION_TIMEOUT
857                );
858                metrics.local_execution_timeout.inc();
859                Err(SuiErrorKind::TimeoutError.into())
860            }
861            Ok(_) => {
862                metrics.local_execution_success.inc();
863                Ok(())
864            }
865        }
866    }
867
868    fn update_metrics<'a>(
869        &'a self,
870        transaction: &Transaction,
871    ) -> (impl Drop + use<A>, &'a GenericCounter<AtomicU64>) {
872        let (in_flight, good_response) = if transaction.is_consensus_tx() {
873            self.metrics.total_req_received_shared_object.inc();
874            (
875                self.metrics.req_in_flight_shared_object.clone(),
876                &self.metrics.good_response_shared_object,
877            )
878        } else {
879            self.metrics.total_req_received_single_writer.inc();
880            (
881                self.metrics.req_in_flight_single_writer.clone(),
882                &self.metrics.good_response_single_writer,
883            )
884        };
885        in_flight.inc();
886        (
887            scopeguard::guard(in_flight, |in_flight| {
888                in_flight.dec();
889            }),
890            good_response,
891        )
892    }
893
894    fn start_task_to_recover_txes_in_log(
895        pending_tx_log: Arc<WritePathPendingTransactionLog>,
896        transaction_driver: Arc<TransactionDriver<A>>,
897    ) {
898        spawn_logged_monitored_task!(async move {
899            if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
900                info!("Skipping loading pending transactions from pending_tx_log.");
901                return;
902            }
903            let pending_txes = pending_tx_log
904                .load_all_pending_transactions()
905                .expect("failed to load all pending transactions");
906            let num_pending_txes = pending_txes.len();
907            info!(
908                "Recovering {} pending transactions from pending_tx_log.",
909                num_pending_txes
910            );
911            let mut recovery = pending_txes
912                .into_iter()
913                .map(|tx| {
914                    let pending_tx_log = pending_tx_log.clone();
915                    let transaction_driver = transaction_driver.clone();
916                    async move {
917                        // TODO: ideally pending_tx_log would not contain VerifiedTransaction, but that
918                        // requires a migration.
919                        let tx = tx.into_inner();
920                        let tx_digest = *tx.digest();
921                        // It's not impossible we fail to enqueue a task but that's not the end of world.
922                        // TODO(william) correctly extract client_addr from logs
923                        if let Err(err) = transaction_driver
924                            .drive_transaction(
925                                SubmitTxRequest::new_transaction(tx),
926                                SubmitTransactionOptions::default(),
927                                Some(Duration::from_secs(60)),
928                            )
929                            .await
930                        {
931                            warn!(?tx_digest, "Failed to execute recovered transaction: {err}");
932                        } else {
933                            debug!(?tx_digest, "Executed recovered transaction");
934                        }
935                        if let Err(err) = pending_tx_log.finish_transaction(&tx_digest) {
936                            warn!(
937                                ?tx_digest,
938                                "Failed to clean up transaction in pending log: {err}"
939                            );
940                        } else {
941                            debug!(?tx_digest, "Cleaned up transaction in pending log");
942                        }
943                    }
944                })
945                .collect::<FuturesUnordered<_>>();
946
947            let mut num_recovered = 0;
948            while recovery.next().await.is_some() {
949                num_recovered += 1;
950                if num_recovered % 1000 == 0 {
951                    info!(
952                        "Recovered {} out of {} transactions from pending_tx_log.",
953                        num_recovered, num_pending_txes
954                    );
955                }
956            }
957            info!(
958                "Recovery finished. Recovered {} out of {} transactions from pending_tx_log.",
959                num_recovered, num_pending_txes
960            );
961        });
962    }
963}
964/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
965#[derive(Clone)]
966pub struct TransactionOrchestratorMetrics {
967    total_req_received_single_writer: GenericCounter<AtomicU64>,
968    total_req_received_shared_object: GenericCounter<AtomicU64>,
969
970    good_response_single_writer: GenericCounter<AtomicU64>,
971    good_response_shared_object: GenericCounter<AtomicU64>,
972
973    req_in_flight_single_writer: GenericGauge<AtomicI64>,
974    req_in_flight_shared_object: GenericGauge<AtomicI64>,
975
976    wait_for_finality_in_flight: GenericGauge<AtomicI64>,
977    wait_for_finality_finished: GenericCounter<AtomicU64>,
978    wait_for_finality_timeout: GenericCounter<AtomicU64>,
979
980    local_execution_in_flight: GenericGauge<AtomicI64>,
981    local_execution_success: GenericCounter<AtomicU64>,
982    local_execution_timeout: GenericCounter<AtomicU64>,
983
984    early_cached_response: IntCounter,
985    concurrent_execution: IntCounter,
986
987    early_validation_rejections: IntCounterVec,
988
989    background_retry_started: IntGauge,
990    background_retry_attempts: IntCounterVec,
991
992    request_latency: HistogramVec,
993    local_execution_latency: HistogramVec,
994    settlement_finality_latency: HistogramVec,
995}
996
997// Note that labeled-metrics are stored upfront individually
998// to mitigate the perf hit by MetricsVec.
999// See https://github.com/tikv/rust-prometheus/tree/master/static-metric
1000impl TransactionOrchestratorMetrics {
1001    pub fn new(registry: &Registry) -> Self {
1002        let total_req_received = register_int_counter_vec_with_registry!(
1003            "tx_orchestrator_total_req_received",
1004            "Total number of executions request Transaction Orchestrator receives, group by tx type",
1005            &["tx_type"],
1006            registry
1007        )
1008        .unwrap();
1009
1010        let total_req_received_single_writer =
1011            total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
1012        let total_req_received_shared_object =
1013            total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
1014
1015        let good_response = register_int_counter_vec_with_registry!(
1016            "tx_orchestrator_good_response",
1017            "Total number of good responses Transaction Orchestrator generates, group by tx type",
1018            &["tx_type"],
1019            registry
1020        )
1021        .unwrap();
1022
1023        let good_response_single_writer =
1024            good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
1025        let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
1026
1027        let req_in_flight = register_int_gauge_vec_with_registry!(
1028            "tx_orchestrator_req_in_flight",
1029            "Number of requests in flights Transaction Orchestrator processes, group by tx type",
1030            &["tx_type"],
1031            registry
1032        )
1033        .unwrap();
1034
1035        let req_in_flight_single_writer =
1036            req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
1037        let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
1038
1039        Self {
1040            total_req_received_single_writer,
1041            total_req_received_shared_object,
1042            good_response_single_writer,
1043            good_response_shared_object,
1044            req_in_flight_single_writer,
1045            req_in_flight_shared_object,
1046            wait_for_finality_in_flight: register_int_gauge_with_registry!(
1047                "tx_orchestrator_wait_for_finality_in_flight",
1048                "Number of in flight txns Transaction Orchestrator are waiting for finality for",
1049                registry,
1050            )
1051            .unwrap(),
1052            wait_for_finality_finished: register_int_counter_with_registry!(
1053                "tx_orchestrator_wait_for_finality_fnished",
1054                "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
1055                registry,
1056            )
1057            .unwrap(),
1058            wait_for_finality_timeout: register_int_counter_with_registry!(
1059                "tx_orchestrator_wait_for_finality_timeout",
1060                "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
1061                registry,
1062            )
1063            .unwrap(),
1064            local_execution_in_flight: register_int_gauge_with_registry!(
1065                "tx_orchestrator_local_execution_in_flight",
1066                "Number of local execution txns in flights Transaction Orchestrator handles",
1067                registry,
1068            )
1069            .unwrap(),
1070            local_execution_success: register_int_counter_with_registry!(
1071                "tx_orchestrator_local_execution_success",
1072                "Total number of successful local execution txns Transaction Orchestrator handles",
1073                registry,
1074            )
1075            .unwrap(),
1076            local_execution_timeout: register_int_counter_with_registry!(
1077                "tx_orchestrator_local_execution_timeout",
1078                "Total number of timed-out local execution txns Transaction Orchestrator handles",
1079                registry,
1080            )
1081            .unwrap(),
1082            early_cached_response: register_int_counter_with_registry!(
1083                "tx_orchestrator_early_cached_response",
1084                "Total number of requests returning cached results for already-executed transactions",
1085                registry,
1086            )
1087            .unwrap(),
1088            concurrent_execution: register_int_counter_with_registry!(
1089                "tx_orchestrator_concurrent_execution",
1090                "Total number of concurrent execution where effects are available locally finishing driving the transaction to finality",
1091                registry,
1092            )
1093            .unwrap(),
1094            early_validation_rejections: register_int_counter_vec_with_registry!(
1095                "tx_orchestrator_early_validation_rejections",
1096                "Total number of transactions rejected during early validation before submission, by reason",
1097                &["reason"],
1098                registry,
1099            )
1100            .unwrap(),
1101            background_retry_started: register_int_gauge_with_registry!(
1102                "tx_orchestrator_background_retry_started",
1103                "Number of background retry tasks kicked off for transactions with retriable errors",
1104                registry,
1105            )
1106            .unwrap(),
1107            background_retry_attempts: register_int_counter_vec_with_registry!(
1108                "tx_orchestrator_background_retry_attempts",
1109                "Total number of background retry attempts, by status",
1110                &["status"],
1111                registry,
1112            )
1113            .unwrap(),
1114            request_latency: register_histogram_vec_with_registry!(
1115                "tx_orchestrator_request_latency",
1116                "Time spent in processing one Transaction Orchestrator request",
1117                &["tx_type", "route", "wait_for_local_execution"],
1118                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1119                registry,
1120            )
1121            .unwrap(),
1122            local_execution_latency: register_histogram_vec_with_registry!(
1123                "tx_orchestrator_local_execution_latency",
1124                "Time spent in waiting for one Transaction Orchestrator gets locally executed",
1125                &["tx_type"],
1126                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1127                registry,
1128            )
1129            .unwrap(),
1130            settlement_finality_latency: register_histogram_vec_with_registry!(
1131                "tx_orchestrator_settlement_finality_latency",
1132                "Time spent in waiting for one Transaction Orchestrator gets settled and finalized",
1133                &["tx_type", "driver_type"],
1134                mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
1135                registry,
1136            )
1137            .unwrap(),
1138        }
1139    }
1140
1141    pub fn new_for_tests() -> Self {
1142        let registry = Registry::new();
1143        Self::new(&registry)
1144    }
1145}
1146
1147#[async_trait::async_trait]
1148impl<A> sui_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
1149where
1150    A: AuthorityAPI + Send + Sync + 'static + Clone,
1151{
1152    async fn execute_transaction(
1153        &self,
1154        request: ExecuteTransactionRequestV3,
1155        client_addr: Option<std::net::SocketAddr>,
1156    ) -> Result<ExecuteTransactionResponseV3, TransactionSubmissionError> {
1157        self.execute_transaction_v3(request, client_addr).await
1158    }
1159
1160    fn simulate_transaction(
1161        &self,
1162        transaction: TransactionData,
1163        checks: TransactionChecks,
1164        allow_mock_gas_coin: bool,
1165    ) -> Result<SimulateTransactionResult, SuiError> {
1166        self.inner
1167            .validator_state
1168            .simulate_transaction(transaction, checks, allow_mock_gas_coin)
1169    }
1170}
1171
1172/// Keeps track of inflight transactions being submitted, and helps recover transactions
1173/// on restart.
1174struct TransactionSubmissionGuard {
1175    pending_tx_log: Arc<WritePathPendingTransactionLog>,
1176    tx_digest: TransactionDigest,
1177    is_new_transaction: bool,
1178}
1179
1180impl TransactionSubmissionGuard {
1181    pub fn new(
1182        pending_tx_log: Arc<WritePathPendingTransactionLog>,
1183        tx: &VerifiedTransaction,
1184    ) -> Self {
1185        let is_new_transaction = pending_tx_log.write_pending_transaction_maybe(tx);
1186        let tx_digest = *tx.digest();
1187        if is_new_transaction {
1188            debug!(?tx_digest, "Added transaction to inflight set");
1189        } else {
1190            debug!(
1191                ?tx_digest,
1192                "Transaction already being processed, no new submission will be made"
1193            );
1194        };
1195        Self {
1196            pending_tx_log,
1197            tx_digest,
1198            is_new_transaction,
1199        }
1200    }
1201
1202    fn is_new_transaction(&self) -> bool {
1203        self.is_new_transaction
1204    }
1205}
1206
1207impl Drop for TransactionSubmissionGuard {
1208    fn drop(&mut self) {
1209        if let Err(err) = self.pending_tx_log.finish_transaction(&self.tx_digest) {
1210            warn!(?self.tx_digest, "Failed to clean up transaction in pending log: {err}");
1211        } else {
1212            debug!(?self.tx_digest, "Cleaned up transaction in pending log");
1213        }
1214    }
1215}