1use std::net::SocketAddr;
5use std::path::Path;
6use std::sync::Arc;
7use std::time::Duration;
8
9use futures::FutureExt;
10use futures::stream::{FuturesUnordered, StreamExt};
11use mysten_common::{backoff, in_antithesis};
12use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, spawn_monitored_task};
13use mysten_metrics::{add_server_timing, spawn_logged_monitored_task};
14use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
15use prometheus::{
16 HistogramVec, IntCounter, IntCounterVec, IntGauge, Registry,
17 register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
18 register_int_counter_with_registry, register_int_gauge_vec_with_registry,
19 register_int_gauge_with_registry,
20};
21use rand::Rng;
22use sui_config::NodeConfig;
23use sui_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
24use sui_types::base_types::TransactionDigest;
25use sui_types::effects::TransactionEffectsAPI;
26use sui_types::error::{ErrorCategory, SuiError, SuiErrorKind, SuiResult};
27use sui_types::messages_grpc::{SubmitTxRequest, TxType};
28use sui_types::sui_system_state::SuiSystemState;
29use sui_types::transaction::{Transaction, TransactionData, VerifiedTransaction};
30use sui_types::transaction_driver_types::{
31 EffectsFinalityInfo, ExecuteTransactionRequestType, ExecuteTransactionRequestV3,
32 ExecuteTransactionResponseV3, FinalizedEffects, IsTransactionExecutedLocally,
33 TransactionSubmissionError,
34};
35use sui_types::transaction_executor::{SimulateTransactionResult, TransactionChecks};
36use tokio::sync::broadcast::Receiver;
37use tokio::time::{Instant, sleep, timeout};
38use tracing::{Instrument, debug, error_span, info, instrument, warn};
39
40use crate::authority::AuthorityState;
41use crate::authority_aggregator::AuthorityAggregator;
42use crate::authority_client::{AuthorityAPI, NetworkAuthorityClient};
43use crate::transaction_driver::reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver};
44use crate::transaction_driver::{
45 QuorumTransactionResponse, SubmitTransactionOptions, TransactionDriver, TransactionDriverError,
46 TransactionDriverMetrics,
47};
48
49const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
52
53const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(90);
55
56pub type QuorumTransactionEffectsResult = Result<
57 (Transaction, QuorumTransactionResponse),
58 (TransactionDigest, TransactionSubmissionError),
59>;
60
61pub struct TransactionOrchestrator<A: Clone> {
67 inner: Arc<Inner<A>>,
68}
69
70impl TransactionOrchestrator<NetworkAuthorityClient> {
71 pub fn new_with_auth_aggregator(
72 validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
73 validator_state: Arc<AuthorityState>,
74 reconfig_channel: Receiver<SuiSystemState>,
75 parent_path: &Path,
76 prometheus_registry: &Registry,
77 node_config: &NodeConfig,
78 ) -> Self {
79 let observer = OnsiteReconfigObserver::new(
80 reconfig_channel,
81 validator_state.get_object_cache_reader().clone(),
82 validator_state.clone_committee_store(),
83 validators.safe_client_metrics_base.clone(),
84 );
85 TransactionOrchestrator::new(
86 validators,
87 validator_state,
88 parent_path,
89 prometheus_registry,
90 observer,
91 node_config,
92 )
93 }
94}
95
96impl<A> TransactionOrchestrator<A>
97where
98 A: AuthorityAPI + Send + Sync + 'static + Clone,
99 OnsiteReconfigObserver: ReconfigObserver<A>,
100{
101 pub fn new(
102 validators: Arc<AuthorityAggregator<A>>,
103 validator_state: Arc<AuthorityState>,
104 parent_path: &Path,
105 prometheus_registry: &Registry,
106 reconfig_observer: OnsiteReconfigObserver,
107 node_config: &NodeConfig,
108 ) -> Self {
109 let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
110 let td_metrics = Arc::new(TransactionDriverMetrics::new(prometheus_registry));
111 let client_metrics = Arc::new(
112 crate::validator_client_monitor::ValidatorClientMetrics::new(prometheus_registry),
113 );
114 let reconfig_observer = Arc::new(reconfig_observer);
115
116 let transaction_driver = TransactionDriver::new(
117 validators.clone(),
118 reconfig_observer.clone(),
119 td_metrics,
120 Some(node_config),
121 client_metrics,
122 );
123
124 let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
125 parent_path.join("fullnode_pending_transactions"),
126 ));
127 Inner::start_task_to_recover_txes_in_log(
128 pending_tx_log.clone(),
129 transaction_driver.clone(),
130 );
131
132 let td_allowed_submission_list = node_config
133 .transaction_driver_config
134 .as_ref()
135 .map(|config| config.allowed_submission_validators.clone())
136 .unwrap_or_default();
137
138 let td_blocked_submission_list = node_config
139 .transaction_driver_config
140 .as_ref()
141 .map(|config| config.blocked_submission_validators.clone())
142 .unwrap_or_default();
143
144 if !td_allowed_submission_list.is_empty() && !td_blocked_submission_list.is_empty() {
145 panic!(
146 "Both allowed and blocked submission lists are set, this is not allowed, {:?} {:?}",
147 td_allowed_submission_list, td_blocked_submission_list
148 );
149 }
150
151 let enable_early_validation = node_config
152 .transaction_driver_config
153 .as_ref()
154 .map(|config| config.enable_early_validation)
155 .unwrap_or(true);
156
157 let inner = Arc::new(Inner {
158 validator_state,
159 pending_tx_log,
160 metrics,
161 transaction_driver,
162 td_allowed_submission_list,
163 td_blocked_submission_list,
164 enable_early_validation,
165 });
166 Self { inner }
167 }
168}
169
170impl<A> TransactionOrchestrator<A>
171where
172 A: AuthorityAPI + Send + Sync + 'static + Clone,
173{
174 #[instrument(name = "tx_orchestrator_execute_transaction", level = "debug", skip_all,
175 fields(
176 tx_digest = ?request.transaction.digest(),
177 tx_type = ?request_type,
178 ))]
179 pub async fn execute_transaction_block(
180 &self,
181 request: ExecuteTransactionRequestV3,
182 request_type: ExecuteTransactionRequestType,
183 client_addr: Option<SocketAddr>,
184 ) -> Result<
185 (ExecuteTransactionResponseV3, IsTransactionExecutedLocally),
186 TransactionSubmissionError,
187 > {
188 let timer = Instant::now();
189 let tx_type = if request.transaction.is_consensus_tx() {
190 TxType::SharedObject
191 } else {
192 TxType::SingleWriter
193 };
194 let tx_digest = *request.transaction.digest();
195
196 let inner = self.inner.clone();
197 let (response, mut executed_locally) = spawn_monitored_task!(
198 Inner::<A>::execute_transaction_with_retry(inner, request, client_addr)
199 )
200 .await
201 .map_err(|e| TransactionSubmissionError::TransactionFailed {
202 category: ErrorCategory::Internal,
203 details: e.to_string(),
204 })??;
205
206 if !executed_locally {
207 executed_locally = if matches!(
208 request_type,
209 ExecuteTransactionRequestType::WaitForLocalExecution
210 ) {
211 let executed_locally =
212 Inner::<A>::wait_for_finalized_tx_executed_locally_with_timeout(
213 &self.inner.validator_state,
214 tx_digest,
215 tx_type,
216 &self.inner.metrics,
217 )
218 .await
219 .is_ok();
220 add_server_timing("local_execution done");
221 executed_locally
222 } else {
223 false
224 };
225 }
226
227 let QuorumTransactionResponse {
228 effects,
229 events,
230 input_objects,
231 output_objects,
232 auxiliary_data,
233 } = response;
234
235 let response = ExecuteTransactionResponseV3 {
236 effects,
237 events,
238 input_objects,
239 output_objects,
240 auxiliary_data,
241 };
242
243 self.inner
244 .metrics
245 .request_latency
246 .with_label_values(&[
247 tx_type.as_str(),
248 "execute_transaction_block",
249 executed_locally.to_string().as_str(),
250 ])
251 .observe(timer.elapsed().as_secs_f64());
252
253 Ok((response, executed_locally))
254 }
255
256 #[instrument(name = "tx_orchestrator_execute_transaction_v3", level = "debug", skip_all,
258 fields(tx_digest = ?request.transaction.digest()))]
259 pub async fn execute_transaction_v3(
260 &self,
261 request: ExecuteTransactionRequestV3,
262 client_addr: Option<SocketAddr>,
263 ) -> Result<ExecuteTransactionResponseV3, TransactionSubmissionError> {
264 let timer = Instant::now();
265 let tx_type = if request.transaction.is_consensus_tx() {
266 TxType::SharedObject
267 } else {
268 TxType::SingleWriter
269 };
270
271 let inner = self.inner.clone();
272 let (response, _) = spawn_monitored_task!(Inner::<A>::execute_transaction_with_retry(
273 inner,
274 request,
275 client_addr
276 ))
277 .await
278 .map_err(|e| TransactionSubmissionError::TransactionFailed {
279 category: ErrorCategory::Internal,
280 details: e.to_string(),
281 })??;
282
283 self.inner
284 .metrics
285 .request_latency
286 .with_label_values(&[tx_type.as_str(), "execute_transaction_v3", "false"])
287 .observe(timer.elapsed().as_secs_f64());
288
289 let QuorumTransactionResponse {
290 effects,
291 events,
292 input_objects,
293 output_objects,
294 auxiliary_data,
295 } = response;
296
297 Ok(ExecuteTransactionResponseV3 {
298 effects,
299 events,
300 input_objects,
301 output_objects,
302 auxiliary_data,
303 })
304 }
305
306 pub fn authority_state(&self) -> &Arc<AuthorityState> {
307 &self.inner.validator_state
308 }
309
310 pub fn transaction_driver(&self) -> &Arc<TransactionDriver<A>> {
311 &self.inner.transaction_driver
312 }
313
314 pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
315 self.inner
316 .transaction_driver
317 .authority_aggregator()
318 .load_full()
319 }
320
321 pub fn load_all_pending_transactions_in_test(&self) -> SuiResult<Vec<VerifiedTransaction>> {
322 self.inner.pending_tx_log.load_all_pending_transactions()
323 }
324
325 pub fn empty_pending_tx_log_in_test(&self) -> bool {
326 self.inner.pending_tx_log.is_empty()
327 }
328}
329
330struct Inner<A: Clone> {
331 validator_state: Arc<AuthorityState>,
332 pending_tx_log: Arc<WritePathPendingTransactionLog>,
333 metrics: Arc<TransactionOrchestratorMetrics>,
334 transaction_driver: Arc<TransactionDriver<A>>,
335 td_allowed_submission_list: Vec<String>,
336 td_blocked_submission_list: Vec<String>,
337 enable_early_validation: bool,
338}
339
340impl<A> Inner<A>
341where
342 A: AuthorityAPI + Send + Sync + 'static + Clone,
343{
344 async fn execute_transaction_with_retry(
345 inner: Arc<Inner<A>>,
346 request: ExecuteTransactionRequestV3,
347 client_addr: Option<SocketAddr>,
348 ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), TransactionSubmissionError>
349 {
350 let result = inner
351 .execute_transaction_with_effects_waiting(
352 request.clone(),
353 client_addr,
354 false,
355 )
356 .await;
357
358 if let Err(e) = &result
360 && e.is_retriable()
361 {
362 spawn_monitored_task!(async move {
363 inner.metrics.background_retry_started.inc();
364 let backoff = backoff::ExponentialBackoff::new(
365 Duration::from_secs(1),
366 Duration::from_secs(300),
367 );
368 const MAX_RETRIES: usize = 10;
369 for (i, delay) in backoff.enumerate() {
370 if i == MAX_RETRIES {
371 break;
372 }
373 let result = inner
376 .execute_transaction_with_effects_waiting(
377 request.clone(),
378 client_addr,
379 i > 3,
380 )
381 .await;
382 match result {
383 Ok(_) => {
384 inner
385 .metrics
386 .background_retry_attempts
387 .with_label_values(&["success"])
388 .inc();
389 debug!(
390 "Background retry {i} for transaction {} succeeded",
391 request.transaction.digest()
392 );
393 break;
394 }
395 Err(e) => {
396 if !e.is_retriable() {
397 inner
398 .metrics
399 .background_retry_attempts
400 .with_label_values(&["non-retriable"])
401 .inc();
402 debug!(
403 "Background retry {i} for transaction {} has non-retriable error: {e:?}. Terminating...",
404 request.transaction.digest()
405 );
406 break;
407 }
408 inner
409 .metrics
410 .background_retry_attempts
411 .with_label_values(&["retriable"])
412 .inc();
413 debug!(
414 "Background retry {i} for transaction {} has retriable error: {e:?}. Continuing...",
415 request.transaction.digest()
416 );
417 }
418 };
419 sleep(delay).await;
420 }
421 });
422 }
423
424 result
425 }
426
427 async fn execute_transaction_with_effects_waiting(
429 &self,
430 request: ExecuteTransactionRequestV3,
431 client_addr: Option<SocketAddr>,
432 enforce_live_input_objects: bool,
433 ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), TransactionSubmissionError>
434 {
435 let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
436 let verified_transaction = epoch_store
437 .verify_transaction_with_current_aliases(request.transaction.clone())
438 .map_err(TransactionSubmissionError::InvalidUserSignature)?
439 .into_tx();
440 let tx_digest = *verified_transaction.digest();
441
442 if self.enable_early_validation
445 && let Err(e) = self.validator_state.check_transaction_validity(
446 &epoch_store,
447 &verified_transaction,
448 enforce_live_input_objects,
449 )
450 {
451 let error_category = e.categorize();
452 if !error_category.is_submission_retriable() {
453 if !self.validator_state.is_tx_already_executed(&tx_digest) {
455 self.metrics
456 .early_validation_rejections
457 .with_label_values(&[e.to_variant_name()])
458 .inc();
459 debug!(
460 error = ?e,
461 "Transaction rejected during early validation"
462 );
463
464 return Err(TransactionSubmissionError::TransactionFailed {
465 category: error_category,
466 details: e.to_string(),
467 });
468 }
469 }
470 }
471
472 let guard =
474 TransactionSubmissionGuard::new(self.pending_tx_log.clone(), &verified_transaction);
475 let is_new_transaction = guard.is_new_transaction();
476
477 let include_events = request.include_events;
478 let include_input_objects = request.include_input_objects;
479 let include_output_objects = request.include_output_objects;
480 let include_auxiliary_data = request.include_auxiliary_data;
481
482 let finality_timeout = std::env::var("WAIT_FOR_FINALITY_TIMEOUT_SECS")
483 .ok()
484 .and_then(|v| v.parse().ok())
485 .map(Duration::from_secs)
486 .unwrap_or(WAIT_FOR_FINALITY_TIMEOUT);
487
488 let num_submissions = if !is_new_transaction {
489 0
491 } else if cfg!(msim) || in_antithesis() {
492 let r = rand::thread_rng().gen_range(1..=100);
494 let n = if r <= 10 {
495 3
496 } else if r <= 30 {
497 2
498 } else {
499 1
500 };
501 if n > 1 {
502 debug!("Making {n} execution calls");
503 }
504 n
505 } else {
506 1
507 };
508
509 let mut execution_futures = FuturesUnordered::new();
511 for i in 0..num_submissions {
512 let should_delay = i > 0 && rand::thread_rng().gen_bool(0.8);
514 let delay_ms = if should_delay {
515 rand::thread_rng().gen_range(100..=500)
516 } else {
517 0
518 };
519
520 let request = request.clone();
521 let verified_transaction = verified_transaction.clone();
522
523 let future = async move {
524 if delay_ms > 0 {
525 sleep(Duration::from_millis(delay_ms)).await;
527 }
528 self.execute_transaction_impl(
529 request,
530 verified_transaction,
531 client_addr,
532 Some(finality_timeout),
533 )
534 .await
535 }
536 .boxed();
537 execution_futures.push(future);
538 }
539
540 let mut last_execution_error: Option<TransactionSubmissionError> = None;
542
543 let digests = [tx_digest];
545 let mut local_effects_future = epoch_store
546 .within_alive_epoch(
547 self.validator_state
548 .get_transaction_cache_reader()
549 .notify_read_executed_effects(
550 "TransactionOrchestrator::notify_read_execute_transaction_with_effects_waiting",
551 &digests,
552 ),
553 )
554 .boxed();
555
556 let mut timeout_future = tokio::time::sleep(finality_timeout).boxed();
558
559 loop {
560 tokio::select! {
561 biased;
562
563 local_effects_result = &mut local_effects_future => {
565 match local_effects_result {
566 Ok(effects) => {
567 debug!(
568 "Effects became available while execution was running"
569 );
570 if let Some(effects) = effects.into_iter().next() {
571 self.metrics.concurrent_execution.inc();
572 let epoch = effects.executed_epoch();
573 let events = if include_events {
574 if effects.events_digest().is_some() {
575 Some(self.validator_state.get_transaction_events(effects.transaction_digest())
576 .map_err(TransactionSubmissionError::TransactionDriverInternalError)?)
577 } else {
578 None
579 }
580 } else {
581 None
582 };
583 let input_objects = include_input_objects
584 .then(|| self.validator_state.get_transaction_input_objects(&effects))
585 .transpose()
586 .map_err(TransactionSubmissionError::TransactionDriverInternalError)?;
587 let output_objects = include_output_objects
588 .then(|| self.validator_state.get_transaction_output_objects(&effects))
589 .transpose()
590 .map_err(TransactionSubmissionError::TransactionDriverInternalError)?;
591 let response = QuorumTransactionResponse {
592 effects: FinalizedEffects {
593 effects,
594 finality_info: EffectsFinalityInfo::QuorumExecuted(epoch),
595 },
596 events,
597 input_objects,
598 output_objects,
599 auxiliary_data: None,
600 };
601 break Ok((response, true));
602 }
603 }
604 Err(_) => {
605 warn!("Epoch terminated before effects were available");
606 }
607 };
608
609 local_effects_future = futures::future::pending().boxed();
611 }
612
613 Some(result) = execution_futures.next() => {
615 match result {
616 Ok(resp) => {
617 debug!("Execution succeeded, returning response");
619 let QuorumTransactionResponse {
620 effects,
621 events,
622 input_objects,
623 output_objects,
624 auxiliary_data,
625 } = resp;
626 let resp = QuorumTransactionResponse {
628 effects,
629 events: if include_events { events } else { None },
630 input_objects: if include_input_objects { input_objects } else { None },
631 output_objects: if include_output_objects { output_objects } else { None },
632 auxiliary_data: if include_auxiliary_data { auxiliary_data } else { None },
633 };
634 break Ok((resp, false));
635 }
636 Err(e) => {
637 debug!(?e, "Execution attempt failed, wait for other attempts");
638 last_execution_error = Some(e);
639 }
640 };
641
642 if execution_futures.is_empty() {
644 break Err(last_execution_error.unwrap());
645 }
646 }
647
648 _ = &mut timeout_future => {
650 if let Some(e) = last_execution_error {
651 debug!("Timeout waiting for transaction finality. Last execution error: {e}");
652 } else {
653 debug!("Timeout waiting for transaction finality.");
654 }
655 self.metrics.wait_for_finality_timeout.inc();
656
657 break Err(TransactionSubmissionError::TimeoutBeforeFinality);
659 }
660 }
661 }
662 }
663
664 #[instrument(level = "error", skip_all)]
665 async fn execute_transaction_impl(
666 &self,
667 request: ExecuteTransactionRequestV3,
668 verified_transaction: VerifiedTransaction,
669 client_addr: Option<SocketAddr>,
670 finality_timeout: Option<Duration>,
671 ) -> Result<QuorumTransactionResponse, TransactionSubmissionError> {
672 debug!("TO Received transaction execution request.");
673
674 let timer = Instant::now();
675 let tx_type = if verified_transaction.is_consensus_tx() {
676 TxType::SharedObject
677 } else {
678 TxType::SingleWriter
679 };
680
681 let (_in_flight_metrics_guards, good_response_metrics) =
682 self.update_metrics(&request.transaction);
683
684 let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
686 wait_for_finality_gauge.inc();
687 let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
688 in_flight.dec();
689 });
690
691 let response = self
692 .submit_with_transaction_driver(
693 &self.transaction_driver,
694 &request,
695 client_addr,
696 &verified_transaction,
697 good_response_metrics,
698 finality_timeout,
699 )
700 .await?;
701 let driver_type = "transaction_driver";
702
703 add_server_timing("wait_for_finality done");
704
705 self.metrics.wait_for_finality_finished.inc();
706
707 let elapsed = timer.elapsed().as_secs_f64();
708 self.metrics
709 .settlement_finality_latency
710 .with_label_values(&[tx_type.as_str(), driver_type])
711 .observe(elapsed);
712 good_response_metrics.inc();
713
714 Ok(response)
715 }
716
717 #[instrument(level = "error", skip_all, err(level = "info"))]
718 async fn submit_with_transaction_driver(
719 &self,
720 td: &Arc<TransactionDriver<A>>,
721 request: &ExecuteTransactionRequestV3,
722 client_addr: Option<SocketAddr>,
723 verified_transaction: &VerifiedTransaction,
724 good_response_metrics: &GenericCounter<AtomicU64>,
725 timeout_duration: Option<Duration>,
726 ) -> Result<QuorumTransactionResponse, TransactionSubmissionError> {
727 let tx_digest = *verified_transaction.digest();
728 debug!("Using TransactionDriver for transaction {:?}", tx_digest);
729
730 let td_response = td
731 .drive_transaction(
732 SubmitTxRequest::new_transaction(request.transaction.clone()),
733 SubmitTransactionOptions {
734 forwarded_client_addr: client_addr,
735 allowed_validators: self.td_allowed_submission_list.clone(),
736 blocked_validators: self.td_blocked_submission_list.clone(),
737 },
738 timeout_duration,
739 )
740 .await
741 .map_err(|e| match e {
742 TransactionDriverError::TimeoutWithLastRetriableError {
743 last_error,
744 attempts,
745 timeout,
746 } => TransactionSubmissionError::TimeoutBeforeFinalityWithErrors {
747 last_error: last_error.map(|e| e.to_string()).unwrap_or_default(),
748 attempts,
749 timeout,
750 },
751 other => TransactionSubmissionError::TransactionFailed {
752 category: other.categorize(),
753 details: other.to_string(),
754 },
755 });
756
757 match td_response {
758 Err(e) => {
759 warn!("TransactionDriver error: {e:?}");
760 Err(e)
761 }
762 Ok(quorum_transaction_response) => {
763 good_response_metrics.inc();
764 Ok(quorum_transaction_response)
765 }
766 }
767 }
768
769 #[instrument(
770 name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout",
771 level = "debug",
772 skip_all,
773 err(level = "info")
774 )]
775 async fn wait_for_finalized_tx_executed_locally_with_timeout(
776 validator_state: &Arc<AuthorityState>,
777 tx_digest: TransactionDigest,
778 tx_type: TxType,
779 metrics: &TransactionOrchestratorMetrics,
780 ) -> SuiResult {
781 metrics.local_execution_in_flight.inc();
782 let _metrics_guard =
783 scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
784 in_flight.dec();
785 });
786
787 let _latency_guard = metrics
788 .local_execution_latency
789 .with_label_values(&[tx_type.as_str()])
790 .start_timer();
791 debug!("Waiting for finalized tx to be executed locally.");
792 match timeout(
793 LOCAL_EXECUTION_TIMEOUT,
794 validator_state
795 .get_transaction_cache_reader()
796 .notify_read_executed_effects_digests(
797 "TransactionOrchestrator::notify_read_wait_for_local_execution",
798 &[tx_digest],
799 ),
800 )
801 .instrument(error_span!(
802 "transaction_orchestrator::local_execution",
803 ?tx_digest
804 ))
805 .await
806 {
807 Err(_elapsed) => {
808 debug!(
809 "Waiting for finalized tx to be executed locally timed out within {:?}.",
810 LOCAL_EXECUTION_TIMEOUT
811 );
812 metrics.local_execution_timeout.inc();
813 Err(SuiErrorKind::TimeoutError.into())
814 }
815 Ok(_) => {
816 metrics.local_execution_success.inc();
817 Ok(())
818 }
819 }
820 }
821
822 fn update_metrics<'a>(
823 &'a self,
824 transaction: &Transaction,
825 ) -> (impl Drop + use<A>, &'a GenericCounter<AtomicU64>) {
826 let (in_flight, good_response) = if transaction.is_consensus_tx() {
827 self.metrics.total_req_received_shared_object.inc();
828 (
829 self.metrics.req_in_flight_shared_object.clone(),
830 &self.metrics.good_response_shared_object,
831 )
832 } else {
833 self.metrics.total_req_received_single_writer.inc();
834 (
835 self.metrics.req_in_flight_single_writer.clone(),
836 &self.metrics.good_response_single_writer,
837 )
838 };
839 in_flight.inc();
840 (
841 scopeguard::guard(in_flight, |in_flight| {
842 in_flight.dec();
843 }),
844 good_response,
845 )
846 }
847
848 fn start_task_to_recover_txes_in_log(
849 pending_tx_log: Arc<WritePathPendingTransactionLog>,
850 transaction_driver: Arc<TransactionDriver<A>>,
851 ) {
852 spawn_logged_monitored_task!(async move {
853 if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
854 info!("Skipping loading pending transactions from pending_tx_log.");
855 return;
856 }
857 let pending_txes = pending_tx_log
858 .load_all_pending_transactions()
859 .expect("failed to load all pending transactions");
860 let num_pending_txes = pending_txes.len();
861 info!(
862 "Recovering {} pending transactions from pending_tx_log.",
863 num_pending_txes
864 );
865 let mut recovery = pending_txes
866 .into_iter()
867 .map(|tx| {
868 let pending_tx_log = pending_tx_log.clone();
869 let transaction_driver = transaction_driver.clone();
870 async move {
871 let tx = tx.into_inner();
874 let tx_digest = *tx.digest();
875 if let Err(err) = transaction_driver
878 .drive_transaction(
879 SubmitTxRequest::new_transaction(tx),
880 SubmitTransactionOptions::default(),
881 Some(Duration::from_secs(60)),
882 )
883 .await
884 {
885 warn!(?tx_digest, "Failed to execute recovered transaction: {err}");
886 } else {
887 debug!(?tx_digest, "Executed recovered transaction");
888 }
889 if let Err(err) = pending_tx_log.finish_transaction(&tx_digest) {
890 warn!(
891 ?tx_digest,
892 "Failed to clean up transaction in pending log: {err}"
893 );
894 } else {
895 debug!(?tx_digest, "Cleaned up transaction in pending log");
896 }
897 }
898 })
899 .collect::<FuturesUnordered<_>>();
900
901 let mut num_recovered = 0;
902 while recovery.next().await.is_some() {
903 num_recovered += 1;
904 if num_recovered % 1000 == 0 {
905 info!(
906 "Recovered {} out of {} transactions from pending_tx_log.",
907 num_recovered, num_pending_txes
908 );
909 }
910 }
911 info!(
912 "Recovery finished. Recovered {} out of {} transactions from pending_tx_log.",
913 num_recovered, num_pending_txes
914 );
915 });
916 }
917}
918#[derive(Clone)]
920pub struct TransactionOrchestratorMetrics {
921 total_req_received_single_writer: GenericCounter<AtomicU64>,
922 total_req_received_shared_object: GenericCounter<AtomicU64>,
923
924 good_response_single_writer: GenericCounter<AtomicU64>,
925 good_response_shared_object: GenericCounter<AtomicU64>,
926
927 req_in_flight_single_writer: GenericGauge<AtomicI64>,
928 req_in_flight_shared_object: GenericGauge<AtomicI64>,
929
930 wait_for_finality_in_flight: GenericGauge<AtomicI64>,
931 wait_for_finality_finished: GenericCounter<AtomicU64>,
932 wait_for_finality_timeout: GenericCounter<AtomicU64>,
933
934 local_execution_in_flight: GenericGauge<AtomicI64>,
935 local_execution_success: GenericCounter<AtomicU64>,
936 local_execution_timeout: GenericCounter<AtomicU64>,
937
938 concurrent_execution: IntCounter,
939
940 early_validation_rejections: IntCounterVec,
941
942 background_retry_started: IntGauge,
943 background_retry_attempts: IntCounterVec,
944
945 request_latency: HistogramVec,
946 local_execution_latency: HistogramVec,
947 settlement_finality_latency: HistogramVec,
948}
949
950impl TransactionOrchestratorMetrics {
954 pub fn new(registry: &Registry) -> Self {
955 let total_req_received = register_int_counter_vec_with_registry!(
956 "tx_orchestrator_total_req_received",
957 "Total number of executions request Transaction Orchestrator receives, group by tx type",
958 &["tx_type"],
959 registry
960 )
961 .unwrap();
962
963 let total_req_received_single_writer =
964 total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
965 let total_req_received_shared_object =
966 total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
967
968 let good_response = register_int_counter_vec_with_registry!(
969 "tx_orchestrator_good_response",
970 "Total number of good responses Transaction Orchestrator generates, group by tx type",
971 &["tx_type"],
972 registry
973 )
974 .unwrap();
975
976 let good_response_single_writer =
977 good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
978 let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
979
980 let req_in_flight = register_int_gauge_vec_with_registry!(
981 "tx_orchestrator_req_in_flight",
982 "Number of requests in flights Transaction Orchestrator processes, group by tx type",
983 &["tx_type"],
984 registry
985 )
986 .unwrap();
987
988 let req_in_flight_single_writer =
989 req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
990 let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
991
992 Self {
993 total_req_received_single_writer,
994 total_req_received_shared_object,
995 good_response_single_writer,
996 good_response_shared_object,
997 req_in_flight_single_writer,
998 req_in_flight_shared_object,
999 wait_for_finality_in_flight: register_int_gauge_with_registry!(
1000 "tx_orchestrator_wait_for_finality_in_flight",
1001 "Number of in flight txns Transaction Orchestrator are waiting for finality for",
1002 registry,
1003 )
1004 .unwrap(),
1005 wait_for_finality_finished: register_int_counter_with_registry!(
1006 "tx_orchestrator_wait_for_finality_fnished",
1007 "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
1008 registry,
1009 )
1010 .unwrap(),
1011 wait_for_finality_timeout: register_int_counter_with_registry!(
1012 "tx_orchestrator_wait_for_finality_timeout",
1013 "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
1014 registry,
1015 )
1016 .unwrap(),
1017 local_execution_in_flight: register_int_gauge_with_registry!(
1018 "tx_orchestrator_local_execution_in_flight",
1019 "Number of local execution txns in flights Transaction Orchestrator handles",
1020 registry,
1021 )
1022 .unwrap(),
1023 local_execution_success: register_int_counter_with_registry!(
1024 "tx_orchestrator_local_execution_success",
1025 "Total number of successful local execution txns Transaction Orchestrator handles",
1026 registry,
1027 )
1028 .unwrap(),
1029 local_execution_timeout: register_int_counter_with_registry!(
1030 "tx_orchestrator_local_execution_timeout",
1031 "Total number of timed-out local execution txns Transaction Orchestrator handles",
1032 registry,
1033 )
1034 .unwrap(),
1035 concurrent_execution: register_int_counter_with_registry!(
1036 "tx_orchestrator_concurrent_execution",
1037 "Total number of concurrent execution where effects are available locally finishing driving the transaction to finality",
1038 registry,
1039 )
1040 .unwrap(),
1041 early_validation_rejections: register_int_counter_vec_with_registry!(
1042 "tx_orchestrator_early_validation_rejections",
1043 "Total number of transactions rejected during early validation before submission, by reason",
1044 &["reason"],
1045 registry,
1046 )
1047 .unwrap(),
1048 background_retry_started: register_int_gauge_with_registry!(
1049 "tx_orchestrator_background_retry_started",
1050 "Number of background retry tasks kicked off for transactions with retriable errors",
1051 registry,
1052 )
1053 .unwrap(),
1054 background_retry_attempts: register_int_counter_vec_with_registry!(
1055 "tx_orchestrator_background_retry_attempts",
1056 "Total number of background retry attempts, by status",
1057 &["status"],
1058 registry,
1059 )
1060 .unwrap(),
1061 request_latency: register_histogram_vec_with_registry!(
1062 "tx_orchestrator_request_latency",
1063 "Time spent in processing one Transaction Orchestrator request",
1064 &["tx_type", "route", "wait_for_local_execution"],
1065 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1066 registry,
1067 )
1068 .unwrap(),
1069 local_execution_latency: register_histogram_vec_with_registry!(
1070 "tx_orchestrator_local_execution_latency",
1071 "Time spent in waiting for one Transaction Orchestrator gets locally executed",
1072 &["tx_type"],
1073 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1074 registry,
1075 )
1076 .unwrap(),
1077 settlement_finality_latency: register_histogram_vec_with_registry!(
1078 "tx_orchestrator_settlement_finality_latency",
1079 "Time spent in waiting for one Transaction Orchestrator gets settled and finalized",
1080 &["tx_type", "driver_type"],
1081 mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
1082 registry,
1083 )
1084 .unwrap(),
1085 }
1086 }
1087
1088 pub fn new_for_tests() -> Self {
1089 let registry = Registry::new();
1090 Self::new(®istry)
1091 }
1092}
1093
1094#[async_trait::async_trait]
1095impl<A> sui_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
1096where
1097 A: AuthorityAPI + Send + Sync + 'static + Clone,
1098{
1099 async fn execute_transaction(
1100 &self,
1101 request: ExecuteTransactionRequestV3,
1102 client_addr: Option<std::net::SocketAddr>,
1103 ) -> Result<ExecuteTransactionResponseV3, TransactionSubmissionError> {
1104 self.execute_transaction_v3(request, client_addr).await
1105 }
1106
1107 fn simulate_transaction(
1108 &self,
1109 transaction: TransactionData,
1110 checks: TransactionChecks,
1111 ) -> Result<SimulateTransactionResult, SuiError> {
1112 self.inner
1113 .validator_state
1114 .simulate_transaction(transaction, checks)
1115 }
1116}
1117
1118struct TransactionSubmissionGuard {
1121 pending_tx_log: Arc<WritePathPendingTransactionLog>,
1122 tx_digest: TransactionDigest,
1123 is_new_transaction: bool,
1124}
1125
1126impl TransactionSubmissionGuard {
1127 pub fn new(
1128 pending_tx_log: Arc<WritePathPendingTransactionLog>,
1129 tx: &VerifiedTransaction,
1130 ) -> Self {
1131 let is_new_transaction = pending_tx_log.write_pending_transaction_maybe(tx);
1132 let tx_digest = *tx.digest();
1133 if is_new_transaction {
1134 debug!(?tx_digest, "Added transaction to inflight set");
1135 } else {
1136 debug!(
1137 ?tx_digest,
1138 "Transaction already being processed, no new submission will be made"
1139 );
1140 };
1141 Self {
1142 pending_tx_log,
1143 tx_digest,
1144 is_new_transaction,
1145 }
1146 }
1147
1148 fn is_new_transaction(&self) -> bool {
1149 self.is_new_transaction
1150 }
1151}
1152
1153impl Drop for TransactionSubmissionGuard {
1154 fn drop(&mut self) {
1155 if let Err(err) = self.pending_tx_log.finish_transaction(&self.tx_digest) {
1156 warn!(?self.tx_digest, "Failed to clean up transaction in pending log: {err}");
1157 } else {
1158 debug!(?self.tx_digest, "Cleaned up transaction in pending log");
1159 }
1160 }
1161}