1use std::net::SocketAddr;
5use std::path::Path;
6use std::sync::Arc;
7use std::time::Duration;
8
9use futures::FutureExt;
10use futures::stream::{FuturesUnordered, StreamExt};
11use mysten_common::{backoff, in_integration_test};
12use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, spawn_monitored_task};
13use mysten_metrics::{add_server_timing, spawn_logged_monitored_task};
14use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
15use prometheus::{
16 HistogramVec, IntCounter, IntCounterVec, IntGauge, Registry,
17 register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
18 register_int_counter_with_registry, register_int_gauge_vec_with_registry,
19 register_int_gauge_with_registry,
20};
21use rand::Rng;
22use sui_config::NodeConfig;
23use sui_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
24use sui_types::base_types::TransactionDigest;
25use sui_types::effects::TransactionEffectsAPI;
26use sui_types::error::{ErrorCategory, SuiError, SuiErrorKind, SuiResult};
27use sui_types::messages_grpc::{SubmitTxRequest, TxType};
28use sui_types::sui_system_state::SuiSystemState;
29use sui_types::transaction::{Transaction, TransactionData, VerifiedTransaction};
30use sui_types::transaction_driver_types::{
31 EffectsFinalityInfo, ExecuteTransactionRequestType, ExecuteTransactionRequestV3,
32 ExecuteTransactionResponseV3, FinalizedEffects, IsTransactionExecutedLocally,
33 TransactionSubmissionError,
34};
35use sui_types::transaction_executor::{SimulateTransactionResult, TransactionChecks};
36use tokio::sync::broadcast::Receiver;
37use tokio::time::{Instant, sleep, timeout};
38use tracing::{Instrument, debug, error_span, info, instrument, warn};
39
40use crate::authority::AuthorityState;
41use crate::authority_aggregator::AuthorityAggregator;
42use crate::authority_client::{AuthorityAPI, NetworkAuthorityClient};
43use crate::transaction_driver::{OnsiteReconfigObserver, ReconfigObserver};
44use crate::transaction_driver::{
45 QuorumTransactionResponse, SubmitTransactionOptions, TransactionDriver, TransactionDriverError,
46 TransactionDriverMetrics,
47};
48
49const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
52
53const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(90);
55
56pub type QuorumTransactionEffectsResult = Result<
57 (Transaction, QuorumTransactionResponse),
58 (TransactionDigest, TransactionSubmissionError),
59>;
60
61pub struct TransactionOrchestrator<A: Clone> {
67 inner: Arc<Inner<A>>,
68}
69
70impl TransactionOrchestrator<NetworkAuthorityClient> {
71 pub fn new_with_auth_aggregator(
72 validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
73 validator_state: Arc<AuthorityState>,
74 reconfig_channel: Receiver<SuiSystemState>,
75 parent_path: &Path,
76 prometheus_registry: &Registry,
77 node_config: &NodeConfig,
78 ) -> Self {
79 let observer = OnsiteReconfigObserver::new(
80 reconfig_channel,
81 validator_state.get_object_cache_reader().clone(),
82 validator_state.clone_committee_store(),
83 validators.safe_client_metrics_base.clone(),
84 );
85 TransactionOrchestrator::new(
86 validators,
87 validator_state,
88 parent_path,
89 prometheus_registry,
90 observer,
91 node_config,
92 )
93 }
94}
95
96impl<A> TransactionOrchestrator<A>
97where
98 A: AuthorityAPI + Send + Sync + 'static + Clone,
99 OnsiteReconfigObserver: ReconfigObserver<A>,
100{
101 pub fn new(
102 validators: Arc<AuthorityAggregator<A>>,
103 validator_state: Arc<AuthorityState>,
104 parent_path: &Path,
105 prometheus_registry: &Registry,
106 reconfig_observer: OnsiteReconfigObserver,
107 node_config: &NodeConfig,
108 ) -> Self {
109 let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
110 let td_metrics = Arc::new(TransactionDriverMetrics::new(prometheus_registry));
111 let client_metrics = Arc::new(
112 crate::validator_client_monitor::ValidatorClientMetrics::new(prometheus_registry),
113 );
114 let reconfig_observer = Arc::new(reconfig_observer);
115
116 let transaction_driver = TransactionDriver::new(
117 validators.clone(),
118 reconfig_observer.clone(),
119 td_metrics,
120 Some(node_config),
121 client_metrics,
122 );
123
124 let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
125 parent_path.join("fullnode_pending_transactions"),
126 ));
127 Inner::start_task_to_recover_txes_in_log(
128 pending_tx_log.clone(),
129 transaction_driver.clone(),
130 );
131
132 let td_allowed_submission_list = node_config
133 .transaction_driver_config
134 .as_ref()
135 .map(|config| config.allowed_submission_validators.clone())
136 .unwrap_or_default();
137
138 let td_blocked_submission_list = node_config
139 .transaction_driver_config
140 .as_ref()
141 .map(|config| config.blocked_submission_validators.clone())
142 .unwrap_or_default();
143
144 if !td_allowed_submission_list.is_empty() && !td_blocked_submission_list.is_empty() {
145 panic!(
146 "Both allowed and blocked submission lists are set, this is not allowed, {:?} {:?}",
147 td_allowed_submission_list, td_blocked_submission_list
148 );
149 }
150
151 let enable_early_validation = node_config
152 .transaction_driver_config
153 .as_ref()
154 .map(|config| config.enable_early_validation)
155 .unwrap_or(true);
156
157 let inner = Arc::new(Inner {
158 validator_state,
159 pending_tx_log,
160 metrics,
161 transaction_driver,
162 td_allowed_submission_list,
163 td_blocked_submission_list,
164 enable_early_validation,
165 });
166 Self { inner }
167 }
168}
169
170impl<A> TransactionOrchestrator<A>
171where
172 A: AuthorityAPI + Send + Sync + 'static + Clone,
173{
174 #[instrument(name = "tx_orchestrator_execute_transaction", level = "debug", skip_all,
175 fields(
176 tx_digest = ?request.transaction.digest(),
177 tx_type = ?request_type,
178 ))]
179 pub async fn execute_transaction_block(
180 &self,
181 request: ExecuteTransactionRequestV3,
182 request_type: ExecuteTransactionRequestType,
183 client_addr: Option<SocketAddr>,
184 ) -> Result<
185 (ExecuteTransactionResponseV3, IsTransactionExecutedLocally),
186 TransactionSubmissionError,
187 > {
188 let timer = Instant::now();
189 let tx_type = if request.transaction.is_consensus_tx() {
190 TxType::SharedObject
191 } else {
192 TxType::SingleWriter
193 };
194 let tx_digest = *request.transaction.digest();
195
196 let inner = self.inner.clone();
197 let (response, mut executed_locally) = spawn_monitored_task!(
198 Inner::<A>::execute_transaction_with_retry(inner, request, client_addr)
199 )
200 .await
201 .map_err(|e| TransactionSubmissionError::TransactionFailed {
202 category: ErrorCategory::Internal,
203 details: e.to_string(),
204 })??;
205
206 if !executed_locally {
207 executed_locally = if matches!(
208 request_type,
209 ExecuteTransactionRequestType::WaitForLocalExecution
210 ) {
211 let executed_locally =
212 Inner::<A>::wait_for_finalized_tx_executed_locally_with_timeout(
213 &self.inner.validator_state,
214 tx_digest,
215 tx_type,
216 &self.inner.metrics,
217 )
218 .await
219 .is_ok();
220 add_server_timing("local_execution done");
221 executed_locally
222 } else {
223 false
224 };
225 }
226
227 let QuorumTransactionResponse {
228 effects,
229 events,
230 input_objects,
231 output_objects,
232 auxiliary_data,
233 } = response;
234
235 let response = ExecuteTransactionResponseV3 {
236 effects,
237 events,
238 input_objects,
239 output_objects,
240 auxiliary_data,
241 };
242
243 let request_latency = timer.elapsed().as_secs_f64();
244 if request_latency > 10.0 {
245 warn!(
246 ?tx_digest,
247 "Request latency {} is too high", request_latency,
248 );
249 }
250 self.inner
251 .metrics
252 .request_latency
253 .with_label_values(&[
254 tx_type.as_str(),
255 "execute_transaction_block",
256 executed_locally.to_string().as_str(),
257 ])
258 .observe(request_latency);
259
260 Ok((response, executed_locally))
261 }
262
263 #[instrument(name = "tx_orchestrator_execute_transaction_v3", level = "debug", skip_all,
265 fields(tx_digest = ?request.transaction.digest()))]
266 pub async fn execute_transaction_v3(
267 &self,
268 request: ExecuteTransactionRequestV3,
269 client_addr: Option<SocketAddr>,
270 ) -> Result<ExecuteTransactionResponseV3, TransactionSubmissionError> {
271 let timer = Instant::now();
272 let tx_type = if request.transaction.is_consensus_tx() {
273 TxType::SharedObject
274 } else {
275 TxType::SingleWriter
276 };
277
278 let inner = self.inner.clone();
279 let (response, _) = spawn_monitored_task!(Inner::<A>::execute_transaction_with_retry(
280 inner,
281 request,
282 client_addr
283 ))
284 .await
285 .map_err(|e| TransactionSubmissionError::TransactionFailed {
286 category: ErrorCategory::Internal,
287 details: e.to_string(),
288 })??;
289
290 self.inner
291 .metrics
292 .request_latency
293 .with_label_values(&[tx_type.as_str(), "execute_transaction_v3", "false"])
294 .observe(timer.elapsed().as_secs_f64());
295
296 let QuorumTransactionResponse {
297 effects,
298 events,
299 input_objects,
300 output_objects,
301 auxiliary_data,
302 } = response;
303
304 Ok(ExecuteTransactionResponseV3 {
305 effects,
306 events,
307 input_objects,
308 output_objects,
309 auxiliary_data,
310 })
311 }
312
313 pub fn authority_state(&self) -> &Arc<AuthorityState> {
314 &self.inner.validator_state
315 }
316
317 pub fn transaction_driver(&self) -> &Arc<TransactionDriver<A>> {
318 &self.inner.transaction_driver
319 }
320
321 pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
322 self.inner
323 .transaction_driver
324 .authority_aggregator()
325 .load_full()
326 }
327
328 pub fn load_all_pending_transactions_in_test(&self) -> SuiResult<Vec<VerifiedTransaction>> {
329 self.inner.pending_tx_log.load_all_pending_transactions()
330 }
331
332 pub fn empty_pending_tx_log_in_test(&self) -> bool {
333 self.inner.pending_tx_log.is_empty()
334 }
335}
336
337struct Inner<A: Clone> {
338 validator_state: Arc<AuthorityState>,
339 pending_tx_log: Arc<WritePathPendingTransactionLog>,
340 metrics: Arc<TransactionOrchestratorMetrics>,
341 transaction_driver: Arc<TransactionDriver<A>>,
342 td_allowed_submission_list: Vec<String>,
343 td_blocked_submission_list: Vec<String>,
344 enable_early_validation: bool,
345}
346
347impl<A> Inner<A>
348where
349 A: AuthorityAPI + Send + Sync + 'static + Clone,
350{
351 async fn execute_transaction_with_retry(
352 inner: Arc<Inner<A>>,
353 request: ExecuteTransactionRequestV3,
354 client_addr: Option<SocketAddr>,
355 ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), TransactionSubmissionError>
356 {
357 let result = inner
358 .execute_transaction_with_effects_waiting(
359 request.clone(),
360 client_addr,
361 false,
362 )
363 .await;
364
365 if let Err(e) = &result
367 && e.is_retriable()
368 {
369 spawn_monitored_task!(async move {
370 inner.metrics.background_retry_started.inc();
371 let backoff = backoff::ExponentialBackoff::new(
372 Duration::from_secs(1),
373 Duration::from_secs(300),
374 );
375 const MAX_RETRIES: usize = 10;
376 for (i, delay) in backoff.enumerate() {
377 if i == MAX_RETRIES {
378 break;
379 }
380 let result = inner
383 .execute_transaction_with_effects_waiting(
384 request.clone(),
385 client_addr,
386 i > 3,
387 )
388 .await;
389 match result {
390 Ok(_) => {
391 inner
392 .metrics
393 .background_retry_attempts
394 .with_label_values(&["success"])
395 .inc();
396 debug!(
397 "Background retry {i} for transaction {} succeeded",
398 request.transaction.digest()
399 );
400 break;
401 }
402 Err(e) => {
403 if !e.is_retriable() {
404 inner
405 .metrics
406 .background_retry_attempts
407 .with_label_values(&["non-retriable"])
408 .inc();
409 debug!(
410 "Background retry {i} for transaction {} has non-retriable error: {e:?}. Terminating...",
411 request.transaction.digest()
412 );
413 break;
414 }
415 inner
416 .metrics
417 .background_retry_attempts
418 .with_label_values(&["retriable"])
419 .inc();
420 debug!(
421 "Background retry {i} for transaction {} has retriable error: {e:?}. Continuing...",
422 request.transaction.digest()
423 );
424 }
425 };
426 tracing::debug!("Wait for {:.3}s before next retry", delay.as_secs_f32());
427 sleep(delay).await;
428 }
429 });
430 }
431
432 result
433 }
434
435 fn build_response_from_local_effects(
436 &self,
437 effects: sui_types::effects::TransactionEffects,
438 include_events: bool,
439 include_input_objects: bool,
440 include_output_objects: bool,
441 ) -> Result<QuorumTransactionResponse, TransactionSubmissionError> {
442 let epoch = effects.executed_epoch();
443 let events = if include_events {
444 if effects.events_digest().is_some() {
445 Some(
446 self.validator_state
447 .get_transaction_events(effects.transaction_digest())
448 .map_err(TransactionSubmissionError::TransactionDriverInternalError)?,
449 )
450 } else {
451 None
452 }
453 } else {
454 None
455 };
456 let input_objects = include_input_objects
457 .then(|| self.validator_state.get_transaction_input_objects(&effects))
458 .transpose()
459 .map_err(TransactionSubmissionError::TransactionDriverInternalError)?;
460 let output_objects = include_output_objects
461 .then(|| {
462 self.validator_state
463 .get_transaction_output_objects(&effects)
464 })
465 .transpose()
466 .map_err(TransactionSubmissionError::TransactionDriverInternalError)?;
467
468 Ok(QuorumTransactionResponse {
469 effects: FinalizedEffects {
470 effects,
471 finality_info: EffectsFinalityInfo::QuorumExecuted(epoch),
472 },
473 events,
474 input_objects,
475 output_objects,
476 auxiliary_data: None,
477 })
478 }
479
480 async fn execute_transaction_with_effects_waiting(
482 &self,
483 request: ExecuteTransactionRequestV3,
484 client_addr: Option<SocketAddr>,
485 enforce_live_input_objects: bool,
486 ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), TransactionSubmissionError>
487 {
488 let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
489 let verified_transaction = epoch_store
490 .verify_transaction_with_current_aliases(request.transaction.clone())
491 .map_err(TransactionSubmissionError::InvalidUserSignature)?
492 .into_tx();
493 let tx_digest = *verified_transaction.digest();
494
495 if self.enable_early_validation
498 && let Err(e) = self.validator_state.check_transaction_validity(
499 &epoch_store,
500 &verified_transaction,
501 enforce_live_input_objects,
502 )
503 {
504 let error_category = e.categorize();
505 if !error_category.is_submission_retriable() {
506 if !self.validator_state.is_tx_already_executed(&tx_digest) {
508 self.metrics
509 .early_validation_rejections
510 .with_label_values(&[e.to_variant_name()])
511 .inc();
512 debug!(
513 error = ?e,
514 "Transaction rejected during early validation"
515 );
516
517 return Err(TransactionSubmissionError::TransactionFailed {
518 category: error_category,
519 details: e.to_string(),
520 });
521 }
522 }
523 }
524
525 let guard =
527 TransactionSubmissionGuard::new(self.pending_tx_log.clone(), &verified_transaction);
528 let is_new_transaction = guard.is_new_transaction();
529
530 let include_events = request.include_events;
531 let include_input_objects = request.include_input_objects;
532 let include_output_objects = request.include_output_objects;
533 let include_auxiliary_data = request.include_auxiliary_data;
534
535 if let Some(effects) = self
537 .validator_state
538 .get_transaction_cache_reader()
539 .get_executed_effects(&tx_digest)
540 {
541 self.metrics.early_cached_response.inc();
542 debug!(
543 ?tx_digest,
544 "Returning cached results for already-executed transaction"
545 );
546 let response = self.build_response_from_local_effects(
547 effects,
548 include_events,
549 include_input_objects,
550 include_output_objects,
551 )?;
552 return Ok((response, true));
553 }
554
555 let finality_timeout = std::env::var("WAIT_FOR_FINALITY_TIMEOUT_SECS")
556 .ok()
557 .and_then(|v| v.parse().ok())
558 .map(Duration::from_secs)
559 .unwrap_or(WAIT_FOR_FINALITY_TIMEOUT);
560
561 let num_submissions = if !is_new_transaction {
562 0
564 } else if in_integration_test() {
565 let r = rand::thread_rng().gen_range(1..=100);
567 let n = if r <= 10 {
568 3
569 } else if r <= 30 {
570 2
571 } else {
572 1
573 };
574 if n > 1 {
575 debug!("Making {n} execution calls");
576 }
577 n
578 } else {
579 1
580 };
581
582 let mut execution_futures = FuturesUnordered::new();
584 for i in 0..num_submissions {
585 let should_delay = i > 0 && rand::thread_rng().gen_bool(0.8);
587 let delay_ms = if should_delay {
588 rand::thread_rng().gen_range(100..=500)
589 } else {
590 0
591 };
592
593 let request = request.clone();
594 let verified_transaction = verified_transaction.clone();
595
596 let future = async move {
597 if delay_ms > 0 {
598 sleep(Duration::from_millis(delay_ms)).await;
600 }
601 self.execute_transaction_impl(
602 request,
603 verified_transaction,
604 client_addr,
605 Some(finality_timeout),
606 )
607 .await
608 }
609 .boxed();
610 execution_futures.push(future);
611 }
612
613 let mut last_execution_error: Option<TransactionSubmissionError> = None;
615
616 let digests = [tx_digest];
618 let mut local_effects_future = self
619 .validator_state
620 .get_transaction_cache_reader()
621 .notify_read_executed_effects_may_fail(
622 "TransactionOrchestrator::notify_read_execute_transaction_with_effects_waiting",
623 &digests,
624 )
625 .boxed();
626
627 let mut timeout_future = tokio::time::sleep(finality_timeout).boxed();
629
630 loop {
631 tokio::select! {
632 all_effects_result = &mut local_effects_future => {
634 let all_effects = all_effects_result
635 .map_err(TransactionSubmissionError::TransactionDriverInternalError)?;
636 if all_effects.len() != 1 {
637 break Err(TransactionSubmissionError::TransactionDriverInternalError(
638 SuiErrorKind::Unknown(format!("Unexpected number of effects found: {}", all_effects.len())).into()
639 ));
640 }
641 debug!(
642 "Effects became available while execution was running"
643 );
644 self.metrics.concurrent_execution.inc();
645
646 let effects = all_effects.into_iter().next().unwrap();
647 let response = self.build_response_from_local_effects(
648 effects,
649 include_events,
650 include_input_objects,
651 include_output_objects,
652 )?;
653 break Ok((response, true));
654 }
655
656 Some(result) = execution_futures.next() => {
658 match result {
659 Ok(resp) => {
660 debug!("Execution succeeded, returning response");
662 let QuorumTransactionResponse {
663 effects,
664 events,
665 input_objects,
666 output_objects,
667 auxiliary_data,
668 } = resp;
669 let resp = QuorumTransactionResponse {
671 effects,
672 events: if include_events { events } else { None },
673 input_objects: if include_input_objects { input_objects } else { None },
674 output_objects: if include_output_objects { output_objects } else { None },
675 auxiliary_data: if include_auxiliary_data { auxiliary_data } else { None },
676 };
677 break Ok((resp, false));
678 }
679 Err(e) => {
680 debug!(?e, "Execution attempt failed, wait for other attempts");
681 last_execution_error = Some(e);
682 }
683 };
684
685 if execution_futures.is_empty() {
687 break Err(last_execution_error.unwrap());
688 }
689 }
690
691 _ = &mut timeout_future => {
693 if let Some(e) = last_execution_error {
694 debug!("Timeout waiting for transaction finality. Last execution error: {e}");
695 } else {
696 debug!("Timeout waiting for transaction finality.");
697 }
698 self.metrics.wait_for_finality_timeout.inc();
699
700 break Err(TransactionSubmissionError::TimeoutBeforeFinality);
702 }
703 }
704 }
705 }
706
707 #[instrument(level = "error", skip_all)]
708 async fn execute_transaction_impl(
709 &self,
710 request: ExecuteTransactionRequestV3,
711 verified_transaction: VerifiedTransaction,
712 client_addr: Option<SocketAddr>,
713 finality_timeout: Option<Duration>,
714 ) -> Result<QuorumTransactionResponse, TransactionSubmissionError> {
715 let timer = Instant::now();
716 let tx_digest = *verified_transaction.digest();
717 let tx_type = if verified_transaction.is_consensus_tx() {
718 TxType::SharedObject
719 } else {
720 TxType::SingleWriter
721 };
722
723 let (_in_flight_metrics_guards, good_response_metrics) =
724 self.update_metrics(&request.transaction);
725
726 let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
728 wait_for_finality_gauge.inc();
729 let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
730 in_flight.dec();
731 });
732
733 let response = self
734 .submit_with_transaction_driver(
735 &self.transaction_driver,
736 &request,
737 client_addr,
738 &verified_transaction,
739 good_response_metrics,
740 finality_timeout,
741 )
742 .await?;
743 let driver_type = "transaction_driver";
744
745 add_server_timing("wait_for_finality done");
746
747 self.metrics.wait_for_finality_finished.inc();
748
749 let elapsed = timer.elapsed().as_secs_f64();
750 if elapsed > 10.0 {
751 warn!(
752 ?tx_digest,
753 "Settlement finality latency {} is too high", elapsed,
754 );
755 }
756 self.metrics
757 .settlement_finality_latency
758 .with_label_values(&[tx_type.as_str(), driver_type])
759 .observe(elapsed);
760 good_response_metrics.inc();
761
762 Ok(response)
763 }
764
765 #[instrument(level = "error", skip_all, err(level = "info"))]
766 async fn submit_with_transaction_driver(
767 &self,
768 td: &Arc<TransactionDriver<A>>,
769 request: &ExecuteTransactionRequestV3,
770 client_addr: Option<SocketAddr>,
771 verified_transaction: &VerifiedTransaction,
772 good_response_metrics: &GenericCounter<AtomicU64>,
773 timeout_duration: Option<Duration>,
774 ) -> Result<QuorumTransactionResponse, TransactionSubmissionError> {
775 let tx_digest = *verified_transaction.digest();
776 let td_response = td
777 .drive_transaction(
778 SubmitTxRequest::new_transaction(request.transaction.clone()),
779 SubmitTransactionOptions {
780 forwarded_client_addr: client_addr,
781 allowed_validators: self.td_allowed_submission_list.clone(),
782 blocked_validators: self.td_blocked_submission_list.clone(),
783 },
784 timeout_duration,
785 )
786 .await
787 .map_err(|e| match e {
788 TransactionDriverError::TimeoutWithLastRetriableError {
789 last_error,
790 attempts,
791 timeout,
792 } => TransactionSubmissionError::TimeoutBeforeFinalityWithErrors {
793 last_error: last_error.map(|e| e.to_string()).unwrap_or_default(),
794 attempts,
795 timeout,
796 },
797 other => TransactionSubmissionError::TransactionFailed {
798 category: other.categorize(),
799 details: other.to_string(),
800 },
801 });
802
803 match td_response {
804 Err(e) => {
805 warn!(?tx_digest, "TransactionDriver error: {e:?}");
806 Err(e)
807 }
808 Ok(quorum_transaction_response) => {
809 good_response_metrics.inc();
810 Ok(quorum_transaction_response)
811 }
812 }
813 }
814
815 #[instrument(
816 name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout",
817 level = "debug",
818 skip_all,
819 err(level = "info")
820 )]
821 async fn wait_for_finalized_tx_executed_locally_with_timeout(
822 validator_state: &Arc<AuthorityState>,
823 tx_digest: TransactionDigest,
824 tx_type: TxType,
825 metrics: &TransactionOrchestratorMetrics,
826 ) -> SuiResult {
827 metrics.local_execution_in_flight.inc();
828 let _metrics_guard =
829 scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
830 in_flight.dec();
831 });
832
833 let _latency_guard = metrics
834 .local_execution_latency
835 .with_label_values(&[tx_type.as_str()])
836 .start_timer();
837 debug!("Waiting for finalized tx to be executed locally.");
838 match timeout(
839 LOCAL_EXECUTION_TIMEOUT,
840 validator_state
841 .get_transaction_cache_reader()
842 .notify_read_executed_effects_digests(
843 "TransactionOrchestrator::notify_read_wait_for_local_execution",
844 &[tx_digest],
845 ),
846 )
847 .instrument(error_span!(
848 "transaction_orchestrator::local_execution",
849 ?tx_digest
850 ))
851 .await
852 {
853 Err(_elapsed) => {
854 debug!(
855 "Waiting for finalized tx to be executed locally timed out within {:?}.",
856 LOCAL_EXECUTION_TIMEOUT
857 );
858 metrics.local_execution_timeout.inc();
859 Err(SuiErrorKind::TimeoutError.into())
860 }
861 Ok(_) => {
862 metrics.local_execution_success.inc();
863 Ok(())
864 }
865 }
866 }
867
868 fn update_metrics<'a>(
869 &'a self,
870 transaction: &Transaction,
871 ) -> (impl Drop + use<A>, &'a GenericCounter<AtomicU64>) {
872 let (in_flight, good_response) = if transaction.is_consensus_tx() {
873 self.metrics.total_req_received_shared_object.inc();
874 (
875 self.metrics.req_in_flight_shared_object.clone(),
876 &self.metrics.good_response_shared_object,
877 )
878 } else {
879 self.metrics.total_req_received_single_writer.inc();
880 (
881 self.metrics.req_in_flight_single_writer.clone(),
882 &self.metrics.good_response_single_writer,
883 )
884 };
885 in_flight.inc();
886 (
887 scopeguard::guard(in_flight, |in_flight| {
888 in_flight.dec();
889 }),
890 good_response,
891 )
892 }
893
894 fn start_task_to_recover_txes_in_log(
895 pending_tx_log: Arc<WritePathPendingTransactionLog>,
896 transaction_driver: Arc<TransactionDriver<A>>,
897 ) {
898 spawn_logged_monitored_task!(async move {
899 if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
900 info!("Skipping loading pending transactions from pending_tx_log.");
901 return;
902 }
903 let pending_txes = pending_tx_log
904 .load_all_pending_transactions()
905 .expect("failed to load all pending transactions");
906 let num_pending_txes = pending_txes.len();
907 info!(
908 "Recovering {} pending transactions from pending_tx_log.",
909 num_pending_txes
910 );
911 let mut recovery = pending_txes
912 .into_iter()
913 .map(|tx| {
914 let pending_tx_log = pending_tx_log.clone();
915 let transaction_driver = transaction_driver.clone();
916 async move {
917 let tx = tx.into_inner();
920 let tx_digest = *tx.digest();
921 if let Err(err) = transaction_driver
924 .drive_transaction(
925 SubmitTxRequest::new_transaction(tx),
926 SubmitTransactionOptions::default(),
927 Some(Duration::from_secs(60)),
928 )
929 .await
930 {
931 warn!(?tx_digest, "Failed to execute recovered transaction: {err}");
932 } else {
933 debug!(?tx_digest, "Executed recovered transaction");
934 }
935 if let Err(err) = pending_tx_log.finish_transaction(&tx_digest) {
936 warn!(
937 ?tx_digest,
938 "Failed to clean up transaction in pending log: {err}"
939 );
940 } else {
941 debug!(?tx_digest, "Cleaned up transaction in pending log");
942 }
943 }
944 })
945 .collect::<FuturesUnordered<_>>();
946
947 let mut num_recovered = 0;
948 while recovery.next().await.is_some() {
949 num_recovered += 1;
950 if num_recovered % 1000 == 0 {
951 info!(
952 "Recovered {} out of {} transactions from pending_tx_log.",
953 num_recovered, num_pending_txes
954 );
955 }
956 }
957 info!(
958 "Recovery finished. Recovered {} out of {} transactions from pending_tx_log.",
959 num_recovered, num_pending_txes
960 );
961 });
962 }
963}
964#[derive(Clone)]
966pub struct TransactionOrchestratorMetrics {
967 total_req_received_single_writer: GenericCounter<AtomicU64>,
968 total_req_received_shared_object: GenericCounter<AtomicU64>,
969
970 good_response_single_writer: GenericCounter<AtomicU64>,
971 good_response_shared_object: GenericCounter<AtomicU64>,
972
973 req_in_flight_single_writer: GenericGauge<AtomicI64>,
974 req_in_flight_shared_object: GenericGauge<AtomicI64>,
975
976 wait_for_finality_in_flight: GenericGauge<AtomicI64>,
977 wait_for_finality_finished: GenericCounter<AtomicU64>,
978 wait_for_finality_timeout: GenericCounter<AtomicU64>,
979
980 local_execution_in_flight: GenericGauge<AtomicI64>,
981 local_execution_success: GenericCounter<AtomicU64>,
982 local_execution_timeout: GenericCounter<AtomicU64>,
983
984 early_cached_response: IntCounter,
985 concurrent_execution: IntCounter,
986
987 early_validation_rejections: IntCounterVec,
988
989 background_retry_started: IntGauge,
990 background_retry_attempts: IntCounterVec,
991
992 request_latency: HistogramVec,
993 local_execution_latency: HistogramVec,
994 settlement_finality_latency: HistogramVec,
995}
996
997impl TransactionOrchestratorMetrics {
1001 pub fn new(registry: &Registry) -> Self {
1002 let total_req_received = register_int_counter_vec_with_registry!(
1003 "tx_orchestrator_total_req_received",
1004 "Total number of executions request Transaction Orchestrator receives, group by tx type",
1005 &["tx_type"],
1006 registry
1007 )
1008 .unwrap();
1009
1010 let total_req_received_single_writer =
1011 total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
1012 let total_req_received_shared_object =
1013 total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
1014
1015 let good_response = register_int_counter_vec_with_registry!(
1016 "tx_orchestrator_good_response",
1017 "Total number of good responses Transaction Orchestrator generates, group by tx type",
1018 &["tx_type"],
1019 registry
1020 )
1021 .unwrap();
1022
1023 let good_response_single_writer =
1024 good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
1025 let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
1026
1027 let req_in_flight = register_int_gauge_vec_with_registry!(
1028 "tx_orchestrator_req_in_flight",
1029 "Number of requests in flights Transaction Orchestrator processes, group by tx type",
1030 &["tx_type"],
1031 registry
1032 )
1033 .unwrap();
1034
1035 let req_in_flight_single_writer =
1036 req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
1037 let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
1038
1039 Self {
1040 total_req_received_single_writer,
1041 total_req_received_shared_object,
1042 good_response_single_writer,
1043 good_response_shared_object,
1044 req_in_flight_single_writer,
1045 req_in_flight_shared_object,
1046 wait_for_finality_in_flight: register_int_gauge_with_registry!(
1047 "tx_orchestrator_wait_for_finality_in_flight",
1048 "Number of in flight txns Transaction Orchestrator are waiting for finality for",
1049 registry,
1050 )
1051 .unwrap(),
1052 wait_for_finality_finished: register_int_counter_with_registry!(
1053 "tx_orchestrator_wait_for_finality_fnished",
1054 "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
1055 registry,
1056 )
1057 .unwrap(),
1058 wait_for_finality_timeout: register_int_counter_with_registry!(
1059 "tx_orchestrator_wait_for_finality_timeout",
1060 "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
1061 registry,
1062 )
1063 .unwrap(),
1064 local_execution_in_flight: register_int_gauge_with_registry!(
1065 "tx_orchestrator_local_execution_in_flight",
1066 "Number of local execution txns in flights Transaction Orchestrator handles",
1067 registry,
1068 )
1069 .unwrap(),
1070 local_execution_success: register_int_counter_with_registry!(
1071 "tx_orchestrator_local_execution_success",
1072 "Total number of successful local execution txns Transaction Orchestrator handles",
1073 registry,
1074 )
1075 .unwrap(),
1076 local_execution_timeout: register_int_counter_with_registry!(
1077 "tx_orchestrator_local_execution_timeout",
1078 "Total number of timed-out local execution txns Transaction Orchestrator handles",
1079 registry,
1080 )
1081 .unwrap(),
1082 early_cached_response: register_int_counter_with_registry!(
1083 "tx_orchestrator_early_cached_response",
1084 "Total number of requests returning cached results for already-executed transactions",
1085 registry,
1086 )
1087 .unwrap(),
1088 concurrent_execution: register_int_counter_with_registry!(
1089 "tx_orchestrator_concurrent_execution",
1090 "Total number of concurrent execution where effects are available locally finishing driving the transaction to finality",
1091 registry,
1092 )
1093 .unwrap(),
1094 early_validation_rejections: register_int_counter_vec_with_registry!(
1095 "tx_orchestrator_early_validation_rejections",
1096 "Total number of transactions rejected during early validation before submission, by reason",
1097 &["reason"],
1098 registry,
1099 )
1100 .unwrap(),
1101 background_retry_started: register_int_gauge_with_registry!(
1102 "tx_orchestrator_background_retry_started",
1103 "Number of background retry tasks kicked off for transactions with retriable errors",
1104 registry,
1105 )
1106 .unwrap(),
1107 background_retry_attempts: register_int_counter_vec_with_registry!(
1108 "tx_orchestrator_background_retry_attempts",
1109 "Total number of background retry attempts, by status",
1110 &["status"],
1111 registry,
1112 )
1113 .unwrap(),
1114 request_latency: register_histogram_vec_with_registry!(
1115 "tx_orchestrator_request_latency",
1116 "Time spent in processing one Transaction Orchestrator request",
1117 &["tx_type", "route", "wait_for_local_execution"],
1118 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1119 registry,
1120 )
1121 .unwrap(),
1122 local_execution_latency: register_histogram_vec_with_registry!(
1123 "tx_orchestrator_local_execution_latency",
1124 "Time spent in waiting for one Transaction Orchestrator gets locally executed",
1125 &["tx_type"],
1126 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1127 registry,
1128 )
1129 .unwrap(),
1130 settlement_finality_latency: register_histogram_vec_with_registry!(
1131 "tx_orchestrator_settlement_finality_latency",
1132 "Time spent in waiting for one Transaction Orchestrator gets settled and finalized",
1133 &["tx_type", "driver_type"],
1134 mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
1135 registry,
1136 )
1137 .unwrap(),
1138 }
1139 }
1140
1141 pub fn new_for_tests() -> Self {
1142 let registry = Registry::new();
1143 Self::new(®istry)
1144 }
1145}
1146
1147#[async_trait::async_trait]
1148impl<A> sui_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
1149where
1150 A: AuthorityAPI + Send + Sync + 'static + Clone,
1151{
1152 async fn execute_transaction(
1153 &self,
1154 request: ExecuteTransactionRequestV3,
1155 client_addr: Option<std::net::SocketAddr>,
1156 ) -> Result<ExecuteTransactionResponseV3, TransactionSubmissionError> {
1157 self.execute_transaction_v3(request, client_addr).await
1158 }
1159
1160 fn simulate_transaction(
1161 &self,
1162 transaction: TransactionData,
1163 checks: TransactionChecks,
1164 allow_mock_gas_coin: bool,
1165 ) -> Result<SimulateTransactionResult, SuiError> {
1166 self.inner
1167 .validator_state
1168 .simulate_transaction(transaction, checks, allow_mock_gas_coin)
1169 }
1170}
1171
1172struct TransactionSubmissionGuard {
1175 pending_tx_log: Arc<WritePathPendingTransactionLog>,
1176 tx_digest: TransactionDigest,
1177 is_new_transaction: bool,
1178}
1179
1180impl TransactionSubmissionGuard {
1181 pub fn new(
1182 pending_tx_log: Arc<WritePathPendingTransactionLog>,
1183 tx: &VerifiedTransaction,
1184 ) -> Self {
1185 let is_new_transaction = pending_tx_log.write_pending_transaction_maybe(tx);
1186 let tx_digest = *tx.digest();
1187 if is_new_transaction {
1188 debug!(?tx_digest, "Added transaction to inflight set");
1189 } else {
1190 debug!(
1191 ?tx_digest,
1192 "Transaction already being processed, no new submission will be made"
1193 );
1194 };
1195 Self {
1196 pending_tx_log,
1197 tx_digest,
1198 is_new_transaction,
1199 }
1200 }
1201
1202 fn is_new_transaction(&self) -> bool {
1203 self.is_new_transaction
1204 }
1205}
1206
1207impl Drop for TransactionSubmissionGuard {
1208 fn drop(&mut self) {
1209 if let Err(err) = self.pending_tx_log.finish_transaction(&self.tx_digest) {
1210 warn!(?self.tx_digest, "Failed to clean up transaction in pending log: {err}");
1211 } else {
1212 debug!(?self.tx_digest, "Cleaned up transaction in pending log");
1213 }
1214 }
1215}