1use std::net::SocketAddr;
5use std::path::Path;
6use std::sync::Arc;
7use std::time::Duration;
8
9use futures::FutureExt;
10use futures::stream::{FuturesUnordered, StreamExt};
11use mysten_common::{backoff, in_antithesis};
12use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, spawn_monitored_task};
13use mysten_metrics::{add_server_timing, spawn_logged_monitored_task};
14use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
15use prometheus::{
16 HistogramVec, IntCounter, IntCounterVec, IntGauge, Registry,
17 register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
18 register_int_counter_with_registry, register_int_gauge_vec_with_registry,
19 register_int_gauge_with_registry,
20};
21use rand::Rng;
22use sui_config::NodeConfig;
23use sui_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
24use sui_types::base_types::TransactionDigest;
25use sui_types::effects::TransactionEffectsAPI;
26use sui_types::error::{ErrorCategory, SuiError, SuiErrorKind, SuiResult};
27use sui_types::messages_grpc::{SubmitTxRequest, TxType};
28use sui_types::sui_system_state::SuiSystemState;
29use sui_types::transaction::{Transaction, TransactionData, VerifiedTransaction};
30use sui_types::transaction_driver_types::{
31 EffectsFinalityInfo, ExecuteTransactionRequestType, ExecuteTransactionRequestV3,
32 ExecuteTransactionResponseV3, FinalizedEffects, IsTransactionExecutedLocally,
33 TransactionSubmissionError,
34};
35use sui_types::transaction_executor::{SimulateTransactionResult, TransactionChecks};
36use tokio::sync::broadcast::Receiver;
37use tokio::time::{Instant, sleep, timeout};
38use tracing::{Instrument, debug, error_span, info, instrument, warn};
39
40use crate::authority::AuthorityState;
41use crate::authority_aggregator::AuthorityAggregator;
42use crate::authority_client::{AuthorityAPI, NetworkAuthorityClient};
43use crate::transaction_driver::reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver};
44use crate::transaction_driver::{
45 QuorumTransactionResponse, SubmitTransactionOptions, TransactionDriver, TransactionDriverError,
46 TransactionDriverMetrics,
47};
48
49const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
52
53const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(90);
55
56pub type QuorumTransactionEffectsResult = Result<
57 (Transaction, QuorumTransactionResponse),
58 (TransactionDigest, TransactionSubmissionError),
59>;
60
61pub struct TransactionOrchestrator<A: Clone> {
67 inner: Arc<Inner<A>>,
68}
69
70impl TransactionOrchestrator<NetworkAuthorityClient> {
71 pub fn new_with_auth_aggregator(
72 validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
73 validator_state: Arc<AuthorityState>,
74 reconfig_channel: Receiver<SuiSystemState>,
75 parent_path: &Path,
76 prometheus_registry: &Registry,
77 node_config: &NodeConfig,
78 ) -> Self {
79 let observer = OnsiteReconfigObserver::new(
80 reconfig_channel,
81 validator_state.get_object_cache_reader().clone(),
82 validator_state.clone_committee_store(),
83 validators.safe_client_metrics_base.clone(),
84 );
85 TransactionOrchestrator::new(
86 validators,
87 validator_state,
88 parent_path,
89 prometheus_registry,
90 observer,
91 node_config,
92 )
93 }
94}
95
96impl<A> TransactionOrchestrator<A>
97where
98 A: AuthorityAPI + Send + Sync + 'static + Clone,
99 OnsiteReconfigObserver: ReconfigObserver<A>,
100{
101 pub fn new(
102 validators: Arc<AuthorityAggregator<A>>,
103 validator_state: Arc<AuthorityState>,
104 parent_path: &Path,
105 prometheus_registry: &Registry,
106 reconfig_observer: OnsiteReconfigObserver,
107 node_config: &NodeConfig,
108 ) -> Self {
109 let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
110 let td_metrics = Arc::new(TransactionDriverMetrics::new(prometheus_registry));
111 let client_metrics = Arc::new(
112 crate::validator_client_monitor::ValidatorClientMetrics::new(prometheus_registry),
113 );
114 let reconfig_observer = Arc::new(reconfig_observer);
115
116 let transaction_driver = TransactionDriver::new(
117 validators.clone(),
118 reconfig_observer.clone(),
119 td_metrics,
120 Some(node_config),
121 client_metrics,
122 );
123
124 let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
125 parent_path.join("fullnode_pending_transactions"),
126 ));
127 Inner::start_task_to_recover_txes_in_log(
128 pending_tx_log.clone(),
129 transaction_driver.clone(),
130 );
131
132 let td_allowed_submission_list = node_config
133 .transaction_driver_config
134 .as_ref()
135 .map(|config| config.allowed_submission_validators.clone())
136 .unwrap_or_default();
137
138 let td_blocked_submission_list = node_config
139 .transaction_driver_config
140 .as_ref()
141 .map(|config| config.blocked_submission_validators.clone())
142 .unwrap_or_default();
143
144 if !td_allowed_submission_list.is_empty() && !td_blocked_submission_list.is_empty() {
145 panic!(
146 "Both allowed and blocked submission lists are set, this is not allowed, {:?} {:?}",
147 td_allowed_submission_list, td_blocked_submission_list
148 );
149 }
150
151 let enable_early_validation = node_config
152 .transaction_driver_config
153 .as_ref()
154 .map(|config| config.enable_early_validation)
155 .unwrap_or(true);
156
157 let inner = Arc::new(Inner {
158 validator_state,
159 pending_tx_log,
160 metrics,
161 transaction_driver,
162 td_allowed_submission_list,
163 td_blocked_submission_list,
164 enable_early_validation,
165 });
166 Self { inner }
167 }
168}
169
170impl<A> TransactionOrchestrator<A>
171where
172 A: AuthorityAPI + Send + Sync + 'static + Clone,
173{
174 #[instrument(name = "tx_orchestrator_execute_transaction", level = "debug", skip_all,
175 fields(
176 tx_digest = ?request.transaction.digest(),
177 tx_type = ?request_type,
178 ))]
179 pub async fn execute_transaction_block(
180 &self,
181 request: ExecuteTransactionRequestV3,
182 request_type: ExecuteTransactionRequestType,
183 client_addr: Option<SocketAddr>,
184 ) -> Result<
185 (ExecuteTransactionResponseV3, IsTransactionExecutedLocally),
186 TransactionSubmissionError,
187 > {
188 let timer = Instant::now();
189 let tx_type = if request.transaction.is_consensus_tx() {
190 TxType::SharedObject
191 } else {
192 TxType::SingleWriter
193 };
194 let tx_digest = *request.transaction.digest();
195
196 let inner = self.inner.clone();
197 let (response, mut executed_locally) = spawn_monitored_task!(
198 Inner::<A>::execute_transaction_with_retry(inner, request, client_addr)
199 )
200 .await
201 .map_err(|e| TransactionSubmissionError::TransactionFailed {
202 category: ErrorCategory::Internal,
203 details: e.to_string(),
204 })??;
205
206 if !executed_locally {
207 executed_locally = if matches!(
208 request_type,
209 ExecuteTransactionRequestType::WaitForLocalExecution
210 ) {
211 let executed_locally =
212 Inner::<A>::wait_for_finalized_tx_executed_locally_with_timeout(
213 &self.inner.validator_state,
214 tx_digest,
215 tx_type,
216 &self.inner.metrics,
217 )
218 .await
219 .is_ok();
220 add_server_timing("local_execution done");
221 executed_locally
222 } else {
223 false
224 };
225 }
226
227 let QuorumTransactionResponse {
228 effects,
229 events,
230 input_objects,
231 output_objects,
232 auxiliary_data,
233 } = response;
234
235 let response = ExecuteTransactionResponseV3 {
236 effects,
237 events,
238 input_objects,
239 output_objects,
240 auxiliary_data,
241 };
242
243 self.inner
244 .metrics
245 .request_latency
246 .with_label_values(&[
247 tx_type.as_str(),
248 "execute_transaction_block",
249 executed_locally.to_string().as_str(),
250 ])
251 .observe(timer.elapsed().as_secs_f64());
252
253 Ok((response, executed_locally))
254 }
255
256 #[instrument(name = "tx_orchestrator_execute_transaction_v3", level = "debug", skip_all,
258 fields(tx_digest = ?request.transaction.digest()))]
259 pub async fn execute_transaction_v3(
260 &self,
261 request: ExecuteTransactionRequestV3,
262 client_addr: Option<SocketAddr>,
263 ) -> Result<ExecuteTransactionResponseV3, TransactionSubmissionError> {
264 let timer = Instant::now();
265 let tx_type = if request.transaction.is_consensus_tx() {
266 TxType::SharedObject
267 } else {
268 TxType::SingleWriter
269 };
270
271 let inner = self.inner.clone();
272 let (response, _) = spawn_monitored_task!(Inner::<A>::execute_transaction_with_retry(
273 inner,
274 request,
275 client_addr
276 ))
277 .await
278 .map_err(|e| TransactionSubmissionError::TransactionFailed {
279 category: ErrorCategory::Internal,
280 details: e.to_string(),
281 })??;
282
283 self.inner
284 .metrics
285 .request_latency
286 .with_label_values(&[tx_type.as_str(), "execute_transaction_v3", "false"])
287 .observe(timer.elapsed().as_secs_f64());
288
289 let QuorumTransactionResponse {
290 effects,
291 events,
292 input_objects,
293 output_objects,
294 auxiliary_data,
295 } = response;
296
297 Ok(ExecuteTransactionResponseV3 {
298 effects,
299 events,
300 input_objects,
301 output_objects,
302 auxiliary_data,
303 })
304 }
305
306 pub fn authority_state(&self) -> &Arc<AuthorityState> {
307 &self.inner.validator_state
308 }
309
310 pub fn transaction_driver(&self) -> &Arc<TransactionDriver<A>> {
311 &self.inner.transaction_driver
312 }
313
314 pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
315 self.inner
316 .transaction_driver
317 .authority_aggregator()
318 .load_full()
319 }
320
321 pub fn load_all_pending_transactions_in_test(&self) -> SuiResult<Vec<VerifiedTransaction>> {
322 self.inner.pending_tx_log.load_all_pending_transactions()
323 }
324
325 pub fn empty_pending_tx_log_in_test(&self) -> bool {
326 self.inner.pending_tx_log.is_empty()
327 }
328}
329
330struct Inner<A: Clone> {
331 validator_state: Arc<AuthorityState>,
332 pending_tx_log: Arc<WritePathPendingTransactionLog>,
333 metrics: Arc<TransactionOrchestratorMetrics>,
334 transaction_driver: Arc<TransactionDriver<A>>,
335 td_allowed_submission_list: Vec<String>,
336 td_blocked_submission_list: Vec<String>,
337 enable_early_validation: bool,
338}
339
340impl<A> Inner<A>
341where
342 A: AuthorityAPI + Send + Sync + 'static + Clone,
343{
344 async fn execute_transaction_with_retry(
345 inner: Arc<Inner<A>>,
346 request: ExecuteTransactionRequestV3,
347 client_addr: Option<SocketAddr>,
348 ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), TransactionSubmissionError>
349 {
350 let result = inner
351 .execute_transaction_with_effects_waiting(
352 request.clone(),
353 client_addr,
354 false,
355 )
356 .await;
357
358 if let Err(e) = &result
360 && e.is_retriable()
361 {
362 spawn_monitored_task!(async move {
363 inner.metrics.background_retry_started.inc();
364 let backoff = backoff::ExponentialBackoff::new(
365 Duration::from_secs(1),
366 Duration::from_secs(300),
367 );
368 const MAX_RETRIES: usize = 10;
369 for (i, delay) in backoff.enumerate() {
370 if i == MAX_RETRIES {
371 break;
372 }
373 let result = inner
376 .execute_transaction_with_effects_waiting(
377 request.clone(),
378 client_addr,
379 i > 3,
380 )
381 .await;
382 match result {
383 Ok(_) => {
384 inner
385 .metrics
386 .background_retry_attempts
387 .with_label_values(&["success"])
388 .inc();
389 debug!(
390 "Background retry {i} for transaction {} succeeded",
391 request.transaction.digest()
392 );
393 break;
394 }
395 Err(e) => {
396 if !e.is_retriable() {
397 inner
398 .metrics
399 .background_retry_attempts
400 .with_label_values(&["non-retriable"])
401 .inc();
402 debug!(
403 "Background retry {i} for transaction {} has non-retriable error: {e:?}. Terminating...",
404 request.transaction.digest()
405 );
406 break;
407 }
408 inner
409 .metrics
410 .background_retry_attempts
411 .with_label_values(&["retriable"])
412 .inc();
413 debug!(
414 "Background retry {i} for transaction {} has retriable error: {e:?}. Continuing...",
415 request.transaction.digest()
416 );
417 }
418 };
419 sleep(delay).await;
420 }
421 });
422 }
423
424 result
425 }
426
427 async fn execute_transaction_with_effects_waiting(
429 &self,
430 request: ExecuteTransactionRequestV3,
431 client_addr: Option<SocketAddr>,
432 enforce_live_input_objects: bool,
433 ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), TransactionSubmissionError>
434 {
435 let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
436 let verified_transaction = epoch_store
437 .verify_transaction_with_current_aliases(request.transaction.clone())
438 .map_err(TransactionSubmissionError::InvalidUserSignature)?
439 .into_tx();
440 let tx_digest = *verified_transaction.digest();
441
442 if self.enable_early_validation
445 && let Err(e) = self.validator_state.check_transaction_validity(
446 &epoch_store,
447 &verified_transaction,
448 enforce_live_input_objects,
449 )
450 {
451 let error_category = e.categorize();
452 if !error_category.is_submission_retriable() {
453 if !self.validator_state.is_tx_already_executed(&tx_digest) {
455 self.metrics
456 .early_validation_rejections
457 .with_label_values(&[e.to_variant_name()])
458 .inc();
459 debug!(
460 error = ?e,
461 "Transaction rejected during early validation"
462 );
463
464 return Err(TransactionSubmissionError::TransactionFailed {
465 category: error_category,
466 details: e.to_string(),
467 });
468 }
469 }
470 }
471
472 let guard =
474 TransactionSubmissionGuard::new(self.pending_tx_log.clone(), &verified_transaction);
475 let is_new_transaction = guard.is_new_transaction();
476
477 let include_events = request.include_events;
478 let include_input_objects = request.include_input_objects;
479 let include_output_objects = request.include_output_objects;
480 let include_auxiliary_data = request.include_auxiliary_data;
481
482 let finality_timeout = std::env::var("WAIT_FOR_FINALITY_TIMEOUT_SECS")
483 .ok()
484 .and_then(|v| v.parse().ok())
485 .map(Duration::from_secs)
486 .unwrap_or(WAIT_FOR_FINALITY_TIMEOUT);
487
488 let num_submissions = if !is_new_transaction {
489 0
491 } else if cfg!(msim) || in_antithesis() {
492 let r = rand::thread_rng().gen_range(1..=100);
494 let n = if r <= 10 {
495 3
496 } else if r <= 30 {
497 2
498 } else {
499 1
500 };
501 if n > 1 {
502 debug!("Making {n} execution calls");
503 }
504 n
505 } else {
506 1
507 };
508
509 let mut execution_futures = FuturesUnordered::new();
511 for i in 0..num_submissions {
512 let should_delay = i > 0 && rand::thread_rng().gen_bool(0.8);
514 let delay_ms = if should_delay {
515 rand::thread_rng().gen_range(100..=500)
516 } else {
517 0
518 };
519
520 let request = request.clone();
521 let verified_transaction = verified_transaction.clone();
522
523 let future = async move {
524 if delay_ms > 0 {
525 sleep(Duration::from_millis(delay_ms)).await;
527 }
528 self.execute_transaction_impl(
529 request,
530 verified_transaction,
531 client_addr,
532 Some(finality_timeout),
533 )
534 .await
535 }
536 .boxed();
537 execution_futures.push(future);
538 }
539
540 let mut last_execution_error: Option<TransactionSubmissionError> = None;
542
543 let digests = [tx_digest];
545 let mut local_effects_future = self
546 .validator_state
547 .get_transaction_cache_reader()
548 .notify_read_executed_effects(
549 "TransactionOrchestrator::notify_read_execute_transaction_with_effects_waiting",
550 &digests,
551 )
552 .boxed();
553
554 let mut timeout_future = tokio::time::sleep(finality_timeout).boxed();
556
557 loop {
558 tokio::select! {
559 biased;
560
561 all_effects = &mut local_effects_future => {
563 debug!(
564 "Effects became available while execution was running"
565 );
566 if all_effects.len() != 1 {
567 break Err(TransactionSubmissionError::TransactionDriverInternalError(SuiErrorKind::Unknown(format!("Unexpected number of effects found: {}", all_effects.len())).into()));
568 }
569 self.metrics.concurrent_execution.inc();
570
571 let effects = all_effects.into_iter().next().unwrap();
572 let epoch = effects.executed_epoch();
573 let events = if include_events {
574 if effects.events_digest().is_some() {
575 Some(self.validator_state.get_transaction_events(effects.transaction_digest())
576 .map_err(TransactionSubmissionError::TransactionDriverInternalError)?)
577 } else {
578 None
579 }
580 } else {
581 None
582 };
583 let input_objects = include_input_objects
584 .then(|| self.validator_state.get_transaction_input_objects(&effects))
585 .transpose()
586 .map_err(TransactionSubmissionError::TransactionDriverInternalError)?;
587 let output_objects = include_output_objects
588 .then(|| self.validator_state.get_transaction_output_objects(&effects))
589 .transpose()
590 .map_err(TransactionSubmissionError::TransactionDriverInternalError)?;
591 let response = QuorumTransactionResponse {
592 effects: FinalizedEffects {
593 effects,
594 finality_info: EffectsFinalityInfo::QuorumExecuted(epoch),
595 },
596 events,
597 input_objects,
598 output_objects,
599 auxiliary_data: None,
600 };
601 break Ok((response, true));
602 }
603
604 Some(result) = execution_futures.next() => {
606 match result {
607 Ok(resp) => {
608 debug!("Execution succeeded, returning response");
610 let QuorumTransactionResponse {
611 effects,
612 events,
613 input_objects,
614 output_objects,
615 auxiliary_data,
616 } = resp;
617 let resp = QuorumTransactionResponse {
619 effects,
620 events: if include_events { events } else { None },
621 input_objects: if include_input_objects { input_objects } else { None },
622 output_objects: if include_output_objects { output_objects } else { None },
623 auxiliary_data: if include_auxiliary_data { auxiliary_data } else { None },
624 };
625 break Ok((resp, false));
626 }
627 Err(e) => {
628 debug!(?e, "Execution attempt failed, wait for other attempts");
629 last_execution_error = Some(e);
630 }
631 };
632
633 if execution_futures.is_empty() {
635 break Err(last_execution_error.unwrap());
636 }
637 }
638
639 _ = &mut timeout_future => {
641 if let Some(e) = last_execution_error {
642 debug!("Timeout waiting for transaction finality. Last execution error: {e}");
643 } else {
644 debug!("Timeout waiting for transaction finality.");
645 }
646 self.metrics.wait_for_finality_timeout.inc();
647
648 break Err(TransactionSubmissionError::TimeoutBeforeFinality);
650 }
651 }
652 }
653 }
654
655 #[instrument(level = "error", skip_all)]
656 async fn execute_transaction_impl(
657 &self,
658 request: ExecuteTransactionRequestV3,
659 verified_transaction: VerifiedTransaction,
660 client_addr: Option<SocketAddr>,
661 finality_timeout: Option<Duration>,
662 ) -> Result<QuorumTransactionResponse, TransactionSubmissionError> {
663 debug!("TO Received transaction execution request.");
664
665 let timer = Instant::now();
666 let tx_type = if verified_transaction.is_consensus_tx() {
667 TxType::SharedObject
668 } else {
669 TxType::SingleWriter
670 };
671
672 let (_in_flight_metrics_guards, good_response_metrics) =
673 self.update_metrics(&request.transaction);
674
675 let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
677 wait_for_finality_gauge.inc();
678 let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
679 in_flight.dec();
680 });
681
682 let response = self
683 .submit_with_transaction_driver(
684 &self.transaction_driver,
685 &request,
686 client_addr,
687 &verified_transaction,
688 good_response_metrics,
689 finality_timeout,
690 )
691 .await?;
692 let driver_type = "transaction_driver";
693
694 add_server_timing("wait_for_finality done");
695
696 self.metrics.wait_for_finality_finished.inc();
697
698 let elapsed = timer.elapsed().as_secs_f64();
699 self.metrics
700 .settlement_finality_latency
701 .with_label_values(&[tx_type.as_str(), driver_type])
702 .observe(elapsed);
703 good_response_metrics.inc();
704
705 Ok(response)
706 }
707
708 #[instrument(level = "error", skip_all, err(level = "info"))]
709 async fn submit_with_transaction_driver(
710 &self,
711 td: &Arc<TransactionDriver<A>>,
712 request: &ExecuteTransactionRequestV3,
713 client_addr: Option<SocketAddr>,
714 verified_transaction: &VerifiedTransaction,
715 good_response_metrics: &GenericCounter<AtomicU64>,
716 timeout_duration: Option<Duration>,
717 ) -> Result<QuorumTransactionResponse, TransactionSubmissionError> {
718 let tx_digest = *verified_transaction.digest();
719 debug!("Using TransactionDriver for transaction {:?}", tx_digest);
720
721 let td_response = td
722 .drive_transaction(
723 SubmitTxRequest::new_transaction(request.transaction.clone()),
724 SubmitTransactionOptions {
725 forwarded_client_addr: client_addr,
726 allowed_validators: self.td_allowed_submission_list.clone(),
727 blocked_validators: self.td_blocked_submission_list.clone(),
728 },
729 timeout_duration,
730 )
731 .await
732 .map_err(|e| match e {
733 TransactionDriverError::TimeoutWithLastRetriableError {
734 last_error,
735 attempts,
736 timeout,
737 } => TransactionSubmissionError::TimeoutBeforeFinalityWithErrors {
738 last_error: last_error.map(|e| e.to_string()).unwrap_or_default(),
739 attempts,
740 timeout,
741 },
742 other => TransactionSubmissionError::TransactionFailed {
743 category: other.categorize(),
744 details: other.to_string(),
745 },
746 });
747
748 match td_response {
749 Err(e) => {
750 warn!("TransactionDriver error: {e:?}");
751 Err(e)
752 }
753 Ok(quorum_transaction_response) => {
754 good_response_metrics.inc();
755 Ok(quorum_transaction_response)
756 }
757 }
758 }
759
760 #[instrument(
761 name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout",
762 level = "debug",
763 skip_all,
764 err(level = "info")
765 )]
766 async fn wait_for_finalized_tx_executed_locally_with_timeout(
767 validator_state: &Arc<AuthorityState>,
768 tx_digest: TransactionDigest,
769 tx_type: TxType,
770 metrics: &TransactionOrchestratorMetrics,
771 ) -> SuiResult {
772 metrics.local_execution_in_flight.inc();
773 let _metrics_guard =
774 scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
775 in_flight.dec();
776 });
777
778 let _latency_guard = metrics
779 .local_execution_latency
780 .with_label_values(&[tx_type.as_str()])
781 .start_timer();
782 debug!("Waiting for finalized tx to be executed locally.");
783 match timeout(
784 LOCAL_EXECUTION_TIMEOUT,
785 validator_state
786 .get_transaction_cache_reader()
787 .notify_read_executed_effects_digests(
788 "TransactionOrchestrator::notify_read_wait_for_local_execution",
789 &[tx_digest],
790 ),
791 )
792 .instrument(error_span!(
793 "transaction_orchestrator::local_execution",
794 ?tx_digest
795 ))
796 .await
797 {
798 Err(_elapsed) => {
799 debug!(
800 "Waiting for finalized tx to be executed locally timed out within {:?}.",
801 LOCAL_EXECUTION_TIMEOUT
802 );
803 metrics.local_execution_timeout.inc();
804 Err(SuiErrorKind::TimeoutError.into())
805 }
806 Ok(_) => {
807 metrics.local_execution_success.inc();
808 Ok(())
809 }
810 }
811 }
812
813 fn update_metrics<'a>(
814 &'a self,
815 transaction: &Transaction,
816 ) -> (impl Drop + use<A>, &'a GenericCounter<AtomicU64>) {
817 let (in_flight, good_response) = if transaction.is_consensus_tx() {
818 self.metrics.total_req_received_shared_object.inc();
819 (
820 self.metrics.req_in_flight_shared_object.clone(),
821 &self.metrics.good_response_shared_object,
822 )
823 } else {
824 self.metrics.total_req_received_single_writer.inc();
825 (
826 self.metrics.req_in_flight_single_writer.clone(),
827 &self.metrics.good_response_single_writer,
828 )
829 };
830 in_flight.inc();
831 (
832 scopeguard::guard(in_flight, |in_flight| {
833 in_flight.dec();
834 }),
835 good_response,
836 )
837 }
838
839 fn start_task_to_recover_txes_in_log(
840 pending_tx_log: Arc<WritePathPendingTransactionLog>,
841 transaction_driver: Arc<TransactionDriver<A>>,
842 ) {
843 spawn_logged_monitored_task!(async move {
844 if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
845 info!("Skipping loading pending transactions from pending_tx_log.");
846 return;
847 }
848 let pending_txes = pending_tx_log
849 .load_all_pending_transactions()
850 .expect("failed to load all pending transactions");
851 let num_pending_txes = pending_txes.len();
852 info!(
853 "Recovering {} pending transactions from pending_tx_log.",
854 num_pending_txes
855 );
856 let mut recovery = pending_txes
857 .into_iter()
858 .map(|tx| {
859 let pending_tx_log = pending_tx_log.clone();
860 let transaction_driver = transaction_driver.clone();
861 async move {
862 let tx = tx.into_inner();
865 let tx_digest = *tx.digest();
866 if let Err(err) = transaction_driver
869 .drive_transaction(
870 SubmitTxRequest::new_transaction(tx),
871 SubmitTransactionOptions::default(),
872 Some(Duration::from_secs(60)),
873 )
874 .await
875 {
876 warn!(?tx_digest, "Failed to execute recovered transaction: {err}");
877 } else {
878 debug!(?tx_digest, "Executed recovered transaction");
879 }
880 if let Err(err) = pending_tx_log.finish_transaction(&tx_digest) {
881 warn!(
882 ?tx_digest,
883 "Failed to clean up transaction in pending log: {err}"
884 );
885 } else {
886 debug!(?tx_digest, "Cleaned up transaction in pending log");
887 }
888 }
889 })
890 .collect::<FuturesUnordered<_>>();
891
892 let mut num_recovered = 0;
893 while recovery.next().await.is_some() {
894 num_recovered += 1;
895 if num_recovered % 1000 == 0 {
896 info!(
897 "Recovered {} out of {} transactions from pending_tx_log.",
898 num_recovered, num_pending_txes
899 );
900 }
901 }
902 info!(
903 "Recovery finished. Recovered {} out of {} transactions from pending_tx_log.",
904 num_recovered, num_pending_txes
905 );
906 });
907 }
908}
909#[derive(Clone)]
911pub struct TransactionOrchestratorMetrics {
912 total_req_received_single_writer: GenericCounter<AtomicU64>,
913 total_req_received_shared_object: GenericCounter<AtomicU64>,
914
915 good_response_single_writer: GenericCounter<AtomicU64>,
916 good_response_shared_object: GenericCounter<AtomicU64>,
917
918 req_in_flight_single_writer: GenericGauge<AtomicI64>,
919 req_in_flight_shared_object: GenericGauge<AtomicI64>,
920
921 wait_for_finality_in_flight: GenericGauge<AtomicI64>,
922 wait_for_finality_finished: GenericCounter<AtomicU64>,
923 wait_for_finality_timeout: GenericCounter<AtomicU64>,
924
925 local_execution_in_flight: GenericGauge<AtomicI64>,
926 local_execution_success: GenericCounter<AtomicU64>,
927 local_execution_timeout: GenericCounter<AtomicU64>,
928
929 concurrent_execution: IntCounter,
930
931 early_validation_rejections: IntCounterVec,
932
933 background_retry_started: IntGauge,
934 background_retry_attempts: IntCounterVec,
935
936 request_latency: HistogramVec,
937 local_execution_latency: HistogramVec,
938 settlement_finality_latency: HistogramVec,
939}
940
941impl TransactionOrchestratorMetrics {
945 pub fn new(registry: &Registry) -> Self {
946 let total_req_received = register_int_counter_vec_with_registry!(
947 "tx_orchestrator_total_req_received",
948 "Total number of executions request Transaction Orchestrator receives, group by tx type",
949 &["tx_type"],
950 registry
951 )
952 .unwrap();
953
954 let total_req_received_single_writer =
955 total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
956 let total_req_received_shared_object =
957 total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
958
959 let good_response = register_int_counter_vec_with_registry!(
960 "tx_orchestrator_good_response",
961 "Total number of good responses Transaction Orchestrator generates, group by tx type",
962 &["tx_type"],
963 registry
964 )
965 .unwrap();
966
967 let good_response_single_writer =
968 good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
969 let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
970
971 let req_in_flight = register_int_gauge_vec_with_registry!(
972 "tx_orchestrator_req_in_flight",
973 "Number of requests in flights Transaction Orchestrator processes, group by tx type",
974 &["tx_type"],
975 registry
976 )
977 .unwrap();
978
979 let req_in_flight_single_writer =
980 req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
981 let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
982
983 Self {
984 total_req_received_single_writer,
985 total_req_received_shared_object,
986 good_response_single_writer,
987 good_response_shared_object,
988 req_in_flight_single_writer,
989 req_in_flight_shared_object,
990 wait_for_finality_in_flight: register_int_gauge_with_registry!(
991 "tx_orchestrator_wait_for_finality_in_flight",
992 "Number of in flight txns Transaction Orchestrator are waiting for finality for",
993 registry,
994 )
995 .unwrap(),
996 wait_for_finality_finished: register_int_counter_with_registry!(
997 "tx_orchestrator_wait_for_finality_fnished",
998 "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
999 registry,
1000 )
1001 .unwrap(),
1002 wait_for_finality_timeout: register_int_counter_with_registry!(
1003 "tx_orchestrator_wait_for_finality_timeout",
1004 "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
1005 registry,
1006 )
1007 .unwrap(),
1008 local_execution_in_flight: register_int_gauge_with_registry!(
1009 "tx_orchestrator_local_execution_in_flight",
1010 "Number of local execution txns in flights Transaction Orchestrator handles",
1011 registry,
1012 )
1013 .unwrap(),
1014 local_execution_success: register_int_counter_with_registry!(
1015 "tx_orchestrator_local_execution_success",
1016 "Total number of successful local execution txns Transaction Orchestrator handles",
1017 registry,
1018 )
1019 .unwrap(),
1020 local_execution_timeout: register_int_counter_with_registry!(
1021 "tx_orchestrator_local_execution_timeout",
1022 "Total number of timed-out local execution txns Transaction Orchestrator handles",
1023 registry,
1024 )
1025 .unwrap(),
1026 concurrent_execution: register_int_counter_with_registry!(
1027 "tx_orchestrator_concurrent_execution",
1028 "Total number of concurrent execution where effects are available locally finishing driving the transaction to finality",
1029 registry,
1030 )
1031 .unwrap(),
1032 early_validation_rejections: register_int_counter_vec_with_registry!(
1033 "tx_orchestrator_early_validation_rejections",
1034 "Total number of transactions rejected during early validation before submission, by reason",
1035 &["reason"],
1036 registry,
1037 )
1038 .unwrap(),
1039 background_retry_started: register_int_gauge_with_registry!(
1040 "tx_orchestrator_background_retry_started",
1041 "Number of background retry tasks kicked off for transactions with retriable errors",
1042 registry,
1043 )
1044 .unwrap(),
1045 background_retry_attempts: register_int_counter_vec_with_registry!(
1046 "tx_orchestrator_background_retry_attempts",
1047 "Total number of background retry attempts, by status",
1048 &["status"],
1049 registry,
1050 )
1051 .unwrap(),
1052 request_latency: register_histogram_vec_with_registry!(
1053 "tx_orchestrator_request_latency",
1054 "Time spent in processing one Transaction Orchestrator request",
1055 &["tx_type", "route", "wait_for_local_execution"],
1056 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1057 registry,
1058 )
1059 .unwrap(),
1060 local_execution_latency: register_histogram_vec_with_registry!(
1061 "tx_orchestrator_local_execution_latency",
1062 "Time spent in waiting for one Transaction Orchestrator gets locally executed",
1063 &["tx_type"],
1064 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1065 registry,
1066 )
1067 .unwrap(),
1068 settlement_finality_latency: register_histogram_vec_with_registry!(
1069 "tx_orchestrator_settlement_finality_latency",
1070 "Time spent in waiting for one Transaction Orchestrator gets settled and finalized",
1071 &["tx_type", "driver_type"],
1072 mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
1073 registry,
1074 )
1075 .unwrap(),
1076 }
1077 }
1078
1079 pub fn new_for_tests() -> Self {
1080 let registry = Registry::new();
1081 Self::new(®istry)
1082 }
1083}
1084
1085#[async_trait::async_trait]
1086impl<A> sui_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
1087where
1088 A: AuthorityAPI + Send + Sync + 'static + Clone,
1089{
1090 async fn execute_transaction(
1091 &self,
1092 request: ExecuteTransactionRequestV3,
1093 client_addr: Option<std::net::SocketAddr>,
1094 ) -> Result<ExecuteTransactionResponseV3, TransactionSubmissionError> {
1095 self.execute_transaction_v3(request, client_addr).await
1096 }
1097
1098 fn simulate_transaction(
1099 &self,
1100 transaction: TransactionData,
1101 checks: TransactionChecks,
1102 allow_mock_gas_coin: bool,
1103 ) -> Result<SimulateTransactionResult, SuiError> {
1104 self.inner
1105 .validator_state
1106 .simulate_transaction(transaction, checks, allow_mock_gas_coin)
1107 }
1108}
1109
1110struct TransactionSubmissionGuard {
1113 pending_tx_log: Arc<WritePathPendingTransactionLog>,
1114 tx_digest: TransactionDigest,
1115 is_new_transaction: bool,
1116}
1117
1118impl TransactionSubmissionGuard {
1119 pub fn new(
1120 pending_tx_log: Arc<WritePathPendingTransactionLog>,
1121 tx: &VerifiedTransaction,
1122 ) -> Self {
1123 let is_new_transaction = pending_tx_log.write_pending_transaction_maybe(tx);
1124 let tx_digest = *tx.digest();
1125 if is_new_transaction {
1126 debug!(?tx_digest, "Added transaction to inflight set");
1127 } else {
1128 debug!(
1129 ?tx_digest,
1130 "Transaction already being processed, no new submission will be made"
1131 );
1132 };
1133 Self {
1134 pending_tx_log,
1135 tx_digest,
1136 is_new_transaction,
1137 }
1138 }
1139
1140 fn is_new_transaction(&self) -> bool {
1141 self.is_new_transaction
1142 }
1143}
1144
1145impl Drop for TransactionSubmissionGuard {
1146 fn drop(&mut self) {
1147 if let Err(err) = self.pending_tx_log.finish_transaction(&self.tx_digest) {
1148 warn!(?self.tx_digest, "Failed to clean up transaction in pending log: {err}");
1149 } else {
1150 debug!(?self.tx_digest, "Cleaned up transaction in pending log");
1151 }
1152 }
1153}