1use std::net::SocketAddr;
5use std::ops::Deref;
6use std::path::Path;
7use std::sync::Arc;
8use std::time::Duration;
9
10use futures::FutureExt;
11use futures::stream::{FuturesUnordered, StreamExt};
12use mysten_common::{backoff, in_antithesis};
13use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, spawn_monitored_task};
14use mysten_metrics::{add_server_timing, spawn_logged_monitored_task};
15use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
16use prometheus::{
17 HistogramVec, IntCounter, IntCounterVec, IntGauge, Registry,
18 register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
19 register_int_counter_with_registry, register_int_gauge_vec_with_registry,
20 register_int_gauge_with_registry,
21};
22use rand::Rng;
23use sui_config::NodeConfig;
24use sui_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
25use sui_types::base_types::TransactionDigest;
26use sui_types::effects::TransactionEffectsAPI;
27use sui_types::error::{ErrorCategory, SuiError, SuiErrorKind, SuiResult};
28use sui_types::messages_grpc::{SubmitTxRequest, TxType};
29use sui_types::quorum_driver_types::{
30 EffectsFinalityInfo, ExecuteTransactionRequestType, ExecuteTransactionRequestV3,
31 ExecuteTransactionResponseV3, FinalizedEffects, IsTransactionExecutedLocally,
32 QuorumDriverError,
33};
34use sui_types::sui_system_state::SuiSystemState;
35use sui_types::transaction::{Transaction, TransactionData, VerifiedTransaction};
36use sui_types::transaction_executor::{SimulateTransactionResult, TransactionChecks};
37use tokio::sync::broadcast::Receiver;
38use tokio::time::{Instant, sleep, timeout};
39use tracing::{Instrument, debug, error_span, info, instrument, warn};
40
41use crate::authority::AuthorityState;
42use crate::authority_aggregator::AuthorityAggregator;
43use crate::authority_client::{AuthorityAPI, NetworkAuthorityClient};
44use crate::quorum_driver::reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver};
45use crate::transaction_driver::{
46 QuorumTransactionResponse, SubmitTransactionOptions, TransactionDriver, TransactionDriverError,
47 TransactionDriverMetrics,
48};
49
50const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
53
54const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(90);
56
57pub type QuorumTransactionEffectsResult =
58 Result<(Transaction, QuorumTransactionResponse), (TransactionDigest, QuorumDriverError)>;
59
60pub struct TransactionOrchestrator<A: Clone> {
66 inner: Arc<Inner<A>>,
67}
68
69impl TransactionOrchestrator<NetworkAuthorityClient> {
70 pub fn new_with_auth_aggregator(
71 validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
72 validator_state: Arc<AuthorityState>,
73 reconfig_channel: Receiver<SuiSystemState>,
74 parent_path: &Path,
75 prometheus_registry: &Registry,
76 node_config: &NodeConfig,
77 ) -> Self {
78 let observer = OnsiteReconfigObserver::new(
79 reconfig_channel,
80 validator_state.get_object_cache_reader().clone(),
81 validator_state.clone_committee_store(),
82 validators.safe_client_metrics_base.clone(),
83 validators.metrics.deref().clone(),
84 );
85 TransactionOrchestrator::new(
86 validators,
87 validator_state,
88 parent_path,
89 prometheus_registry,
90 observer,
91 node_config,
92 )
93 }
94}
95
96impl<A> TransactionOrchestrator<A>
97where
98 A: AuthorityAPI + Send + Sync + 'static + Clone,
99 OnsiteReconfigObserver: ReconfigObserver<A>,
100{
101 pub fn new(
102 validators: Arc<AuthorityAggregator<A>>,
103 validator_state: Arc<AuthorityState>,
104 parent_path: &Path,
105 prometheus_registry: &Registry,
106 reconfig_observer: OnsiteReconfigObserver,
107 node_config: &NodeConfig,
108 ) -> Self {
109 let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
110 let td_metrics = Arc::new(TransactionDriverMetrics::new(prometheus_registry));
111 let client_metrics = Arc::new(
112 crate::validator_client_monitor::ValidatorClientMetrics::new(prometheus_registry),
113 );
114 let reconfig_observer = Arc::new(reconfig_observer);
115
116 let transaction_driver = TransactionDriver::new(
117 validators.clone(),
118 reconfig_observer.clone(),
119 td_metrics,
120 Some(node_config),
121 client_metrics,
122 );
123
124 let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
125 parent_path.join("fullnode_pending_transactions"),
126 ));
127 Inner::start_task_to_recover_txes_in_log(
128 pending_tx_log.clone(),
129 transaction_driver.clone(),
130 );
131
132 let td_allowed_submission_list = node_config
133 .transaction_driver_config
134 .as_ref()
135 .map(|config| config.allowed_submission_validators.clone())
136 .unwrap_or_default();
137
138 let td_blocked_submission_list = node_config
139 .transaction_driver_config
140 .as_ref()
141 .map(|config| config.blocked_submission_validators.clone())
142 .unwrap_or_default();
143
144 if !td_allowed_submission_list.is_empty() && !td_blocked_submission_list.is_empty() {
145 panic!(
146 "Both allowed and blocked submission lists are set, this is not allowed, {:?} {:?}",
147 td_allowed_submission_list, td_blocked_submission_list
148 );
149 }
150
151 let enable_early_validation = node_config
152 .transaction_driver_config
153 .as_ref()
154 .map(|config| config.enable_early_validation)
155 .unwrap_or(true);
156
157 let inner = Arc::new(Inner {
158 validator_state,
159 pending_tx_log,
160 metrics,
161 transaction_driver,
162 td_allowed_submission_list,
163 td_blocked_submission_list,
164 enable_early_validation,
165 });
166 Self { inner }
167 }
168}
169
170impl<A> TransactionOrchestrator<A>
171where
172 A: AuthorityAPI + Send + Sync + 'static + Clone,
173{
174 #[instrument(name = "tx_orchestrator_execute_transaction", level = "debug", skip_all,
175 fields(
176 tx_digest = ?request.transaction.digest(),
177 tx_type = ?request_type,
178 ))]
179 pub async fn execute_transaction_block(
180 &self,
181 request: ExecuteTransactionRequestV3,
182 request_type: ExecuteTransactionRequestType,
183 client_addr: Option<SocketAddr>,
184 ) -> Result<(ExecuteTransactionResponseV3, IsTransactionExecutedLocally), QuorumDriverError>
185 {
186 let timer = Instant::now();
187 let tx_type = if request.transaction.is_consensus_tx() {
188 TxType::SharedObject
189 } else {
190 TxType::SingleWriter
191 };
192 let tx_digest = *request.transaction.digest();
193
194 let inner = self.inner.clone();
195 let (response, mut executed_locally) = spawn_monitored_task!(
196 Inner::<A>::execute_transaction_with_retry(inner, request, client_addr)
197 )
198 .await
199 .map_err(|e| QuorumDriverError::TransactionFailed {
200 category: ErrorCategory::Internal,
201 details: e.to_string(),
202 })??;
203
204 if !executed_locally {
205 executed_locally = if matches!(
206 request_type,
207 ExecuteTransactionRequestType::WaitForLocalExecution
208 ) {
209 let executed_locally =
210 Inner::<A>::wait_for_finalized_tx_executed_locally_with_timeout(
211 &self.inner.validator_state,
212 tx_digest,
213 tx_type,
214 &self.inner.metrics,
215 )
216 .await
217 .is_ok();
218 add_server_timing("local_execution done");
219 executed_locally
220 } else {
221 false
222 };
223 }
224
225 let QuorumTransactionResponse {
226 effects,
227 events,
228 input_objects,
229 output_objects,
230 auxiliary_data,
231 } = response;
232
233 let response = ExecuteTransactionResponseV3 {
234 effects,
235 events,
236 input_objects,
237 output_objects,
238 auxiliary_data,
239 };
240
241 self.inner
242 .metrics
243 .request_latency
244 .with_label_values(&[
245 tx_type.as_str(),
246 "execute_transaction_block",
247 executed_locally.to_string().as_str(),
248 ])
249 .observe(timer.elapsed().as_secs_f64());
250
251 Ok((response, executed_locally))
252 }
253
254 #[instrument(name = "tx_orchestrator_execute_transaction_v3", level = "debug", skip_all,
256 fields(tx_digest = ?request.transaction.digest()))]
257 pub async fn execute_transaction_v3(
258 &self,
259 request: ExecuteTransactionRequestV3,
260 client_addr: Option<SocketAddr>,
261 ) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
262 let timer = Instant::now();
263 let tx_type = if request.transaction.is_consensus_tx() {
264 TxType::SharedObject
265 } else {
266 TxType::SingleWriter
267 };
268
269 let inner = self.inner.clone();
270 let (response, _) = spawn_monitored_task!(Inner::<A>::execute_transaction_with_retry(
271 inner,
272 request,
273 client_addr
274 ))
275 .await
276 .map_err(|e| QuorumDriverError::TransactionFailed {
277 category: ErrorCategory::Internal,
278 details: e.to_string(),
279 })??;
280
281 self.inner
282 .metrics
283 .request_latency
284 .with_label_values(&[tx_type.as_str(), "execute_transaction_v3", "false"])
285 .observe(timer.elapsed().as_secs_f64());
286
287 let QuorumTransactionResponse {
288 effects,
289 events,
290 input_objects,
291 output_objects,
292 auxiliary_data,
293 } = response;
294
295 Ok(ExecuteTransactionResponseV3 {
296 effects,
297 events,
298 input_objects,
299 output_objects,
300 auxiliary_data,
301 })
302 }
303
304 pub fn authority_state(&self) -> &Arc<AuthorityState> {
305 &self.inner.validator_state
306 }
307
308 pub fn transaction_driver(&self) -> &Arc<TransactionDriver<A>> {
309 &self.inner.transaction_driver
310 }
311
312 pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
313 self.inner
314 .transaction_driver
315 .authority_aggregator()
316 .load_full()
317 }
318
319 pub fn load_all_pending_transactions_in_test(&self) -> SuiResult<Vec<VerifiedTransaction>> {
320 self.inner.pending_tx_log.load_all_pending_transactions()
321 }
322
323 pub fn empty_pending_tx_log_in_test(&self) -> bool {
324 self.inner.pending_tx_log.is_empty()
325 }
326}
327
328struct Inner<A: Clone> {
329 validator_state: Arc<AuthorityState>,
330 pending_tx_log: Arc<WritePathPendingTransactionLog>,
331 metrics: Arc<TransactionOrchestratorMetrics>,
332 transaction_driver: Arc<TransactionDriver<A>>,
333 td_allowed_submission_list: Vec<String>,
334 td_blocked_submission_list: Vec<String>,
335 enable_early_validation: bool,
336}
337
338impl<A> Inner<A>
339where
340 A: AuthorityAPI + Send + Sync + 'static + Clone,
341{
342 async fn execute_transaction_with_retry(
343 inner: Arc<Inner<A>>,
344 request: ExecuteTransactionRequestV3,
345 client_addr: Option<SocketAddr>,
346 ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), QuorumDriverError> {
347 let result = inner
348 .execute_transaction_with_effects_waiting(
349 request.clone(),
350 client_addr,
351 false,
352 )
353 .await;
354
355 if let Err(e) = &result
357 && e.is_retriable()
358 {
359 spawn_monitored_task!(async move {
360 inner.metrics.background_retry_started.inc();
361 let backoff = backoff::ExponentialBackoff::new(
362 Duration::from_secs(1),
363 Duration::from_secs(300),
364 );
365 const MAX_RETRIES: usize = 10;
366 for (i, delay) in backoff.enumerate() {
367 if i == MAX_RETRIES {
368 break;
369 }
370 let result = inner
373 .execute_transaction_with_effects_waiting(
374 request.clone(),
375 client_addr,
376 i > 3,
377 )
378 .await;
379 match result {
380 Ok(_) => {
381 inner
382 .metrics
383 .background_retry_attempts
384 .with_label_values(&["success"])
385 .inc();
386 debug!(
387 "Background retry {i} for transaction {} succeeded",
388 request.transaction.digest()
389 );
390 break;
391 }
392 Err(e) => {
393 if !e.is_retriable() {
394 inner
395 .metrics
396 .background_retry_attempts
397 .with_label_values(&["non-retriable"])
398 .inc();
399 debug!(
400 "Background retry {i} for transaction {} has non-retriable error: {e:?}. Terminating...",
401 request.transaction.digest()
402 );
403 break;
404 }
405 inner
406 .metrics
407 .background_retry_attempts
408 .with_label_values(&["retriable"])
409 .inc();
410 debug!(
411 "Background retry {i} for transaction {} has retriable error: {e:?}. Continuing...",
412 request.transaction.digest()
413 );
414 }
415 };
416 sleep(delay).await;
417 }
418 });
419 }
420
421 result
422 }
423
424 async fn execute_transaction_with_effects_waiting(
426 &self,
427 request: ExecuteTransactionRequestV3,
428 client_addr: Option<SocketAddr>,
429 enforce_live_input_objects: bool,
430 ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), QuorumDriverError> {
431 let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
432 let verified_transaction = epoch_store
433 .verify_transaction_with_current_aliases(request.transaction.clone())
434 .map_err(QuorumDriverError::InvalidUserSignature)?
435 .into_tx();
436 let tx_digest = *verified_transaction.digest();
437
438 if self.enable_early_validation
441 && let Err(e) = self.validator_state.check_transaction_validity(
442 &epoch_store,
443 &verified_transaction,
444 enforce_live_input_objects,
445 )
446 {
447 let error_category = e.categorize();
448 if !error_category.is_submission_retriable() {
449 if !self.validator_state.is_tx_already_executed(&tx_digest) {
451 self.metrics
452 .early_validation_rejections
453 .with_label_values(&[e.to_variant_name()])
454 .inc();
455 debug!(
456 error = ?e,
457 "Transaction rejected during early validation"
458 );
459
460 return Err(QuorumDriverError::TransactionFailed {
461 category: error_category,
462 details: e.to_string(),
463 });
464 }
465 }
466 }
467
468 let guard =
470 TransactionSubmissionGuard::new(self.pending_tx_log.clone(), &verified_transaction);
471 let is_new_transaction = guard.is_new_transaction();
472
473 let include_events = request.include_events;
474 let include_input_objects = request.include_input_objects;
475 let include_output_objects = request.include_output_objects;
476 let include_auxiliary_data = request.include_auxiliary_data;
477
478 let finality_timeout = std::env::var("WAIT_FOR_FINALITY_TIMEOUT_SECS")
479 .ok()
480 .and_then(|v| v.parse().ok())
481 .map(Duration::from_secs)
482 .unwrap_or(WAIT_FOR_FINALITY_TIMEOUT);
483
484 let num_submissions = if !is_new_transaction {
485 0
487 } else if cfg!(msim) || in_antithesis() {
488 let r = rand::thread_rng().gen_range(1..=100);
490 let n = if r <= 10 {
491 3
492 } else if r <= 30 {
493 2
494 } else {
495 1
496 };
497 if n > 1 {
498 debug!("Making {n} execution calls");
499 }
500 n
501 } else {
502 1
503 };
504
505 let mut execution_futures = FuturesUnordered::new();
507 for i in 0..num_submissions {
508 let should_delay = i > 0 && rand::thread_rng().gen_bool(0.8);
510 let delay_ms = if should_delay {
511 rand::thread_rng().gen_range(100..=500)
512 } else {
513 0
514 };
515
516 let request = request.clone();
517 let verified_transaction = verified_transaction.clone();
518
519 let future = async move {
520 if delay_ms > 0 {
521 sleep(Duration::from_millis(delay_ms)).await;
523 }
524 self.execute_transaction_impl(
525 request,
526 verified_transaction,
527 client_addr,
528 Some(finality_timeout),
529 )
530 .await
531 }
532 .boxed();
533 execution_futures.push(future);
534 }
535
536 let mut last_execution_error: Option<QuorumDriverError> = None;
538
539 let digests = [tx_digest];
541 let mut local_effects_future = epoch_store
542 .within_alive_epoch(
543 self.validator_state
544 .get_transaction_cache_reader()
545 .notify_read_executed_effects(
546 "TransactionOrchestrator::notify_read_execute_transaction_with_effects_waiting",
547 &digests,
548 ),
549 )
550 .boxed();
551
552 let mut timeout_future = tokio::time::sleep(finality_timeout).boxed();
554
555 loop {
556 tokio::select! {
557 biased;
558
559 local_effects_result = &mut local_effects_future => {
561 match local_effects_result {
562 Ok(effects) => {
563 debug!(
564 "Effects became available while execution was running"
565 );
566 if let Some(effects) = effects.into_iter().next() {
567 self.metrics.concurrent_execution.inc();
568 let epoch = effects.executed_epoch();
569 let events = if include_events {
570 if effects.events_digest().is_some() {
571 Some(self.validator_state.get_transaction_events(effects.transaction_digest())
572 .map_err(QuorumDriverError::QuorumDriverInternalError)?)
573 } else {
574 None
575 }
576 } else {
577 None
578 };
579 let input_objects = include_input_objects
580 .then(|| self.validator_state.get_transaction_input_objects(&effects))
581 .transpose()
582 .map_err(QuorumDriverError::QuorumDriverInternalError)?;
583 let output_objects = include_output_objects
584 .then(|| self.validator_state.get_transaction_output_objects(&effects))
585 .transpose()
586 .map_err(QuorumDriverError::QuorumDriverInternalError)?;
587 let response = QuorumTransactionResponse {
588 effects: FinalizedEffects {
589 effects,
590 finality_info: EffectsFinalityInfo::QuorumExecuted(epoch),
591 },
592 events,
593 input_objects,
594 output_objects,
595 auxiliary_data: None,
596 };
597 break Ok((response, true));
598 }
599 }
600 Err(_) => {
601 warn!("Epoch terminated before effects were available");
602 }
603 };
604
605 local_effects_future = futures::future::pending().boxed();
607 }
608
609 Some(result) = execution_futures.next() => {
611 match result {
612 Ok(resp) => {
613 debug!("Execution succeeded, returning response");
615 let QuorumTransactionResponse {
616 effects,
617 events,
618 input_objects,
619 output_objects,
620 auxiliary_data,
621 } = resp;
622 let resp = QuorumTransactionResponse {
624 effects,
625 events: if include_events { events } else { None },
626 input_objects: if include_input_objects { input_objects } else { None },
627 output_objects: if include_output_objects { output_objects } else { None },
628 auxiliary_data: if include_auxiliary_data { auxiliary_data } else { None },
629 };
630 break Ok((resp, false));
631 }
632 Err(e) => {
633 debug!(?e, "Execution attempt failed, wait for other attempts");
634 last_execution_error = Some(e);
635 }
636 };
637
638 if execution_futures.is_empty() {
640 break Err(last_execution_error.unwrap());
641 }
642 }
643
644 _ = &mut timeout_future => {
646 if let Some(e) = last_execution_error {
647 debug!("Timeout waiting for transaction finality. Last execution error: {e}");
648 } else {
649 debug!("Timeout waiting for transaction finality.");
650 }
651 self.metrics.wait_for_finality_timeout.inc();
652
653 break Err(QuorumDriverError::TimeoutBeforeFinality);
655 }
656 }
657 }
658 }
659
660 #[instrument(level = "error", skip_all)]
661 async fn execute_transaction_impl(
662 &self,
663 request: ExecuteTransactionRequestV3,
664 verified_transaction: VerifiedTransaction,
665 client_addr: Option<SocketAddr>,
666 finality_timeout: Option<Duration>,
667 ) -> Result<QuorumTransactionResponse, QuorumDriverError> {
668 debug!("TO Received transaction execution request.");
669
670 let timer = Instant::now();
671 let tx_type = if verified_transaction.is_consensus_tx() {
672 TxType::SharedObject
673 } else {
674 TxType::SingleWriter
675 };
676
677 let (_in_flight_metrics_guards, good_response_metrics) =
678 self.update_metrics(&request.transaction);
679
680 let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
682 wait_for_finality_gauge.inc();
683 let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
684 in_flight.dec();
685 });
686
687 let response = self
688 .submit_with_transaction_driver(
689 &self.transaction_driver,
690 &request,
691 client_addr,
692 &verified_transaction,
693 good_response_metrics,
694 finality_timeout,
695 )
696 .await?;
697 let driver_type = "transaction_driver";
698
699 add_server_timing("wait_for_finality done");
700
701 self.metrics.wait_for_finality_finished.inc();
702
703 let elapsed = timer.elapsed().as_secs_f64();
704 self.metrics
705 .settlement_finality_latency
706 .with_label_values(&[tx_type.as_str(), driver_type])
707 .observe(elapsed);
708 good_response_metrics.inc();
709
710 Ok(response)
711 }
712
713 #[instrument(level = "error", skip_all, err(level = "info"))]
714 async fn submit_with_transaction_driver(
715 &self,
716 td: &Arc<TransactionDriver<A>>,
717 request: &ExecuteTransactionRequestV3,
718 client_addr: Option<SocketAddr>,
719 verified_transaction: &VerifiedTransaction,
720 good_response_metrics: &GenericCounter<AtomicU64>,
721 timeout_duration: Option<Duration>,
722 ) -> Result<QuorumTransactionResponse, QuorumDriverError> {
723 let tx_digest = *verified_transaction.digest();
724 debug!("Using TransactionDriver for transaction {:?}", tx_digest);
725
726 let td_response = td
727 .drive_transaction(
728 SubmitTxRequest::new_transaction(request.transaction.clone()),
729 SubmitTransactionOptions {
730 forwarded_client_addr: client_addr,
731 allowed_validators: self.td_allowed_submission_list.clone(),
732 blocked_validators: self.td_blocked_submission_list.clone(),
733 },
734 timeout_duration,
735 )
736 .await
737 .map_err(|e| match e {
738 TransactionDriverError::TimeoutWithLastRetriableError {
739 last_error,
740 attempts,
741 timeout,
742 } => QuorumDriverError::TimeoutBeforeFinalityWithErrors {
743 last_error: last_error.map(|e| e.to_string()).unwrap_or_default(),
744 attempts,
745 timeout,
746 },
747 other => QuorumDriverError::TransactionFailed {
748 category: other.categorize(),
749 details: other.to_string(),
750 },
751 });
752
753 match td_response {
754 Err(e) => {
755 warn!("TransactionDriver error: {e:?}");
756 Err(e)
757 }
758 Ok(quorum_transaction_response) => {
759 good_response_metrics.inc();
760 Ok(quorum_transaction_response)
761 }
762 }
763 }
764
765 #[instrument(
766 name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout",
767 level = "debug",
768 skip_all,
769 err(level = "info")
770 )]
771 async fn wait_for_finalized_tx_executed_locally_with_timeout(
772 validator_state: &Arc<AuthorityState>,
773 tx_digest: TransactionDigest,
774 tx_type: TxType,
775 metrics: &TransactionOrchestratorMetrics,
776 ) -> SuiResult {
777 metrics.local_execution_in_flight.inc();
778 let _metrics_guard =
779 scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
780 in_flight.dec();
781 });
782
783 let _latency_guard = metrics
784 .local_execution_latency
785 .with_label_values(&[tx_type.as_str()])
786 .start_timer();
787 debug!("Waiting for finalized tx to be executed locally.");
788 match timeout(
789 LOCAL_EXECUTION_TIMEOUT,
790 validator_state
791 .get_transaction_cache_reader()
792 .notify_read_executed_effects_digests(
793 "TransactionOrchestrator::notify_read_wait_for_local_execution",
794 &[tx_digest],
795 ),
796 )
797 .instrument(error_span!(
798 "transaction_orchestrator::local_execution",
799 ?tx_digest
800 ))
801 .await
802 {
803 Err(_elapsed) => {
804 debug!(
805 "Waiting for finalized tx to be executed locally timed out within {:?}.",
806 LOCAL_EXECUTION_TIMEOUT
807 );
808 metrics.local_execution_timeout.inc();
809 Err(SuiErrorKind::TimeoutError.into())
810 }
811 Ok(_) => {
812 metrics.local_execution_success.inc();
813 Ok(())
814 }
815 }
816 }
817
818 fn update_metrics<'a>(
819 &'a self,
820 transaction: &Transaction,
821 ) -> (impl Drop + use<A>, &'a GenericCounter<AtomicU64>) {
822 let (in_flight, good_response) = if transaction.is_consensus_tx() {
823 self.metrics.total_req_received_shared_object.inc();
824 (
825 self.metrics.req_in_flight_shared_object.clone(),
826 &self.metrics.good_response_shared_object,
827 )
828 } else {
829 self.metrics.total_req_received_single_writer.inc();
830 (
831 self.metrics.req_in_flight_single_writer.clone(),
832 &self.metrics.good_response_single_writer,
833 )
834 };
835 in_flight.inc();
836 (
837 scopeguard::guard(in_flight, |in_flight| {
838 in_flight.dec();
839 }),
840 good_response,
841 )
842 }
843
844 fn start_task_to_recover_txes_in_log(
845 pending_tx_log: Arc<WritePathPendingTransactionLog>,
846 transaction_driver: Arc<TransactionDriver<A>>,
847 ) {
848 spawn_logged_monitored_task!(async move {
849 if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
850 info!("Skipping loading pending transactions from pending_tx_log.");
851 return;
852 }
853 let pending_txes = pending_tx_log
854 .load_all_pending_transactions()
855 .expect("failed to load all pending transactions");
856 let num_pending_txes = pending_txes.len();
857 info!(
858 "Recovering {} pending transactions from pending_tx_log.",
859 num_pending_txes
860 );
861 let mut recovery = pending_txes
862 .into_iter()
863 .map(|tx| {
864 let pending_tx_log = pending_tx_log.clone();
865 let transaction_driver = transaction_driver.clone();
866 async move {
867 let tx = tx.into_inner();
870 let tx_digest = *tx.digest();
871 if let Err(err) = transaction_driver
874 .drive_transaction(
875 SubmitTxRequest::new_transaction(tx),
876 SubmitTransactionOptions::default(),
877 Some(Duration::from_secs(60)),
878 )
879 .await
880 {
881 warn!(?tx_digest, "Failed to execute recovered transaction: {err}");
882 } else {
883 debug!(?tx_digest, "Executed recovered transaction");
884 }
885 if let Err(err) = pending_tx_log.finish_transaction(&tx_digest) {
886 warn!(
887 ?tx_digest,
888 "Failed to clean up transaction in pending log: {err}"
889 );
890 } else {
891 debug!(?tx_digest, "Cleaned up transaction in pending log");
892 }
893 }
894 })
895 .collect::<FuturesUnordered<_>>();
896
897 let mut num_recovered = 0;
898 while recovery.next().await.is_some() {
899 num_recovered += 1;
900 if num_recovered % 1000 == 0 {
901 info!(
902 "Recovered {} out of {} transactions from pending_tx_log.",
903 num_recovered, num_pending_txes
904 );
905 }
906 }
907 info!(
908 "Recovery finished. Recovered {} out of {} transactions from pending_tx_log.",
909 num_recovered, num_pending_txes
910 );
911 });
912 }
913}
914#[derive(Clone)]
916pub struct TransactionOrchestratorMetrics {
917 total_req_received_single_writer: GenericCounter<AtomicU64>,
918 total_req_received_shared_object: GenericCounter<AtomicU64>,
919
920 good_response_single_writer: GenericCounter<AtomicU64>,
921 good_response_shared_object: GenericCounter<AtomicU64>,
922
923 req_in_flight_single_writer: GenericGauge<AtomicI64>,
924 req_in_flight_shared_object: GenericGauge<AtomicI64>,
925
926 wait_for_finality_in_flight: GenericGauge<AtomicI64>,
927 wait_for_finality_finished: GenericCounter<AtomicU64>,
928 wait_for_finality_timeout: GenericCounter<AtomicU64>,
929
930 local_execution_in_flight: GenericGauge<AtomicI64>,
931 local_execution_success: GenericCounter<AtomicU64>,
932 local_execution_timeout: GenericCounter<AtomicU64>,
933
934 concurrent_execution: IntCounter,
935
936 early_validation_rejections: IntCounterVec,
937
938 background_retry_started: IntGauge,
939 background_retry_attempts: IntCounterVec,
940
941 request_latency: HistogramVec,
942 local_execution_latency: HistogramVec,
943 settlement_finality_latency: HistogramVec,
944}
945
946impl TransactionOrchestratorMetrics {
950 pub fn new(registry: &Registry) -> Self {
951 let total_req_received = register_int_counter_vec_with_registry!(
952 "tx_orchestrator_total_req_received",
953 "Total number of executions request Transaction Orchestrator receives, group by tx type",
954 &["tx_type"],
955 registry
956 )
957 .unwrap();
958
959 let total_req_received_single_writer =
960 total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
961 let total_req_received_shared_object =
962 total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
963
964 let good_response = register_int_counter_vec_with_registry!(
965 "tx_orchestrator_good_response",
966 "Total number of good responses Transaction Orchestrator generates, group by tx type",
967 &["tx_type"],
968 registry
969 )
970 .unwrap();
971
972 let good_response_single_writer =
973 good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
974 let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
975
976 let req_in_flight = register_int_gauge_vec_with_registry!(
977 "tx_orchestrator_req_in_flight",
978 "Number of requests in flights Transaction Orchestrator processes, group by tx type",
979 &["tx_type"],
980 registry
981 )
982 .unwrap();
983
984 let req_in_flight_single_writer =
985 req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
986 let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
987
988 Self {
989 total_req_received_single_writer,
990 total_req_received_shared_object,
991 good_response_single_writer,
992 good_response_shared_object,
993 req_in_flight_single_writer,
994 req_in_flight_shared_object,
995 wait_for_finality_in_flight: register_int_gauge_with_registry!(
996 "tx_orchestrator_wait_for_finality_in_flight",
997 "Number of in flight txns Transaction Orchestrator are waiting for finality for",
998 registry,
999 )
1000 .unwrap(),
1001 wait_for_finality_finished: register_int_counter_with_registry!(
1002 "tx_orchestrator_wait_for_finality_fnished",
1003 "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
1004 registry,
1005 )
1006 .unwrap(),
1007 wait_for_finality_timeout: register_int_counter_with_registry!(
1008 "tx_orchestrator_wait_for_finality_timeout",
1009 "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
1010 registry,
1011 )
1012 .unwrap(),
1013 local_execution_in_flight: register_int_gauge_with_registry!(
1014 "tx_orchestrator_local_execution_in_flight",
1015 "Number of local execution txns in flights Transaction Orchestrator handles",
1016 registry,
1017 )
1018 .unwrap(),
1019 local_execution_success: register_int_counter_with_registry!(
1020 "tx_orchestrator_local_execution_success",
1021 "Total number of successful local execution txns Transaction Orchestrator handles",
1022 registry,
1023 )
1024 .unwrap(),
1025 local_execution_timeout: register_int_counter_with_registry!(
1026 "tx_orchestrator_local_execution_timeout",
1027 "Total number of timed-out local execution txns Transaction Orchestrator handles",
1028 registry,
1029 )
1030 .unwrap(),
1031 concurrent_execution: register_int_counter_with_registry!(
1032 "tx_orchestrator_concurrent_execution",
1033 "Total number of concurrent execution where effects are available locally finishing driving the transaction to finality",
1034 registry,
1035 )
1036 .unwrap(),
1037 early_validation_rejections: register_int_counter_vec_with_registry!(
1038 "tx_orchestrator_early_validation_rejections",
1039 "Total number of transactions rejected during early validation before submission, by reason",
1040 &["reason"],
1041 registry,
1042 )
1043 .unwrap(),
1044 background_retry_started: register_int_gauge_with_registry!(
1045 "tx_orchestrator_background_retry_started",
1046 "Number of background retry tasks kicked off for transactions with retriable errors",
1047 registry,
1048 )
1049 .unwrap(),
1050 background_retry_attempts: register_int_counter_vec_with_registry!(
1051 "tx_orchestrator_background_retry_attempts",
1052 "Total number of background retry attempts, by status",
1053 &["status"],
1054 registry,
1055 )
1056 .unwrap(),
1057 request_latency: register_histogram_vec_with_registry!(
1058 "tx_orchestrator_request_latency",
1059 "Time spent in processing one Transaction Orchestrator request",
1060 &["tx_type", "route", "wait_for_local_execution"],
1061 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1062 registry,
1063 )
1064 .unwrap(),
1065 local_execution_latency: register_histogram_vec_with_registry!(
1066 "tx_orchestrator_local_execution_latency",
1067 "Time spent in waiting for one Transaction Orchestrator gets locally executed",
1068 &["tx_type"],
1069 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1070 registry,
1071 )
1072 .unwrap(),
1073 settlement_finality_latency: register_histogram_vec_with_registry!(
1074 "tx_orchestrator_settlement_finality_latency",
1075 "Time spent in waiting for one Transaction Orchestrator gets settled and finalized",
1076 &["tx_type", "driver_type"],
1077 mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
1078 registry,
1079 )
1080 .unwrap(),
1081 }
1082 }
1083
1084 pub fn new_for_tests() -> Self {
1085 let registry = Registry::new();
1086 Self::new(®istry)
1087 }
1088}
1089
1090#[async_trait::async_trait]
1091impl<A> sui_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
1092where
1093 A: AuthorityAPI + Send + Sync + 'static + Clone,
1094{
1095 async fn execute_transaction(
1096 &self,
1097 request: ExecuteTransactionRequestV3,
1098 client_addr: Option<std::net::SocketAddr>,
1099 ) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
1100 self.execute_transaction_v3(request, client_addr).await
1101 }
1102
1103 fn simulate_transaction(
1104 &self,
1105 transaction: TransactionData,
1106 checks: TransactionChecks,
1107 ) -> Result<SimulateTransactionResult, SuiError> {
1108 self.inner
1109 .validator_state
1110 .simulate_transaction(transaction, checks)
1111 }
1112}
1113
1114struct TransactionSubmissionGuard {
1117 pending_tx_log: Arc<WritePathPendingTransactionLog>,
1118 tx_digest: TransactionDigest,
1119 is_new_transaction: bool,
1120}
1121
1122impl TransactionSubmissionGuard {
1123 pub fn new(
1124 pending_tx_log: Arc<WritePathPendingTransactionLog>,
1125 tx: &VerifiedTransaction,
1126 ) -> Self {
1127 let is_new_transaction = pending_tx_log.write_pending_transaction_maybe(tx);
1128 let tx_digest = *tx.digest();
1129 if is_new_transaction {
1130 debug!(?tx_digest, "Added transaction to inflight set");
1131 } else {
1132 debug!(
1133 ?tx_digest,
1134 "Transaction already being processed, no new submission will be made"
1135 );
1136 };
1137 Self {
1138 pending_tx_log,
1139 tx_digest,
1140 is_new_transaction,
1141 }
1142 }
1143
1144 fn is_new_transaction(&self) -> bool {
1145 self.is_new_transaction
1146 }
1147}
1148
1149impl Drop for TransactionSubmissionGuard {
1150 fn drop(&mut self) {
1151 if let Err(err) = self.pending_tx_log.finish_transaction(&self.tx_digest) {
1152 warn!(?self.tx_digest, "Failed to clean up transaction in pending log: {err}");
1153 } else {
1154 debug!(?self.tx_digest, "Cleaned up transaction in pending log");
1155 }
1156 }
1157}