sui_core/
transaction_orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4/*
5Transaction Orchestrator is a Node component that utilizes Quorum Driver to
6submit transactions to validators for finality, and proactively executes
7finalized transactions locally, when possible.
8*/
9
10use std::net::SocketAddr;
11use std::ops::Deref;
12use std::path::Path;
13use std::sync::Arc;
14use std::sync::atomic::{AtomicBool, Ordering};
15use std::time::Duration;
16
17use futures::FutureExt;
18use futures::future::{Either, Future, select};
19use futures::stream::{FuturesUnordered, StreamExt};
20use mysten_common::in_antithesis;
21use mysten_common::sync::notify_read::NotifyRead;
22use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX};
23use mysten_metrics::{add_server_timing, spawn_logged_monitored_task, spawn_monitored_task};
24use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
25use prometheus::{
26    HistogramVec, IntCounter, IntCounterVec, Registry, register_histogram_vec_with_registry,
27    register_int_counter_vec_with_registry, register_int_counter_with_registry,
28    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
29};
30use rand::Rng;
31use sui_config::NodeConfig;
32use sui_protocol_config::Chain;
33use sui_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
34use sui_types::base_types::TransactionDigest;
35use sui_types::effects::TransactionEffectsAPI;
36use sui_types::error::{SuiError, SuiErrorKind, SuiResult};
37use sui_types::messages_grpc::{SubmitTxRequest, TxType};
38use sui_types::quorum_driver_types::{
39    EffectsFinalityInfo, ExecuteTransactionRequestType, ExecuteTransactionRequestV3,
40    ExecuteTransactionResponseV3, FinalizedEffects, IsTransactionExecutedLocally,
41    QuorumDriverEffectsQueueResult, QuorumDriverError, QuorumDriverResult,
42};
43use sui_types::sui_system_state::SuiSystemState;
44use sui_types::transaction::{Transaction, TransactionData, VerifiedTransaction};
45use sui_types::transaction_executor::{SimulateTransactionResult, TransactionChecks};
46use tokio::sync::broadcast::Receiver;
47use tokio::sync::broadcast::error::RecvError;
48use tokio::task::JoinHandle;
49use tokio::time::{Instant, sleep, timeout};
50use tracing::{Instrument, debug, error, error_span, info, instrument, warn};
51
52use crate::authority::AuthorityState;
53use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
54use crate::authority_aggregator::AuthorityAggregator;
55use crate::authority_client::{AuthorityAPI, NetworkAuthorityClient};
56use crate::quorum_driver::reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver};
57use crate::quorum_driver::{QuorumDriverHandler, QuorumDriverHandlerBuilder, QuorumDriverMetrics};
58use crate::transaction_driver::{
59    QuorumTransactionResponse, SubmitTransactionOptions, TransactionDriver, TransactionDriverError,
60    TransactionDriverMetrics, choose_transaction_driver_percentage,
61};
62
63// How long to wait for local execution (including parents) before a timeout
64// is returned to client.
65const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
66
67// Timeout for waiting for finality for each transaction.
68const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(90);
69
70pub type QuorumTransactionEffectsResult =
71    Result<(Transaction, QuorumTransactionResponse), (TransactionDigest, QuorumDriverError)>;
72pub struct TransactionOrchestrator<A: Clone> {
73    quorum_driver_handler: Arc<QuorumDriverHandler<A>>,
74    validator_state: Arc<AuthorityState>,
75    _local_executor_handle: JoinHandle<()>,
76    pending_tx_log: Arc<WritePathPendingTransactionLog>,
77    notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
78    metrics: Arc<TransactionOrchestratorMetrics>,
79    transaction_driver: Option<Arc<TransactionDriver<A>>>,
80    td_percentage: u8,
81    td_allowed_submission_list: Vec<String>,
82    td_blocked_submission_list: Vec<String>,
83    enable_early_validation: bool,
84}
85
86impl TransactionOrchestrator<NetworkAuthorityClient> {
87    pub fn new_with_auth_aggregator(
88        validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
89        validator_state: Arc<AuthorityState>,
90        reconfig_channel: Receiver<SuiSystemState>,
91        parent_path: &Path,
92        prometheus_registry: &Registry,
93        node_config: &NodeConfig,
94    ) -> Self {
95        let observer = OnsiteReconfigObserver::new(
96            reconfig_channel,
97            validator_state.get_object_cache_reader().clone(),
98            validator_state.clone_committee_store(),
99            validators.safe_client_metrics_base.clone(),
100            validators.metrics.deref().clone(),
101        );
102        TransactionOrchestrator::new(
103            validators,
104            validator_state,
105            parent_path,
106            prometheus_registry,
107            observer,
108            node_config,
109        )
110    }
111}
112
113impl<A> TransactionOrchestrator<A>
114where
115    A: AuthorityAPI + Send + Sync + 'static + Clone,
116    OnsiteReconfigObserver: ReconfigObserver<A>,
117{
118    pub fn new(
119        validators: Arc<AuthorityAggregator<A>>,
120        validator_state: Arc<AuthorityState>,
121        parent_path: &Path,
122        prometheus_registry: &Registry,
123        reconfig_observer: OnsiteReconfigObserver,
124        node_config: &NodeConfig,
125    ) -> Self {
126        let metrics = Arc::new(QuorumDriverMetrics::new(prometheus_registry));
127        let notifier = Arc::new(NotifyRead::new());
128        let reconfig_observer = Arc::new(reconfig_observer);
129        let quorum_driver_handler = Arc::new(
130            QuorumDriverHandlerBuilder::new(validators.clone(), metrics.clone())
131                .with_notifier(notifier.clone())
132                .with_reconfig_observer(reconfig_observer.clone())
133                .start(),
134        );
135
136        let effects_receiver = quorum_driver_handler.subscribe_to_effects();
137        let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
138        let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
139            parent_path.join("fullnode_pending_transactions"),
140        ));
141        let pending_tx_log_clone = pending_tx_log.clone();
142        let _local_executor_handle = {
143            spawn_monitored_task!(async move {
144                Self::loop_pending_transaction_log(effects_receiver, pending_tx_log_clone).await;
145            })
146        };
147        Self::schedule_txes_in_log(pending_tx_log.clone(), quorum_driver_handler.clone());
148
149        let epoch_store = validator_state.load_epoch_store_one_call_per_task();
150        let td_percentage = if !epoch_store.protocol_config().mysticeti_fastpath() {
151            0
152        } else {
153            choose_transaction_driver_percentage(Some(epoch_store.get_chain_identifier()))
154        };
155
156        let transaction_driver = if td_percentage > 0 {
157            let td_metrics = Arc::new(TransactionDriverMetrics::new(prometheus_registry));
158            let client_metrics = Arc::new(
159                crate::validator_client_monitor::ValidatorClientMetrics::new(prometheus_registry),
160            );
161            Some(TransactionDriver::new(
162                validators.clone(),
163                reconfig_observer.clone(),
164                td_metrics,
165                Some(node_config),
166                client_metrics,
167            ))
168        } else {
169            None
170        };
171
172        let td_allowed_submission_list = node_config
173            .transaction_driver_config
174            .as_ref()
175            .map(|config| config.allowed_submission_validators.clone())
176            .unwrap_or_default();
177
178        let td_blocked_submission_list = node_config
179            .transaction_driver_config
180            .as_ref()
181            .map(|config| config.blocked_submission_validators.clone())
182            .unwrap_or_default();
183
184        if !td_allowed_submission_list.is_empty() && !td_blocked_submission_list.is_empty() {
185            panic!(
186                "Both allowed and blocked submission lists are set, this is not allowed, {:?} {:?}",
187                td_allowed_submission_list, td_blocked_submission_list
188            );
189        }
190
191        let enable_early_validation = node_config
192            .transaction_driver_config
193            .as_ref()
194            .map(|config| config.enable_early_validation)
195            .unwrap_or(true);
196
197        Self {
198            quorum_driver_handler,
199            validator_state,
200            _local_executor_handle,
201            pending_tx_log,
202            notifier,
203            metrics,
204            transaction_driver,
205            td_percentage,
206            td_allowed_submission_list,
207            td_blocked_submission_list,
208            enable_early_validation,
209        }
210    }
211}
212
213impl<A> TransactionOrchestrator<A>
214where
215    A: AuthorityAPI + Send + Sync + 'static + Clone,
216{
217    #[instrument(name = "tx_orchestrator_execute_transaction", level = "debug", skip_all,
218    fields(
219        tx_digest = ?request.transaction.digest(),
220        tx_type = ?request_type,
221    ))]
222    pub async fn execute_transaction_block(
223        &self,
224        request: ExecuteTransactionRequestV3,
225        request_type: ExecuteTransactionRequestType,
226        client_addr: Option<SocketAddr>,
227    ) -> Result<(ExecuteTransactionResponseV3, IsTransactionExecutedLocally), QuorumDriverError>
228    {
229        let timer = Instant::now();
230        let tx_type = if request.transaction.is_consensus_tx() {
231            TxType::SharedObject
232        } else {
233            TxType::SingleWriter
234        };
235        let tx_digest = *request.transaction.digest();
236
237        let (response, mut executed_locally) = self
238            .execute_transaction_with_effects_waiting(request, client_addr)
239            .await?;
240
241        if !executed_locally {
242            executed_locally = if matches!(
243                request_type,
244                ExecuteTransactionRequestType::WaitForLocalExecution
245            ) {
246                let executed_locally = Self::wait_for_finalized_tx_executed_locally_with_timeout(
247                    &self.validator_state,
248                    tx_digest,
249                    tx_type,
250                    &self.metrics,
251                )
252                .await
253                .is_ok();
254                add_server_timing("local_execution done");
255                executed_locally
256            } else {
257                false
258            };
259        }
260
261        let QuorumTransactionResponse {
262            effects,
263            events,
264            input_objects,
265            output_objects,
266            auxiliary_data,
267        } = response;
268
269        let response = ExecuteTransactionResponseV3 {
270            effects,
271            events,
272            input_objects,
273            output_objects,
274            auxiliary_data,
275        };
276
277        self.metrics
278            .request_latency
279            .with_label_values(&[
280                tx_type.as_str(),
281                "execute_transaction_block",
282                executed_locally.to_string().as_str(),
283            ])
284            .observe(timer.elapsed().as_secs_f64());
285
286        Ok((response, executed_locally))
287    }
288
289    // Utilize the handle_certificate_v3 validator api to request input/output objects
290    #[instrument(name = "tx_orchestrator_execute_transaction_v3", level = "debug", skip_all,
291                 fields(tx_digest = ?request.transaction.digest()))]
292    pub async fn execute_transaction_v3(
293        &self,
294        request: ExecuteTransactionRequestV3,
295        client_addr: Option<SocketAddr>,
296    ) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
297        let timer = Instant::now();
298        let tx_type = if request.transaction.is_consensus_tx() {
299            TxType::SharedObject
300        } else {
301            TxType::SingleWriter
302        };
303
304        let (response, _) = self
305            .execute_transaction_with_effects_waiting(request, client_addr)
306            .await?;
307
308        self.metrics
309            .request_latency
310            .with_label_values(&[tx_type.as_str(), "execute_transaction_v3", "false"])
311            .observe(timer.elapsed().as_secs_f64());
312
313        let QuorumTransactionResponse {
314            effects,
315            events,
316            input_objects,
317            output_objects,
318            auxiliary_data,
319        } = response;
320
321        Ok(ExecuteTransactionResponseV3 {
322            effects,
323            events,
324            input_objects,
325            output_objects,
326            auxiliary_data,
327        })
328    }
329
330    /// Shared implementation for executing transactions with parallel local effects waiting
331    async fn execute_transaction_with_effects_waiting(
332        &self,
333        request: ExecuteTransactionRequestV3,
334        client_addr: Option<SocketAddr>,
335    ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), QuorumDriverError> {
336        let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
337        let verified_transaction = epoch_store
338            .verify_transaction(request.transaction.clone())
339            .map_err(QuorumDriverError::InvalidUserSignature)?;
340        let tx_digest = *verified_transaction.digest();
341
342        // Early validation check against local state before submission to catch non-retriable errors
343        // TODO: Consider moving this check to TransactionDriver for per-retry validation
344        if self.enable_early_validation
345            && let Err(e) = self
346                .validator_state
347                .check_transaction_validity(&epoch_store, &verified_transaction)
348        {
349            let error_category = e.categorize();
350            if !error_category.is_submission_retriable() {
351                // Skip early validation rejection if transaction has already been executed (allows retries to return cached results)
352                if !self.validator_state.is_tx_already_executed(&tx_digest) {
353                    self.metrics
354                        .early_validation_rejections
355                        .with_label_values(&[e.to_variant_name()])
356                        .inc();
357                    debug!(
358                        error = ?e,
359                        "Transaction rejected during early validation"
360                    );
361
362                    return Err(QuorumDriverError::TransactionFailed {
363                        category: error_category,
364                        details: e.to_string(),
365                    });
366                }
367            }
368        }
369
370        // Add transaction to WAL log.
371        let is_new_transaction = self
372            .pending_tx_log
373            .write_pending_transaction_maybe(&verified_transaction)
374            .await
375            .map_err(|e| {
376                warn!("QuorumDriverInternalError: {e:?}");
377                QuorumDriverError::QuorumDriverInternalError(e)
378            })?;
379        if is_new_transaction {
380            debug!("Added transaction to WAL log for TransactionDriver");
381        } else {
382            debug!("Transaction already in pending_tx_log");
383        }
384
385        let include_events = request.include_events;
386        let include_input_objects = request.include_input_objects;
387        let include_output_objects = request.include_output_objects;
388        let include_auxiliary_data = request.include_auxiliary_data;
389
390        // Track whether TD is being used for this transaction
391        let using_td = Arc::new(AtomicBool::new(false));
392
393        let finality_timeout = std::env::var("WAIT_FOR_FINALITY_TIMEOUT_SECS")
394            .ok()
395            .and_then(|v| v.parse().ok())
396            .map(Duration::from_secs)
397            .unwrap_or(WAIT_FOR_FINALITY_TIMEOUT);
398
399        let num_submissions = if !is_new_transaction {
400            // No need to submit when the transaction is already being processed.
401            0
402        } else if cfg!(msim) || in_antithesis() {
403            // Allow duplicated submissions in tests.
404            let r = rand::thread_rng().gen_range(1..=100);
405            let n = if r <= 10 {
406                3
407            } else if r <= 30 {
408                2
409            } else {
410                1
411            };
412            if n > 1 {
413                debug!("Making {n} execution calls");
414            }
415            n
416        } else {
417            1
418        };
419
420        // Wait for one of the execution futures to succeed, or all of them to fail.
421        let mut execution_futures = FuturesUnordered::new();
422        for i in 0..num_submissions {
423            // Generate jitter values outside the async block
424            let should_delay = i > 0 && rand::thread_rng().gen_bool(0.8);
425            let delay_ms = if should_delay {
426                rand::thread_rng().gen_range(100..=500)
427            } else {
428                0
429            };
430
431            let epoch_store = epoch_store.clone();
432            let request = request.clone();
433            let verified_transaction = verified_transaction.clone();
434            let using_td = using_td.clone();
435
436            let future = async move {
437                if delay_ms > 0 {
438                    // Add jitters to duplicated submissions.
439                    sleep(Duration::from_millis(delay_ms)).await;
440                }
441                self.execute_transaction_impl(
442                    &epoch_store,
443                    request,
444                    verified_transaction,
445                    client_addr,
446                    Some(finality_timeout),
447                    using_td,
448                )
449                .await
450            }
451            .boxed();
452            execution_futures.push(future);
453        }
454
455        // Track the last execution error.
456        let mut last_execution_error: Option<QuorumDriverError> = None;
457
458        // Wait for execution result outside of this call to become available.
459        let digests = [tx_digest];
460        let mut local_effects_future = epoch_store
461            .within_alive_epoch(
462                self.validator_state
463                    .get_transaction_cache_reader()
464                    .notify_read_executed_effects(
465                    "TransactionOrchestrator::notify_read_execute_transaction_with_effects_waiting",
466                    &digests,
467                ),
468            )
469            .boxed();
470
471        // Wait for execution timeout.
472        let mut timeout_future = tokio::time::sleep(finality_timeout).boxed();
473
474        let result = loop {
475            tokio::select! {
476                biased;
477
478                // Local effects might be available
479                local_effects_result = &mut local_effects_future => {
480                    match local_effects_result {
481                        Ok(effects) => {
482                            debug!(
483                                "Effects became available while execution was running"
484                            );
485                            if let Some(effects) = effects.into_iter().next() {
486                                self.metrics.concurrent_execution.inc();
487                                let epoch = effects.executed_epoch();
488                                let events = if include_events {
489                                    if effects.events_digest().is_some() {
490                                        Some(self.validator_state.get_transaction_events(effects.transaction_digest())
491                                            .map_err(QuorumDriverError::QuorumDriverInternalError)?)
492                                    } else {
493                                        None
494                                    }
495                                } else {
496                                    None
497                                };
498                                let input_objects = include_input_objects
499                                    .then(|| self.validator_state.get_transaction_input_objects(&effects))
500                                    .transpose()
501                                    .map_err(QuorumDriverError::QuorumDriverInternalError)?;
502                                let output_objects = include_output_objects
503                                    .then(|| self.validator_state.get_transaction_output_objects(&effects))
504                                    .transpose()
505                                    .map_err(QuorumDriverError::QuorumDriverInternalError)?;
506                                let response = QuorumTransactionResponse {
507                                    effects: FinalizedEffects {
508                                        effects,
509                                        finality_info: EffectsFinalityInfo::QuorumExecuted(epoch),
510                                    },
511                                    events,
512                                    input_objects,
513                                    output_objects,
514                                    auxiliary_data: None,
515                                };
516                                break Ok((response, true));
517                            }
518                        }
519                        Err(_) => {
520                            warn!("Epoch terminated before effects were available");
521                        }
522                    };
523
524                    // Prevent this branch from being selected again
525                    local_effects_future = futures::future::pending().boxed();
526                }
527
528                // This branch is disabled if execution_futures is empty.
529                Some(result) = execution_futures.next() => {
530                    match result {
531                        Ok(resp) => {
532                            // First success gets returned.
533                            debug!("Execution succeeded, returning response");
534                            let QuorumTransactionResponse {
535                                effects,
536                                events,
537                                input_objects,
538                                output_objects,
539                                auxiliary_data,
540                            } = resp;
541                            // Filter fields based on request flags.
542                            let resp = QuorumTransactionResponse {
543                                effects,
544                                events: if include_events { events } else { None },
545                                input_objects: if include_input_objects { input_objects } else { None },
546                                output_objects: if include_output_objects { output_objects } else { None },
547                                auxiliary_data: if include_auxiliary_data { auxiliary_data } else { None },
548                            };
549                            break Ok((resp, false));
550                        }
551                        Err(QuorumDriverError::PendingExecutionInTransactionOrchestrator) => {
552                            debug!(
553                                "Transaction is already being processed"
554                            );
555                            // Avoid overriding errors with transaction already being processed.
556                            if last_execution_error.is_none() {
557                                last_execution_error = Some(QuorumDriverError::PendingExecutionInTransactionOrchestrator);
558                            }
559                        }
560                        Err(e) => {
561                            debug!(?e, "Execution attempt failed, wait for other attempts");
562                            last_execution_error = Some(e);
563                        }
564                    };
565
566                    // Last error must have been recorded.
567                    if execution_futures.is_empty() {
568                        break Err(last_execution_error.unwrap());
569                    }
570                }
571
572                // A timeout has occurred while waiting for finality
573                _ = &mut timeout_future => {
574                    debug!("Timeout waiting for transaction finality.");
575                    self.metrics.wait_for_finality_timeout.inc();
576
577                    // Clean up transaction from WAL log only for TD submissions
578                    // For QD submissions, the cleanup happens in loop_pending_transaction_log
579                    if using_td.load(Ordering::Acquire) {
580                        debug!("Cleaning up TD transaction from WAL due to timeout");
581                        if let Err(err) = self.pending_tx_log.finish_transaction(&tx_digest) {
582                            warn!(
583                                "Failed to finish TD transaction in pending transaction log: {err}"
584                            );
585                        }
586                    }
587
588                    break Err(QuorumDriverError::TimeoutBeforeFinality);
589                }
590            }
591        };
592
593        // Clean up transaction from WAL log
594        if let Err(err) = self.pending_tx_log.finish_transaction(&tx_digest) {
595            warn!("Failed to finish transaction in pending transaction log: {err}");
596        }
597
598        result
599    }
600
601    #[instrument(level = "error", skip_all)]
602    async fn execute_transaction_impl(
603        &self,
604        epoch_store: &Arc<AuthorityPerEpochStore>,
605        request: ExecuteTransactionRequestV3,
606        verified_transaction: VerifiedTransaction,
607        client_addr: Option<SocketAddr>,
608        finality_timeout: Option<Duration>,
609        using_td: Arc<AtomicBool>,
610    ) -> Result<QuorumTransactionResponse, QuorumDriverError> {
611        let tx_digest = *verified_transaction.digest();
612        debug!("TO Received transaction execution request.");
613
614        let timer = Instant::now();
615        let tx_type = if verified_transaction.is_consensus_tx() {
616            TxType::SharedObject
617        } else {
618            TxType::SingleWriter
619        };
620
621        let (_in_flight_metrics_guards, good_response_metrics) =
622            self.update_metrics(&request.transaction);
623
624        // TODO: refactor all the gauge and timer metrics with `monitored_scope`
625        let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
626        wait_for_finality_gauge.inc();
627        let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
628            in_flight.dec();
629        });
630
631        // Select TransactionDriver or QuorumDriver for submission.
632        let (response, driver_type) = if self.transaction_driver.is_some()
633            && self.should_use_transaction_driver(epoch_store, tx_digest)
634        {
635            // Mark that we're using TD before submitting.
636            using_td.store(true, Ordering::Release);
637
638            (
639                self.submit_with_transaction_driver(
640                    self.transaction_driver.as_ref().unwrap(),
641                    &request,
642                    client_addr,
643                    &verified_transaction,
644                    good_response_metrics,
645                    finality_timeout,
646                )
647                .await?,
648                "transaction_driver",
649            )
650        } else {
651            // Submit transaction through QuorumDriver.
652            using_td.store(false, Ordering::Release);
653
654            let resp = self
655                .submit_with_quorum_driver(
656                    epoch_store.clone(),
657                    verified_transaction.clone(),
658                    request,
659                    client_addr,
660                )
661                .await
662                .map_err(|e| {
663                    warn!("QuorumDriverInternalError: {e:?}");
664                    QuorumDriverError::QuorumDriverInternalError(e)
665                })?
666                .await
667                .map_err(|e| {
668                    warn!("QuorumDriverInternalError: {e:?}");
669                    QuorumDriverError::QuorumDriverInternalError(e)
670                })??;
671
672            (
673                QuorumTransactionResponse {
674                    effects: FinalizedEffects::new_from_effects_cert(resp.effects_cert.into()),
675                    events: resp.events,
676                    input_objects: resp.input_objects,
677                    output_objects: resp.output_objects,
678                    auxiliary_data: resp.auxiliary_data,
679                },
680                "quorum_driver",
681            )
682        };
683
684        add_server_timing("wait_for_finality done");
685
686        self.metrics.wait_for_finality_finished.inc();
687
688        let elapsed = timer.elapsed().as_secs_f64();
689        self.metrics
690            .settlement_finality_latency
691            .with_label_values(&[tx_type.as_str(), driver_type])
692            .observe(elapsed);
693        good_response_metrics.inc();
694
695        Ok(response)
696    }
697
698    #[instrument(level = "error", skip_all, err(level = "info"))]
699    async fn submit_with_transaction_driver(
700        &self,
701        td: &Arc<TransactionDriver<A>>,
702        request: &ExecuteTransactionRequestV3,
703        client_addr: Option<SocketAddr>,
704        verified_transaction: &VerifiedTransaction,
705        good_response_metrics: &GenericCounter<AtomicU64>,
706        timeout_duration: Option<Duration>,
707    ) -> Result<QuorumTransactionResponse, QuorumDriverError> {
708        let tx_digest = *verified_transaction.digest();
709        debug!("Using TransactionDriver for transaction {:?}", tx_digest);
710
711        let td_response = td
712            .drive_transaction(
713                SubmitTxRequest::new_transaction(request.transaction.clone()),
714                SubmitTransactionOptions {
715                    forwarded_client_addr: client_addr,
716                    allowed_validators: self.td_allowed_submission_list.clone(),
717                    blocked_validators: self.td_blocked_submission_list.clone(),
718                },
719                timeout_duration,
720            )
721            .await
722            .map_err(|e| match e {
723                TransactionDriverError::TimeoutWithLastRetriableError {
724                    last_error,
725                    attempts,
726                    timeout,
727                } => QuorumDriverError::TimeoutBeforeFinalityWithErrors {
728                    last_error: last_error.map(|e| e.to_string()).unwrap_or_default(),
729                    attempts,
730                    timeout,
731                },
732                other => QuorumDriverError::TransactionFailed {
733                    category: other.categorize(),
734                    details: other.to_string(),
735                },
736            });
737
738        match td_response {
739            Err(e) => {
740                warn!("TransactionDriver error: {e:?}");
741                Err(e)
742            }
743            Ok(quorum_transaction_response) => {
744                good_response_metrics.inc();
745                Ok(quorum_transaction_response)
746            }
747        }
748    }
749
750    /// Submits the transaction to Quorum Driver for execution.
751    /// Returns an awaitable Future.
752    #[instrument(name = "tx_orchestrator_submit", level = "trace", skip_all)]
753    async fn submit_with_quorum_driver(
754        &self,
755        epoch_store: Arc<AuthorityPerEpochStore>,
756        transaction: VerifiedTransaction,
757        request: ExecuteTransactionRequestV3,
758        client_addr: Option<SocketAddr>,
759    ) -> SuiResult<impl Future<Output = SuiResult<QuorumDriverResult>> + '_> {
760        let tx_digest = *transaction.digest();
761
762        let ticket = self.notifier.register_one(&tx_digest);
763        self.quorum_driver()
764            .submit_transaction_no_ticket(request.clone(), client_addr)
765            .await?;
766
767        // It's possible that the transaction effects is already stored in DB at this point.
768        // So we also subscribe to that. If we hear from `effects_await` first, it means
769        // the ticket misses the previous notification, and we want to ask quorum driver
770        // to form a certificate for us again, to serve this request.
771        let cache_reader = self.validator_state.get_transaction_cache_reader().clone();
772        let qd = self.clone_quorum_driver();
773        Ok(async move {
774            let digests = [tx_digest];
775            let effects_await =
776                epoch_store.within_alive_epoch(cache_reader.notify_read_executed_effects(
777                    "TransactionOrchestrator::notify_read_submit_with_qd",
778                    &digests,
779                ));
780            // let-and-return necessary to satisfy borrow checker.
781            #[allow(clippy::let_and_return)]
782            let res = match select(ticket, effects_await.boxed()).await {
783                Either::Left((quorum_driver_response, _)) => Ok(quorum_driver_response),
784                Either::Right((_, unfinished_quorum_driver_task)) => {
785                    debug!("Effects are available in DB, use quorum driver to get a certificate");
786                    qd.submit_transaction_no_ticket(request, client_addr)
787                        .await?;
788                    Ok(unfinished_quorum_driver_task.await)
789                }
790            };
791            res
792        })
793    }
794
795    #[instrument(
796        name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout",
797        level = "debug",
798        skip_all,
799        err(level = "info")
800    )]
801    async fn wait_for_finalized_tx_executed_locally_with_timeout(
802        validator_state: &Arc<AuthorityState>,
803        tx_digest: TransactionDigest,
804        tx_type: TxType,
805        metrics: &TransactionOrchestratorMetrics,
806    ) -> SuiResult {
807        metrics.local_execution_in_flight.inc();
808        let _metrics_guard =
809            scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
810                in_flight.dec();
811            });
812
813        let _latency_guard = metrics
814            .local_execution_latency
815            .with_label_values(&[tx_type.as_str()])
816            .start_timer();
817        debug!("Waiting for finalized tx to be executed locally.");
818        match timeout(
819            LOCAL_EXECUTION_TIMEOUT,
820            validator_state
821                .get_transaction_cache_reader()
822                .notify_read_executed_effects_digests(
823                    "TransactionOrchestrator::notify_read_wait_for_local_execution",
824                    &[tx_digest],
825                ),
826        )
827        .instrument(error_span!(
828            "transaction_orchestrator::local_execution",
829            ?tx_digest
830        ))
831        .await
832        {
833            Err(_elapsed) => {
834                debug!(
835                    "Waiting for finalized tx to be executed locally timed out within {:?}.",
836                    LOCAL_EXECUTION_TIMEOUT
837                );
838                metrics.local_execution_timeout.inc();
839                Err(SuiErrorKind::TimeoutError.into())
840            }
841            Ok(_) => {
842                metrics.local_execution_success.inc();
843                Ok(())
844            }
845        }
846    }
847
848    fn should_use_transaction_driver(
849        &self,
850        epoch_store: &Arc<AuthorityPerEpochStore>,
851        tx_digest: TransactionDigest,
852    ) -> bool {
853        const MAX_PERCENTAGE: u8 = 100;
854        let unknown_network = epoch_store.get_chain() == Chain::Unknown;
855        let v = if unknown_network {
856            rand::thread_rng().gen_range(1..=MAX_PERCENTAGE)
857        } else {
858            let v = u32::from_le_bytes(tx_digest.inner()[..4].try_into().unwrap());
859            (v % (MAX_PERCENTAGE as u32) + 1) as u8
860        };
861        debug!(
862            "Choosing whether to use transaction driver: {} vs {}",
863            v, self.td_percentage
864        );
865        v <= self.td_percentage
866    }
867
868    // TODO: Potentially cleanup this function and pending transaction log.
869    async fn loop_pending_transaction_log(
870        mut effects_receiver: Receiver<QuorumDriverEffectsQueueResult>,
871        pending_transaction_log: Arc<WritePathPendingTransactionLog>,
872    ) {
873        loop {
874            match effects_receiver.recv().await {
875                Ok(Ok((transaction, ..))) => {
876                    let tx_digest = transaction.digest();
877                    if let Err(err) = pending_transaction_log.finish_transaction(tx_digest) {
878                        error!(
879                            ?tx_digest,
880                            "Failed to finish transaction in pending transaction log: {err}"
881                        );
882                    }
883                }
884                Ok(Err((tx_digest, _err))) => {
885                    if let Err(err) = pending_transaction_log.finish_transaction(&tx_digest) {
886                        error!(
887                            ?tx_digest,
888                            "Failed to finish transaction in pending transaction log: {err}"
889                        );
890                    }
891                }
892                Err(RecvError::Closed) => {
893                    error!("Sender of effects subscriber queue has been dropped!");
894                    return;
895                }
896                Err(RecvError::Lagged(skipped_count)) => {
897                    warn!("Skipped {skipped_count} transasctions in effects subscriber queue.");
898                }
899            }
900        }
901    }
902
903    pub fn quorum_driver(&self) -> &Arc<QuorumDriverHandler<A>> {
904        &self.quorum_driver_handler
905    }
906
907    pub fn clone_quorum_driver(&self) -> Arc<QuorumDriverHandler<A>> {
908        self.quorum_driver_handler.clone()
909    }
910
911    pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
912        self.quorum_driver().authority_aggregator().load_full()
913    }
914
915    fn update_metrics<'a>(
916        &'a self,
917        transaction: &Transaction,
918    ) -> (impl Drop + use<A>, &'a GenericCounter<AtomicU64>) {
919        let (in_flight, good_response) = if transaction.is_consensus_tx() {
920            self.metrics.total_req_received_shared_object.inc();
921            (
922                self.metrics.req_in_flight_shared_object.clone(),
923                &self.metrics.good_response_shared_object,
924            )
925        } else {
926            self.metrics.total_req_received_single_writer.inc();
927            (
928                self.metrics.req_in_flight_single_writer.clone(),
929                &self.metrics.good_response_single_writer,
930            )
931        };
932        in_flight.inc();
933        (
934            scopeguard::guard(in_flight, |in_flight| {
935                in_flight.dec();
936            }),
937            good_response,
938        )
939    }
940
941    fn schedule_txes_in_log(
942        pending_tx_log: Arc<WritePathPendingTransactionLog>,
943        quorum_driver: Arc<QuorumDriverHandler<A>>,
944    ) {
945        spawn_logged_monitored_task!(async move {
946            if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
947                info!("Skipping loading pending transactions from pending_tx_log.");
948                return;
949            }
950            let pending_txes = pending_tx_log
951                .load_all_pending_transactions()
952                .expect("failed to load all pending transactions");
953            info!(
954                "Recovering {} pending transactions from pending_tx_log.",
955                pending_txes.len()
956            );
957            for (i, tx) in pending_txes.into_iter().enumerate() {
958                // TODO: ideally pending_tx_log would not contain VerifiedTransaction, but that
959                // requires a migration.
960                let tx = tx.into_inner();
961                let tx_digest = *tx.digest();
962                // It's not impossible we fail to enqueue a task but that's not the end of world.
963                // TODO(william) correctly extract client_addr from logs
964                if let Err(err) = quorum_driver
965                    .submit_transaction_no_ticket(
966                        ExecuteTransactionRequestV3 {
967                            transaction: tx,
968                            include_events: true,
969                            include_input_objects: false,
970                            include_output_objects: false,
971                            include_auxiliary_data: false,
972                        },
973                        None,
974                    )
975                    .await
976                {
977                    warn!(
978                        ?tx_digest,
979                        "Failed to enqueue transaction from pending_tx_log, err: {err:?}"
980                    );
981                } else {
982                    debug!("Enqueued transaction from pending_tx_log");
983                    if (i + 1) % 1000 == 0 {
984                        info!("Enqueued {} transactions from pending_tx_log.", i + 1);
985                    }
986                }
987            }
988            // Transactions will be cleaned up in loop_execute_finalized_tx_locally() after they
989            // produce effects.
990        });
991    }
992
993    pub fn load_all_pending_transactions_in_test(&self) -> SuiResult<Vec<VerifiedTransaction>> {
994        self.pending_tx_log.load_all_pending_transactions()
995    }
996}
997/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
998#[derive(Clone)]
999pub struct TransactionOrchestratorMetrics {
1000    total_req_received_single_writer: GenericCounter<AtomicU64>,
1001    total_req_received_shared_object: GenericCounter<AtomicU64>,
1002
1003    good_response_single_writer: GenericCounter<AtomicU64>,
1004    good_response_shared_object: GenericCounter<AtomicU64>,
1005
1006    req_in_flight_single_writer: GenericGauge<AtomicI64>,
1007    req_in_flight_shared_object: GenericGauge<AtomicI64>,
1008
1009    wait_for_finality_in_flight: GenericGauge<AtomicI64>,
1010    wait_for_finality_finished: GenericCounter<AtomicU64>,
1011    wait_for_finality_timeout: GenericCounter<AtomicU64>,
1012
1013    local_execution_in_flight: GenericGauge<AtomicI64>,
1014    local_execution_success: GenericCounter<AtomicU64>,
1015    local_execution_timeout: GenericCounter<AtomicU64>,
1016
1017    concurrent_execution: IntCounter,
1018
1019    early_validation_rejections: IntCounterVec,
1020
1021    request_latency: HistogramVec,
1022    local_execution_latency: HistogramVec,
1023    settlement_finality_latency: HistogramVec,
1024}
1025
1026// Note that labeled-metrics are stored upfront individually
1027// to mitigate the perf hit by MetricsVec.
1028// See https://github.com/tikv/rust-prometheus/tree/master/static-metric
1029impl TransactionOrchestratorMetrics {
1030    pub fn new(registry: &Registry) -> Self {
1031        let total_req_received = register_int_counter_vec_with_registry!(
1032            "tx_orchestrator_total_req_received",
1033            "Total number of executions request Transaction Orchestrator receives, group by tx type",
1034            &["tx_type"],
1035            registry
1036        )
1037        .unwrap();
1038
1039        let total_req_received_single_writer =
1040            total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
1041        let total_req_received_shared_object =
1042            total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
1043
1044        let good_response = register_int_counter_vec_with_registry!(
1045            "tx_orchestrator_good_response",
1046            "Total number of good responses Transaction Orchestrator generates, group by tx type",
1047            &["tx_type"],
1048            registry
1049        )
1050        .unwrap();
1051
1052        let good_response_single_writer =
1053            good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
1054        let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
1055
1056        let req_in_flight = register_int_gauge_vec_with_registry!(
1057            "tx_orchestrator_req_in_flight",
1058            "Number of requests in flights Transaction Orchestrator processes, group by tx type",
1059            &["tx_type"],
1060            registry
1061        )
1062        .unwrap();
1063
1064        let req_in_flight_single_writer =
1065            req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
1066        let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
1067
1068        Self {
1069            total_req_received_single_writer,
1070            total_req_received_shared_object,
1071            good_response_single_writer,
1072            good_response_shared_object,
1073            req_in_flight_single_writer,
1074            req_in_flight_shared_object,
1075            wait_for_finality_in_flight: register_int_gauge_with_registry!(
1076                "tx_orchestrator_wait_for_finality_in_flight",
1077                "Number of in flight txns Transaction Orchestrator are waiting for finality for",
1078                registry,
1079            )
1080            .unwrap(),
1081            wait_for_finality_finished: register_int_counter_with_registry!(
1082                "tx_orchestrator_wait_for_finality_fnished",
1083                "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
1084                registry,
1085            )
1086            .unwrap(),
1087            wait_for_finality_timeout: register_int_counter_with_registry!(
1088                "tx_orchestrator_wait_for_finality_timeout",
1089                "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
1090                registry,
1091            )
1092            .unwrap(),
1093            local_execution_in_flight: register_int_gauge_with_registry!(
1094                "tx_orchestrator_local_execution_in_flight",
1095                "Number of local execution txns in flights Transaction Orchestrator handles",
1096                registry,
1097            )
1098            .unwrap(),
1099            local_execution_success: register_int_counter_with_registry!(
1100                "tx_orchestrator_local_execution_success",
1101                "Total number of successful local execution txns Transaction Orchestrator handles",
1102                registry,
1103            )
1104            .unwrap(),
1105            local_execution_timeout: register_int_counter_with_registry!(
1106                "tx_orchestrator_local_execution_timeout",
1107                "Total number of timed-out local execution txns Transaction Orchestrator handles",
1108                registry,
1109            )
1110            .unwrap(),
1111            concurrent_execution: register_int_counter_with_registry!(
1112                "tx_orchestrator_concurrent_execution",
1113                "Total number of concurrent execution where effects are available locally finishing driving the transaction to finality",
1114                registry,
1115            )
1116            .unwrap(),
1117            early_validation_rejections: register_int_counter_vec_with_registry!(
1118                "tx_orchestrator_early_validation_rejections",
1119                "Total number of transactions rejected during early validation before submission, by reason",
1120                &["reason"],
1121                registry,
1122            )
1123            .unwrap(),
1124            request_latency: register_histogram_vec_with_registry!(
1125                "tx_orchestrator_request_latency",
1126                "Time spent in processing one Transaction Orchestrator request",
1127                &["tx_type", "route", "wait_for_local_execution"],
1128                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1129                registry,
1130            )
1131            .unwrap(),
1132            local_execution_latency: register_histogram_vec_with_registry!(
1133                "tx_orchestrator_local_execution_latency",
1134                "Time spent in waiting for one Transaction Orchestrator gets locally executed",
1135                &["tx_type"],
1136                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1137                registry,
1138            )
1139            .unwrap(),
1140            settlement_finality_latency: register_histogram_vec_with_registry!(
1141                "tx_orchestrator_settlement_finality_latency",
1142                "Time spent in waiting for one Transaction Orchestrator gets settled and finalized",
1143                &["tx_type", "driver_type"],
1144                mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
1145                registry,
1146            )
1147            .unwrap(),
1148        }
1149    }
1150
1151    pub fn new_for_tests() -> Self {
1152        let registry = Registry::new();
1153        Self::new(&registry)
1154    }
1155}
1156
1157#[async_trait::async_trait]
1158impl<A> sui_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
1159where
1160    A: AuthorityAPI + Send + Sync + 'static + Clone,
1161{
1162    async fn execute_transaction(
1163        &self,
1164        request: ExecuteTransactionRequestV3,
1165        client_addr: Option<std::net::SocketAddr>,
1166    ) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
1167        self.execute_transaction_v3(request, client_addr).await
1168    }
1169
1170    fn simulate_transaction(
1171        &self,
1172        transaction: TransactionData,
1173        checks: TransactionChecks,
1174    ) -> Result<SimulateTransactionResult, SuiError> {
1175        self.validator_state
1176            .simulate_transaction(transaction, checks)
1177    }
1178}