1use std::net::SocketAddr;
5use std::path::Path;
6use std::sync::Arc;
7use std::time::Duration;
8
9use futures::FutureExt;
10use futures::stream::{FuturesUnordered, StreamExt};
11use mysten_common::{backoff, in_integration_test};
12use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, spawn_monitored_task};
13use mysten_metrics::{add_server_timing, spawn_logged_monitored_task};
14use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
15use prometheus::{
16 HistogramVec, IntCounter, IntCounterVec, IntGauge, Registry,
17 register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
18 register_int_counter_with_registry, register_int_gauge_vec_with_registry,
19 register_int_gauge_with_registry,
20};
21use rand::Rng;
22use sui_config::NodeConfig;
23use sui_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
24use sui_types::base_types::TransactionDigest;
25use sui_types::effects::TransactionEffectsAPI;
26use sui_types::error::{ErrorCategory, SuiError, SuiErrorKind, SuiResult};
27use sui_types::messages_grpc::{SubmitTxRequest, TxType};
28use sui_types::sui_system_state::SuiSystemState;
29use sui_types::transaction::{Transaction, TransactionData, VerifiedTransaction};
30use sui_types::transaction_driver_types::{
31 EffectsFinalityInfo, ExecuteTransactionRequestType, ExecuteTransactionRequestV3,
32 ExecuteTransactionResponseV3, FinalizedEffects, IsTransactionExecutedLocally,
33 TransactionSubmissionError,
34};
35use sui_types::transaction_executor::{SimulateTransactionResult, TransactionChecks};
36use tokio::sync::broadcast::Receiver;
37use tokio::time::{Instant, sleep, timeout};
38use tracing::{Instrument, debug, error_span, info, instrument, warn};
39
40use crate::authority::AuthorityState;
41use crate::authority_aggregator::AuthorityAggregator;
42use crate::authority_client::{AuthorityAPI, NetworkAuthorityClient};
43use crate::transaction_driver::{OnsiteReconfigObserver, ReconfigObserver};
44use crate::transaction_driver::{
45 QuorumTransactionResponse, SubmitTransactionOptions, TransactionDriver, TransactionDriverError,
46 TransactionDriverMetrics,
47};
48
49const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
52
53const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(90);
55
56pub type QuorumTransactionEffectsResult = Result<
57 (Transaction, QuorumTransactionResponse),
58 (TransactionDigest, TransactionSubmissionError),
59>;
60
61pub struct TransactionOrchestrator<A: Clone> {
67 inner: Arc<Inner<A>>,
68}
69
70impl TransactionOrchestrator<NetworkAuthorityClient> {
71 pub fn new_with_auth_aggregator(
72 validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
73 validator_state: Arc<AuthorityState>,
74 reconfig_channel: Receiver<SuiSystemState>,
75 parent_path: &Path,
76 prometheus_registry: &Registry,
77 node_config: &NodeConfig,
78 ) -> Self {
79 let observer = OnsiteReconfigObserver::new(
80 reconfig_channel,
81 validator_state.get_object_cache_reader().clone(),
82 validator_state.clone_committee_store(),
83 validators.safe_client_metrics_base.clone(),
84 );
85 TransactionOrchestrator::new(
86 validators,
87 validator_state,
88 parent_path,
89 prometheus_registry,
90 observer,
91 node_config,
92 )
93 }
94}
95
96impl<A> TransactionOrchestrator<A>
97where
98 A: AuthorityAPI + Send + Sync + 'static + Clone,
99 OnsiteReconfigObserver: ReconfigObserver<A>,
100{
101 pub fn new(
102 validators: Arc<AuthorityAggregator<A>>,
103 validator_state: Arc<AuthorityState>,
104 parent_path: &Path,
105 prometheus_registry: &Registry,
106 reconfig_observer: OnsiteReconfigObserver,
107 node_config: &NodeConfig,
108 ) -> Self {
109 let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
110 let td_metrics = Arc::new(TransactionDriverMetrics::new(prometheus_registry));
111 let client_metrics = Arc::new(
112 crate::validator_client_monitor::ValidatorClientMetrics::new(prometheus_registry),
113 );
114 let reconfig_observer = Arc::new(reconfig_observer);
115
116 let transaction_driver = TransactionDriver::new(
117 validators.clone(),
118 reconfig_observer.clone(),
119 td_metrics,
120 Some(node_config),
121 client_metrics,
122 );
123
124 let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
125 parent_path.join("fullnode_pending_transactions"),
126 ));
127 Inner::start_task_to_recover_txes_in_log(
128 pending_tx_log.clone(),
129 transaction_driver.clone(),
130 );
131
132 let td_allowed_submission_list = node_config
133 .transaction_driver_config
134 .as_ref()
135 .map(|config| config.allowed_submission_validators.clone())
136 .unwrap_or_default();
137
138 let td_blocked_submission_list = node_config
139 .transaction_driver_config
140 .as_ref()
141 .map(|config| config.blocked_submission_validators.clone())
142 .unwrap_or_default();
143
144 if !td_allowed_submission_list.is_empty() && !td_blocked_submission_list.is_empty() {
145 panic!(
146 "Both allowed and blocked submission lists are set, this is not allowed, {:?} {:?}",
147 td_allowed_submission_list, td_blocked_submission_list
148 );
149 }
150
151 let enable_early_validation = node_config
152 .transaction_driver_config
153 .as_ref()
154 .map(|config| config.enable_early_validation)
155 .unwrap_or(true);
156
157 let inner = Arc::new(Inner {
158 validator_state,
159 pending_tx_log,
160 metrics,
161 transaction_driver,
162 td_allowed_submission_list,
163 td_blocked_submission_list,
164 enable_early_validation,
165 });
166 Self { inner }
167 }
168}
169
170impl<A> TransactionOrchestrator<A>
171where
172 A: AuthorityAPI + Send + Sync + 'static + Clone,
173{
174 #[instrument(name = "tx_orchestrator_execute_transaction", level = "debug", skip_all,
175 fields(
176 tx_digest = ?request.transaction.digest(),
177 tx_type = ?request_type,
178 ))]
179 pub async fn execute_transaction_block(
180 &self,
181 request: ExecuteTransactionRequestV3,
182 request_type: ExecuteTransactionRequestType,
183 client_addr: Option<SocketAddr>,
184 ) -> Result<
185 (ExecuteTransactionResponseV3, IsTransactionExecutedLocally),
186 TransactionSubmissionError,
187 > {
188 let timer = Instant::now();
189 let tx_type = if request.transaction.is_consensus_tx() {
190 TxType::SharedObject
191 } else {
192 TxType::SingleWriter
193 };
194 let tx_digest = *request.transaction.digest();
195
196 let inner = self.inner.clone();
197 let (response, mut executed_locally) = spawn_monitored_task!(
198 Inner::<A>::execute_transaction_with_retry(inner, request, client_addr)
199 )
200 .await
201 .map_err(|e| TransactionSubmissionError::TransactionFailed {
202 category: ErrorCategory::Internal,
203 details: e.to_string(),
204 })??;
205
206 if matches!(
207 request_type,
208 ExecuteTransactionRequestType::WaitForLocalExecution
209 ) {
210 executed_locally = Inner::<A>::wait_for_finalized_tx_executed_locally_with_timeout(
216 &self.inner.validator_state,
217 tx_digest,
218 tx_type,
219 &self.inner.metrics,
220 )
221 .await
222 .is_ok();
223 add_server_timing("local_execution done");
224 }
225
226 let QuorumTransactionResponse {
227 effects,
228 events,
229 input_objects,
230 output_objects,
231 auxiliary_data,
232 } = response;
233
234 let response = ExecuteTransactionResponseV3 {
235 effects,
236 events,
237 input_objects,
238 output_objects,
239 auxiliary_data,
240 };
241
242 let request_latency = timer.elapsed().as_secs_f64();
243 if request_latency > 10.0 {
244 warn!(
245 ?tx_digest,
246 "Request latency {} is too high", request_latency,
247 );
248 }
249 self.inner
250 .metrics
251 .request_latency
252 .with_label_values(&[
253 tx_type.as_str(),
254 "execute_transaction_block",
255 executed_locally.to_string().as_str(),
256 ])
257 .observe(request_latency);
258
259 Ok((response, executed_locally))
260 }
261
262 #[instrument(name = "tx_orchestrator_execute_transaction_v3", level = "debug", skip_all,
264 fields(tx_digest = ?request.transaction.digest()))]
265 pub async fn execute_transaction_v3(
266 &self,
267 request: ExecuteTransactionRequestV3,
268 client_addr: Option<SocketAddr>,
269 ) -> Result<ExecuteTransactionResponseV3, TransactionSubmissionError> {
270 let timer = Instant::now();
271 let tx_type = if request.transaction.is_consensus_tx() {
272 TxType::SharedObject
273 } else {
274 TxType::SingleWriter
275 };
276
277 let inner = self.inner.clone();
278 let (response, _) = spawn_monitored_task!(Inner::<A>::execute_transaction_with_retry(
279 inner,
280 request,
281 client_addr
282 ))
283 .await
284 .map_err(|e| TransactionSubmissionError::TransactionFailed {
285 category: ErrorCategory::Internal,
286 details: e.to_string(),
287 })??;
288
289 self.inner
290 .metrics
291 .request_latency
292 .with_label_values(&[tx_type.as_str(), "execute_transaction_v3", "false"])
293 .observe(timer.elapsed().as_secs_f64());
294
295 let QuorumTransactionResponse {
296 effects,
297 events,
298 input_objects,
299 output_objects,
300 auxiliary_data,
301 } = response;
302
303 Ok(ExecuteTransactionResponseV3 {
304 effects,
305 events,
306 input_objects,
307 output_objects,
308 auxiliary_data,
309 })
310 }
311
312 pub fn authority_state(&self) -> &Arc<AuthorityState> {
313 &self.inner.validator_state
314 }
315
316 pub fn transaction_driver(&self) -> &Arc<TransactionDriver<A>> {
317 &self.inner.transaction_driver
318 }
319
320 pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
321 self.inner
322 .transaction_driver
323 .authority_aggregator()
324 .load_full()
325 }
326
327 pub fn load_all_pending_transactions_in_test(&self) -> SuiResult<Vec<VerifiedTransaction>> {
328 self.inner.pending_tx_log.load_all_pending_transactions()
329 }
330
331 pub fn empty_pending_tx_log_in_test(&self) -> bool {
332 self.inner.pending_tx_log.is_empty()
333 }
334}
335
336struct Inner<A: Clone> {
337 validator_state: Arc<AuthorityState>,
338 pending_tx_log: Arc<WritePathPendingTransactionLog>,
339 metrics: Arc<TransactionOrchestratorMetrics>,
340 transaction_driver: Arc<TransactionDriver<A>>,
341 td_allowed_submission_list: Vec<String>,
342 td_blocked_submission_list: Vec<String>,
343 enable_early_validation: bool,
344}
345
346impl<A> Inner<A>
347where
348 A: AuthorityAPI + Send + Sync + 'static + Clone,
349{
350 async fn execute_transaction_with_retry(
351 inner: Arc<Inner<A>>,
352 request: ExecuteTransactionRequestV3,
353 client_addr: Option<SocketAddr>,
354 ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), TransactionSubmissionError>
355 {
356 let result = inner
357 .execute_transaction_with_effects_waiting(
358 request.clone(),
359 client_addr,
360 false,
361 )
362 .await;
363
364 if let Err(e) = &result
366 && e.is_retriable()
367 {
368 spawn_monitored_task!(async move {
369 inner.metrics.background_retry_started.inc();
370 let backoff = backoff::ExponentialBackoff::new(
371 Duration::from_secs(1),
372 Duration::from_secs(300),
373 );
374 const MAX_RETRIES: usize = 10;
375 for (i, delay) in backoff.enumerate() {
376 if i == MAX_RETRIES {
377 break;
378 }
379 let result = inner
382 .execute_transaction_with_effects_waiting(
383 request.clone(),
384 client_addr,
385 i > 3,
386 )
387 .await;
388 match result {
389 Ok(_) => {
390 inner
391 .metrics
392 .background_retry_attempts
393 .with_label_values(&["success"])
394 .inc();
395 debug!(
396 "Background retry {i} for transaction {} succeeded",
397 request.transaction.digest()
398 );
399 break;
400 }
401 Err(e) => {
402 if !e.is_retriable() {
403 inner
404 .metrics
405 .background_retry_attempts
406 .with_label_values(&["non-retriable"])
407 .inc();
408 debug!(
409 "Background retry {i} for transaction {} has non-retriable error: {e:?}. Terminating...",
410 request.transaction.digest()
411 );
412 break;
413 }
414 inner
415 .metrics
416 .background_retry_attempts
417 .with_label_values(&["retriable"])
418 .inc();
419 debug!(
420 "Background retry {i} for transaction {} has retriable error: {e:?}. Continuing...",
421 request.transaction.digest()
422 );
423 }
424 };
425 tracing::debug!("Wait for {:.3}s before next retry", delay.as_secs_f32());
426 sleep(delay).await;
427 }
428 });
429 }
430
431 result
432 }
433
434 fn build_response_from_local_effects(
435 &self,
436 effects: sui_types::effects::TransactionEffects,
437 include_events: bool,
438 include_input_objects: bool,
439 include_output_objects: bool,
440 ) -> Result<QuorumTransactionResponse, TransactionSubmissionError> {
441 let epoch = effects.executed_epoch();
442 let events = if include_events {
443 if effects.events_digest().is_some() {
444 Some(
445 self.validator_state
446 .get_transaction_events(effects.transaction_digest())
447 .map_err(TransactionSubmissionError::TransactionDriverInternalError)?,
448 )
449 } else {
450 None
451 }
452 } else {
453 None
454 };
455 let input_objects = include_input_objects
456 .then(|| self.validator_state.get_transaction_input_objects(&effects))
457 .transpose()
458 .map_err(TransactionSubmissionError::TransactionDriverInternalError)?;
459 let output_objects = include_output_objects
460 .then(|| {
461 self.validator_state
462 .get_transaction_output_objects(&effects)
463 })
464 .transpose()
465 .map_err(TransactionSubmissionError::TransactionDriverInternalError)?;
466
467 Ok(QuorumTransactionResponse {
468 effects: FinalizedEffects {
469 effects,
470 finality_info: EffectsFinalityInfo::QuorumExecuted(epoch),
471 },
472 events,
473 input_objects,
474 output_objects,
475 auxiliary_data: None,
476 })
477 }
478
479 async fn execute_transaction_with_effects_waiting(
481 &self,
482 request: ExecuteTransactionRequestV3,
483 client_addr: Option<SocketAddr>,
484 enforce_live_input_objects: bool,
485 ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), TransactionSubmissionError>
486 {
487 let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
488 let verified_transaction = epoch_store
489 .verify_transaction_with_current_aliases(request.transaction.clone())
490 .map_err(TransactionSubmissionError::InvalidUserSignature)?
491 .into_tx();
492 let tx_digest = *verified_transaction.digest();
493
494 if self.enable_early_validation
497 && let Err(e) = self.validator_state.check_transaction_validity(
498 &epoch_store,
499 &verified_transaction,
500 enforce_live_input_objects,
501 )
502 {
503 let error_category = e.categorize();
504 if !error_category.is_submission_retriable() {
505 if !self.validator_state.is_tx_already_executed(&tx_digest) {
507 self.metrics
508 .early_validation_rejections
509 .with_label_values(&[e.to_variant_name()])
510 .inc();
511 debug!(
512 error = ?e,
513 "Transaction rejected during early validation"
514 );
515
516 return Err(TransactionSubmissionError::TransactionFailed {
517 category: error_category,
518 details: e.to_string(),
519 });
520 }
521 }
522 }
523
524 let guard =
526 TransactionSubmissionGuard::new(self.pending_tx_log.clone(), &verified_transaction);
527 let is_new_transaction = guard.is_new_transaction();
528
529 let include_events = request.include_events;
530 let include_input_objects = request.include_input_objects;
531 let include_output_objects = request.include_output_objects;
532 let include_auxiliary_data = request.include_auxiliary_data;
533
534 if let Some(effects) = self
536 .validator_state
537 .get_transaction_cache_reader()
538 .get_executed_effects(&tx_digest)
539 {
540 self.metrics.early_cached_response.inc();
541 debug!(
542 ?tx_digest,
543 "Returning cached results for already-executed transaction"
544 );
545 let response = self.build_response_from_local_effects(
546 effects,
547 include_events,
548 include_input_objects,
549 include_output_objects,
550 )?;
551 return Ok((response, true));
552 }
553
554 let finality_timeout = std::env::var("WAIT_FOR_FINALITY_TIMEOUT_SECS")
555 .ok()
556 .and_then(|v| v.parse().ok())
557 .map(Duration::from_secs)
558 .unwrap_or(WAIT_FOR_FINALITY_TIMEOUT);
559
560 let num_submissions = if !is_new_transaction {
561 0
563 } else if in_integration_test() {
564 let r = rand::thread_rng().gen_range(1..=100);
566 let n = if r <= 10 {
567 3
568 } else if r <= 30 {
569 2
570 } else {
571 1
572 };
573 if n > 1 {
574 debug!("Making {n} execution calls");
575 }
576 n
577 } else {
578 1
579 };
580
581 let mut execution_futures = FuturesUnordered::new();
583 for i in 0..num_submissions {
584 let should_delay = i > 0 && rand::thread_rng().gen_bool(0.8);
586 let delay_ms = if should_delay {
587 rand::thread_rng().gen_range(100..=500)
588 } else {
589 0
590 };
591
592 let request = request.clone();
593 let verified_transaction = verified_transaction.clone();
594
595 let future = async move {
596 if delay_ms > 0 {
597 sleep(Duration::from_millis(delay_ms)).await;
599 }
600 self.execute_transaction_impl(
601 request,
602 verified_transaction,
603 client_addr,
604 Some(finality_timeout),
605 )
606 .await
607 }
608 .boxed();
609 execution_futures.push(future);
610 }
611
612 let mut last_execution_error: Option<TransactionSubmissionError> = None;
614
615 let digests = [tx_digest];
617 let mut local_effects_future = self
618 .validator_state
619 .get_transaction_cache_reader()
620 .notify_read_executed_effects_may_fail(
621 "TransactionOrchestrator::notify_read_execute_transaction_with_effects_waiting",
622 &digests,
623 )
624 .boxed();
625
626 let mut timeout_future = tokio::time::sleep(finality_timeout).boxed();
628
629 loop {
630 tokio::select! {
631 all_effects_result = &mut local_effects_future => {
633 let all_effects = all_effects_result
634 .map_err(TransactionSubmissionError::TransactionDriverInternalError)?;
635 if all_effects.len() != 1 {
636 break Err(TransactionSubmissionError::TransactionDriverInternalError(
637 SuiErrorKind::Unknown(format!("Unexpected number of effects found: {}", all_effects.len())).into()
638 ));
639 }
640 debug!(
641 "Effects became available while execution was running"
642 );
643 self.metrics.concurrent_execution.inc();
644
645 let effects = all_effects.into_iter().next().unwrap();
646 let response = self.build_response_from_local_effects(
647 effects,
648 include_events,
649 include_input_objects,
650 include_output_objects,
651 )?;
652 break Ok((response, true));
653 }
654
655 Some(result) = execution_futures.next() => {
657 match result {
658 Ok(resp) => {
659 debug!("Execution succeeded, returning response");
661 let QuorumTransactionResponse {
662 effects,
663 events,
664 input_objects,
665 output_objects,
666 auxiliary_data,
667 } = resp;
668 let resp = QuorumTransactionResponse {
670 effects,
671 events: if include_events { events } else { None },
672 input_objects: if include_input_objects { input_objects } else { None },
673 output_objects: if include_output_objects { output_objects } else { None },
674 auxiliary_data: if include_auxiliary_data { auxiliary_data } else { None },
675 };
676 break Ok((resp, false));
677 }
678 Err(e) => {
679 debug!(?e, "Execution attempt failed, wait for other attempts");
680 last_execution_error = Some(e);
681 }
682 };
683
684 if execution_futures.is_empty() {
686 break Err(last_execution_error.unwrap());
687 }
688 }
689
690 _ = &mut timeout_future => {
692 if let Some(e) = last_execution_error {
693 debug!("Timeout waiting for transaction finality. Last execution error: {e}");
694 } else {
695 debug!("Timeout waiting for transaction finality.");
696 }
697 self.metrics.wait_for_finality_timeout.inc();
698
699 break Err(TransactionSubmissionError::TimeoutBeforeFinality);
701 }
702 }
703 }
704 }
705
706 #[instrument(level = "error", skip_all)]
707 async fn execute_transaction_impl(
708 &self,
709 request: ExecuteTransactionRequestV3,
710 verified_transaction: VerifiedTransaction,
711 client_addr: Option<SocketAddr>,
712 finality_timeout: Option<Duration>,
713 ) -> Result<QuorumTransactionResponse, TransactionSubmissionError> {
714 let timer = Instant::now();
715 let tx_digest = *verified_transaction.digest();
716 let tx_type = if verified_transaction.is_consensus_tx() {
717 TxType::SharedObject
718 } else {
719 TxType::SingleWriter
720 };
721
722 let (_in_flight_metrics_guards, good_response_metrics) =
723 self.update_metrics(&request.transaction);
724
725 let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
727 wait_for_finality_gauge.inc();
728 let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
729 in_flight.dec();
730 });
731
732 let response = self
733 .submit_with_transaction_driver(
734 &self.transaction_driver,
735 &request,
736 client_addr,
737 &verified_transaction,
738 good_response_metrics,
739 finality_timeout,
740 )
741 .await?;
742 let driver_type = "transaction_driver";
743
744 add_server_timing("wait_for_finality done");
745
746 self.metrics.wait_for_finality_finished.inc();
747
748 let elapsed = timer.elapsed().as_secs_f64();
749 if elapsed > 10.0 {
750 warn!(
751 ?tx_digest,
752 "Settlement finality latency {} is too high", elapsed,
753 );
754 }
755 self.metrics
756 .settlement_finality_latency
757 .with_label_values(&[tx_type.as_str(), driver_type])
758 .observe(elapsed);
759 good_response_metrics.inc();
760
761 Ok(response)
762 }
763
764 #[instrument(level = "error", skip_all, err(level = "info"))]
765 async fn submit_with_transaction_driver(
766 &self,
767 td: &Arc<TransactionDriver<A>>,
768 request: &ExecuteTransactionRequestV3,
769 client_addr: Option<SocketAddr>,
770 verified_transaction: &VerifiedTransaction,
771 good_response_metrics: &GenericCounter<AtomicU64>,
772 timeout_duration: Option<Duration>,
773 ) -> Result<QuorumTransactionResponse, TransactionSubmissionError> {
774 let tx_digest = *verified_transaction.digest();
775 let td_response = td
776 .drive_transaction(
777 SubmitTxRequest::new_transaction(request.transaction.clone()),
778 SubmitTransactionOptions {
779 forwarded_client_addr: client_addr,
780 allowed_validators: self.td_allowed_submission_list.clone(),
781 blocked_validators: self.td_blocked_submission_list.clone(),
782 },
783 timeout_duration,
784 )
785 .await
786 .map_err(|e| match e {
787 TransactionDriverError::TimeoutWithLastRetriableError {
788 last_error,
789 attempts,
790 timeout,
791 } => TransactionSubmissionError::TimeoutBeforeFinalityWithErrors {
792 last_error: last_error.map(|e| e.to_string()).unwrap_or_default(),
793 attempts,
794 timeout,
795 },
796 other => TransactionSubmissionError::TransactionFailed {
797 category: other.categorize(),
798 details: other.to_string(),
799 },
800 });
801
802 match td_response {
803 Err(e) => {
804 warn!(?tx_digest, "TransactionDriver error: {e:?}");
805 Err(e)
806 }
807 Ok(quorum_transaction_response) => {
808 good_response_metrics.inc();
809 Ok(quorum_transaction_response)
810 }
811 }
812 }
813
814 #[instrument(
815 name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout",
816 level = "debug",
817 skip_all,
818 err(level = "info")
819 )]
820 async fn wait_for_finalized_tx_executed_locally_with_timeout(
821 validator_state: &Arc<AuthorityState>,
822 tx_digest: TransactionDigest,
823 tx_type: TxType,
824 metrics: &TransactionOrchestratorMetrics,
825 ) -> SuiResult {
826 metrics.local_execution_in_flight.inc();
827 let _metrics_guard =
828 scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
829 in_flight.dec();
830 });
831
832 let _latency_guard = metrics
833 .local_execution_latency
834 .with_label_values(&[tx_type.as_str()])
835 .start_timer();
836 debug!("Waiting for finalized tx to be executed locally.");
837 match timeout(LOCAL_EXECUTION_TIMEOUT, async move {
838 validator_state
839 .get_transaction_cache_reader()
840 .notify_read_executed_effects_digests(
841 "TransactionOrchestrator::notify_read_wait_for_local_execution",
842 &[tx_digest],
843 )
844 .await;
845 let epoch_store = validator_state.load_epoch_store_one_call_per_task();
849 epoch_store
850 .transactions_executed_in_checkpoint_notify(vec![tx_digest])
851 .await
852 .expect("db error waiting for transaction checkpointing");
853 })
854 .instrument(error_span!(
855 "transaction_orchestrator::local_execution",
856 ?tx_digest
857 ))
858 .await
859 {
860 Err(_elapsed) => {
861 debug!(
862 "Waiting for finalized tx to be executed locally timed out within {:?}.",
863 LOCAL_EXECUTION_TIMEOUT
864 );
865 metrics.local_execution_timeout.inc();
866 Err(SuiErrorKind::TimeoutError.into())
867 }
868 Ok(_) => {
869 metrics.local_execution_success.inc();
870 Ok(())
871 }
872 }
873 }
874
875 fn update_metrics<'a>(
876 &'a self,
877 transaction: &Transaction,
878 ) -> (impl Drop + use<A>, &'a GenericCounter<AtomicU64>) {
879 let (in_flight, good_response) = if transaction.is_consensus_tx() {
880 self.metrics.total_req_received_shared_object.inc();
881 (
882 self.metrics.req_in_flight_shared_object.clone(),
883 &self.metrics.good_response_shared_object,
884 )
885 } else {
886 self.metrics.total_req_received_single_writer.inc();
887 (
888 self.metrics.req_in_flight_single_writer.clone(),
889 &self.metrics.good_response_single_writer,
890 )
891 };
892 in_flight.inc();
893 (
894 scopeguard::guard(in_flight, |in_flight| {
895 in_flight.dec();
896 }),
897 good_response,
898 )
899 }
900
901 fn start_task_to_recover_txes_in_log(
902 pending_tx_log: Arc<WritePathPendingTransactionLog>,
903 transaction_driver: Arc<TransactionDriver<A>>,
904 ) {
905 spawn_logged_monitored_task!(async move {
906 if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
907 info!("Skipping loading pending transactions from pending_tx_log.");
908 return;
909 }
910 let pending_txes = pending_tx_log
911 .load_all_pending_transactions()
912 .expect("failed to load all pending transactions");
913 let num_pending_txes = pending_txes.len();
914 info!(
915 "Recovering {} pending transactions from pending_tx_log.",
916 num_pending_txes
917 );
918 let mut recovery = pending_txes
919 .into_iter()
920 .map(|tx| {
921 let pending_tx_log = pending_tx_log.clone();
922 let transaction_driver = transaction_driver.clone();
923 async move {
924 let tx = tx.into_inner();
927 let tx_digest = *tx.digest();
928 if let Err(err) = transaction_driver
931 .drive_transaction(
932 SubmitTxRequest::new_transaction(tx),
933 SubmitTransactionOptions::default(),
934 Some(Duration::from_secs(60)),
935 )
936 .await
937 {
938 warn!(?tx_digest, "Failed to execute recovered transaction: {err}");
939 } else {
940 debug!(?tx_digest, "Executed recovered transaction");
941 }
942 if let Err(err) = pending_tx_log.finish_transaction(&tx_digest) {
943 warn!(
944 ?tx_digest,
945 "Failed to clean up transaction in pending log: {err}"
946 );
947 } else {
948 debug!(?tx_digest, "Cleaned up transaction in pending log");
949 }
950 }
951 })
952 .collect::<FuturesUnordered<_>>();
953
954 let mut num_recovered = 0;
955 while recovery.next().await.is_some() {
956 num_recovered += 1;
957 if num_recovered % 1000 == 0 {
958 info!(
959 "Recovered {} out of {} transactions from pending_tx_log.",
960 num_recovered, num_pending_txes
961 );
962 }
963 }
964 info!(
965 "Recovery finished. Recovered {} out of {} transactions from pending_tx_log.",
966 num_recovered, num_pending_txes
967 );
968 });
969 }
970}
971#[derive(Clone)]
973pub struct TransactionOrchestratorMetrics {
974 total_req_received_single_writer: GenericCounter<AtomicU64>,
975 total_req_received_shared_object: GenericCounter<AtomicU64>,
976
977 good_response_single_writer: GenericCounter<AtomicU64>,
978 good_response_shared_object: GenericCounter<AtomicU64>,
979
980 req_in_flight_single_writer: GenericGauge<AtomicI64>,
981 req_in_flight_shared_object: GenericGauge<AtomicI64>,
982
983 wait_for_finality_in_flight: GenericGauge<AtomicI64>,
984 wait_for_finality_finished: GenericCounter<AtomicU64>,
985 wait_for_finality_timeout: GenericCounter<AtomicU64>,
986
987 local_execution_in_flight: GenericGauge<AtomicI64>,
988 local_execution_success: GenericCounter<AtomicU64>,
989 local_execution_timeout: GenericCounter<AtomicU64>,
990
991 early_cached_response: IntCounter,
992 concurrent_execution: IntCounter,
993
994 early_validation_rejections: IntCounterVec,
995
996 background_retry_started: IntGauge,
997 background_retry_attempts: IntCounterVec,
998
999 request_latency: HistogramVec,
1000 local_execution_latency: HistogramVec,
1001 settlement_finality_latency: HistogramVec,
1002}
1003
1004impl TransactionOrchestratorMetrics {
1008 pub fn new(registry: &Registry) -> Self {
1009 let total_req_received = register_int_counter_vec_with_registry!(
1010 "tx_orchestrator_total_req_received",
1011 "Total number of executions request Transaction Orchestrator receives, group by tx type",
1012 &["tx_type"],
1013 registry
1014 )
1015 .unwrap();
1016
1017 let total_req_received_single_writer =
1018 total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
1019 let total_req_received_shared_object =
1020 total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
1021
1022 let good_response = register_int_counter_vec_with_registry!(
1023 "tx_orchestrator_good_response",
1024 "Total number of good responses Transaction Orchestrator generates, group by tx type",
1025 &["tx_type"],
1026 registry
1027 )
1028 .unwrap();
1029
1030 let good_response_single_writer =
1031 good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
1032 let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
1033
1034 let req_in_flight = register_int_gauge_vec_with_registry!(
1035 "tx_orchestrator_req_in_flight",
1036 "Number of requests in flights Transaction Orchestrator processes, group by tx type",
1037 &["tx_type"],
1038 registry
1039 )
1040 .unwrap();
1041
1042 let req_in_flight_single_writer =
1043 req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
1044 let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
1045
1046 Self {
1047 total_req_received_single_writer,
1048 total_req_received_shared_object,
1049 good_response_single_writer,
1050 good_response_shared_object,
1051 req_in_flight_single_writer,
1052 req_in_flight_shared_object,
1053 wait_for_finality_in_flight: register_int_gauge_with_registry!(
1054 "tx_orchestrator_wait_for_finality_in_flight",
1055 "Number of in flight txns Transaction Orchestrator are waiting for finality for",
1056 registry,
1057 )
1058 .unwrap(),
1059 wait_for_finality_finished: register_int_counter_with_registry!(
1060 "tx_orchestrator_wait_for_finality_fnished",
1061 "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
1062 registry,
1063 )
1064 .unwrap(),
1065 wait_for_finality_timeout: register_int_counter_with_registry!(
1066 "tx_orchestrator_wait_for_finality_timeout",
1067 "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
1068 registry,
1069 )
1070 .unwrap(),
1071 local_execution_in_flight: register_int_gauge_with_registry!(
1072 "tx_orchestrator_local_execution_in_flight",
1073 "Number of local execution txns in flights Transaction Orchestrator handles",
1074 registry,
1075 )
1076 .unwrap(),
1077 local_execution_success: register_int_counter_with_registry!(
1078 "tx_orchestrator_local_execution_success",
1079 "Total number of successful local execution txns Transaction Orchestrator handles",
1080 registry,
1081 )
1082 .unwrap(),
1083 local_execution_timeout: register_int_counter_with_registry!(
1084 "tx_orchestrator_local_execution_timeout",
1085 "Total number of timed-out local execution txns Transaction Orchestrator handles",
1086 registry,
1087 )
1088 .unwrap(),
1089 early_cached_response: register_int_counter_with_registry!(
1090 "tx_orchestrator_early_cached_response",
1091 "Total number of requests returning cached results for already-executed transactions",
1092 registry,
1093 )
1094 .unwrap(),
1095 concurrent_execution: register_int_counter_with_registry!(
1096 "tx_orchestrator_concurrent_execution",
1097 "Total number of concurrent execution where effects are available locally finishing driving the transaction to finality",
1098 registry,
1099 )
1100 .unwrap(),
1101 early_validation_rejections: register_int_counter_vec_with_registry!(
1102 "tx_orchestrator_early_validation_rejections",
1103 "Total number of transactions rejected during early validation before submission, by reason",
1104 &["reason"],
1105 registry,
1106 )
1107 .unwrap(),
1108 background_retry_started: register_int_gauge_with_registry!(
1109 "tx_orchestrator_background_retry_started",
1110 "Number of background retry tasks kicked off for transactions with retriable errors",
1111 registry,
1112 )
1113 .unwrap(),
1114 background_retry_attempts: register_int_counter_vec_with_registry!(
1115 "tx_orchestrator_background_retry_attempts",
1116 "Total number of background retry attempts, by status",
1117 &["status"],
1118 registry,
1119 )
1120 .unwrap(),
1121 request_latency: register_histogram_vec_with_registry!(
1122 "tx_orchestrator_request_latency",
1123 "Time spent in processing one Transaction Orchestrator request",
1124 &["tx_type", "route", "wait_for_local_execution"],
1125 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1126 registry,
1127 )
1128 .unwrap(),
1129 local_execution_latency: register_histogram_vec_with_registry!(
1130 "tx_orchestrator_local_execution_latency",
1131 "Time spent in waiting for one Transaction Orchestrator gets locally executed",
1132 &["tx_type"],
1133 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1134 registry,
1135 )
1136 .unwrap(),
1137 settlement_finality_latency: register_histogram_vec_with_registry!(
1138 "tx_orchestrator_settlement_finality_latency",
1139 "Time spent in waiting for one Transaction Orchestrator gets settled and finalized",
1140 &["tx_type", "driver_type"],
1141 mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
1142 registry,
1143 )
1144 .unwrap(),
1145 }
1146 }
1147
1148 pub fn new_for_tests() -> Self {
1149 let registry = Registry::new();
1150 Self::new(®istry)
1151 }
1152}
1153
1154#[async_trait::async_trait]
1155impl<A> sui_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
1156where
1157 A: AuthorityAPI + Send + Sync + 'static + Clone,
1158{
1159 async fn execute_transaction(
1160 &self,
1161 request: ExecuteTransactionRequestV3,
1162 client_addr: Option<std::net::SocketAddr>,
1163 ) -> Result<ExecuteTransactionResponseV3, TransactionSubmissionError> {
1164 self.execute_transaction_v3(request, client_addr).await
1165 }
1166
1167 fn simulate_transaction(
1168 &self,
1169 transaction: TransactionData,
1170 checks: TransactionChecks,
1171 allow_mock_gas_coin: bool,
1172 ) -> Result<SimulateTransactionResult, SuiError> {
1173 self.inner
1174 .validator_state
1175 .simulate_transaction(transaction, checks, allow_mock_gas_coin)
1176 }
1177}
1178
1179struct TransactionSubmissionGuard {
1182 pending_tx_log: Arc<WritePathPendingTransactionLog>,
1183 tx_digest: TransactionDigest,
1184 is_new_transaction: bool,
1185}
1186
1187impl TransactionSubmissionGuard {
1188 pub fn new(
1189 pending_tx_log: Arc<WritePathPendingTransactionLog>,
1190 tx: &VerifiedTransaction,
1191 ) -> Self {
1192 let is_new_transaction = pending_tx_log.write_pending_transaction_maybe(tx);
1193 let tx_digest = *tx.digest();
1194 if is_new_transaction {
1195 debug!(?tx_digest, "Added transaction to inflight set");
1196 } else {
1197 debug!(
1198 ?tx_digest,
1199 "Transaction already being processed, no new submission will be made"
1200 );
1201 };
1202 Self {
1203 pending_tx_log,
1204 tx_digest,
1205 is_new_transaction,
1206 }
1207 }
1208
1209 fn is_new_transaction(&self) -> bool {
1210 self.is_new_transaction
1211 }
1212}
1213
1214impl Drop for TransactionSubmissionGuard {
1215 fn drop(&mut self) {
1216 if let Err(err) = self.pending_tx_log.finish_transaction(&self.tx_digest) {
1217 warn!(?self.tx_digest, "Failed to clean up transaction in pending log: {err}");
1218 } else {
1219 debug!(?self.tx_digest, "Cleaned up transaction in pending log");
1220 }
1221 }
1222}