sui_core/
transaction_orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4/*
5Transaction Orchestrator is a Node component that utilizes Quorum Driver to
6submit transactions to validators for finality, and proactively executes
7finalized transactions locally, when possible.
8*/
9
10use std::net::SocketAddr;
11use std::ops::Deref;
12use std::path::Path;
13use std::sync::Arc;
14use std::sync::atomic::{AtomicBool, Ordering};
15use std::time::Duration;
16
17use futures::FutureExt;
18use futures::future::{Either, Future, select};
19use futures::stream::{FuturesUnordered, StreamExt};
20use mysten_common::in_antithesis;
21use mysten_common::sync::notify_read::NotifyRead;
22use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX};
23use mysten_metrics::{add_server_timing, spawn_logged_monitored_task};
24use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
25use prometheus::{
26    HistogramVec, IntCounter, IntCounterVec, Registry, register_histogram_vec_with_registry,
27    register_int_counter_vec_with_registry, register_int_counter_with_registry,
28    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
29};
30use rand::Rng;
31use sui_config::NodeConfig;
32use sui_protocol_config::Chain;
33use sui_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
34use sui_types::base_types::TransactionDigest;
35use sui_types::effects::TransactionEffectsAPI;
36use sui_types::error::{SuiError, SuiErrorKind, SuiResult};
37use sui_types::messages_grpc::{SubmitTxRequest, TxType};
38use sui_types::quorum_driver_types::{
39    EffectsFinalityInfo, ExecuteTransactionRequestType, ExecuteTransactionRequestV3,
40    ExecuteTransactionResponseV3, FinalizedEffects, IsTransactionExecutedLocally,
41    QuorumDriverError, QuorumDriverResult,
42};
43use sui_types::sui_system_state::SuiSystemState;
44use sui_types::transaction::{Transaction, TransactionData, VerifiedTransaction};
45use sui_types::transaction_executor::{SimulateTransactionResult, TransactionChecks};
46use tokio::sync::broadcast::Receiver;
47use tokio::time::{Instant, sleep, timeout};
48use tracing::{Instrument, debug, error_span, info, instrument, warn};
49
50use crate::authority::AuthorityState;
51use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
52use crate::authority_aggregator::AuthorityAggregator;
53use crate::authority_client::{AuthorityAPI, NetworkAuthorityClient};
54use crate::quorum_driver::reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver};
55use crate::quorum_driver::{QuorumDriverHandler, QuorumDriverHandlerBuilder, QuorumDriverMetrics};
56use crate::transaction_driver::{
57    QuorumTransactionResponse, SubmitTransactionOptions, TransactionDriver, TransactionDriverError,
58    TransactionDriverMetrics, choose_transaction_driver_percentage,
59};
60
61// How long to wait for local execution (including parents) before a timeout
62// is returned to client.
63const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
64
65// Timeout for waiting for finality for each transaction.
66const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(90);
67
68pub type QuorumTransactionEffectsResult =
69    Result<(Transaction, QuorumTransactionResponse), (TransactionDigest, QuorumDriverError)>;
70pub struct TransactionOrchestrator<A: Clone> {
71    quorum_driver_handler: Arc<QuorumDriverHandler<A>>,
72    validator_state: Arc<AuthorityState>,
73    pending_tx_log: Arc<WritePathPendingTransactionLog>,
74    notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
75    metrics: Arc<TransactionOrchestratorMetrics>,
76    transaction_driver: Arc<TransactionDriver<A>>,
77    td_percentage: u8,
78    td_allowed_submission_list: Vec<String>,
79    td_blocked_submission_list: Vec<String>,
80    enable_early_validation: bool,
81}
82
83impl TransactionOrchestrator<NetworkAuthorityClient> {
84    pub fn new_with_auth_aggregator(
85        validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
86        validator_state: Arc<AuthorityState>,
87        reconfig_channel: Receiver<SuiSystemState>,
88        parent_path: &Path,
89        prometheus_registry: &Registry,
90        node_config: &NodeConfig,
91    ) -> Self {
92        let observer = OnsiteReconfigObserver::new(
93            reconfig_channel,
94            validator_state.get_object_cache_reader().clone(),
95            validator_state.clone_committee_store(),
96            validators.safe_client_metrics_base.clone(),
97            validators.metrics.deref().clone(),
98        );
99        TransactionOrchestrator::new(
100            validators,
101            validator_state,
102            parent_path,
103            prometheus_registry,
104            observer,
105            node_config,
106        )
107    }
108}
109
110impl<A> TransactionOrchestrator<A>
111where
112    A: AuthorityAPI + Send + Sync + 'static + Clone,
113    OnsiteReconfigObserver: ReconfigObserver<A>,
114{
115    pub fn new(
116        validators: Arc<AuthorityAggregator<A>>,
117        validator_state: Arc<AuthorityState>,
118        parent_path: &Path,
119        prometheus_registry: &Registry,
120        reconfig_observer: OnsiteReconfigObserver,
121        node_config: &NodeConfig,
122    ) -> Self {
123        let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
124
125        let qd_metrics = Arc::new(QuorumDriverMetrics::new(prometheus_registry));
126        let notifier = Arc::new(NotifyRead::new());
127        let reconfig_observer = Arc::new(reconfig_observer);
128        let quorum_driver_handler = Arc::new(
129            QuorumDriverHandlerBuilder::new(validators.clone(), qd_metrics.clone())
130                .with_notifier(notifier.clone())
131                .with_reconfig_observer(reconfig_observer.clone())
132                .start(),
133        );
134
135        let td_metrics = Arc::new(TransactionDriverMetrics::new(prometheus_registry));
136        let client_metrics = Arc::new(
137            crate::validator_client_monitor::ValidatorClientMetrics::new(prometheus_registry),
138        );
139        let transaction_driver = TransactionDriver::new(
140            validators.clone(),
141            reconfig_observer.clone(),
142            td_metrics,
143            Some(node_config),
144            client_metrics,
145        );
146
147        let epoch_store = validator_state.load_epoch_store_one_call_per_task();
148        let td_percentage = if !epoch_store.protocol_config().mysticeti_fastpath() {
149            0
150        } else {
151            choose_transaction_driver_percentage(Some(epoch_store.get_chain_identifier()))
152        };
153
154        let td_allowed_submission_list = node_config
155            .transaction_driver_config
156            .as_ref()
157            .map(|config| config.allowed_submission_validators.clone())
158            .unwrap_or_default();
159
160        let td_blocked_submission_list = node_config
161            .transaction_driver_config
162            .as_ref()
163            .map(|config| config.blocked_submission_validators.clone())
164            .unwrap_or_default();
165
166        if !td_allowed_submission_list.is_empty() && !td_blocked_submission_list.is_empty() {
167            panic!(
168                "Both allowed and blocked submission lists are set, this is not allowed, {:?} {:?}",
169                td_allowed_submission_list, td_blocked_submission_list
170            );
171        }
172
173        let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
174            parent_path.join("fullnode_pending_transactions"),
175        ));
176        Self::start_task_to_recover_txes_in_log(pending_tx_log.clone(), transaction_driver.clone());
177
178        let enable_early_validation = node_config
179            .transaction_driver_config
180            .as_ref()
181            .map(|config| config.enable_early_validation)
182            .unwrap_or(true);
183
184        Self {
185            quorum_driver_handler,
186            validator_state,
187            pending_tx_log,
188            notifier,
189            metrics,
190            transaction_driver,
191            td_percentage,
192            td_allowed_submission_list,
193            td_blocked_submission_list,
194            enable_early_validation,
195        }
196    }
197}
198
199impl<A> TransactionOrchestrator<A>
200where
201    A: AuthorityAPI + Send + Sync + 'static + Clone,
202{
203    #[instrument(name = "tx_orchestrator_execute_transaction", level = "debug", skip_all,
204    fields(
205        tx_digest = ?request.transaction.digest(),
206        tx_type = ?request_type,
207    ))]
208    pub async fn execute_transaction_block(
209        &self,
210        request: ExecuteTransactionRequestV3,
211        request_type: ExecuteTransactionRequestType,
212        client_addr: Option<SocketAddr>,
213    ) -> Result<(ExecuteTransactionResponseV3, IsTransactionExecutedLocally), QuorumDriverError>
214    {
215        let timer = Instant::now();
216        let tx_type = if request.transaction.is_consensus_tx() {
217            TxType::SharedObject
218        } else {
219            TxType::SingleWriter
220        };
221        let tx_digest = *request.transaction.digest();
222
223        let (response, mut executed_locally) = self
224            .execute_transaction_with_effects_waiting(request, client_addr)
225            .await?;
226
227        if !executed_locally {
228            executed_locally = if matches!(
229                request_type,
230                ExecuteTransactionRequestType::WaitForLocalExecution
231            ) {
232                let executed_locally = Self::wait_for_finalized_tx_executed_locally_with_timeout(
233                    &self.validator_state,
234                    tx_digest,
235                    tx_type,
236                    &self.metrics,
237                )
238                .await
239                .is_ok();
240                add_server_timing("local_execution done");
241                executed_locally
242            } else {
243                false
244            };
245        }
246
247        let QuorumTransactionResponse {
248            effects,
249            events,
250            input_objects,
251            output_objects,
252            auxiliary_data,
253        } = response;
254
255        let response = ExecuteTransactionResponseV3 {
256            effects,
257            events,
258            input_objects,
259            output_objects,
260            auxiliary_data,
261        };
262
263        self.metrics
264            .request_latency
265            .with_label_values(&[
266                tx_type.as_str(),
267                "execute_transaction_block",
268                executed_locally.to_string().as_str(),
269            ])
270            .observe(timer.elapsed().as_secs_f64());
271
272        Ok((response, executed_locally))
273    }
274
275    // Utilize the handle_certificate_v3 validator api to request input/output objects
276    #[instrument(name = "tx_orchestrator_execute_transaction_v3", level = "debug", skip_all,
277                 fields(tx_digest = ?request.transaction.digest()))]
278    pub async fn execute_transaction_v3(
279        &self,
280        request: ExecuteTransactionRequestV3,
281        client_addr: Option<SocketAddr>,
282    ) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
283        let timer = Instant::now();
284        let tx_type = if request.transaction.is_consensus_tx() {
285            TxType::SharedObject
286        } else {
287            TxType::SingleWriter
288        };
289
290        let (response, _) = self
291            .execute_transaction_with_effects_waiting(request, client_addr)
292            .await?;
293
294        self.metrics
295            .request_latency
296            .with_label_values(&[tx_type.as_str(), "execute_transaction_v3", "false"])
297            .observe(timer.elapsed().as_secs_f64());
298
299        let QuorumTransactionResponse {
300            effects,
301            events,
302            input_objects,
303            output_objects,
304            auxiliary_data,
305        } = response;
306
307        Ok(ExecuteTransactionResponseV3 {
308            effects,
309            events,
310            input_objects,
311            output_objects,
312            auxiliary_data,
313        })
314    }
315
316    /// Shared implementation for executing transactions with parallel local effects waiting
317    async fn execute_transaction_with_effects_waiting(
318        &self,
319        request: ExecuteTransactionRequestV3,
320        client_addr: Option<SocketAddr>,
321    ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), QuorumDriverError> {
322        let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
323        let verified_transaction = epoch_store
324            .verify_transaction(request.transaction.clone())
325            .map_err(QuorumDriverError::InvalidUserSignature)?;
326        let tx_digest = *verified_transaction.digest();
327
328        // Early validation check against local state before submission to catch non-retriable errors
329        // TODO: Consider moving this check to TransactionDriver for per-retry validation
330        if self.enable_early_validation
331            && let Err(e) = self
332                .validator_state
333                .check_transaction_validity(&epoch_store, &verified_transaction)
334        {
335            let error_category = e.categorize();
336            if !error_category.is_submission_retriable() {
337                // Skip early validation rejection if transaction has already been executed (allows retries to return cached results)
338                if !self.validator_state.is_tx_already_executed(&tx_digest) {
339                    self.metrics
340                        .early_validation_rejections
341                        .with_label_values(&[e.to_variant_name()])
342                        .inc();
343                    debug!(
344                        error = ?e,
345                        "Transaction rejected during early validation"
346                    );
347
348                    return Err(QuorumDriverError::TransactionFailed {
349                        category: error_category,
350                        details: e.to_string(),
351                    });
352                }
353            }
354        }
355
356        // Add transaction to WAL log.
357        let guard =
358            TransactionSubmissionGuard::new(self.pending_tx_log.clone(), &verified_transaction);
359        let is_new_transaction = guard.is_new_transaction();
360
361        let include_events = request.include_events;
362        let include_input_objects = request.include_input_objects;
363        let include_output_objects = request.include_output_objects;
364        let include_auxiliary_data = request.include_auxiliary_data;
365
366        // Track whether TD is being used for this transaction
367        let using_td = Arc::new(AtomicBool::new(false));
368
369        let finality_timeout = std::env::var("WAIT_FOR_FINALITY_TIMEOUT_SECS")
370            .ok()
371            .and_then(|v| v.parse().ok())
372            .map(Duration::from_secs)
373            .unwrap_or(WAIT_FOR_FINALITY_TIMEOUT);
374
375        let num_submissions = if !is_new_transaction {
376            // No need to submit when the transaction is already being processed.
377            0
378        } else if cfg!(msim) || in_antithesis() {
379            // Allow duplicated submissions in tests.
380            let r = rand::thread_rng().gen_range(1..=100);
381            let n = if r <= 10 {
382                3
383            } else if r <= 30 {
384                2
385            } else {
386                1
387            };
388            if n > 1 {
389                debug!("Making {n} execution calls");
390            }
391            n
392        } else {
393            1
394        };
395
396        // Wait for one of the execution futures to succeed, or all of them to fail.
397        let mut execution_futures = FuturesUnordered::new();
398        for i in 0..num_submissions {
399            // Generate jitter values outside the async block
400            let should_delay = i > 0 && rand::thread_rng().gen_bool(0.8);
401            let delay_ms = if should_delay {
402                rand::thread_rng().gen_range(100..=500)
403            } else {
404                0
405            };
406
407            let epoch_store = epoch_store.clone();
408            let request = request.clone();
409            let verified_transaction = verified_transaction.clone();
410            let using_td = using_td.clone();
411
412            let future = async move {
413                if delay_ms > 0 {
414                    // Add jitters to duplicated submissions.
415                    sleep(Duration::from_millis(delay_ms)).await;
416                }
417                self.execute_transaction_impl(
418                    &epoch_store,
419                    request,
420                    verified_transaction,
421                    client_addr,
422                    Some(finality_timeout),
423                    using_td,
424                )
425                .await
426            }
427            .boxed();
428            execution_futures.push(future);
429        }
430
431        // Track the last execution error.
432        let mut last_execution_error: Option<QuorumDriverError> = None;
433
434        // Wait for execution result outside of this call to become available.
435        let digests = [tx_digest];
436        let mut local_effects_future = epoch_store
437            .within_alive_epoch(
438                self.validator_state
439                    .get_transaction_cache_reader()
440                    .notify_read_executed_effects(
441                    "TransactionOrchestrator::notify_read_execute_transaction_with_effects_waiting",
442                    &digests,
443                ),
444            )
445            .boxed();
446
447        // Wait for execution timeout.
448        let mut timeout_future = tokio::time::sleep(finality_timeout).boxed();
449
450        loop {
451            tokio::select! {
452                biased;
453
454                // Local effects might be available
455                local_effects_result = &mut local_effects_future => {
456                    match local_effects_result {
457                        Ok(effects) => {
458                            debug!(
459                                "Effects became available while execution was running"
460                            );
461                            if let Some(effects) = effects.into_iter().next() {
462                                self.metrics.concurrent_execution.inc();
463                                let epoch = effects.executed_epoch();
464                                let events = if include_events {
465                                    if effects.events_digest().is_some() {
466                                        Some(self.validator_state.get_transaction_events(effects.transaction_digest())
467                                            .map_err(QuorumDriverError::QuorumDriverInternalError)?)
468                                    } else {
469                                        None
470                                    }
471                                } else {
472                                    None
473                                };
474                                let input_objects = include_input_objects
475                                    .then(|| self.validator_state.get_transaction_input_objects(&effects))
476                                    .transpose()
477                                    .map_err(QuorumDriverError::QuorumDriverInternalError)?;
478                                let output_objects = include_output_objects
479                                    .then(|| self.validator_state.get_transaction_output_objects(&effects))
480                                    .transpose()
481                                    .map_err(QuorumDriverError::QuorumDriverInternalError)?;
482                                let response = QuorumTransactionResponse {
483                                    effects: FinalizedEffects {
484                                        effects,
485                                        finality_info: EffectsFinalityInfo::QuorumExecuted(epoch),
486                                    },
487                                    events,
488                                    input_objects,
489                                    output_objects,
490                                    auxiliary_data: None,
491                                };
492                                break Ok((response, true));
493                            }
494                        }
495                        Err(_) => {
496                            warn!("Epoch terminated before effects were available");
497                        }
498                    };
499
500                    // Prevent this branch from being selected again
501                    local_effects_future = futures::future::pending().boxed();
502                }
503
504                // This branch is disabled if execution_futures is empty.
505                Some(result) = execution_futures.next() => {
506                    match result {
507                        Ok(resp) => {
508                            // First success gets returned.
509                            debug!("Execution succeeded, returning response");
510                            let QuorumTransactionResponse {
511                                effects,
512                                events,
513                                input_objects,
514                                output_objects,
515                                auxiliary_data,
516                            } = resp;
517                            // Filter fields based on request flags.
518                            let resp = QuorumTransactionResponse {
519                                effects,
520                                events: if include_events { events } else { None },
521                                input_objects: if include_input_objects { input_objects } else { None },
522                                output_objects: if include_output_objects { output_objects } else { None },
523                                auxiliary_data: if include_auxiliary_data { auxiliary_data } else { None },
524                            };
525                            break Ok((resp, false));
526                        }
527                        Err(QuorumDriverError::PendingExecutionInTransactionOrchestrator) => {
528                            debug!(
529                                "Transaction is already being processed"
530                            );
531                            // Avoid overriding errors with transaction already being processed.
532                            if last_execution_error.is_none() {
533                                last_execution_error = Some(QuorumDriverError::PendingExecutionInTransactionOrchestrator);
534                            }
535                        }
536                        Err(e) => {
537                            debug!(?e, "Execution attempt failed, wait for other attempts");
538                            last_execution_error = Some(e);
539                        }
540                    };
541
542                    // Last error must have been recorded.
543                    if execution_futures.is_empty() {
544                        break Err(last_execution_error.unwrap());
545                    }
546                }
547
548                // A timeout has occurred while waiting for finality
549                _ = &mut timeout_future => {
550                    debug!("Timeout waiting for transaction finality.");
551                    self.metrics.wait_for_finality_timeout.inc();
552
553                    break Err(QuorumDriverError::TimeoutBeforeFinality);
554                }
555            }
556        }
557    }
558
559    #[instrument(level = "error", skip_all)]
560    async fn execute_transaction_impl(
561        &self,
562        epoch_store: &Arc<AuthorityPerEpochStore>,
563        request: ExecuteTransactionRequestV3,
564        verified_transaction: VerifiedTransaction,
565        client_addr: Option<SocketAddr>,
566        finality_timeout: Option<Duration>,
567        using_td: Arc<AtomicBool>,
568    ) -> Result<QuorumTransactionResponse, QuorumDriverError> {
569        let tx_digest = *verified_transaction.digest();
570        debug!("TO Received transaction execution request.");
571
572        let timer = Instant::now();
573        let tx_type = if verified_transaction.is_consensus_tx() {
574            TxType::SharedObject
575        } else {
576            TxType::SingleWriter
577        };
578
579        let (_in_flight_metrics_guards, good_response_metrics) =
580            self.update_metrics(&request.transaction);
581
582        // TODO: refactor all the gauge and timer metrics with `monitored_scope`
583        let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
584        wait_for_finality_gauge.inc();
585        let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
586            in_flight.dec();
587        });
588
589        // Select TransactionDriver or QuorumDriver for submission.
590        let (response, driver_type) = if self.should_use_transaction_driver(epoch_store, tx_digest)
591        {
592            // Mark that we're using TD before submitting.
593            using_td.store(true, Ordering::Release);
594
595            (
596                self.submit_with_transaction_driver(
597                    &self.transaction_driver,
598                    &request,
599                    client_addr,
600                    &verified_transaction,
601                    good_response_metrics,
602                    finality_timeout,
603                )
604                .await?,
605                "transaction_driver",
606            )
607        } else {
608            // Submit transaction through QuorumDriver.
609            using_td.store(false, Ordering::Release);
610
611            let resp = self
612                .submit_with_quorum_driver(
613                    epoch_store.clone(),
614                    verified_transaction.clone(),
615                    request,
616                    client_addr,
617                )
618                .await
619                .map_err(|e| {
620                    warn!("QuorumDriverInternalError: {e:?}");
621                    QuorumDriverError::QuorumDriverInternalError(e)
622                })?
623                .await
624                .map_err(|e| {
625                    warn!("QuorumDriverInternalError: {e:?}");
626                    QuorumDriverError::QuorumDriverInternalError(e)
627                })??;
628
629            (
630                QuorumTransactionResponse {
631                    effects: FinalizedEffects::new_from_effects_cert(resp.effects_cert.into()),
632                    events: resp.events,
633                    input_objects: resp.input_objects,
634                    output_objects: resp.output_objects,
635                    auxiliary_data: resp.auxiliary_data,
636                },
637                "quorum_driver",
638            )
639        };
640
641        add_server_timing("wait_for_finality done");
642
643        self.metrics.wait_for_finality_finished.inc();
644
645        let elapsed = timer.elapsed().as_secs_f64();
646        self.metrics
647            .settlement_finality_latency
648            .with_label_values(&[tx_type.as_str(), driver_type])
649            .observe(elapsed);
650        good_response_metrics.inc();
651
652        Ok(response)
653    }
654
655    #[instrument(level = "error", skip_all, err(level = "info"))]
656    async fn submit_with_transaction_driver(
657        &self,
658        td: &Arc<TransactionDriver<A>>,
659        request: &ExecuteTransactionRequestV3,
660        client_addr: Option<SocketAddr>,
661        verified_transaction: &VerifiedTransaction,
662        good_response_metrics: &GenericCounter<AtomicU64>,
663        timeout_duration: Option<Duration>,
664    ) -> Result<QuorumTransactionResponse, QuorumDriverError> {
665        let tx_digest = *verified_transaction.digest();
666        debug!("Using TransactionDriver for transaction {:?}", tx_digest);
667
668        let td_response = td
669            .drive_transaction(
670                SubmitTxRequest::new_transaction(request.transaction.clone()),
671                SubmitTransactionOptions {
672                    forwarded_client_addr: client_addr,
673                    allowed_validators: self.td_allowed_submission_list.clone(),
674                    blocked_validators: self.td_blocked_submission_list.clone(),
675                },
676                timeout_duration,
677            )
678            .await
679            .map_err(|e| match e {
680                TransactionDriverError::TimeoutWithLastRetriableError {
681                    last_error,
682                    attempts,
683                    timeout,
684                } => QuorumDriverError::TimeoutBeforeFinalityWithErrors {
685                    last_error: last_error.map(|e| e.to_string()).unwrap_or_default(),
686                    attempts,
687                    timeout,
688                },
689                other => QuorumDriverError::TransactionFailed {
690                    category: other.categorize(),
691                    details: other.to_string(),
692                },
693            });
694
695        match td_response {
696            Err(e) => {
697                warn!("TransactionDriver error: {e:?}");
698                Err(e)
699            }
700            Ok(quorum_transaction_response) => {
701                good_response_metrics.inc();
702                Ok(quorum_transaction_response)
703            }
704        }
705    }
706
707    /// Submits the transaction to Quorum Driver for execution.
708    /// Returns an awaitable Future.
709    #[instrument(name = "tx_orchestrator_submit", level = "trace", skip_all)]
710    async fn submit_with_quorum_driver(
711        &self,
712        epoch_store: Arc<AuthorityPerEpochStore>,
713        transaction: VerifiedTransaction,
714        request: ExecuteTransactionRequestV3,
715        client_addr: Option<SocketAddr>,
716    ) -> SuiResult<impl Future<Output = SuiResult<QuorumDriverResult>> + '_> {
717        let tx_digest = *transaction.digest();
718
719        let ticket = self.notifier.register_one(&tx_digest);
720        self.quorum_driver()
721            .submit_transaction_no_ticket(request.clone(), client_addr)
722            .await?;
723
724        // It's possible that the transaction effects is already stored in DB at this point.
725        // So we also subscribe to that. If we hear from `effects_await` first, it means
726        // the ticket misses the previous notification, and we want to ask quorum driver
727        // to form a certificate for us again, to serve this request.
728        let cache_reader = self.validator_state.get_transaction_cache_reader().clone();
729        let qd = self.clone_quorum_driver();
730        Ok(async move {
731            let digests = [tx_digest];
732            let effects_await =
733                epoch_store.within_alive_epoch(cache_reader.notify_read_executed_effects(
734                    "TransactionOrchestrator::notify_read_submit_with_qd",
735                    &digests,
736                ));
737            // let-and-return necessary to satisfy borrow checker.
738            #[allow(clippy::let_and_return)]
739            let res = match select(ticket, effects_await.boxed()).await {
740                Either::Left((quorum_driver_response, _)) => Ok(quorum_driver_response),
741                Either::Right((_, unfinished_quorum_driver_task)) => {
742                    debug!("Effects are available in DB, use quorum driver to get a certificate");
743                    qd.submit_transaction_no_ticket(request, client_addr)
744                        .await?;
745                    Ok(unfinished_quorum_driver_task.await)
746                }
747            };
748            res
749        })
750    }
751
752    #[instrument(
753        name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout",
754        level = "debug",
755        skip_all,
756        err(level = "info")
757    )]
758    async fn wait_for_finalized_tx_executed_locally_with_timeout(
759        validator_state: &Arc<AuthorityState>,
760        tx_digest: TransactionDigest,
761        tx_type: TxType,
762        metrics: &TransactionOrchestratorMetrics,
763    ) -> SuiResult {
764        metrics.local_execution_in_flight.inc();
765        let _metrics_guard =
766            scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
767                in_flight.dec();
768            });
769
770        let _latency_guard = metrics
771            .local_execution_latency
772            .with_label_values(&[tx_type.as_str()])
773            .start_timer();
774        debug!("Waiting for finalized tx to be executed locally.");
775        match timeout(
776            LOCAL_EXECUTION_TIMEOUT,
777            validator_state
778                .get_transaction_cache_reader()
779                .notify_read_executed_effects_digests(
780                    "TransactionOrchestrator::notify_read_wait_for_local_execution",
781                    &[tx_digest],
782                ),
783        )
784        .instrument(error_span!(
785            "transaction_orchestrator::local_execution",
786            ?tx_digest
787        ))
788        .await
789        {
790            Err(_elapsed) => {
791                debug!(
792                    "Waiting for finalized tx to be executed locally timed out within {:?}.",
793                    LOCAL_EXECUTION_TIMEOUT
794                );
795                metrics.local_execution_timeout.inc();
796                Err(SuiErrorKind::TimeoutError.into())
797            }
798            Ok(_) => {
799                metrics.local_execution_success.inc();
800                Ok(())
801            }
802        }
803    }
804
805    fn should_use_transaction_driver(
806        &self,
807        epoch_store: &Arc<AuthorityPerEpochStore>,
808        tx_digest: TransactionDigest,
809    ) -> bool {
810        const MAX_PERCENTAGE: u8 = 100;
811        let unknown_network = epoch_store.get_chain() == Chain::Unknown;
812        let v = if unknown_network {
813            rand::thread_rng().gen_range(1..=MAX_PERCENTAGE)
814        } else {
815            let v = u32::from_le_bytes(tx_digest.inner()[..4].try_into().unwrap());
816            (v % (MAX_PERCENTAGE as u32) + 1) as u8
817        };
818        debug!(
819            "Choosing whether to use transaction driver: {} vs {}",
820            v, self.td_percentage
821        );
822        v <= self.td_percentage
823    }
824
825    pub fn authority_state(&self) -> &Arc<AuthorityState> {
826        &self.validator_state
827    }
828
829    pub fn transaction_driver(&self) -> &Arc<TransactionDriver<A>> {
830        &self.transaction_driver
831    }
832
833    fn quorum_driver(&self) -> &Arc<QuorumDriverHandler<A>> {
834        &self.quorum_driver_handler
835    }
836
837    fn clone_quorum_driver(&self) -> Arc<QuorumDriverHandler<A>> {
838        self.quorum_driver_handler.clone()
839    }
840
841    pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
842        self.transaction_driver.authority_aggregator().load_full()
843    }
844
845    fn update_metrics<'a>(
846        &'a self,
847        transaction: &Transaction,
848    ) -> (impl Drop + use<A>, &'a GenericCounter<AtomicU64>) {
849        let (in_flight, good_response) = if transaction.is_consensus_tx() {
850            self.metrics.total_req_received_shared_object.inc();
851            (
852                self.metrics.req_in_flight_shared_object.clone(),
853                &self.metrics.good_response_shared_object,
854            )
855        } else {
856            self.metrics.total_req_received_single_writer.inc();
857            (
858                self.metrics.req_in_flight_single_writer.clone(),
859                &self.metrics.good_response_single_writer,
860            )
861        };
862        in_flight.inc();
863        (
864            scopeguard::guard(in_flight, |in_flight| {
865                in_flight.dec();
866            }),
867            good_response,
868        )
869    }
870
871    fn start_task_to_recover_txes_in_log(
872        pending_tx_log: Arc<WritePathPendingTransactionLog>,
873        transaction_driver: Arc<TransactionDriver<A>>,
874    ) {
875        spawn_logged_monitored_task!(async move {
876            if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
877                info!("Skipping loading pending transactions from pending_tx_log.");
878                return;
879            }
880            let pending_txes = pending_tx_log
881                .load_all_pending_transactions()
882                .expect("failed to load all pending transactions");
883            let num_pending_txes = pending_txes.len();
884            info!(
885                "Recovering {} pending transactions from pending_tx_log.",
886                num_pending_txes
887            );
888            let mut recovery = pending_txes
889                .into_iter()
890                .map(|tx| {
891                    let pending_tx_log = pending_tx_log.clone();
892                    let transaction_driver = transaction_driver.clone();
893                    async move {
894                        // TODO: ideally pending_tx_log would not contain VerifiedTransaction, but that
895                        // requires a migration.
896                        let tx = tx.into_inner();
897                        let tx_digest = *tx.digest();
898                        // It's not impossible we fail to enqueue a task but that's not the end of world.
899                        // TODO(william) correctly extract client_addr from logs
900                        if let Err(err) = transaction_driver
901                            .drive_transaction(
902                                SubmitTxRequest::new_transaction(tx),
903                                SubmitTransactionOptions::default(),
904                                Some(Duration::from_secs(60)),
905                            )
906                            .await
907                        {
908                            warn!(?tx_digest, "Failed to execute recovered transaction: {err}");
909                        } else {
910                            debug!(?tx_digest, "Executed recovered transaction");
911                        }
912                        if let Err(err) = pending_tx_log.finish_transaction(&tx_digest) {
913                            warn!(
914                                ?tx_digest,
915                                "Failed to clean up transaction in pending log: {err}"
916                            );
917                        } else {
918                            debug!(?tx_digest, "Cleaned up transaction in pending log");
919                        }
920                    }
921                })
922                .collect::<FuturesUnordered<_>>();
923
924            let mut num_recovered = 0;
925            while recovery.next().await.is_some() {
926                num_recovered += 1;
927                if num_recovered % 1000 == 0 {
928                    info!(
929                        "Recovered {} out of {} transactions from pending_tx_log.",
930                        num_recovered, num_pending_txes
931                    );
932                }
933            }
934            info!(
935                "Recovery finished. Recovered {} out of {} transactions from pending_tx_log.",
936                num_recovered, num_pending_txes
937            );
938        });
939    }
940
941    pub fn load_all_pending_transactions_in_test(&self) -> SuiResult<Vec<VerifiedTransaction>> {
942        self.pending_tx_log.load_all_pending_transactions()
943    }
944
945    pub fn empty_pending_tx_log_in_test(&self) -> bool {
946        self.pending_tx_log.is_empty()
947    }
948}
949/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
950#[derive(Clone)]
951pub struct TransactionOrchestratorMetrics {
952    total_req_received_single_writer: GenericCounter<AtomicU64>,
953    total_req_received_shared_object: GenericCounter<AtomicU64>,
954
955    good_response_single_writer: GenericCounter<AtomicU64>,
956    good_response_shared_object: GenericCounter<AtomicU64>,
957
958    req_in_flight_single_writer: GenericGauge<AtomicI64>,
959    req_in_flight_shared_object: GenericGauge<AtomicI64>,
960
961    wait_for_finality_in_flight: GenericGauge<AtomicI64>,
962    wait_for_finality_finished: GenericCounter<AtomicU64>,
963    wait_for_finality_timeout: GenericCounter<AtomicU64>,
964
965    local_execution_in_flight: GenericGauge<AtomicI64>,
966    local_execution_success: GenericCounter<AtomicU64>,
967    local_execution_timeout: GenericCounter<AtomicU64>,
968
969    concurrent_execution: IntCounter,
970
971    early_validation_rejections: IntCounterVec,
972
973    request_latency: HistogramVec,
974    local_execution_latency: HistogramVec,
975    settlement_finality_latency: HistogramVec,
976}
977
978// Note that labeled-metrics are stored upfront individually
979// to mitigate the perf hit by MetricsVec.
980// See https://github.com/tikv/rust-prometheus/tree/master/static-metric
981impl TransactionOrchestratorMetrics {
982    pub fn new(registry: &Registry) -> Self {
983        let total_req_received = register_int_counter_vec_with_registry!(
984            "tx_orchestrator_total_req_received",
985            "Total number of executions request Transaction Orchestrator receives, group by tx type",
986            &["tx_type"],
987            registry
988        )
989        .unwrap();
990
991        let total_req_received_single_writer =
992            total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
993        let total_req_received_shared_object =
994            total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
995
996        let good_response = register_int_counter_vec_with_registry!(
997            "tx_orchestrator_good_response",
998            "Total number of good responses Transaction Orchestrator generates, group by tx type",
999            &["tx_type"],
1000            registry
1001        )
1002        .unwrap();
1003
1004        let good_response_single_writer =
1005            good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
1006        let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
1007
1008        let req_in_flight = register_int_gauge_vec_with_registry!(
1009            "tx_orchestrator_req_in_flight",
1010            "Number of requests in flights Transaction Orchestrator processes, group by tx type",
1011            &["tx_type"],
1012            registry
1013        )
1014        .unwrap();
1015
1016        let req_in_flight_single_writer =
1017            req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
1018        let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
1019
1020        Self {
1021            total_req_received_single_writer,
1022            total_req_received_shared_object,
1023            good_response_single_writer,
1024            good_response_shared_object,
1025            req_in_flight_single_writer,
1026            req_in_flight_shared_object,
1027            wait_for_finality_in_flight: register_int_gauge_with_registry!(
1028                "tx_orchestrator_wait_for_finality_in_flight",
1029                "Number of in flight txns Transaction Orchestrator are waiting for finality for",
1030                registry,
1031            )
1032            .unwrap(),
1033            wait_for_finality_finished: register_int_counter_with_registry!(
1034                "tx_orchestrator_wait_for_finality_fnished",
1035                "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
1036                registry,
1037            )
1038            .unwrap(),
1039            wait_for_finality_timeout: register_int_counter_with_registry!(
1040                "tx_orchestrator_wait_for_finality_timeout",
1041                "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
1042                registry,
1043            )
1044            .unwrap(),
1045            local_execution_in_flight: register_int_gauge_with_registry!(
1046                "tx_orchestrator_local_execution_in_flight",
1047                "Number of local execution txns in flights Transaction Orchestrator handles",
1048                registry,
1049            )
1050            .unwrap(),
1051            local_execution_success: register_int_counter_with_registry!(
1052                "tx_orchestrator_local_execution_success",
1053                "Total number of successful local execution txns Transaction Orchestrator handles",
1054                registry,
1055            )
1056            .unwrap(),
1057            local_execution_timeout: register_int_counter_with_registry!(
1058                "tx_orchestrator_local_execution_timeout",
1059                "Total number of timed-out local execution txns Transaction Orchestrator handles",
1060                registry,
1061            )
1062            .unwrap(),
1063            concurrent_execution: register_int_counter_with_registry!(
1064                "tx_orchestrator_concurrent_execution",
1065                "Total number of concurrent execution where effects are available locally finishing driving the transaction to finality",
1066                registry,
1067            )
1068            .unwrap(),
1069            early_validation_rejections: register_int_counter_vec_with_registry!(
1070                "tx_orchestrator_early_validation_rejections",
1071                "Total number of transactions rejected during early validation before submission, by reason",
1072                &["reason"],
1073                registry,
1074            )
1075            .unwrap(),
1076            request_latency: register_histogram_vec_with_registry!(
1077                "tx_orchestrator_request_latency",
1078                "Time spent in processing one Transaction Orchestrator request",
1079                &["tx_type", "route", "wait_for_local_execution"],
1080                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1081                registry,
1082            )
1083            .unwrap(),
1084            local_execution_latency: register_histogram_vec_with_registry!(
1085                "tx_orchestrator_local_execution_latency",
1086                "Time spent in waiting for one Transaction Orchestrator gets locally executed",
1087                &["tx_type"],
1088                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1089                registry,
1090            )
1091            .unwrap(),
1092            settlement_finality_latency: register_histogram_vec_with_registry!(
1093                "tx_orchestrator_settlement_finality_latency",
1094                "Time spent in waiting for one Transaction Orchestrator gets settled and finalized",
1095                &["tx_type", "driver_type"],
1096                mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
1097                registry,
1098            )
1099            .unwrap(),
1100        }
1101    }
1102
1103    pub fn new_for_tests() -> Self {
1104        let registry = Registry::new();
1105        Self::new(&registry)
1106    }
1107}
1108
1109#[async_trait::async_trait]
1110impl<A> sui_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
1111where
1112    A: AuthorityAPI + Send + Sync + 'static + Clone,
1113{
1114    async fn execute_transaction(
1115        &self,
1116        request: ExecuteTransactionRequestV3,
1117        client_addr: Option<std::net::SocketAddr>,
1118    ) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
1119        self.execute_transaction_v3(request, client_addr).await
1120    }
1121
1122    fn simulate_transaction(
1123        &self,
1124        transaction: TransactionData,
1125        checks: TransactionChecks,
1126    ) -> Result<SimulateTransactionResult, SuiError> {
1127        self.validator_state
1128            .simulate_transaction(transaction, checks)
1129    }
1130}
1131
1132/// Keeps track of inflight transactions being submitted, and helps recover transactions
1133/// on restart.
1134struct TransactionSubmissionGuard {
1135    pending_tx_log: Arc<WritePathPendingTransactionLog>,
1136    tx_digest: TransactionDigest,
1137    is_new_transaction: bool,
1138}
1139
1140impl TransactionSubmissionGuard {
1141    pub fn new(
1142        pending_tx_log: Arc<WritePathPendingTransactionLog>,
1143        tx: &VerifiedTransaction,
1144    ) -> Self {
1145        let is_new_transaction = pending_tx_log.write_pending_transaction_maybe(tx);
1146        let tx_digest = *tx.digest();
1147        if is_new_transaction {
1148            debug!(?tx_digest, "Added transaction to inflight set");
1149        } else {
1150            debug!(
1151                ?tx_digest,
1152                "Transaction already being processed, no new submission will be made"
1153            );
1154        };
1155        Self {
1156            pending_tx_log,
1157            tx_digest,
1158            is_new_transaction,
1159        }
1160    }
1161
1162    fn is_new_transaction(&self) -> bool {
1163        self.is_new_transaction
1164    }
1165}
1166
1167impl Drop for TransactionSubmissionGuard {
1168    fn drop(&mut self) {
1169        if let Err(err) = self.pending_tx_log.finish_transaction(&self.tx_digest) {
1170            warn!(?self.tx_digest, "Failed to clean up transaction in pending log: {err}");
1171        } else {
1172            debug!(?self.tx_digest, "Cleaned up transaction in pending log");
1173        }
1174    }
1175}