sui_core/
transaction_orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::net::SocketAddr;
5use std::ops::Deref;
6use std::path::Path;
7use std::sync::Arc;
8use std::time::Duration;
9
10use futures::FutureExt;
11use futures::stream::{FuturesUnordered, StreamExt};
12use mysten_common::in_antithesis;
13use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX};
14use mysten_metrics::{add_server_timing, spawn_logged_monitored_task};
15use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
16use prometheus::{
17    HistogramVec, IntCounter, IntCounterVec, Registry, register_histogram_vec_with_registry,
18    register_int_counter_vec_with_registry, register_int_counter_with_registry,
19    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
20};
21use rand::Rng;
22use sui_config::NodeConfig;
23use sui_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
24use sui_types::base_types::TransactionDigest;
25use sui_types::effects::TransactionEffectsAPI;
26use sui_types::error::{SuiError, SuiErrorKind, SuiResult};
27use sui_types::messages_grpc::{SubmitTxRequest, TxType};
28use sui_types::quorum_driver_types::{
29    EffectsFinalityInfo, ExecuteTransactionRequestType, ExecuteTransactionRequestV3,
30    ExecuteTransactionResponseV3, FinalizedEffects, IsTransactionExecutedLocally,
31    QuorumDriverError,
32};
33use sui_types::sui_system_state::SuiSystemState;
34use sui_types::transaction::{Transaction, TransactionData, VerifiedTransaction};
35use sui_types::transaction_executor::{SimulateTransactionResult, TransactionChecks};
36use tokio::sync::broadcast::Receiver;
37use tokio::time::{Instant, sleep, timeout};
38use tracing::{Instrument, debug, error_span, info, instrument, warn};
39
40use crate::authority::AuthorityState;
41use crate::authority_aggregator::AuthorityAggregator;
42use crate::authority_client::{AuthorityAPI, NetworkAuthorityClient};
43use crate::quorum_driver::reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver};
44use crate::transaction_driver::{
45    QuorumTransactionResponse, SubmitTransactionOptions, TransactionDriver, TransactionDriverError,
46    TransactionDriverMetrics,
47};
48
49// How long to wait for local execution (including parents) before a timeout
50// is returned to client.
51const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
52
53// Timeout for waiting for finality for each transaction.
54const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(90);
55
56pub type QuorumTransactionEffectsResult =
57    Result<(Transaction, QuorumTransactionResponse), (TransactionDigest, QuorumDriverError)>;
58
59/// Transaction Orchestrator is a Node component that utilizes Transaction Driver to
60/// submit transactions to validators for finality. It adds inflight deduplication,
61/// waiting for local execution, recovery, early validation, and epoch change handling
62/// on top of Transaction Driver.
63/// This is used by node RPC service to support transaction submission and finality waiting.
64pub struct TransactionOrchestrator<A: Clone> {
65    validator_state: Arc<AuthorityState>,
66    pending_tx_log: Arc<WritePathPendingTransactionLog>,
67    metrics: Arc<TransactionOrchestratorMetrics>,
68    transaction_driver: Arc<TransactionDriver<A>>,
69    td_allowed_submission_list: Vec<String>,
70    td_blocked_submission_list: Vec<String>,
71    enable_early_validation: bool,
72}
73
74impl TransactionOrchestrator<NetworkAuthorityClient> {
75    pub fn new_with_auth_aggregator(
76        validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
77        validator_state: Arc<AuthorityState>,
78        reconfig_channel: Receiver<SuiSystemState>,
79        parent_path: &Path,
80        prometheus_registry: &Registry,
81        node_config: &NodeConfig,
82    ) -> Self {
83        let observer = OnsiteReconfigObserver::new(
84            reconfig_channel,
85            validator_state.get_object_cache_reader().clone(),
86            validator_state.clone_committee_store(),
87            validators.safe_client_metrics_base.clone(),
88            validators.metrics.deref().clone(),
89        );
90        TransactionOrchestrator::new(
91            validators,
92            validator_state,
93            parent_path,
94            prometheus_registry,
95            observer,
96            node_config,
97        )
98    }
99}
100
101impl<A> TransactionOrchestrator<A>
102where
103    A: AuthorityAPI + Send + Sync + 'static + Clone,
104    OnsiteReconfigObserver: ReconfigObserver<A>,
105{
106    pub fn new(
107        validators: Arc<AuthorityAggregator<A>>,
108        validator_state: Arc<AuthorityState>,
109        parent_path: &Path,
110        prometheus_registry: &Registry,
111        reconfig_observer: OnsiteReconfigObserver,
112        node_config: &NodeConfig,
113    ) -> Self {
114        let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
115        let td_metrics = Arc::new(TransactionDriverMetrics::new(prometheus_registry));
116        let client_metrics = Arc::new(
117            crate::validator_client_monitor::ValidatorClientMetrics::new(prometheus_registry),
118        );
119        let reconfig_observer = Arc::new(reconfig_observer);
120
121        let transaction_driver = TransactionDriver::new(
122            validators.clone(),
123            reconfig_observer.clone(),
124            td_metrics,
125            Some(node_config),
126            client_metrics,
127        );
128
129        let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
130            parent_path.join("fullnode_pending_transactions"),
131        ));
132        Self::start_task_to_recover_txes_in_log(pending_tx_log.clone(), transaction_driver.clone());
133
134        let td_allowed_submission_list = node_config
135            .transaction_driver_config
136            .as_ref()
137            .map(|config| config.allowed_submission_validators.clone())
138            .unwrap_or_default();
139
140        let td_blocked_submission_list = node_config
141            .transaction_driver_config
142            .as_ref()
143            .map(|config| config.blocked_submission_validators.clone())
144            .unwrap_or_default();
145
146        if !td_allowed_submission_list.is_empty() && !td_blocked_submission_list.is_empty() {
147            panic!(
148                "Both allowed and blocked submission lists are set, this is not allowed, {:?} {:?}",
149                td_allowed_submission_list, td_blocked_submission_list
150            );
151        }
152
153        let enable_early_validation = node_config
154            .transaction_driver_config
155            .as_ref()
156            .map(|config| config.enable_early_validation)
157            .unwrap_or(true);
158
159        Self {
160            validator_state,
161            pending_tx_log,
162            metrics,
163            transaction_driver,
164            td_allowed_submission_list,
165            td_blocked_submission_list,
166            enable_early_validation,
167        }
168    }
169}
170
171impl<A> TransactionOrchestrator<A>
172where
173    A: AuthorityAPI + Send + Sync + 'static + Clone,
174{
175    #[instrument(name = "tx_orchestrator_execute_transaction", level = "debug", skip_all,
176    fields(
177        tx_digest = ?request.transaction.digest(),
178        tx_type = ?request_type,
179    ))]
180    pub async fn execute_transaction_block(
181        &self,
182        request: ExecuteTransactionRequestV3,
183        request_type: ExecuteTransactionRequestType,
184        client_addr: Option<SocketAddr>,
185    ) -> Result<(ExecuteTransactionResponseV3, IsTransactionExecutedLocally), QuorumDriverError>
186    {
187        let timer = Instant::now();
188        let tx_type = if request.transaction.is_consensus_tx() {
189            TxType::SharedObject
190        } else {
191            TxType::SingleWriter
192        };
193        let tx_digest = *request.transaction.digest();
194
195        let (response, mut executed_locally) = self
196            .execute_transaction_with_effects_waiting(request, client_addr)
197            .await?;
198
199        if !executed_locally {
200            executed_locally = if matches!(
201                request_type,
202                ExecuteTransactionRequestType::WaitForLocalExecution
203            ) {
204                let executed_locally = Self::wait_for_finalized_tx_executed_locally_with_timeout(
205                    &self.validator_state,
206                    tx_digest,
207                    tx_type,
208                    &self.metrics,
209                )
210                .await
211                .is_ok();
212                add_server_timing("local_execution done");
213                executed_locally
214            } else {
215                false
216            };
217        }
218
219        let QuorumTransactionResponse {
220            effects,
221            events,
222            input_objects,
223            output_objects,
224            auxiliary_data,
225        } = response;
226
227        let response = ExecuteTransactionResponseV3 {
228            effects,
229            events,
230            input_objects,
231            output_objects,
232            auxiliary_data,
233        };
234
235        self.metrics
236            .request_latency
237            .with_label_values(&[
238                tx_type.as_str(),
239                "execute_transaction_block",
240                executed_locally.to_string().as_str(),
241            ])
242            .observe(timer.elapsed().as_secs_f64());
243
244        Ok((response, executed_locally))
245    }
246
247    // Utilize the handle_certificate_v3 validator api to request input/output objects
248    #[instrument(name = "tx_orchestrator_execute_transaction_v3", level = "debug", skip_all,
249                 fields(tx_digest = ?request.transaction.digest()))]
250    pub async fn execute_transaction_v3(
251        &self,
252        request: ExecuteTransactionRequestV3,
253        client_addr: Option<SocketAddr>,
254    ) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
255        let timer = Instant::now();
256        let tx_type = if request.transaction.is_consensus_tx() {
257            TxType::SharedObject
258        } else {
259            TxType::SingleWriter
260        };
261
262        let (response, _) = self
263            .execute_transaction_with_effects_waiting(request, client_addr)
264            .await?;
265
266        self.metrics
267            .request_latency
268            .with_label_values(&[tx_type.as_str(), "execute_transaction_v3", "false"])
269            .observe(timer.elapsed().as_secs_f64());
270
271        let QuorumTransactionResponse {
272            effects,
273            events,
274            input_objects,
275            output_objects,
276            auxiliary_data,
277        } = response;
278
279        Ok(ExecuteTransactionResponseV3 {
280            effects,
281            events,
282            input_objects,
283            output_objects,
284            auxiliary_data,
285        })
286    }
287
288    /// Shared implementation for executing transactions with parallel local effects waiting
289    async fn execute_transaction_with_effects_waiting(
290        &self,
291        request: ExecuteTransactionRequestV3,
292        client_addr: Option<SocketAddr>,
293    ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), QuorumDriverError> {
294        let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
295        let verified_transaction = epoch_store
296            .verify_transaction_with_current_aliases(request.transaction.clone())
297            .map_err(QuorumDriverError::InvalidUserSignature)?
298            .into_tx();
299        let tx_digest = *verified_transaction.digest();
300
301        // Early validation check against local state before submission to catch non-retriable errors
302        // TODO: Consider moving this check to TransactionDriver for per-retry validation
303        if self.enable_early_validation
304            && let Err(e) = self
305                .validator_state
306                .check_transaction_validity(&epoch_store, &verified_transaction)
307        {
308            let error_category = e.categorize();
309            if !error_category.is_submission_retriable() {
310                // Skip early validation rejection if transaction has already been executed (allows retries to return cached results)
311                if !self.validator_state.is_tx_already_executed(&tx_digest) {
312                    self.metrics
313                        .early_validation_rejections
314                        .with_label_values(&[e.to_variant_name()])
315                        .inc();
316                    debug!(
317                        error = ?e,
318                        "Transaction rejected during early validation"
319                    );
320
321                    return Err(QuorumDriverError::TransactionFailed {
322                        category: error_category,
323                        details: e.to_string(),
324                    });
325                }
326            }
327        }
328
329        // Add transaction to WAL log.
330        let guard =
331            TransactionSubmissionGuard::new(self.pending_tx_log.clone(), &verified_transaction);
332        let is_new_transaction = guard.is_new_transaction();
333
334        let include_events = request.include_events;
335        let include_input_objects = request.include_input_objects;
336        let include_output_objects = request.include_output_objects;
337        let include_auxiliary_data = request.include_auxiliary_data;
338
339        let finality_timeout = std::env::var("WAIT_FOR_FINALITY_TIMEOUT_SECS")
340            .ok()
341            .and_then(|v| v.parse().ok())
342            .map(Duration::from_secs)
343            .unwrap_or(WAIT_FOR_FINALITY_TIMEOUT);
344
345        let num_submissions = if !is_new_transaction {
346            // No need to submit when the transaction is already being processed.
347            0
348        } else if cfg!(msim) || in_antithesis() {
349            // Allow duplicated submissions in tests.
350            let r = rand::thread_rng().gen_range(1..=100);
351            let n = if r <= 10 {
352                3
353            } else if r <= 30 {
354                2
355            } else {
356                1
357            };
358            if n > 1 {
359                debug!("Making {n} execution calls");
360            }
361            n
362        } else {
363            1
364        };
365
366        // Wait for one of the execution futures to succeed, or all of them to fail.
367        let mut execution_futures = FuturesUnordered::new();
368        for i in 0..num_submissions {
369            // Generate jitter values outside the async block
370            let should_delay = i > 0 && rand::thread_rng().gen_bool(0.8);
371            let delay_ms = if should_delay {
372                rand::thread_rng().gen_range(100..=500)
373            } else {
374                0
375            };
376
377            let request = request.clone();
378            let verified_transaction = verified_transaction.clone();
379
380            let future = async move {
381                if delay_ms > 0 {
382                    // Add jitters to duplicated submissions.
383                    sleep(Duration::from_millis(delay_ms)).await;
384                }
385                self.execute_transaction_impl(
386                    request,
387                    verified_transaction,
388                    client_addr,
389                    Some(finality_timeout),
390                )
391                .await
392            }
393            .boxed();
394            execution_futures.push(future);
395        }
396
397        // Track the last execution error.
398        let mut last_execution_error: Option<QuorumDriverError> = None;
399
400        // Wait for execution result outside of this call to become available.
401        let digests = [tx_digest];
402        let mut local_effects_future = epoch_store
403            .within_alive_epoch(
404                self.validator_state
405                    .get_transaction_cache_reader()
406                    .notify_read_executed_effects(
407                    "TransactionOrchestrator::notify_read_execute_transaction_with_effects_waiting",
408                    &digests,
409                ),
410            )
411            .boxed();
412
413        // Wait for execution timeout.
414        let mut timeout_future = tokio::time::sleep(finality_timeout).boxed();
415
416        loop {
417            tokio::select! {
418                biased;
419
420                // Local effects might be available
421                local_effects_result = &mut local_effects_future => {
422                    match local_effects_result {
423                        Ok(effects) => {
424                            debug!(
425                                "Effects became available while execution was running"
426                            );
427                            if let Some(effects) = effects.into_iter().next() {
428                                self.metrics.concurrent_execution.inc();
429                                let epoch = effects.executed_epoch();
430                                let events = if include_events {
431                                    if effects.events_digest().is_some() {
432                                        Some(self.validator_state.get_transaction_events(effects.transaction_digest())
433                                            .map_err(QuorumDriverError::QuorumDriverInternalError)?)
434                                    } else {
435                                        None
436                                    }
437                                } else {
438                                    None
439                                };
440                                let input_objects = include_input_objects
441                                    .then(|| self.validator_state.get_transaction_input_objects(&effects))
442                                    .transpose()
443                                    .map_err(QuorumDriverError::QuorumDriverInternalError)?;
444                                let output_objects = include_output_objects
445                                    .then(|| self.validator_state.get_transaction_output_objects(&effects))
446                                    .transpose()
447                                    .map_err(QuorumDriverError::QuorumDriverInternalError)?;
448                                let response = QuorumTransactionResponse {
449                                    effects: FinalizedEffects {
450                                        effects,
451                                        finality_info: EffectsFinalityInfo::QuorumExecuted(epoch),
452                                    },
453                                    events,
454                                    input_objects,
455                                    output_objects,
456                                    auxiliary_data: None,
457                                };
458                                break Ok((response, true));
459                            }
460                        }
461                        Err(_) => {
462                            warn!("Epoch terminated before effects were available");
463                        }
464                    };
465
466                    // Prevent this branch from being selected again
467                    local_effects_future = futures::future::pending().boxed();
468                }
469
470                // This branch is disabled if execution_futures is empty.
471                Some(result) = execution_futures.next() => {
472                    match result {
473                        Ok(resp) => {
474                            // First success gets returned.
475                            debug!("Execution succeeded, returning response");
476                            let QuorumTransactionResponse {
477                                effects,
478                                events,
479                                input_objects,
480                                output_objects,
481                                auxiliary_data,
482                            } = resp;
483                            // Filter fields based on request flags.
484                            let resp = QuorumTransactionResponse {
485                                effects,
486                                events: if include_events { events } else { None },
487                                input_objects: if include_input_objects { input_objects } else { None },
488                                output_objects: if include_output_objects { output_objects } else { None },
489                                auxiliary_data: if include_auxiliary_data { auxiliary_data } else { None },
490                            };
491                            break Ok((resp, false));
492                        }
493                        Err(e) => {
494                            debug!(?e, "Execution attempt failed, wait for other attempts");
495                            last_execution_error = Some(e);
496                        }
497                    };
498
499                    // Last error must have been recorded.
500                    if execution_futures.is_empty() {
501                        break Err(last_execution_error.unwrap());
502                    }
503                }
504
505                // A timeout has occurred while waiting for finality
506                _ = &mut timeout_future => {
507                    if let Some(e) = last_execution_error {
508                        debug!("Timeout waiting for transaction finality. Last execution error: {e}");
509                    } else {
510                        debug!("Timeout waiting for transaction finality.");
511                    }
512                    self.metrics.wait_for_finality_timeout.inc();
513
514                    // TODO: Return the last execution error.
515                    break Err(QuorumDriverError::TimeoutBeforeFinality);
516                }
517            }
518        }
519    }
520
521    #[instrument(level = "error", skip_all)]
522    async fn execute_transaction_impl(
523        &self,
524        request: ExecuteTransactionRequestV3,
525        verified_transaction: VerifiedTransaction,
526        client_addr: Option<SocketAddr>,
527        finality_timeout: Option<Duration>,
528    ) -> Result<QuorumTransactionResponse, QuorumDriverError> {
529        debug!("TO Received transaction execution request.");
530
531        let timer = Instant::now();
532        let tx_type = if verified_transaction.is_consensus_tx() {
533            TxType::SharedObject
534        } else {
535            TxType::SingleWriter
536        };
537
538        let (_in_flight_metrics_guards, good_response_metrics) =
539            self.update_metrics(&request.transaction);
540
541        // TODO: refactor all the gauge and timer metrics with `monitored_scope`
542        let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
543        wait_for_finality_gauge.inc();
544        let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
545            in_flight.dec();
546        });
547
548        let response = self
549            .submit_with_transaction_driver(
550                &self.transaction_driver,
551                &request,
552                client_addr,
553                &verified_transaction,
554                good_response_metrics,
555                finality_timeout,
556            )
557            .await?;
558        let driver_type = "transaction_driver";
559
560        add_server_timing("wait_for_finality done");
561
562        self.metrics.wait_for_finality_finished.inc();
563
564        let elapsed = timer.elapsed().as_secs_f64();
565        self.metrics
566            .settlement_finality_latency
567            .with_label_values(&[tx_type.as_str(), driver_type])
568            .observe(elapsed);
569        good_response_metrics.inc();
570
571        Ok(response)
572    }
573
574    #[instrument(level = "error", skip_all, err(level = "info"))]
575    async fn submit_with_transaction_driver(
576        &self,
577        td: &Arc<TransactionDriver<A>>,
578        request: &ExecuteTransactionRequestV3,
579        client_addr: Option<SocketAddr>,
580        verified_transaction: &VerifiedTransaction,
581        good_response_metrics: &GenericCounter<AtomicU64>,
582        timeout_duration: Option<Duration>,
583    ) -> Result<QuorumTransactionResponse, QuorumDriverError> {
584        let tx_digest = *verified_transaction.digest();
585        debug!("Using TransactionDriver for transaction {:?}", tx_digest);
586
587        let td_response = td
588            .drive_transaction(
589                SubmitTxRequest::new_transaction(request.transaction.clone()),
590                SubmitTransactionOptions {
591                    forwarded_client_addr: client_addr,
592                    allowed_validators: self.td_allowed_submission_list.clone(),
593                    blocked_validators: self.td_blocked_submission_list.clone(),
594                },
595                timeout_duration,
596            )
597            .await
598            .map_err(|e| match e {
599                TransactionDriverError::TimeoutWithLastRetriableError {
600                    last_error,
601                    attempts,
602                    timeout,
603                } => QuorumDriverError::TimeoutBeforeFinalityWithErrors {
604                    last_error: last_error.map(|e| e.to_string()).unwrap_or_default(),
605                    attempts,
606                    timeout,
607                },
608                other => QuorumDriverError::TransactionFailed {
609                    category: other.categorize(),
610                    details: other.to_string(),
611                },
612            });
613
614        match td_response {
615            Err(e) => {
616                warn!("TransactionDriver error: {e:?}");
617                Err(e)
618            }
619            Ok(quorum_transaction_response) => {
620                good_response_metrics.inc();
621                Ok(quorum_transaction_response)
622            }
623        }
624    }
625
626    #[instrument(
627        name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout",
628        level = "debug",
629        skip_all,
630        err(level = "info")
631    )]
632    async fn wait_for_finalized_tx_executed_locally_with_timeout(
633        validator_state: &Arc<AuthorityState>,
634        tx_digest: TransactionDigest,
635        tx_type: TxType,
636        metrics: &TransactionOrchestratorMetrics,
637    ) -> SuiResult {
638        metrics.local_execution_in_flight.inc();
639        let _metrics_guard =
640            scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
641                in_flight.dec();
642            });
643
644        let _latency_guard = metrics
645            .local_execution_latency
646            .with_label_values(&[tx_type.as_str()])
647            .start_timer();
648        debug!("Waiting for finalized tx to be executed locally.");
649        match timeout(
650            LOCAL_EXECUTION_TIMEOUT,
651            validator_state
652                .get_transaction_cache_reader()
653                .notify_read_executed_effects_digests(
654                    "TransactionOrchestrator::notify_read_wait_for_local_execution",
655                    &[tx_digest],
656                ),
657        )
658        .instrument(error_span!(
659            "transaction_orchestrator::local_execution",
660            ?tx_digest
661        ))
662        .await
663        {
664            Err(_elapsed) => {
665                debug!(
666                    "Waiting for finalized tx to be executed locally timed out within {:?}.",
667                    LOCAL_EXECUTION_TIMEOUT
668                );
669                metrics.local_execution_timeout.inc();
670                Err(SuiErrorKind::TimeoutError.into())
671            }
672            Ok(_) => {
673                metrics.local_execution_success.inc();
674                Ok(())
675            }
676        }
677    }
678
679    pub fn authority_state(&self) -> &Arc<AuthorityState> {
680        &self.validator_state
681    }
682
683    pub fn transaction_driver(&self) -> &Arc<TransactionDriver<A>> {
684        &self.transaction_driver
685    }
686
687    pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
688        self.transaction_driver.authority_aggregator().load_full()
689    }
690
691    fn update_metrics<'a>(
692        &'a self,
693        transaction: &Transaction,
694    ) -> (impl Drop + use<A>, &'a GenericCounter<AtomicU64>) {
695        let (in_flight, good_response) = if transaction.is_consensus_tx() {
696            self.metrics.total_req_received_shared_object.inc();
697            (
698                self.metrics.req_in_flight_shared_object.clone(),
699                &self.metrics.good_response_shared_object,
700            )
701        } else {
702            self.metrics.total_req_received_single_writer.inc();
703            (
704                self.metrics.req_in_flight_single_writer.clone(),
705                &self.metrics.good_response_single_writer,
706            )
707        };
708        in_flight.inc();
709        (
710            scopeguard::guard(in_flight, |in_flight| {
711                in_flight.dec();
712            }),
713            good_response,
714        )
715    }
716
717    fn start_task_to_recover_txes_in_log(
718        pending_tx_log: Arc<WritePathPendingTransactionLog>,
719        transaction_driver: Arc<TransactionDriver<A>>,
720    ) {
721        spawn_logged_monitored_task!(async move {
722            if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
723                info!("Skipping loading pending transactions from pending_tx_log.");
724                return;
725            }
726            let pending_txes = pending_tx_log
727                .load_all_pending_transactions()
728                .expect("failed to load all pending transactions");
729            let num_pending_txes = pending_txes.len();
730            info!(
731                "Recovering {} pending transactions from pending_tx_log.",
732                num_pending_txes
733            );
734            let mut recovery = pending_txes
735                .into_iter()
736                .map(|tx| {
737                    let pending_tx_log = pending_tx_log.clone();
738                    let transaction_driver = transaction_driver.clone();
739                    async move {
740                        // TODO: ideally pending_tx_log would not contain VerifiedTransaction, but that
741                        // requires a migration.
742                        let tx = tx.into_inner();
743                        let tx_digest = *tx.digest();
744                        // It's not impossible we fail to enqueue a task but that's not the end of world.
745                        // TODO(william) correctly extract client_addr from logs
746                        if let Err(err) = transaction_driver
747                            .drive_transaction(
748                                SubmitTxRequest::new_transaction(tx),
749                                SubmitTransactionOptions::default(),
750                                Some(Duration::from_secs(60)),
751                            )
752                            .await
753                        {
754                            warn!(?tx_digest, "Failed to execute recovered transaction: {err}");
755                        } else {
756                            debug!(?tx_digest, "Executed recovered transaction");
757                        }
758                        if let Err(err) = pending_tx_log.finish_transaction(&tx_digest) {
759                            warn!(
760                                ?tx_digest,
761                                "Failed to clean up transaction in pending log: {err}"
762                            );
763                        } else {
764                            debug!(?tx_digest, "Cleaned up transaction in pending log");
765                        }
766                    }
767                })
768                .collect::<FuturesUnordered<_>>();
769
770            let mut num_recovered = 0;
771            while recovery.next().await.is_some() {
772                num_recovered += 1;
773                if num_recovered % 1000 == 0 {
774                    info!(
775                        "Recovered {} out of {} transactions from pending_tx_log.",
776                        num_recovered, num_pending_txes
777                    );
778                }
779            }
780            info!(
781                "Recovery finished. Recovered {} out of {} transactions from pending_tx_log.",
782                num_recovered, num_pending_txes
783            );
784        });
785    }
786
787    pub fn load_all_pending_transactions_in_test(&self) -> SuiResult<Vec<VerifiedTransaction>> {
788        self.pending_tx_log.load_all_pending_transactions()
789    }
790
791    pub fn empty_pending_tx_log_in_test(&self) -> bool {
792        self.pending_tx_log.is_empty()
793    }
794}
795/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
796#[derive(Clone)]
797pub struct TransactionOrchestratorMetrics {
798    total_req_received_single_writer: GenericCounter<AtomicU64>,
799    total_req_received_shared_object: GenericCounter<AtomicU64>,
800
801    good_response_single_writer: GenericCounter<AtomicU64>,
802    good_response_shared_object: GenericCounter<AtomicU64>,
803
804    req_in_flight_single_writer: GenericGauge<AtomicI64>,
805    req_in_flight_shared_object: GenericGauge<AtomicI64>,
806
807    wait_for_finality_in_flight: GenericGauge<AtomicI64>,
808    wait_for_finality_finished: GenericCounter<AtomicU64>,
809    wait_for_finality_timeout: GenericCounter<AtomicU64>,
810
811    local_execution_in_flight: GenericGauge<AtomicI64>,
812    local_execution_success: GenericCounter<AtomicU64>,
813    local_execution_timeout: GenericCounter<AtomicU64>,
814
815    concurrent_execution: IntCounter,
816
817    early_validation_rejections: IntCounterVec,
818
819    request_latency: HistogramVec,
820    local_execution_latency: HistogramVec,
821    settlement_finality_latency: HistogramVec,
822}
823
824// Note that labeled-metrics are stored upfront individually
825// to mitigate the perf hit by MetricsVec.
826// See https://github.com/tikv/rust-prometheus/tree/master/static-metric
827impl TransactionOrchestratorMetrics {
828    pub fn new(registry: &Registry) -> Self {
829        let total_req_received = register_int_counter_vec_with_registry!(
830            "tx_orchestrator_total_req_received",
831            "Total number of executions request Transaction Orchestrator receives, group by tx type",
832            &["tx_type"],
833            registry
834        )
835        .unwrap();
836
837        let total_req_received_single_writer =
838            total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
839        let total_req_received_shared_object =
840            total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
841
842        let good_response = register_int_counter_vec_with_registry!(
843            "tx_orchestrator_good_response",
844            "Total number of good responses Transaction Orchestrator generates, group by tx type",
845            &["tx_type"],
846            registry
847        )
848        .unwrap();
849
850        let good_response_single_writer =
851            good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
852        let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
853
854        let req_in_flight = register_int_gauge_vec_with_registry!(
855            "tx_orchestrator_req_in_flight",
856            "Number of requests in flights Transaction Orchestrator processes, group by tx type",
857            &["tx_type"],
858            registry
859        )
860        .unwrap();
861
862        let req_in_flight_single_writer =
863            req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
864        let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
865
866        Self {
867            total_req_received_single_writer,
868            total_req_received_shared_object,
869            good_response_single_writer,
870            good_response_shared_object,
871            req_in_flight_single_writer,
872            req_in_flight_shared_object,
873            wait_for_finality_in_flight: register_int_gauge_with_registry!(
874                "tx_orchestrator_wait_for_finality_in_flight",
875                "Number of in flight txns Transaction Orchestrator are waiting for finality for",
876                registry,
877            )
878            .unwrap(),
879            wait_for_finality_finished: register_int_counter_with_registry!(
880                "tx_orchestrator_wait_for_finality_fnished",
881                "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
882                registry,
883            )
884            .unwrap(),
885            wait_for_finality_timeout: register_int_counter_with_registry!(
886                "tx_orchestrator_wait_for_finality_timeout",
887                "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
888                registry,
889            )
890            .unwrap(),
891            local_execution_in_flight: register_int_gauge_with_registry!(
892                "tx_orchestrator_local_execution_in_flight",
893                "Number of local execution txns in flights Transaction Orchestrator handles",
894                registry,
895            )
896            .unwrap(),
897            local_execution_success: register_int_counter_with_registry!(
898                "tx_orchestrator_local_execution_success",
899                "Total number of successful local execution txns Transaction Orchestrator handles",
900                registry,
901            )
902            .unwrap(),
903            local_execution_timeout: register_int_counter_with_registry!(
904                "tx_orchestrator_local_execution_timeout",
905                "Total number of timed-out local execution txns Transaction Orchestrator handles",
906                registry,
907            )
908            .unwrap(),
909            concurrent_execution: register_int_counter_with_registry!(
910                "tx_orchestrator_concurrent_execution",
911                "Total number of concurrent execution where effects are available locally finishing driving the transaction to finality",
912                registry,
913            )
914            .unwrap(),
915            early_validation_rejections: register_int_counter_vec_with_registry!(
916                "tx_orchestrator_early_validation_rejections",
917                "Total number of transactions rejected during early validation before submission, by reason",
918                &["reason"],
919                registry,
920            )
921            .unwrap(),
922            request_latency: register_histogram_vec_with_registry!(
923                "tx_orchestrator_request_latency",
924                "Time spent in processing one Transaction Orchestrator request",
925                &["tx_type", "route", "wait_for_local_execution"],
926                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
927                registry,
928            )
929            .unwrap(),
930            local_execution_latency: register_histogram_vec_with_registry!(
931                "tx_orchestrator_local_execution_latency",
932                "Time spent in waiting for one Transaction Orchestrator gets locally executed",
933                &["tx_type"],
934                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
935                registry,
936            )
937            .unwrap(),
938            settlement_finality_latency: register_histogram_vec_with_registry!(
939                "tx_orchestrator_settlement_finality_latency",
940                "Time spent in waiting for one Transaction Orchestrator gets settled and finalized",
941                &["tx_type", "driver_type"],
942                mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
943                registry,
944            )
945            .unwrap(),
946        }
947    }
948
949    pub fn new_for_tests() -> Self {
950        let registry = Registry::new();
951        Self::new(&registry)
952    }
953}
954
955#[async_trait::async_trait]
956impl<A> sui_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
957where
958    A: AuthorityAPI + Send + Sync + 'static + Clone,
959{
960    async fn execute_transaction(
961        &self,
962        request: ExecuteTransactionRequestV3,
963        client_addr: Option<std::net::SocketAddr>,
964    ) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
965        self.execute_transaction_v3(request, client_addr).await
966    }
967
968    fn simulate_transaction(
969        &self,
970        transaction: TransactionData,
971        checks: TransactionChecks,
972    ) -> Result<SimulateTransactionResult, SuiError> {
973        self.validator_state
974            .simulate_transaction(transaction, checks)
975    }
976}
977
978/// Keeps track of inflight transactions being submitted, and helps recover transactions
979/// on restart.
980struct TransactionSubmissionGuard {
981    pending_tx_log: Arc<WritePathPendingTransactionLog>,
982    tx_digest: TransactionDigest,
983    is_new_transaction: bool,
984}
985
986impl TransactionSubmissionGuard {
987    pub fn new(
988        pending_tx_log: Arc<WritePathPendingTransactionLog>,
989        tx: &VerifiedTransaction,
990    ) -> Self {
991        let is_new_transaction = pending_tx_log.write_pending_transaction_maybe(tx);
992        let tx_digest = *tx.digest();
993        if is_new_transaction {
994            debug!(?tx_digest, "Added transaction to inflight set");
995        } else {
996            debug!(
997                ?tx_digest,
998                "Transaction already being processed, no new submission will be made"
999            );
1000        };
1001        Self {
1002            pending_tx_log,
1003            tx_digest,
1004            is_new_transaction,
1005        }
1006    }
1007
1008    fn is_new_transaction(&self) -> bool {
1009        self.is_new_transaction
1010    }
1011}
1012
1013impl Drop for TransactionSubmissionGuard {
1014    fn drop(&mut self) {
1015        if let Err(err) = self.pending_tx_log.finish_transaction(&self.tx_digest) {
1016            warn!(?self.tx_digest, "Failed to clean up transaction in pending log: {err}");
1017        } else {
1018            debug!(?self.tx_digest, "Cleaned up transaction in pending log");
1019        }
1020    }
1021}