sui_core/
transaction_orchestrator.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::net::SocketAddr;
5use std::ops::Deref;
6use std::path::Path;
7use std::sync::Arc;
8use std::time::Duration;
9
10use futures::FutureExt;
11use futures::stream::{FuturesUnordered, StreamExt};
12use mysten_common::in_antithesis;
13use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX};
14use mysten_metrics::{add_server_timing, spawn_logged_monitored_task};
15use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
16use prometheus::{
17    HistogramVec, IntCounter, IntCounterVec, Registry, register_histogram_vec_with_registry,
18    register_int_counter_vec_with_registry, register_int_counter_with_registry,
19    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
20};
21use rand::Rng;
22use sui_config::NodeConfig;
23use sui_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
24use sui_types::base_types::TransactionDigest;
25use sui_types::effects::TransactionEffectsAPI;
26use sui_types::error::{SuiError, SuiErrorKind, SuiResult};
27use sui_types::messages_grpc::{SubmitTxRequest, TxType};
28use sui_types::quorum_driver_types::{
29    EffectsFinalityInfo, ExecuteTransactionRequestType, ExecuteTransactionRequestV3,
30    ExecuteTransactionResponseV3, FinalizedEffects, IsTransactionExecutedLocally,
31    QuorumDriverError,
32};
33use sui_types::sui_system_state::SuiSystemState;
34use sui_types::transaction::{Transaction, TransactionData, VerifiedTransaction};
35use sui_types::transaction_executor::{SimulateTransactionResult, TransactionChecks};
36use tokio::sync::broadcast::Receiver;
37use tokio::time::{Instant, sleep, timeout};
38use tracing::{Instrument, debug, error_span, info, instrument, warn};
39
40use crate::authority::AuthorityState;
41use crate::authority_aggregator::AuthorityAggregator;
42use crate::authority_client::{AuthorityAPI, NetworkAuthorityClient};
43use crate::quorum_driver::reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver};
44use crate::transaction_driver::{
45    QuorumTransactionResponse, SubmitTransactionOptions, TransactionDriver, TransactionDriverError,
46    TransactionDriverMetrics,
47};
48
49// How long to wait for local execution (including parents) before a timeout
50// is returned to client.
51const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
52
53// Timeout for waiting for finality for each transaction.
54const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(90);
55
56pub type QuorumTransactionEffectsResult =
57    Result<(Transaction, QuorumTransactionResponse), (TransactionDigest, QuorumDriverError)>;
58
59/// Transaction Orchestrator is a Node component that utilizes Transaction Driver to
60/// submit transactions to validators for finality. It adds inflight deduplication,
61/// waiting for local execution, recovery, early validation, and epoch change handling
62/// on top of Transaction Driver.
63/// This is used by node RPC service to support transaction submission and finality waiting.
64pub struct TransactionOrchestrator<A: Clone> {
65    validator_state: Arc<AuthorityState>,
66    pending_tx_log: Arc<WritePathPendingTransactionLog>,
67    metrics: Arc<TransactionOrchestratorMetrics>,
68    transaction_driver: Arc<TransactionDriver<A>>,
69    td_allowed_submission_list: Vec<String>,
70    td_blocked_submission_list: Vec<String>,
71    enable_early_validation: bool,
72}
73
74impl TransactionOrchestrator<NetworkAuthorityClient> {
75    pub fn new_with_auth_aggregator(
76        validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
77        validator_state: Arc<AuthorityState>,
78        reconfig_channel: Receiver<SuiSystemState>,
79        parent_path: &Path,
80        prometheus_registry: &Registry,
81        node_config: &NodeConfig,
82    ) -> Self {
83        let observer = OnsiteReconfigObserver::new(
84            reconfig_channel,
85            validator_state.get_object_cache_reader().clone(),
86            validator_state.clone_committee_store(),
87            validators.safe_client_metrics_base.clone(),
88            validators.metrics.deref().clone(),
89        );
90        TransactionOrchestrator::new(
91            validators,
92            validator_state,
93            parent_path,
94            prometheus_registry,
95            observer,
96            node_config,
97        )
98    }
99}
100
101impl<A> TransactionOrchestrator<A>
102where
103    A: AuthorityAPI + Send + Sync + 'static + Clone,
104    OnsiteReconfigObserver: ReconfigObserver<A>,
105{
106    pub fn new(
107        validators: Arc<AuthorityAggregator<A>>,
108        validator_state: Arc<AuthorityState>,
109        parent_path: &Path,
110        prometheus_registry: &Registry,
111        reconfig_observer: OnsiteReconfigObserver,
112        node_config: &NodeConfig,
113    ) -> Self {
114        let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
115        let td_metrics = Arc::new(TransactionDriverMetrics::new(prometheus_registry));
116        let client_metrics = Arc::new(
117            crate::validator_client_monitor::ValidatorClientMetrics::new(prometheus_registry),
118        );
119        let reconfig_observer = Arc::new(reconfig_observer);
120
121        let transaction_driver = TransactionDriver::new(
122            validators.clone(),
123            reconfig_observer.clone(),
124            td_metrics,
125            Some(node_config),
126            client_metrics,
127        );
128
129        let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
130            parent_path.join("fullnode_pending_transactions"),
131        ));
132        Self::start_task_to_recover_txes_in_log(pending_tx_log.clone(), transaction_driver.clone());
133
134        let td_allowed_submission_list = node_config
135            .transaction_driver_config
136            .as_ref()
137            .map(|config| config.allowed_submission_validators.clone())
138            .unwrap_or_default();
139
140        let td_blocked_submission_list = node_config
141            .transaction_driver_config
142            .as_ref()
143            .map(|config| config.blocked_submission_validators.clone())
144            .unwrap_or_default();
145
146        if !td_allowed_submission_list.is_empty() && !td_blocked_submission_list.is_empty() {
147            panic!(
148                "Both allowed and blocked submission lists are set, this is not allowed, {:?} {:?}",
149                td_allowed_submission_list, td_blocked_submission_list
150            );
151        }
152
153        let enable_early_validation = node_config
154            .transaction_driver_config
155            .as_ref()
156            .map(|config| config.enable_early_validation)
157            .unwrap_or(true);
158
159        Self {
160            validator_state,
161            pending_tx_log,
162            metrics,
163            transaction_driver,
164            td_allowed_submission_list,
165            td_blocked_submission_list,
166            enable_early_validation,
167        }
168    }
169}
170
171impl<A> TransactionOrchestrator<A>
172where
173    A: AuthorityAPI + Send + Sync + 'static + Clone,
174{
175    #[instrument(name = "tx_orchestrator_execute_transaction", level = "debug", skip_all,
176    fields(
177        tx_digest = ?request.transaction.digest(),
178        tx_type = ?request_type,
179    ))]
180    pub async fn execute_transaction_block(
181        &self,
182        request: ExecuteTransactionRequestV3,
183        request_type: ExecuteTransactionRequestType,
184        client_addr: Option<SocketAddr>,
185    ) -> Result<(ExecuteTransactionResponseV3, IsTransactionExecutedLocally), QuorumDriverError>
186    {
187        let timer = Instant::now();
188        let tx_type = if request.transaction.is_consensus_tx() {
189            TxType::SharedObject
190        } else {
191            TxType::SingleWriter
192        };
193        let tx_digest = *request.transaction.digest();
194
195        let (response, mut executed_locally) = self
196            .execute_transaction_with_effects_waiting(request, client_addr)
197            .await?;
198
199        if !executed_locally {
200            executed_locally = if matches!(
201                request_type,
202                ExecuteTransactionRequestType::WaitForLocalExecution
203            ) {
204                let executed_locally = Self::wait_for_finalized_tx_executed_locally_with_timeout(
205                    &self.validator_state,
206                    tx_digest,
207                    tx_type,
208                    &self.metrics,
209                )
210                .await
211                .is_ok();
212                add_server_timing("local_execution done");
213                executed_locally
214            } else {
215                false
216            };
217        }
218
219        let QuorumTransactionResponse {
220            effects,
221            events,
222            input_objects,
223            output_objects,
224            auxiliary_data,
225        } = response;
226
227        let response = ExecuteTransactionResponseV3 {
228            effects,
229            events,
230            input_objects,
231            output_objects,
232            auxiliary_data,
233        };
234
235        self.metrics
236            .request_latency
237            .with_label_values(&[
238                tx_type.as_str(),
239                "execute_transaction_block",
240                executed_locally.to_string().as_str(),
241            ])
242            .observe(timer.elapsed().as_secs_f64());
243
244        Ok((response, executed_locally))
245    }
246
247    // Utilize the handle_certificate_v3 validator api to request input/output objects
248    #[instrument(name = "tx_orchestrator_execute_transaction_v3", level = "debug", skip_all,
249                 fields(tx_digest = ?request.transaction.digest()))]
250    pub async fn execute_transaction_v3(
251        &self,
252        request: ExecuteTransactionRequestV3,
253        client_addr: Option<SocketAddr>,
254    ) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
255        let timer = Instant::now();
256        let tx_type = if request.transaction.is_consensus_tx() {
257            TxType::SharedObject
258        } else {
259            TxType::SingleWriter
260        };
261
262        let (response, _) = self
263            .execute_transaction_with_effects_waiting(request, client_addr)
264            .await?;
265
266        self.metrics
267            .request_latency
268            .with_label_values(&[tx_type.as_str(), "execute_transaction_v3", "false"])
269            .observe(timer.elapsed().as_secs_f64());
270
271        let QuorumTransactionResponse {
272            effects,
273            events,
274            input_objects,
275            output_objects,
276            auxiliary_data,
277        } = response;
278
279        Ok(ExecuteTransactionResponseV3 {
280            effects,
281            events,
282            input_objects,
283            output_objects,
284            auxiliary_data,
285        })
286    }
287
288    /// Shared implementation for executing transactions with parallel local effects waiting
289    async fn execute_transaction_with_effects_waiting(
290        &self,
291        request: ExecuteTransactionRequestV3,
292        client_addr: Option<SocketAddr>,
293    ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), QuorumDriverError> {
294        let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
295        let verified_transaction = epoch_store
296            .verify_transaction(request.transaction.clone())
297            .map_err(QuorumDriverError::InvalidUserSignature)?;
298        let tx_digest = *verified_transaction.digest();
299
300        // Early validation check against local state before submission to catch non-retriable errors
301        // TODO: Consider moving this check to TransactionDriver for per-retry validation
302        if self.enable_early_validation
303            && let Err(e) = self
304                .validator_state
305                .check_transaction_validity(&epoch_store, &verified_transaction)
306        {
307            let error_category = e.categorize();
308            if !error_category.is_submission_retriable() {
309                // Skip early validation rejection if transaction has already been executed (allows retries to return cached results)
310                if !self.validator_state.is_tx_already_executed(&tx_digest) {
311                    self.metrics
312                        .early_validation_rejections
313                        .with_label_values(&[e.to_variant_name()])
314                        .inc();
315                    debug!(
316                        error = ?e,
317                        "Transaction rejected during early validation"
318                    );
319
320                    return Err(QuorumDriverError::TransactionFailed {
321                        category: error_category,
322                        details: e.to_string(),
323                    });
324                }
325            }
326        }
327
328        // Add transaction to WAL log.
329        let guard =
330            TransactionSubmissionGuard::new(self.pending_tx_log.clone(), &verified_transaction);
331        let is_new_transaction = guard.is_new_transaction();
332
333        let include_events = request.include_events;
334        let include_input_objects = request.include_input_objects;
335        let include_output_objects = request.include_output_objects;
336        let include_auxiliary_data = request.include_auxiliary_data;
337
338        let finality_timeout = std::env::var("WAIT_FOR_FINALITY_TIMEOUT_SECS")
339            .ok()
340            .and_then(|v| v.parse().ok())
341            .map(Duration::from_secs)
342            .unwrap_or(WAIT_FOR_FINALITY_TIMEOUT);
343
344        let num_submissions = if !is_new_transaction {
345            // No need to submit when the transaction is already being processed.
346            0
347        } else if cfg!(msim) || in_antithesis() {
348            // Allow duplicated submissions in tests.
349            let r = rand::thread_rng().gen_range(1..=100);
350            let n = if r <= 10 {
351                3
352            } else if r <= 30 {
353                2
354            } else {
355                1
356            };
357            if n > 1 {
358                debug!("Making {n} execution calls");
359            }
360            n
361        } else {
362            1
363        };
364
365        // Wait for one of the execution futures to succeed, or all of them to fail.
366        let mut execution_futures = FuturesUnordered::new();
367        for i in 0..num_submissions {
368            // Generate jitter values outside the async block
369            let should_delay = i > 0 && rand::thread_rng().gen_bool(0.8);
370            let delay_ms = if should_delay {
371                rand::thread_rng().gen_range(100..=500)
372            } else {
373                0
374            };
375
376            let request = request.clone();
377            let verified_transaction = verified_transaction.clone();
378
379            let future = async move {
380                if delay_ms > 0 {
381                    // Add jitters to duplicated submissions.
382                    sleep(Duration::from_millis(delay_ms)).await;
383                }
384                self.execute_transaction_impl(
385                    request,
386                    verified_transaction,
387                    client_addr,
388                    Some(finality_timeout),
389                )
390                .await
391            }
392            .boxed();
393            execution_futures.push(future);
394        }
395
396        // Track the last execution error.
397        let mut last_execution_error: Option<QuorumDriverError> = None;
398
399        // Wait for execution result outside of this call to become available.
400        let digests = [tx_digest];
401        let mut local_effects_future = epoch_store
402            .within_alive_epoch(
403                self.validator_state
404                    .get_transaction_cache_reader()
405                    .notify_read_executed_effects(
406                    "TransactionOrchestrator::notify_read_execute_transaction_with_effects_waiting",
407                    &digests,
408                ),
409            )
410            .boxed();
411
412        // Wait for execution timeout.
413        let mut timeout_future = tokio::time::sleep(finality_timeout).boxed();
414
415        loop {
416            tokio::select! {
417                biased;
418
419                // Local effects might be available
420                local_effects_result = &mut local_effects_future => {
421                    match local_effects_result {
422                        Ok(effects) => {
423                            debug!(
424                                "Effects became available while execution was running"
425                            );
426                            if let Some(effects) = effects.into_iter().next() {
427                                self.metrics.concurrent_execution.inc();
428                                let epoch = effects.executed_epoch();
429                                let events = if include_events {
430                                    if effects.events_digest().is_some() {
431                                        Some(self.validator_state.get_transaction_events(effects.transaction_digest())
432                                            .map_err(QuorumDriverError::QuorumDriverInternalError)?)
433                                    } else {
434                                        None
435                                    }
436                                } else {
437                                    None
438                                };
439                                let input_objects = include_input_objects
440                                    .then(|| self.validator_state.get_transaction_input_objects(&effects))
441                                    .transpose()
442                                    .map_err(QuorumDriverError::QuorumDriverInternalError)?;
443                                let output_objects = include_output_objects
444                                    .then(|| self.validator_state.get_transaction_output_objects(&effects))
445                                    .transpose()
446                                    .map_err(QuorumDriverError::QuorumDriverInternalError)?;
447                                let response = QuorumTransactionResponse {
448                                    effects: FinalizedEffects {
449                                        effects,
450                                        finality_info: EffectsFinalityInfo::QuorumExecuted(epoch),
451                                    },
452                                    events,
453                                    input_objects,
454                                    output_objects,
455                                    auxiliary_data: None,
456                                };
457                                break Ok((response, true));
458                            }
459                        }
460                        Err(_) => {
461                            warn!("Epoch terminated before effects were available");
462                        }
463                    };
464
465                    // Prevent this branch from being selected again
466                    local_effects_future = futures::future::pending().boxed();
467                }
468
469                // This branch is disabled if execution_futures is empty.
470                Some(result) = execution_futures.next() => {
471                    match result {
472                        Ok(resp) => {
473                            // First success gets returned.
474                            debug!("Execution succeeded, returning response");
475                            let QuorumTransactionResponse {
476                                effects,
477                                events,
478                                input_objects,
479                                output_objects,
480                                auxiliary_data,
481                            } = resp;
482                            // Filter fields based on request flags.
483                            let resp = QuorumTransactionResponse {
484                                effects,
485                                events: if include_events { events } else { None },
486                                input_objects: if include_input_objects { input_objects } else { None },
487                                output_objects: if include_output_objects { output_objects } else { None },
488                                auxiliary_data: if include_auxiliary_data { auxiliary_data } else { None },
489                            };
490                            break Ok((resp, false));
491                        }
492                        Err(e) => {
493                            debug!(?e, "Execution attempt failed, wait for other attempts");
494                            last_execution_error = Some(e);
495                        }
496                    };
497
498                    // Last error must have been recorded.
499                    if execution_futures.is_empty() {
500                        break Err(last_execution_error.unwrap());
501                    }
502                }
503
504                // A timeout has occurred while waiting for finality
505                _ = &mut timeout_future => {
506                    if let Some(e) = last_execution_error {
507                        debug!("Timeout waiting for transaction finality. Last execution error: {e}");
508                    } else {
509                        debug!("Timeout waiting for transaction finality.");
510                    }
511                    self.metrics.wait_for_finality_timeout.inc();
512
513                    // TODO: Return the last execution error.
514                    break Err(QuorumDriverError::TimeoutBeforeFinality);
515                }
516            }
517        }
518    }
519
520    #[instrument(level = "error", skip_all)]
521    async fn execute_transaction_impl(
522        &self,
523        request: ExecuteTransactionRequestV3,
524        verified_transaction: VerifiedTransaction,
525        client_addr: Option<SocketAddr>,
526        finality_timeout: Option<Duration>,
527    ) -> Result<QuorumTransactionResponse, QuorumDriverError> {
528        debug!("TO Received transaction execution request.");
529
530        let timer = Instant::now();
531        let tx_type = if verified_transaction.is_consensus_tx() {
532            TxType::SharedObject
533        } else {
534            TxType::SingleWriter
535        };
536
537        let (_in_flight_metrics_guards, good_response_metrics) =
538            self.update_metrics(&request.transaction);
539
540        // TODO: refactor all the gauge and timer metrics with `monitored_scope`
541        let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
542        wait_for_finality_gauge.inc();
543        let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
544            in_flight.dec();
545        });
546
547        let response = self
548            .submit_with_transaction_driver(
549                &self.transaction_driver,
550                &request,
551                client_addr,
552                &verified_transaction,
553                good_response_metrics,
554                finality_timeout,
555            )
556            .await?;
557        let driver_type = "transaction_driver";
558
559        add_server_timing("wait_for_finality done");
560
561        self.metrics.wait_for_finality_finished.inc();
562
563        let elapsed = timer.elapsed().as_secs_f64();
564        self.metrics
565            .settlement_finality_latency
566            .with_label_values(&[tx_type.as_str(), driver_type])
567            .observe(elapsed);
568        good_response_metrics.inc();
569
570        Ok(response)
571    }
572
573    #[instrument(level = "error", skip_all, err(level = "info"))]
574    async fn submit_with_transaction_driver(
575        &self,
576        td: &Arc<TransactionDriver<A>>,
577        request: &ExecuteTransactionRequestV3,
578        client_addr: Option<SocketAddr>,
579        verified_transaction: &VerifiedTransaction,
580        good_response_metrics: &GenericCounter<AtomicU64>,
581        timeout_duration: Option<Duration>,
582    ) -> Result<QuorumTransactionResponse, QuorumDriverError> {
583        let tx_digest = *verified_transaction.digest();
584        debug!("Using TransactionDriver for transaction {:?}", tx_digest);
585
586        let td_response = td
587            .drive_transaction(
588                SubmitTxRequest::new_transaction(request.transaction.clone()),
589                SubmitTransactionOptions {
590                    forwarded_client_addr: client_addr,
591                    allowed_validators: self.td_allowed_submission_list.clone(),
592                    blocked_validators: self.td_blocked_submission_list.clone(),
593                },
594                timeout_duration,
595            )
596            .await
597            .map_err(|e| match e {
598                TransactionDriverError::TimeoutWithLastRetriableError {
599                    last_error,
600                    attempts,
601                    timeout,
602                } => QuorumDriverError::TimeoutBeforeFinalityWithErrors {
603                    last_error: last_error.map(|e| e.to_string()).unwrap_or_default(),
604                    attempts,
605                    timeout,
606                },
607                other => QuorumDriverError::TransactionFailed {
608                    category: other.categorize(),
609                    details: other.to_string(),
610                },
611            });
612
613        match td_response {
614            Err(e) => {
615                warn!("TransactionDriver error: {e:?}");
616                Err(e)
617            }
618            Ok(quorum_transaction_response) => {
619                good_response_metrics.inc();
620                Ok(quorum_transaction_response)
621            }
622        }
623    }
624
625    #[instrument(
626        name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout",
627        level = "debug",
628        skip_all,
629        err(level = "info")
630    )]
631    async fn wait_for_finalized_tx_executed_locally_with_timeout(
632        validator_state: &Arc<AuthorityState>,
633        tx_digest: TransactionDigest,
634        tx_type: TxType,
635        metrics: &TransactionOrchestratorMetrics,
636    ) -> SuiResult {
637        metrics.local_execution_in_flight.inc();
638        let _metrics_guard =
639            scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
640                in_flight.dec();
641            });
642
643        let _latency_guard = metrics
644            .local_execution_latency
645            .with_label_values(&[tx_type.as_str()])
646            .start_timer();
647        debug!("Waiting for finalized tx to be executed locally.");
648        match timeout(
649            LOCAL_EXECUTION_TIMEOUT,
650            validator_state
651                .get_transaction_cache_reader()
652                .notify_read_executed_effects_digests(
653                    "TransactionOrchestrator::notify_read_wait_for_local_execution",
654                    &[tx_digest],
655                ),
656        )
657        .instrument(error_span!(
658            "transaction_orchestrator::local_execution",
659            ?tx_digest
660        ))
661        .await
662        {
663            Err(_elapsed) => {
664                debug!(
665                    "Waiting for finalized tx to be executed locally timed out within {:?}.",
666                    LOCAL_EXECUTION_TIMEOUT
667                );
668                metrics.local_execution_timeout.inc();
669                Err(SuiErrorKind::TimeoutError.into())
670            }
671            Ok(_) => {
672                metrics.local_execution_success.inc();
673                Ok(())
674            }
675        }
676    }
677
678    pub fn authority_state(&self) -> &Arc<AuthorityState> {
679        &self.validator_state
680    }
681
682    pub fn transaction_driver(&self) -> &Arc<TransactionDriver<A>> {
683        &self.transaction_driver
684    }
685
686    pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
687        self.transaction_driver.authority_aggregator().load_full()
688    }
689
690    fn update_metrics<'a>(
691        &'a self,
692        transaction: &Transaction,
693    ) -> (impl Drop + use<A>, &'a GenericCounter<AtomicU64>) {
694        let (in_flight, good_response) = if transaction.is_consensus_tx() {
695            self.metrics.total_req_received_shared_object.inc();
696            (
697                self.metrics.req_in_flight_shared_object.clone(),
698                &self.metrics.good_response_shared_object,
699            )
700        } else {
701            self.metrics.total_req_received_single_writer.inc();
702            (
703                self.metrics.req_in_flight_single_writer.clone(),
704                &self.metrics.good_response_single_writer,
705            )
706        };
707        in_flight.inc();
708        (
709            scopeguard::guard(in_flight, |in_flight| {
710                in_flight.dec();
711            }),
712            good_response,
713        )
714    }
715
716    fn start_task_to_recover_txes_in_log(
717        pending_tx_log: Arc<WritePathPendingTransactionLog>,
718        transaction_driver: Arc<TransactionDriver<A>>,
719    ) {
720        spawn_logged_monitored_task!(async move {
721            if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
722                info!("Skipping loading pending transactions from pending_tx_log.");
723                return;
724            }
725            let pending_txes = pending_tx_log
726                .load_all_pending_transactions()
727                .expect("failed to load all pending transactions");
728            let num_pending_txes = pending_txes.len();
729            info!(
730                "Recovering {} pending transactions from pending_tx_log.",
731                num_pending_txes
732            );
733            let mut recovery = pending_txes
734                .into_iter()
735                .map(|tx| {
736                    let pending_tx_log = pending_tx_log.clone();
737                    let transaction_driver = transaction_driver.clone();
738                    async move {
739                        // TODO: ideally pending_tx_log would not contain VerifiedTransaction, but that
740                        // requires a migration.
741                        let tx = tx.into_inner();
742                        let tx_digest = *tx.digest();
743                        // It's not impossible we fail to enqueue a task but that's not the end of world.
744                        // TODO(william) correctly extract client_addr from logs
745                        if let Err(err) = transaction_driver
746                            .drive_transaction(
747                                SubmitTxRequest::new_transaction(tx),
748                                SubmitTransactionOptions::default(),
749                                Some(Duration::from_secs(60)),
750                            )
751                            .await
752                        {
753                            warn!(?tx_digest, "Failed to execute recovered transaction: {err}");
754                        } else {
755                            debug!(?tx_digest, "Executed recovered transaction");
756                        }
757                        if let Err(err) = pending_tx_log.finish_transaction(&tx_digest) {
758                            warn!(
759                                ?tx_digest,
760                                "Failed to clean up transaction in pending log: {err}"
761                            );
762                        } else {
763                            debug!(?tx_digest, "Cleaned up transaction in pending log");
764                        }
765                    }
766                })
767                .collect::<FuturesUnordered<_>>();
768
769            let mut num_recovered = 0;
770            while recovery.next().await.is_some() {
771                num_recovered += 1;
772                if num_recovered % 1000 == 0 {
773                    info!(
774                        "Recovered {} out of {} transactions from pending_tx_log.",
775                        num_recovered, num_pending_txes
776                    );
777                }
778            }
779            info!(
780                "Recovery finished. Recovered {} out of {} transactions from pending_tx_log.",
781                num_recovered, num_pending_txes
782            );
783        });
784    }
785
786    pub fn load_all_pending_transactions_in_test(&self) -> SuiResult<Vec<VerifiedTransaction>> {
787        self.pending_tx_log.load_all_pending_transactions()
788    }
789
790    pub fn empty_pending_tx_log_in_test(&self) -> bool {
791        self.pending_tx_log.is_empty()
792    }
793}
794/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
795#[derive(Clone)]
796pub struct TransactionOrchestratorMetrics {
797    total_req_received_single_writer: GenericCounter<AtomicU64>,
798    total_req_received_shared_object: GenericCounter<AtomicU64>,
799
800    good_response_single_writer: GenericCounter<AtomicU64>,
801    good_response_shared_object: GenericCounter<AtomicU64>,
802
803    req_in_flight_single_writer: GenericGauge<AtomicI64>,
804    req_in_flight_shared_object: GenericGauge<AtomicI64>,
805
806    wait_for_finality_in_flight: GenericGauge<AtomicI64>,
807    wait_for_finality_finished: GenericCounter<AtomicU64>,
808    wait_for_finality_timeout: GenericCounter<AtomicU64>,
809
810    local_execution_in_flight: GenericGauge<AtomicI64>,
811    local_execution_success: GenericCounter<AtomicU64>,
812    local_execution_timeout: GenericCounter<AtomicU64>,
813
814    concurrent_execution: IntCounter,
815
816    early_validation_rejections: IntCounterVec,
817
818    request_latency: HistogramVec,
819    local_execution_latency: HistogramVec,
820    settlement_finality_latency: HistogramVec,
821}
822
823// Note that labeled-metrics are stored upfront individually
824// to mitigate the perf hit by MetricsVec.
825// See https://github.com/tikv/rust-prometheus/tree/master/static-metric
826impl TransactionOrchestratorMetrics {
827    pub fn new(registry: &Registry) -> Self {
828        let total_req_received = register_int_counter_vec_with_registry!(
829            "tx_orchestrator_total_req_received",
830            "Total number of executions request Transaction Orchestrator receives, group by tx type",
831            &["tx_type"],
832            registry
833        )
834        .unwrap();
835
836        let total_req_received_single_writer =
837            total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
838        let total_req_received_shared_object =
839            total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
840
841        let good_response = register_int_counter_vec_with_registry!(
842            "tx_orchestrator_good_response",
843            "Total number of good responses Transaction Orchestrator generates, group by tx type",
844            &["tx_type"],
845            registry
846        )
847        .unwrap();
848
849        let good_response_single_writer =
850            good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
851        let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
852
853        let req_in_flight = register_int_gauge_vec_with_registry!(
854            "tx_orchestrator_req_in_flight",
855            "Number of requests in flights Transaction Orchestrator processes, group by tx type",
856            &["tx_type"],
857            registry
858        )
859        .unwrap();
860
861        let req_in_flight_single_writer =
862            req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
863        let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
864
865        Self {
866            total_req_received_single_writer,
867            total_req_received_shared_object,
868            good_response_single_writer,
869            good_response_shared_object,
870            req_in_flight_single_writer,
871            req_in_flight_shared_object,
872            wait_for_finality_in_flight: register_int_gauge_with_registry!(
873                "tx_orchestrator_wait_for_finality_in_flight",
874                "Number of in flight txns Transaction Orchestrator are waiting for finality for",
875                registry,
876            )
877            .unwrap(),
878            wait_for_finality_finished: register_int_counter_with_registry!(
879                "tx_orchestrator_wait_for_finality_fnished",
880                "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
881                registry,
882            )
883            .unwrap(),
884            wait_for_finality_timeout: register_int_counter_with_registry!(
885                "tx_orchestrator_wait_for_finality_timeout",
886                "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
887                registry,
888            )
889            .unwrap(),
890            local_execution_in_flight: register_int_gauge_with_registry!(
891                "tx_orchestrator_local_execution_in_flight",
892                "Number of local execution txns in flights Transaction Orchestrator handles",
893                registry,
894            )
895            .unwrap(),
896            local_execution_success: register_int_counter_with_registry!(
897                "tx_orchestrator_local_execution_success",
898                "Total number of successful local execution txns Transaction Orchestrator handles",
899                registry,
900            )
901            .unwrap(),
902            local_execution_timeout: register_int_counter_with_registry!(
903                "tx_orchestrator_local_execution_timeout",
904                "Total number of timed-out local execution txns Transaction Orchestrator handles",
905                registry,
906            )
907            .unwrap(),
908            concurrent_execution: register_int_counter_with_registry!(
909                "tx_orchestrator_concurrent_execution",
910                "Total number of concurrent execution where effects are available locally finishing driving the transaction to finality",
911                registry,
912            )
913            .unwrap(),
914            early_validation_rejections: register_int_counter_vec_with_registry!(
915                "tx_orchestrator_early_validation_rejections",
916                "Total number of transactions rejected during early validation before submission, by reason",
917                &["reason"],
918                registry,
919            )
920            .unwrap(),
921            request_latency: register_histogram_vec_with_registry!(
922                "tx_orchestrator_request_latency",
923                "Time spent in processing one Transaction Orchestrator request",
924                &["tx_type", "route", "wait_for_local_execution"],
925                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
926                registry,
927            )
928            .unwrap(),
929            local_execution_latency: register_histogram_vec_with_registry!(
930                "tx_orchestrator_local_execution_latency",
931                "Time spent in waiting for one Transaction Orchestrator gets locally executed",
932                &["tx_type"],
933                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
934                registry,
935            )
936            .unwrap(),
937            settlement_finality_latency: register_histogram_vec_with_registry!(
938                "tx_orchestrator_settlement_finality_latency",
939                "Time spent in waiting for one Transaction Orchestrator gets settled and finalized",
940                &["tx_type", "driver_type"],
941                mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
942                registry,
943            )
944            .unwrap(),
945        }
946    }
947
948    pub fn new_for_tests() -> Self {
949        let registry = Registry::new();
950        Self::new(&registry)
951    }
952}
953
954#[async_trait::async_trait]
955impl<A> sui_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
956where
957    A: AuthorityAPI + Send + Sync + 'static + Clone,
958{
959    async fn execute_transaction(
960        &self,
961        request: ExecuteTransactionRequestV3,
962        client_addr: Option<std::net::SocketAddr>,
963    ) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
964        self.execute_transaction_v3(request, client_addr).await
965    }
966
967    fn simulate_transaction(
968        &self,
969        transaction: TransactionData,
970        checks: TransactionChecks,
971    ) -> Result<SimulateTransactionResult, SuiError> {
972        self.validator_state
973            .simulate_transaction(transaction, checks)
974    }
975}
976
977/// Keeps track of inflight transactions being submitted, and helps recover transactions
978/// on restart.
979struct TransactionSubmissionGuard {
980    pending_tx_log: Arc<WritePathPendingTransactionLog>,
981    tx_digest: TransactionDigest,
982    is_new_transaction: bool,
983}
984
985impl TransactionSubmissionGuard {
986    pub fn new(
987        pending_tx_log: Arc<WritePathPendingTransactionLog>,
988        tx: &VerifiedTransaction,
989    ) -> Self {
990        let is_new_transaction = pending_tx_log.write_pending_transaction_maybe(tx);
991        let tx_digest = *tx.digest();
992        if is_new_transaction {
993            debug!(?tx_digest, "Added transaction to inflight set");
994        } else {
995            debug!(
996                ?tx_digest,
997                "Transaction already being processed, no new submission will be made"
998            );
999        };
1000        Self {
1001            pending_tx_log,
1002            tx_digest,
1003            is_new_transaction,
1004        }
1005    }
1006
1007    fn is_new_transaction(&self) -> bool {
1008        self.is_new_transaction
1009    }
1010}
1011
1012impl Drop for TransactionSubmissionGuard {
1013    fn drop(&mut self) {
1014        if let Err(err) = self.pending_tx_log.finish_transaction(&self.tx_digest) {
1015            warn!(?self.tx_digest, "Failed to clean up transaction in pending log: {err}");
1016        } else {
1017            debug!(?self.tx_digest, "Cleaned up transaction in pending log");
1018        }
1019    }
1020}