sui_core/
transaction_orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4/*
5Transaction Orchestrator is a Node component that utilizes Quorum Driver to
6submit transactions to validators for finality, and proactively executes
7finalized transactions locally, when possible.
8*/
9
10use std::net::SocketAddr;
11use std::ops::Deref;
12use std::path::Path;
13use std::sync::Arc;
14use std::sync::atomic::{AtomicBool, Ordering};
15use std::time::Duration;
16
17use futures::FutureExt;
18use futures::future::{Either, Future, select};
19use futures::stream::{FuturesUnordered, StreamExt};
20use mysten_common::in_antithesis;
21use mysten_common::sync::notify_read::NotifyRead;
22use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX};
23use mysten_metrics::{add_server_timing, spawn_logged_monitored_task, spawn_monitored_task};
24use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
25use prometheus::{
26    HistogramVec, IntCounter, IntCounterVec, Registry, register_histogram_vec_with_registry,
27    register_int_counter_vec_with_registry, register_int_counter_with_registry,
28    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
29};
30use rand::Rng;
31use sui_config::NodeConfig;
32use sui_protocol_config::Chain;
33use sui_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
34use sui_types::base_types::TransactionDigest;
35use sui_types::effects::TransactionEffectsAPI;
36use sui_types::error::{SuiError, SuiErrorKind, SuiResult};
37use sui_types::messages_grpc::{SubmitTxRequest, TxType};
38use sui_types::quorum_driver_types::{
39    EffectsFinalityInfo, ExecuteTransactionRequestType, ExecuteTransactionRequestV3,
40    ExecuteTransactionResponseV3, FinalizedEffects, IsTransactionExecutedLocally,
41    QuorumDriverEffectsQueueResult, QuorumDriverError, QuorumDriverResult,
42};
43use sui_types::sui_system_state::SuiSystemState;
44use sui_types::transaction::{Transaction, TransactionData, VerifiedTransaction};
45use sui_types::transaction_executor::{SimulateTransactionResult, TransactionChecks};
46use tokio::sync::broadcast::Receiver;
47use tokio::sync::broadcast::error::RecvError;
48use tokio::task::JoinHandle;
49use tokio::time::{Instant, sleep, timeout};
50use tracing::{Instrument, debug, error, error_span, info, instrument, warn};
51
52use crate::authority::AuthorityState;
53use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
54use crate::authority_aggregator::AuthorityAggregator;
55use crate::authority_client::{AuthorityAPI, NetworkAuthorityClient};
56use crate::quorum_driver::reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver};
57use crate::quorum_driver::{QuorumDriverHandler, QuorumDriverHandlerBuilder, QuorumDriverMetrics};
58use crate::transaction_driver::{
59    QuorumTransactionResponse, SubmitTransactionOptions, TransactionDriver, TransactionDriverError,
60    TransactionDriverMetrics, choose_transaction_driver_percentage,
61};
62
63// How long to wait for local execution (including parents) before a timeout
64// is returned to client.
65const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
66
67// Timeout for waiting for finality for each transaction.
68const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(90);
69
70pub type QuorumTransactionEffectsResult =
71    Result<(Transaction, QuorumTransactionResponse), (TransactionDigest, QuorumDriverError)>;
72pub struct TransactionOrchestrator<A: Clone> {
73    quorum_driver_handler: Arc<QuorumDriverHandler<A>>,
74    validator_state: Arc<AuthorityState>,
75    _local_executor_handle: JoinHandle<()>,
76    pending_tx_log: Arc<WritePathPendingTransactionLog>,
77    notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
78    metrics: Arc<TransactionOrchestratorMetrics>,
79    transaction_driver: Option<Arc<TransactionDriver<A>>>,
80    td_percentage: u8,
81    td_allowed_submission_list: Vec<String>,
82    enable_early_validation: bool,
83}
84
85impl TransactionOrchestrator<NetworkAuthorityClient> {
86    pub fn new_with_auth_aggregator(
87        validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
88        validator_state: Arc<AuthorityState>,
89        reconfig_channel: Receiver<SuiSystemState>,
90        parent_path: &Path,
91        prometheus_registry: &Registry,
92        node_config: &NodeConfig,
93    ) -> Self {
94        let observer = OnsiteReconfigObserver::new(
95            reconfig_channel,
96            validator_state.get_object_cache_reader().clone(),
97            validator_state.clone_committee_store(),
98            validators.safe_client_metrics_base.clone(),
99            validators.metrics.deref().clone(),
100        );
101        TransactionOrchestrator::new(
102            validators,
103            validator_state,
104            parent_path,
105            prometheus_registry,
106            observer,
107            node_config,
108        )
109    }
110}
111
112impl<A> TransactionOrchestrator<A>
113where
114    A: AuthorityAPI + Send + Sync + 'static + Clone,
115    OnsiteReconfigObserver: ReconfigObserver<A>,
116{
117    pub fn new(
118        validators: Arc<AuthorityAggregator<A>>,
119        validator_state: Arc<AuthorityState>,
120        parent_path: &Path,
121        prometheus_registry: &Registry,
122        reconfig_observer: OnsiteReconfigObserver,
123        node_config: &NodeConfig,
124    ) -> Self {
125        let metrics = Arc::new(QuorumDriverMetrics::new(prometheus_registry));
126        let notifier = Arc::new(NotifyRead::new());
127        let reconfig_observer = Arc::new(reconfig_observer);
128        let quorum_driver_handler = Arc::new(
129            QuorumDriverHandlerBuilder::new(validators.clone(), metrics.clone())
130                .with_notifier(notifier.clone())
131                .with_reconfig_observer(reconfig_observer.clone())
132                .start(),
133        );
134
135        let effects_receiver = quorum_driver_handler.subscribe_to_effects();
136        let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
137        let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
138            parent_path.join("fullnode_pending_transactions"),
139        ));
140        let pending_tx_log_clone = pending_tx_log.clone();
141        let _local_executor_handle = {
142            spawn_monitored_task!(async move {
143                Self::loop_pending_transaction_log(effects_receiver, pending_tx_log_clone).await;
144            })
145        };
146        Self::schedule_txes_in_log(pending_tx_log.clone(), quorum_driver_handler.clone());
147
148        let epoch_store = validator_state.load_epoch_store_one_call_per_task();
149        let td_percentage = if !epoch_store.protocol_config().mysticeti_fastpath() {
150            0
151        } else {
152            choose_transaction_driver_percentage(Some(epoch_store.get_chain_identifier()))
153        };
154
155        let transaction_driver = if td_percentage > 0 {
156            let td_metrics = Arc::new(TransactionDriverMetrics::new(prometheus_registry));
157            let client_metrics = Arc::new(
158                crate::validator_client_monitor::ValidatorClientMetrics::new(prometheus_registry),
159            );
160            Some(TransactionDriver::new(
161                validators.clone(),
162                reconfig_observer.clone(),
163                td_metrics,
164                Some(node_config),
165                client_metrics,
166            ))
167        } else {
168            None
169        };
170
171        let td_allowed_submission_list = node_config
172            .transaction_driver_config
173            .as_ref()
174            .map(|config| config.allowed_submission_validators.clone())
175            .unwrap_or_default();
176
177        let enable_early_validation = node_config
178            .transaction_driver_config
179            .as_ref()
180            .map(|config| config.enable_early_validation)
181            .unwrap_or(true);
182
183        Self {
184            quorum_driver_handler,
185            validator_state,
186            _local_executor_handle,
187            pending_tx_log,
188            notifier,
189            metrics,
190            transaction_driver,
191            td_percentage,
192            td_allowed_submission_list,
193            enable_early_validation,
194        }
195    }
196}
197
198impl<A> TransactionOrchestrator<A>
199where
200    A: AuthorityAPI + Send + Sync + 'static + Clone,
201{
202    #[instrument(name = "tx_orchestrator_execute_transaction", level = "debug", skip_all,
203    fields(
204        tx_digest = ?request.transaction.digest(),
205        tx_type = ?request_type,
206    ))]
207    pub async fn execute_transaction_block(
208        &self,
209        request: ExecuteTransactionRequestV3,
210        request_type: ExecuteTransactionRequestType,
211        client_addr: Option<SocketAddr>,
212    ) -> Result<(ExecuteTransactionResponseV3, IsTransactionExecutedLocally), QuorumDriverError>
213    {
214        let timer = Instant::now();
215        let tx_type = if request.transaction.is_consensus_tx() {
216            TxType::SharedObject
217        } else {
218            TxType::SingleWriter
219        };
220        let tx_digest = *request.transaction.digest();
221
222        let (response, mut executed_locally) = self
223            .execute_transaction_with_effects_waiting(request, client_addr)
224            .await?;
225
226        if !executed_locally {
227            executed_locally = if matches!(
228                request_type,
229                ExecuteTransactionRequestType::WaitForLocalExecution
230            ) {
231                let executed_locally = Self::wait_for_finalized_tx_executed_locally_with_timeout(
232                    &self.validator_state,
233                    tx_digest,
234                    tx_type,
235                    &self.metrics,
236                )
237                .await
238                .is_ok();
239                add_server_timing("local_execution done");
240                executed_locally
241            } else {
242                false
243            };
244        }
245
246        let QuorumTransactionResponse {
247            effects,
248            events,
249            input_objects,
250            output_objects,
251            auxiliary_data,
252        } = response;
253
254        let response = ExecuteTransactionResponseV3 {
255            effects,
256            events,
257            input_objects,
258            output_objects,
259            auxiliary_data,
260        };
261
262        self.metrics
263            .request_latency
264            .with_label_values(&[
265                tx_type.as_str(),
266                "execute_transaction_block",
267                executed_locally.to_string().as_str(),
268            ])
269            .observe(timer.elapsed().as_secs_f64());
270
271        Ok((response, executed_locally))
272    }
273
274    // Utilize the handle_certificate_v3 validator api to request input/output objects
275    #[instrument(name = "tx_orchestrator_execute_transaction_v3", level = "debug", skip_all,
276                 fields(tx_digest = ?request.transaction.digest()))]
277    pub async fn execute_transaction_v3(
278        &self,
279        request: ExecuteTransactionRequestV3,
280        client_addr: Option<SocketAddr>,
281    ) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
282        let timer = Instant::now();
283        let tx_type = if request.transaction.is_consensus_tx() {
284            TxType::SharedObject
285        } else {
286            TxType::SingleWriter
287        };
288
289        let (response, _) = self
290            .execute_transaction_with_effects_waiting(request, client_addr)
291            .await?;
292
293        self.metrics
294            .request_latency
295            .with_label_values(&[tx_type.as_str(), "execute_transaction_v3", "false"])
296            .observe(timer.elapsed().as_secs_f64());
297
298        let QuorumTransactionResponse {
299            effects,
300            events,
301            input_objects,
302            output_objects,
303            auxiliary_data,
304        } = response;
305
306        Ok(ExecuteTransactionResponseV3 {
307            effects,
308            events,
309            input_objects,
310            output_objects,
311            auxiliary_data,
312        })
313    }
314
315    /// Shared implementation for executing transactions with parallel local effects waiting
316    async fn execute_transaction_with_effects_waiting(
317        &self,
318        request: ExecuteTransactionRequestV3,
319        client_addr: Option<SocketAddr>,
320    ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), QuorumDriverError> {
321        let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
322        let verified_transaction = epoch_store
323            .verify_transaction(request.transaction.clone())
324            .map_err(QuorumDriverError::InvalidUserSignature)?;
325        let tx_digest = *verified_transaction.digest();
326
327        // Early validation check against local state before submission to catch non-retriable errors
328        // TODO: Consider moving this check to TransactionDriver for per-retry validation
329        if self.enable_early_validation
330            && let Err(e) = self
331                .validator_state
332                .check_transaction_validity(&epoch_store, &verified_transaction)
333        {
334            let error_category = e.categorize();
335            if !error_category.is_submission_retriable() {
336                // Skip early validation rejection if transaction has already been executed (allows retries to return cached results)
337                if !self.validator_state.is_tx_already_executed(&tx_digest) {
338                    self.metrics
339                        .early_validation_rejections
340                        .with_label_values(&[e.as_ref()])
341                        .inc();
342                    debug!(
343                        error = ?e,
344                        "Transaction rejected during early validation"
345                    );
346
347                    return Err(QuorumDriverError::TransactionFailed {
348                        category: error_category,
349                        details: e.to_string(),
350                    });
351                }
352            }
353        }
354
355        // Add transaction to WAL log.
356        let is_new_transaction = self
357            .pending_tx_log
358            .write_pending_transaction_maybe(&verified_transaction)
359            .await
360            .map_err(|e| {
361                warn!("QuorumDriverInternalError: {e:?}");
362                QuorumDriverError::QuorumDriverInternalError(e)
363            })?;
364        if is_new_transaction {
365            debug!("Added transaction to WAL log for TransactionDriver");
366        } else {
367            debug!("Transaction already in pending_tx_log");
368        }
369
370        let include_events = request.include_events;
371        let include_input_objects = request.include_input_objects;
372        let include_output_objects = request.include_output_objects;
373        let include_auxiliary_data = request.include_auxiliary_data;
374
375        // Track whether TD is being used for this transaction
376        let using_td = Arc::new(AtomicBool::new(false));
377
378        let finality_timeout = std::env::var("WAIT_FOR_FINALITY_TIMEOUT_SECS")
379            .ok()
380            .and_then(|v| v.parse().ok())
381            .map(Duration::from_secs)
382            .unwrap_or(WAIT_FOR_FINALITY_TIMEOUT);
383
384        let num_submissions = if !is_new_transaction {
385            // No need to submit when the transaction is already being processed.
386            0
387        } else if cfg!(msim) || in_antithesis() {
388            // Allow duplicated submissions in tests.
389            let r = rand::thread_rng().gen_range(1..=100);
390            let n = if r <= 10 {
391                3
392            } else if r <= 30 {
393                2
394            } else {
395                1
396            };
397            if n > 1 {
398                debug!("Making {n} execution calls");
399            }
400            n
401        } else {
402            1
403        };
404
405        // Wait for one of the execution futures to succeed, or all of them to fail.
406        let mut execution_futures = FuturesUnordered::new();
407        for i in 0..num_submissions {
408            // Generate jitter values outside the async block
409            let should_delay = i > 0 && rand::thread_rng().gen_bool(0.8);
410            let delay_ms = if should_delay {
411                rand::thread_rng().gen_range(100..=500)
412            } else {
413                0
414            };
415
416            let epoch_store = epoch_store.clone();
417            let request = request.clone();
418            let verified_transaction = verified_transaction.clone();
419            let using_td = using_td.clone();
420
421            let future = async move {
422                if delay_ms > 0 {
423                    // Add jitters to duplicated submissions.
424                    sleep(Duration::from_millis(delay_ms)).await;
425                }
426                self.execute_transaction_impl(
427                    &epoch_store,
428                    request,
429                    verified_transaction,
430                    client_addr,
431                    Some(finality_timeout),
432                    using_td,
433                )
434                .await
435            }
436            .boxed();
437            execution_futures.push(future);
438        }
439
440        // Track the last execution error.
441        let mut last_execution_error: Option<QuorumDriverError> = None;
442
443        // Wait for execution result outside of this call to become available.
444        let digests = [tx_digest];
445        let mut local_effects_future = epoch_store
446            .within_alive_epoch(
447                self.validator_state
448                    .get_transaction_cache_reader()
449                    .notify_read_executed_effects(
450                    "TransactionOrchestrator::notify_read_execute_transaction_with_effects_waiting",
451                    &digests,
452                ),
453            )
454            .boxed();
455
456        // Wait for execution timeout.
457        let mut timeout_future = tokio::time::sleep(finality_timeout).boxed();
458
459        let result = loop {
460            tokio::select! {
461                biased;
462
463                // Local effects might be available
464                local_effects_result = &mut local_effects_future => {
465                    match local_effects_result {
466                        Ok(effects) => {
467                            debug!(
468                                "Effects became available while execution was running"
469                            );
470                            if let Some(effects) = effects.into_iter().next() {
471                                self.metrics.concurrent_execution.inc();
472                                let epoch = effects.executed_epoch();
473                                let events = if include_events {
474                                    if effects.events_digest().is_some() {
475                                        Some(self.validator_state.get_transaction_events(effects.transaction_digest())
476                                            .map_err(QuorumDriverError::QuorumDriverInternalError)?)
477                                    } else {
478                                        None
479                                    }
480                                } else {
481                                    None
482                                };
483                                let input_objects = include_input_objects
484                                    .then(|| self.validator_state.get_transaction_input_objects(&effects))
485                                    .transpose()
486                                    .map_err(QuorumDriverError::QuorumDriverInternalError)?;
487                                let output_objects = include_output_objects
488                                    .then(|| self.validator_state.get_transaction_output_objects(&effects))
489                                    .transpose()
490                                    .map_err(QuorumDriverError::QuorumDriverInternalError)?;
491                                let response = QuorumTransactionResponse {
492                                    effects: FinalizedEffects {
493                                        effects,
494                                        finality_info: EffectsFinalityInfo::QuorumExecuted(epoch),
495                                    },
496                                    events,
497                                    input_objects,
498                                    output_objects,
499                                    auxiliary_data: None,
500                                };
501                                break Ok((response, true));
502                            }
503                        }
504                        Err(_) => {
505                            warn!("Epoch terminated before effects were available");
506                        }
507                    };
508
509                    // Prevent this branch from being selected again
510                    local_effects_future = futures::future::pending().boxed();
511                }
512
513                // This branch is disabled if execution_futures is empty.
514                Some(result) = execution_futures.next() => {
515                    match result {
516                        Ok(resp) => {
517                            // First success gets returned.
518                            debug!("Execution succeeded, returning response");
519                            let QuorumTransactionResponse {
520                                effects,
521                                events,
522                                input_objects,
523                                output_objects,
524                                auxiliary_data,
525                            } = resp;
526                            // Filter fields based on request flags.
527                            let resp = QuorumTransactionResponse {
528                                effects,
529                                events: if include_events { events } else { None },
530                                input_objects: if include_input_objects { input_objects } else { None },
531                                output_objects: if include_output_objects { output_objects } else { None },
532                                auxiliary_data: if include_auxiliary_data { auxiliary_data } else { None },
533                            };
534                            break Ok((resp, false));
535                        }
536                        Err(QuorumDriverError::PendingExecutionInTransactionOrchestrator) => {
537                            debug!(
538                                "Transaction is already being processed"
539                            );
540                            // Avoid overriding errors with transaction already being processed.
541                            if last_execution_error.is_none() {
542                                last_execution_error = Some(QuorumDriverError::PendingExecutionInTransactionOrchestrator);
543                            }
544                        }
545                        Err(e) => {
546                            debug!(?e, "Execution attempt failed, wait for other attempts");
547                            last_execution_error = Some(e);
548                        }
549                    };
550
551                    // Last error must have been recorded.
552                    if execution_futures.is_empty() {
553                        break Err(last_execution_error.unwrap());
554                    }
555                }
556
557                // A timeout has occurred while waiting for finality
558                _ = &mut timeout_future => {
559                    debug!("Timeout waiting for transaction finality.");
560                    self.metrics.wait_for_finality_timeout.inc();
561
562                    // Clean up transaction from WAL log only for TD submissions
563                    // For QD submissions, the cleanup happens in loop_pending_transaction_log
564                    if using_td.load(Ordering::Acquire) {
565                        debug!("Cleaning up TD transaction from WAL due to timeout");
566                        if let Err(err) = self.pending_tx_log.finish_transaction(&tx_digest) {
567                            warn!(
568                                "Failed to finish TD transaction in pending transaction log: {err}"
569                            );
570                        }
571                    }
572
573                    break Err(QuorumDriverError::TimeoutBeforeFinality);
574                }
575            }
576        };
577
578        // Clean up transaction from WAL log
579        if let Err(err) = self.pending_tx_log.finish_transaction(&tx_digest) {
580            warn!("Failed to finish transaction in pending transaction log: {err}");
581        }
582
583        result
584    }
585
586    #[instrument(level = "error", skip_all)]
587    async fn execute_transaction_impl(
588        &self,
589        epoch_store: &Arc<AuthorityPerEpochStore>,
590        request: ExecuteTransactionRequestV3,
591        verified_transaction: VerifiedTransaction,
592        client_addr: Option<SocketAddr>,
593        finality_timeout: Option<Duration>,
594        using_td: Arc<AtomicBool>,
595    ) -> Result<QuorumTransactionResponse, QuorumDriverError> {
596        let tx_digest = *verified_transaction.digest();
597        debug!("TO Received transaction execution request.");
598
599        let timer = Instant::now();
600        let tx_type = if verified_transaction.is_consensus_tx() {
601            TxType::SharedObject
602        } else {
603            TxType::SingleWriter
604        };
605
606        let (_in_flight_metrics_guards, good_response_metrics) =
607            self.update_metrics(&request.transaction);
608
609        // TODO: refactor all the gauge and timer metrics with `monitored_scope`
610        let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
611        wait_for_finality_gauge.inc();
612        let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
613            in_flight.dec();
614        });
615
616        // Select TransactionDriver or QuorumDriver for submission.
617        let (response, driver_type) = if self.transaction_driver.is_some()
618            && self.should_use_transaction_driver(epoch_store, tx_digest)
619        {
620            // Mark that we're using TD before submitting.
621            using_td.store(true, Ordering::Release);
622
623            (
624                self.submit_with_transaction_driver(
625                    self.transaction_driver.as_ref().unwrap(),
626                    &request,
627                    client_addr,
628                    &verified_transaction,
629                    good_response_metrics,
630                    finality_timeout,
631                )
632                .await?,
633                "transaction_driver",
634            )
635        } else {
636            // Submit transaction through QuorumDriver.
637            using_td.store(false, Ordering::Release);
638
639            let resp = self
640                .submit_with_quorum_driver(
641                    epoch_store.clone(),
642                    verified_transaction.clone(),
643                    request,
644                    client_addr,
645                )
646                .await
647                .map_err(|e| {
648                    warn!("QuorumDriverInternalError: {e:?}");
649                    QuorumDriverError::QuorumDriverInternalError(e)
650                })?
651                .await
652                .map_err(|e| {
653                    warn!("QuorumDriverInternalError: {e:?}");
654                    QuorumDriverError::QuorumDriverInternalError(e)
655                })??;
656
657            (
658                QuorumTransactionResponse {
659                    effects: FinalizedEffects::new_from_effects_cert(resp.effects_cert.into()),
660                    events: resp.events,
661                    input_objects: resp.input_objects,
662                    output_objects: resp.output_objects,
663                    auxiliary_data: resp.auxiliary_data,
664                },
665                "quorum_driver",
666            )
667        };
668
669        add_server_timing("wait_for_finality done");
670
671        self.metrics.wait_for_finality_finished.inc();
672
673        let elapsed = timer.elapsed().as_secs_f64();
674        self.metrics
675            .settlement_finality_latency
676            .with_label_values(&[tx_type.as_str(), driver_type])
677            .observe(elapsed);
678        good_response_metrics.inc();
679
680        Ok(response)
681    }
682
683    #[instrument(level = "error", skip_all, err(level = "info"))]
684    async fn submit_with_transaction_driver(
685        &self,
686        td: &Arc<TransactionDriver<A>>,
687        request: &ExecuteTransactionRequestV3,
688        client_addr: Option<SocketAddr>,
689        verified_transaction: &VerifiedTransaction,
690        good_response_metrics: &GenericCounter<AtomicU64>,
691        timeout_duration: Option<Duration>,
692    ) -> Result<QuorumTransactionResponse, QuorumDriverError> {
693        let tx_digest = *verified_transaction.digest();
694        debug!("Using TransactionDriver for transaction {:?}", tx_digest);
695
696        let td_response = td
697            .drive_transaction(
698                SubmitTxRequest::new_transaction(request.transaction.clone()),
699                SubmitTransactionOptions {
700                    forwarded_client_addr: client_addr,
701                    allowed_validators: self.td_allowed_submission_list.clone(),
702                },
703                timeout_duration,
704            )
705            .await
706            .map_err(|e| match e {
707                TransactionDriverError::TimeoutWithLastRetriableError {
708                    last_error,
709                    attempts,
710                    timeout,
711                } => QuorumDriverError::TimeoutBeforeFinalityWithErrors {
712                    last_error: last_error.map(|e| e.to_string()).unwrap_or_default(),
713                    attempts,
714                    timeout,
715                },
716                other => QuorumDriverError::TransactionFailed {
717                    category: other.categorize(),
718                    details: other.to_string(),
719                },
720            });
721
722        match td_response {
723            Err(e) => {
724                warn!("TransactionDriver error: {e:?}");
725                Err(e)
726            }
727            Ok(quorum_transaction_response) => {
728                good_response_metrics.inc();
729                Ok(quorum_transaction_response)
730            }
731        }
732    }
733
734    /// Submits the transaction to Quorum Driver for execution.
735    /// Returns an awaitable Future.
736    #[instrument(name = "tx_orchestrator_submit", level = "trace", skip_all)]
737    async fn submit_with_quorum_driver(
738        &self,
739        epoch_store: Arc<AuthorityPerEpochStore>,
740        transaction: VerifiedTransaction,
741        request: ExecuteTransactionRequestV3,
742        client_addr: Option<SocketAddr>,
743    ) -> SuiResult<impl Future<Output = SuiResult<QuorumDriverResult>> + '_> {
744        let tx_digest = *transaction.digest();
745
746        let ticket = self.notifier.register_one(&tx_digest);
747        self.quorum_driver()
748            .submit_transaction_no_ticket(request.clone(), client_addr)
749            .await?;
750
751        // It's possible that the transaction effects is already stored in DB at this point.
752        // So we also subscribe to that. If we hear from `effects_await` first, it means
753        // the ticket misses the previous notification, and we want to ask quorum driver
754        // to form a certificate for us again, to serve this request.
755        let cache_reader = self.validator_state.get_transaction_cache_reader().clone();
756        let qd = self.clone_quorum_driver();
757        Ok(async move {
758            let digests = [tx_digest];
759            let effects_await =
760                epoch_store.within_alive_epoch(cache_reader.notify_read_executed_effects(
761                    "TransactionOrchestrator::notify_read_submit_with_qd",
762                    &digests,
763                ));
764            // let-and-return necessary to satisfy borrow checker.
765            #[allow(clippy::let_and_return)]
766            let res = match select(ticket, effects_await.boxed()).await {
767                Either::Left((quorum_driver_response, _)) => Ok(quorum_driver_response),
768                Either::Right((_, unfinished_quorum_driver_task)) => {
769                    debug!("Effects are available in DB, use quorum driver to get a certificate");
770                    qd.submit_transaction_no_ticket(request, client_addr)
771                        .await?;
772                    Ok(unfinished_quorum_driver_task.await)
773                }
774            };
775            res
776        })
777    }
778
779    #[instrument(
780        name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout",
781        level = "debug",
782        skip_all,
783        err(level = "info")
784    )]
785    async fn wait_for_finalized_tx_executed_locally_with_timeout(
786        validator_state: &Arc<AuthorityState>,
787        tx_digest: TransactionDigest,
788        tx_type: TxType,
789        metrics: &TransactionOrchestratorMetrics,
790    ) -> SuiResult {
791        metrics.local_execution_in_flight.inc();
792        let _metrics_guard =
793            scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
794                in_flight.dec();
795            });
796
797        let _latency_guard = metrics
798            .local_execution_latency
799            .with_label_values(&[tx_type.as_str()])
800            .start_timer();
801        debug!("Waiting for finalized tx to be executed locally.");
802        match timeout(
803            LOCAL_EXECUTION_TIMEOUT,
804            validator_state
805                .get_transaction_cache_reader()
806                .notify_read_executed_effects_digests(
807                    "TransactionOrchestrator::notify_read_wait_for_local_execution",
808                    &[tx_digest],
809                ),
810        )
811        .instrument(error_span!(
812            "transaction_orchestrator::local_execution",
813            ?tx_digest
814        ))
815        .await
816        {
817            Err(_elapsed) => {
818                debug!(
819                    "Waiting for finalized tx to be executed locally timed out within {:?}.",
820                    LOCAL_EXECUTION_TIMEOUT
821                );
822                metrics.local_execution_timeout.inc();
823                Err(SuiErrorKind::TimeoutError.into())
824            }
825            Ok(_) => {
826                metrics.local_execution_success.inc();
827                Ok(())
828            }
829        }
830    }
831
832    fn should_use_transaction_driver(
833        &self,
834        epoch_store: &Arc<AuthorityPerEpochStore>,
835        tx_digest: TransactionDigest,
836    ) -> bool {
837        const MAX_PERCENTAGE: u8 = 100;
838        let unknown_network = epoch_store.get_chain() == Chain::Unknown;
839        let v = if unknown_network {
840            rand::thread_rng().gen_range(1..=MAX_PERCENTAGE)
841        } else {
842            let v = u32::from_le_bytes(tx_digest.inner()[..4].try_into().unwrap());
843            (v % (MAX_PERCENTAGE as u32) + 1) as u8
844        };
845        debug!(
846            "Choosing whether to use transaction driver: {} vs {}",
847            v, self.td_percentage
848        );
849        v <= self.td_percentage
850    }
851
852    // TODO: Potentially cleanup this function and pending transaction log.
853    async fn loop_pending_transaction_log(
854        mut effects_receiver: Receiver<QuorumDriverEffectsQueueResult>,
855        pending_transaction_log: Arc<WritePathPendingTransactionLog>,
856    ) {
857        loop {
858            match effects_receiver.recv().await {
859                Ok(Ok((transaction, ..))) => {
860                    let tx_digest = transaction.digest();
861                    if let Err(err) = pending_transaction_log.finish_transaction(tx_digest) {
862                        error!(
863                            ?tx_digest,
864                            "Failed to finish transaction in pending transaction log: {err}"
865                        );
866                    }
867                }
868                Ok(Err((tx_digest, _err))) => {
869                    if let Err(err) = pending_transaction_log.finish_transaction(&tx_digest) {
870                        error!(
871                            ?tx_digest,
872                            "Failed to finish transaction in pending transaction log: {err}"
873                        );
874                    }
875                }
876                Err(RecvError::Closed) => {
877                    error!("Sender of effects subscriber queue has been dropped!");
878                    return;
879                }
880                Err(RecvError::Lagged(skipped_count)) => {
881                    warn!("Skipped {skipped_count} transasctions in effects subscriber queue.");
882                }
883            }
884        }
885    }
886
887    pub fn quorum_driver(&self) -> &Arc<QuorumDriverHandler<A>> {
888        &self.quorum_driver_handler
889    }
890
891    pub fn clone_quorum_driver(&self) -> Arc<QuorumDriverHandler<A>> {
892        self.quorum_driver_handler.clone()
893    }
894
895    pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
896        self.quorum_driver().authority_aggregator().load_full()
897    }
898
899    fn update_metrics<'a>(
900        &'a self,
901        transaction: &Transaction,
902    ) -> (impl Drop + use<A>, &'a GenericCounter<AtomicU64>) {
903        let (in_flight, good_response) = if transaction.is_consensus_tx() {
904            self.metrics.total_req_received_shared_object.inc();
905            (
906                self.metrics.req_in_flight_shared_object.clone(),
907                &self.metrics.good_response_shared_object,
908            )
909        } else {
910            self.metrics.total_req_received_single_writer.inc();
911            (
912                self.metrics.req_in_flight_single_writer.clone(),
913                &self.metrics.good_response_single_writer,
914            )
915        };
916        in_flight.inc();
917        (
918            scopeguard::guard(in_flight, |in_flight| {
919                in_flight.dec();
920            }),
921            good_response,
922        )
923    }
924
925    fn schedule_txes_in_log(
926        pending_tx_log: Arc<WritePathPendingTransactionLog>,
927        quorum_driver: Arc<QuorumDriverHandler<A>>,
928    ) {
929        spawn_logged_monitored_task!(async move {
930            if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
931                info!("Skipping loading pending transactions from pending_tx_log.");
932                return;
933            }
934            let pending_txes = pending_tx_log
935                .load_all_pending_transactions()
936                .expect("failed to load all pending transactions");
937            info!(
938                "Recovering {} pending transactions from pending_tx_log.",
939                pending_txes.len()
940            );
941            for (i, tx) in pending_txes.into_iter().enumerate() {
942                // TODO: ideally pending_tx_log would not contain VerifiedTransaction, but that
943                // requires a migration.
944                let tx = tx.into_inner();
945                let tx_digest = *tx.digest();
946                // It's not impossible we fail to enqueue a task but that's not the end of world.
947                // TODO(william) correctly extract client_addr from logs
948                if let Err(err) = quorum_driver
949                    .submit_transaction_no_ticket(
950                        ExecuteTransactionRequestV3 {
951                            transaction: tx,
952                            include_events: true,
953                            include_input_objects: false,
954                            include_output_objects: false,
955                            include_auxiliary_data: false,
956                        },
957                        None,
958                    )
959                    .await
960                {
961                    warn!(
962                        ?tx_digest,
963                        "Failed to enqueue transaction from pending_tx_log, err: {err:?}"
964                    );
965                } else {
966                    debug!("Enqueued transaction from pending_tx_log");
967                    if (i + 1) % 1000 == 0 {
968                        info!("Enqueued {} transactions from pending_tx_log.", i + 1);
969                    }
970                }
971            }
972            // Transactions will be cleaned up in loop_execute_finalized_tx_locally() after they
973            // produce effects.
974        });
975    }
976
977    pub fn load_all_pending_transactions_in_test(&self) -> SuiResult<Vec<VerifiedTransaction>> {
978        self.pending_tx_log.load_all_pending_transactions()
979    }
980}
981/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
982#[derive(Clone)]
983pub struct TransactionOrchestratorMetrics {
984    total_req_received_single_writer: GenericCounter<AtomicU64>,
985    total_req_received_shared_object: GenericCounter<AtomicU64>,
986
987    good_response_single_writer: GenericCounter<AtomicU64>,
988    good_response_shared_object: GenericCounter<AtomicU64>,
989
990    req_in_flight_single_writer: GenericGauge<AtomicI64>,
991    req_in_flight_shared_object: GenericGauge<AtomicI64>,
992
993    wait_for_finality_in_flight: GenericGauge<AtomicI64>,
994    wait_for_finality_finished: GenericCounter<AtomicU64>,
995    wait_for_finality_timeout: GenericCounter<AtomicU64>,
996
997    local_execution_in_flight: GenericGauge<AtomicI64>,
998    local_execution_success: GenericCounter<AtomicU64>,
999    local_execution_timeout: GenericCounter<AtomicU64>,
1000
1001    concurrent_execution: IntCounter,
1002
1003    early_validation_rejections: IntCounterVec,
1004
1005    request_latency: HistogramVec,
1006    local_execution_latency: HistogramVec,
1007    settlement_finality_latency: HistogramVec,
1008}
1009
1010// Note that labeled-metrics are stored upfront individually
1011// to mitigate the perf hit by MetricsVec.
1012// See https://github.com/tikv/rust-prometheus/tree/master/static-metric
1013impl TransactionOrchestratorMetrics {
1014    pub fn new(registry: &Registry) -> Self {
1015        let total_req_received = register_int_counter_vec_with_registry!(
1016            "tx_orchestrator_total_req_received",
1017            "Total number of executions request Transaction Orchestrator receives, group by tx type",
1018            &["tx_type"],
1019            registry
1020        )
1021        .unwrap();
1022
1023        let total_req_received_single_writer =
1024            total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
1025        let total_req_received_shared_object =
1026            total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
1027
1028        let good_response = register_int_counter_vec_with_registry!(
1029            "tx_orchestrator_good_response",
1030            "Total number of good responses Transaction Orchestrator generates, group by tx type",
1031            &["tx_type"],
1032            registry
1033        )
1034        .unwrap();
1035
1036        let good_response_single_writer =
1037            good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
1038        let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
1039
1040        let req_in_flight = register_int_gauge_vec_with_registry!(
1041            "tx_orchestrator_req_in_flight",
1042            "Number of requests in flights Transaction Orchestrator processes, group by tx type",
1043            &["tx_type"],
1044            registry
1045        )
1046        .unwrap();
1047
1048        let req_in_flight_single_writer =
1049            req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
1050        let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
1051
1052        Self {
1053            total_req_received_single_writer,
1054            total_req_received_shared_object,
1055            good_response_single_writer,
1056            good_response_shared_object,
1057            req_in_flight_single_writer,
1058            req_in_flight_shared_object,
1059            wait_for_finality_in_flight: register_int_gauge_with_registry!(
1060                "tx_orchestrator_wait_for_finality_in_flight",
1061                "Number of in flight txns Transaction Orchestrator are waiting for finality for",
1062                registry,
1063            )
1064            .unwrap(),
1065            wait_for_finality_finished: register_int_counter_with_registry!(
1066                "tx_orchestrator_wait_for_finality_fnished",
1067                "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
1068                registry,
1069            )
1070            .unwrap(),
1071            wait_for_finality_timeout: register_int_counter_with_registry!(
1072                "tx_orchestrator_wait_for_finality_timeout",
1073                "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
1074                registry,
1075            )
1076            .unwrap(),
1077            local_execution_in_flight: register_int_gauge_with_registry!(
1078                "tx_orchestrator_local_execution_in_flight",
1079                "Number of local execution txns in flights Transaction Orchestrator handles",
1080                registry,
1081            )
1082            .unwrap(),
1083            local_execution_success: register_int_counter_with_registry!(
1084                "tx_orchestrator_local_execution_success",
1085                "Total number of successful local execution txns Transaction Orchestrator handles",
1086                registry,
1087            )
1088            .unwrap(),
1089            local_execution_timeout: register_int_counter_with_registry!(
1090                "tx_orchestrator_local_execution_timeout",
1091                "Total number of timed-out local execution txns Transaction Orchestrator handles",
1092                registry,
1093            )
1094            .unwrap(),
1095            concurrent_execution: register_int_counter_with_registry!(
1096                "tx_orchestrator_concurrent_execution",
1097                "Total number of concurrent execution where effects are available locally finishing driving the transaction to finality",
1098                registry,
1099            )
1100            .unwrap(),
1101            early_validation_rejections: register_int_counter_vec_with_registry!(
1102                "tx_orchestrator_early_validation_rejections",
1103                "Total number of transactions rejected during early validation before submission, by error type",
1104                &["error_type"],
1105                registry,
1106            )
1107            .unwrap(),
1108            request_latency: register_histogram_vec_with_registry!(
1109                "tx_orchestrator_request_latency",
1110                "Time spent in processing one Transaction Orchestrator request",
1111                &["tx_type", "route", "wait_for_local_execution"],
1112                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1113                registry,
1114            )
1115            .unwrap(),
1116            local_execution_latency: register_histogram_vec_with_registry!(
1117                "tx_orchestrator_local_execution_latency",
1118                "Time spent in waiting for one Transaction Orchestrator gets locally executed",
1119                &["tx_type"],
1120                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1121                registry,
1122            )
1123            .unwrap(),
1124            settlement_finality_latency: register_histogram_vec_with_registry!(
1125                "tx_orchestrator_settlement_finality_latency",
1126                "Time spent in waiting for one Transaction Orchestrator gets settled and finalized",
1127                &["tx_type", "driver_type"],
1128                mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
1129                registry,
1130            )
1131            .unwrap(),
1132        }
1133    }
1134
1135    pub fn new_for_tests() -> Self {
1136        let registry = Registry::new();
1137        Self::new(&registry)
1138    }
1139}
1140
1141#[async_trait::async_trait]
1142impl<A> sui_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
1143where
1144    A: AuthorityAPI + Send + Sync + 'static + Clone,
1145{
1146    async fn execute_transaction(
1147        &self,
1148        request: ExecuteTransactionRequestV3,
1149        client_addr: Option<std::net::SocketAddr>,
1150    ) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
1151        self.execute_transaction_v3(request, client_addr).await
1152    }
1153
1154    fn simulate_transaction(
1155        &self,
1156        transaction: TransactionData,
1157        checks: TransactionChecks,
1158    ) -> Result<SimulateTransactionResult, SuiError> {
1159        self.validator_state
1160            .simulate_transaction(transaction, checks)
1161    }
1162}