1use std::net::SocketAddr;
5use std::ops::Deref;
6use std::path::Path;
7use std::sync::Arc;
8use std::time::Duration;
9
10use futures::FutureExt;
11use futures::stream::{FuturesUnordered, StreamExt};
12use mysten_common::{backoff, in_antithesis};
13use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, spawn_monitored_task};
14use mysten_metrics::{add_server_timing, spawn_logged_monitored_task};
15use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
16use prometheus::{
17 HistogramVec, IntCounter, IntCounterVec, IntGauge, Registry,
18 register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
19 register_int_counter_with_registry, register_int_gauge_vec_with_registry,
20 register_int_gauge_with_registry,
21};
22use rand::Rng;
23use sui_config::NodeConfig;
24use sui_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
25use sui_types::base_types::TransactionDigest;
26use sui_types::effects::TransactionEffectsAPI;
27use sui_types::error::{ErrorCategory, SuiError, SuiErrorKind, SuiResult};
28use sui_types::messages_grpc::{SubmitTxRequest, TxType};
29use sui_types::quorum_driver_types::{
30 EffectsFinalityInfo, ExecuteTransactionRequestType, ExecuteTransactionRequestV3,
31 ExecuteTransactionResponseV3, FinalizedEffects, IsTransactionExecutedLocally,
32 QuorumDriverError,
33};
34use sui_types::sui_system_state::SuiSystemState;
35use sui_types::transaction::{Transaction, TransactionData, VerifiedTransaction};
36use sui_types::transaction_executor::{SimulateTransactionResult, TransactionChecks};
37use tokio::sync::broadcast::Receiver;
38use tokio::time::{Instant, sleep, timeout};
39use tracing::{Instrument, debug, error_span, info, instrument, warn};
40
41use crate::authority::AuthorityState;
42use crate::authority_aggregator::AuthorityAggregator;
43use crate::authority_client::{AuthorityAPI, NetworkAuthorityClient};
44use crate::quorum_driver::reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver};
45use crate::transaction_driver::{
46 QuorumTransactionResponse, SubmitTransactionOptions, TransactionDriver, TransactionDriverError,
47 TransactionDriverMetrics,
48};
49
50const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
53
54const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(90);
56
57pub type QuorumTransactionEffectsResult =
58 Result<(Transaction, QuorumTransactionResponse), (TransactionDigest, QuorumDriverError)>;
59
60pub struct TransactionOrchestrator<A: Clone> {
66 inner: Arc<Inner<A>>,
67}
68
69impl TransactionOrchestrator<NetworkAuthorityClient> {
70 pub fn new_with_auth_aggregator(
71 validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
72 validator_state: Arc<AuthorityState>,
73 reconfig_channel: Receiver<SuiSystemState>,
74 parent_path: &Path,
75 prometheus_registry: &Registry,
76 node_config: &NodeConfig,
77 ) -> Self {
78 let observer = OnsiteReconfigObserver::new(
79 reconfig_channel,
80 validator_state.get_object_cache_reader().clone(),
81 validator_state.clone_committee_store(),
82 validators.safe_client_metrics_base.clone(),
83 validators.metrics.deref().clone(),
84 );
85 TransactionOrchestrator::new(
86 validators,
87 validator_state,
88 parent_path,
89 prometheus_registry,
90 observer,
91 node_config,
92 )
93 }
94}
95
96impl<A> TransactionOrchestrator<A>
97where
98 A: AuthorityAPI + Send + Sync + 'static + Clone,
99 OnsiteReconfigObserver: ReconfigObserver<A>,
100{
101 pub fn new(
102 validators: Arc<AuthorityAggregator<A>>,
103 validator_state: Arc<AuthorityState>,
104 parent_path: &Path,
105 prometheus_registry: &Registry,
106 reconfig_observer: OnsiteReconfigObserver,
107 node_config: &NodeConfig,
108 ) -> Self {
109 let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
110 let td_metrics = Arc::new(TransactionDriverMetrics::new(prometheus_registry));
111 let client_metrics = Arc::new(
112 crate::validator_client_monitor::ValidatorClientMetrics::new(prometheus_registry),
113 );
114 let reconfig_observer = Arc::new(reconfig_observer);
115
116 let transaction_driver = TransactionDriver::new(
117 validators.clone(),
118 reconfig_observer.clone(),
119 td_metrics,
120 Some(node_config),
121 client_metrics,
122 );
123
124 let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
125 parent_path.join("fullnode_pending_transactions"),
126 ));
127 Inner::start_task_to_recover_txes_in_log(
128 pending_tx_log.clone(),
129 transaction_driver.clone(),
130 );
131
132 let td_allowed_submission_list = node_config
133 .transaction_driver_config
134 .as_ref()
135 .map(|config| config.allowed_submission_validators.clone())
136 .unwrap_or_default();
137
138 let td_blocked_submission_list = node_config
139 .transaction_driver_config
140 .as_ref()
141 .map(|config| config.blocked_submission_validators.clone())
142 .unwrap_or_default();
143
144 if !td_allowed_submission_list.is_empty() && !td_blocked_submission_list.is_empty() {
145 panic!(
146 "Both allowed and blocked submission lists are set, this is not allowed, {:?} {:?}",
147 td_allowed_submission_list, td_blocked_submission_list
148 );
149 }
150
151 let enable_early_validation = node_config
152 .transaction_driver_config
153 .as_ref()
154 .map(|config| config.enable_early_validation)
155 .unwrap_or(true);
156
157 let inner = Arc::new(Inner {
158 validator_state,
159 pending_tx_log,
160 metrics,
161 transaction_driver,
162 td_allowed_submission_list,
163 td_blocked_submission_list,
164 enable_early_validation,
165 });
166 Self { inner }
167 }
168}
169
170impl<A> TransactionOrchestrator<A>
171where
172 A: AuthorityAPI + Send + Sync + 'static + Clone,
173{
174 #[instrument(name = "tx_orchestrator_execute_transaction", level = "debug", skip_all,
175 fields(
176 tx_digest = ?request.transaction.digest(),
177 tx_type = ?request_type,
178 ))]
179 pub async fn execute_transaction_block(
180 &self,
181 request: ExecuteTransactionRequestV3,
182 request_type: ExecuteTransactionRequestType,
183 client_addr: Option<SocketAddr>,
184 ) -> Result<(ExecuteTransactionResponseV3, IsTransactionExecutedLocally), QuorumDriverError>
185 {
186 let timer = Instant::now();
187 let tx_type = if request.transaction.is_consensus_tx() {
188 TxType::SharedObject
189 } else {
190 TxType::SingleWriter
191 };
192 let tx_digest = *request.transaction.digest();
193
194 let inner = self.inner.clone();
195 let (response, mut executed_locally) = spawn_monitored_task!(
196 Inner::<A>::execute_transaction_with_retry(inner, request, client_addr)
197 )
198 .await
199 .map_err(|e| QuorumDriverError::TransactionFailed {
200 category: ErrorCategory::Internal,
201 details: e.to_string(),
202 })??;
203
204 if !executed_locally {
205 executed_locally = if matches!(
206 request_type,
207 ExecuteTransactionRequestType::WaitForLocalExecution
208 ) {
209 let executed_locally =
210 Inner::<A>::wait_for_finalized_tx_executed_locally_with_timeout(
211 &self.inner.validator_state,
212 tx_digest,
213 tx_type,
214 &self.inner.metrics,
215 )
216 .await
217 .is_ok();
218 add_server_timing("local_execution done");
219 executed_locally
220 } else {
221 false
222 };
223 }
224
225 let QuorumTransactionResponse {
226 effects,
227 events,
228 input_objects,
229 output_objects,
230 auxiliary_data,
231 } = response;
232
233 let response = ExecuteTransactionResponseV3 {
234 effects,
235 events,
236 input_objects,
237 output_objects,
238 auxiliary_data,
239 };
240
241 self.inner
242 .metrics
243 .request_latency
244 .with_label_values(&[
245 tx_type.as_str(),
246 "execute_transaction_block",
247 executed_locally.to_string().as_str(),
248 ])
249 .observe(timer.elapsed().as_secs_f64());
250
251 Ok((response, executed_locally))
252 }
253
254 #[instrument(name = "tx_orchestrator_execute_transaction_v3", level = "debug", skip_all,
256 fields(tx_digest = ?request.transaction.digest()))]
257 pub async fn execute_transaction_v3(
258 &self,
259 request: ExecuteTransactionRequestV3,
260 client_addr: Option<SocketAddr>,
261 ) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
262 let timer = Instant::now();
263 let tx_type = if request.transaction.is_consensus_tx() {
264 TxType::SharedObject
265 } else {
266 TxType::SingleWriter
267 };
268
269 let inner = self.inner.clone();
270 let (response, _) = spawn_monitored_task!(Inner::<A>::execute_transaction_with_retry(
271 inner,
272 request,
273 client_addr
274 ))
275 .await
276 .map_err(|e| QuorumDriverError::TransactionFailed {
277 category: ErrorCategory::Internal,
278 details: e.to_string(),
279 })??;
280
281 self.inner
282 .metrics
283 .request_latency
284 .with_label_values(&[tx_type.as_str(), "execute_transaction_v3", "false"])
285 .observe(timer.elapsed().as_secs_f64());
286
287 let QuorumTransactionResponse {
288 effects,
289 events,
290 input_objects,
291 output_objects,
292 auxiliary_data,
293 } = response;
294
295 Ok(ExecuteTransactionResponseV3 {
296 effects,
297 events,
298 input_objects,
299 output_objects,
300 auxiliary_data,
301 })
302 }
303
304 pub fn authority_state(&self) -> &Arc<AuthorityState> {
305 &self.inner.validator_state
306 }
307
308 pub fn transaction_driver(&self) -> &Arc<TransactionDriver<A>> {
309 &self.inner.transaction_driver
310 }
311
312 pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
313 self.inner
314 .transaction_driver
315 .authority_aggregator()
316 .load_full()
317 }
318
319 pub fn load_all_pending_transactions_in_test(&self) -> SuiResult<Vec<VerifiedTransaction>> {
320 self.inner.pending_tx_log.load_all_pending_transactions()
321 }
322
323 pub fn empty_pending_tx_log_in_test(&self) -> bool {
324 self.inner.pending_tx_log.is_empty()
325 }
326}
327
328struct Inner<A: Clone> {
329 validator_state: Arc<AuthorityState>,
330 pending_tx_log: Arc<WritePathPendingTransactionLog>,
331 metrics: Arc<TransactionOrchestratorMetrics>,
332 transaction_driver: Arc<TransactionDriver<A>>,
333 td_allowed_submission_list: Vec<String>,
334 td_blocked_submission_list: Vec<String>,
335 enable_early_validation: bool,
336}
337
338impl<A> Inner<A>
339where
340 A: AuthorityAPI + Send + Sync + 'static + Clone,
341{
342 async fn execute_transaction_with_retry(
343 inner: Arc<Inner<A>>,
344 request: ExecuteTransactionRequestV3,
345 client_addr: Option<SocketAddr>,
346 ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), QuorumDriverError> {
347 let result = inner
348 .execute_transaction_with_effects_waiting(
349 request.clone(),
350 client_addr,
351 false,
352 )
353 .await;
354
355 if let Err(e) = &result
357 && e.is_retriable()
358 {
359 spawn_monitored_task!(async move {
360 inner.metrics.background_retry_started.inc();
361 let backoff = backoff::ExponentialBackoff::new(
362 Duration::from_secs(1),
363 Duration::from_secs(300),
364 );
365 const MAX_RETRIES: usize = 10;
366 for (i, delay) in backoff.enumerate() {
367 if i == MAX_RETRIES {
368 break;
369 }
370 let result = inner
373 .execute_transaction_with_effects_waiting(
374 request.clone(),
375 client_addr,
376 i > 3,
377 )
378 .await;
379 match result {
380 Ok(_) => break,
381 Err(e) => {
382 if !e.is_retriable() {
383 break;
384 }
385 inner.metrics.background_retry_errors.inc();
386 if i % 100 == 0 {
387 debug!(
388 "Background retry {i} for transaction {}: {e:?}",
389 request.transaction.digest()
390 );
391 }
392 }
393 };
394 sleep(delay).await;
395 }
396 });
397 }
398
399 result
400 }
401
402 async fn execute_transaction_with_effects_waiting(
404 &self,
405 request: ExecuteTransactionRequestV3,
406 client_addr: Option<SocketAddr>,
407 enforce_live_input_objects: bool,
408 ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), QuorumDriverError> {
409 let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
410 let verified_transaction = epoch_store
411 .verify_transaction_with_current_aliases(request.transaction.clone())
412 .map_err(QuorumDriverError::InvalidUserSignature)?
413 .into_tx();
414 let tx_digest = *verified_transaction.digest();
415
416 if self.enable_early_validation
419 && let Err(e) = self.validator_state.check_transaction_validity(
420 &epoch_store,
421 &verified_transaction,
422 enforce_live_input_objects,
423 )
424 {
425 let error_category = e.categorize();
426 if !error_category.is_submission_retriable() {
427 if !self.validator_state.is_tx_already_executed(&tx_digest) {
429 self.metrics
430 .early_validation_rejections
431 .with_label_values(&[e.to_variant_name()])
432 .inc();
433 debug!(
434 error = ?e,
435 "Transaction rejected during early validation"
436 );
437
438 return Err(QuorumDriverError::TransactionFailed {
439 category: error_category,
440 details: e.to_string(),
441 });
442 }
443 }
444 }
445
446 let guard =
448 TransactionSubmissionGuard::new(self.pending_tx_log.clone(), &verified_transaction);
449 let is_new_transaction = guard.is_new_transaction();
450
451 let include_events = request.include_events;
452 let include_input_objects = request.include_input_objects;
453 let include_output_objects = request.include_output_objects;
454 let include_auxiliary_data = request.include_auxiliary_data;
455
456 let finality_timeout = std::env::var("WAIT_FOR_FINALITY_TIMEOUT_SECS")
457 .ok()
458 .and_then(|v| v.parse().ok())
459 .map(Duration::from_secs)
460 .unwrap_or(WAIT_FOR_FINALITY_TIMEOUT);
461
462 let num_submissions = if !is_new_transaction {
463 0
465 } else if cfg!(msim) || in_antithesis() {
466 let r = rand::thread_rng().gen_range(1..=100);
468 let n = if r <= 10 {
469 3
470 } else if r <= 30 {
471 2
472 } else {
473 1
474 };
475 if n > 1 {
476 debug!("Making {n} execution calls");
477 }
478 n
479 } else {
480 1
481 };
482
483 let mut execution_futures = FuturesUnordered::new();
485 for i in 0..num_submissions {
486 let should_delay = i > 0 && rand::thread_rng().gen_bool(0.8);
488 let delay_ms = if should_delay {
489 rand::thread_rng().gen_range(100..=500)
490 } else {
491 0
492 };
493
494 let request = request.clone();
495 let verified_transaction = verified_transaction.clone();
496
497 let future = async move {
498 if delay_ms > 0 {
499 sleep(Duration::from_millis(delay_ms)).await;
501 }
502 self.execute_transaction_impl(
503 request,
504 verified_transaction,
505 client_addr,
506 Some(finality_timeout),
507 )
508 .await
509 }
510 .boxed();
511 execution_futures.push(future);
512 }
513
514 let mut last_execution_error: Option<QuorumDriverError> = None;
516
517 let digests = [tx_digest];
519 let mut local_effects_future = epoch_store
520 .within_alive_epoch(
521 self.validator_state
522 .get_transaction_cache_reader()
523 .notify_read_executed_effects(
524 "TransactionOrchestrator::notify_read_execute_transaction_with_effects_waiting",
525 &digests,
526 ),
527 )
528 .boxed();
529
530 let mut timeout_future = tokio::time::sleep(finality_timeout).boxed();
532
533 loop {
534 tokio::select! {
535 biased;
536
537 local_effects_result = &mut local_effects_future => {
539 match local_effects_result {
540 Ok(effects) => {
541 debug!(
542 "Effects became available while execution was running"
543 );
544 if let Some(effects) = effects.into_iter().next() {
545 self.metrics.concurrent_execution.inc();
546 let epoch = effects.executed_epoch();
547 let events = if include_events {
548 if effects.events_digest().is_some() {
549 Some(self.validator_state.get_transaction_events(effects.transaction_digest())
550 .map_err(QuorumDriverError::QuorumDriverInternalError)?)
551 } else {
552 None
553 }
554 } else {
555 None
556 };
557 let input_objects = include_input_objects
558 .then(|| self.validator_state.get_transaction_input_objects(&effects))
559 .transpose()
560 .map_err(QuorumDriverError::QuorumDriverInternalError)?;
561 let output_objects = include_output_objects
562 .then(|| self.validator_state.get_transaction_output_objects(&effects))
563 .transpose()
564 .map_err(QuorumDriverError::QuorumDriverInternalError)?;
565 let response = QuorumTransactionResponse {
566 effects: FinalizedEffects {
567 effects,
568 finality_info: EffectsFinalityInfo::QuorumExecuted(epoch),
569 },
570 events,
571 input_objects,
572 output_objects,
573 auxiliary_data: None,
574 };
575 break Ok((response, true));
576 }
577 }
578 Err(_) => {
579 warn!("Epoch terminated before effects were available");
580 }
581 };
582
583 local_effects_future = futures::future::pending().boxed();
585 }
586
587 Some(result) = execution_futures.next() => {
589 match result {
590 Ok(resp) => {
591 debug!("Execution succeeded, returning response");
593 let QuorumTransactionResponse {
594 effects,
595 events,
596 input_objects,
597 output_objects,
598 auxiliary_data,
599 } = resp;
600 let resp = QuorumTransactionResponse {
602 effects,
603 events: if include_events { events } else { None },
604 input_objects: if include_input_objects { input_objects } else { None },
605 output_objects: if include_output_objects { output_objects } else { None },
606 auxiliary_data: if include_auxiliary_data { auxiliary_data } else { None },
607 };
608 break Ok((resp, false));
609 }
610 Err(e) => {
611 debug!(?e, "Execution attempt failed, wait for other attempts");
612 last_execution_error = Some(e);
613 }
614 };
615
616 if execution_futures.is_empty() {
618 break Err(last_execution_error.unwrap());
619 }
620 }
621
622 _ = &mut timeout_future => {
624 if let Some(e) = last_execution_error {
625 debug!("Timeout waiting for transaction finality. Last execution error: {e}");
626 } else {
627 debug!("Timeout waiting for transaction finality.");
628 }
629 self.metrics.wait_for_finality_timeout.inc();
630
631 break Err(QuorumDriverError::TimeoutBeforeFinality);
633 }
634 }
635 }
636 }
637
638 #[instrument(level = "error", skip_all)]
639 async fn execute_transaction_impl(
640 &self,
641 request: ExecuteTransactionRequestV3,
642 verified_transaction: VerifiedTransaction,
643 client_addr: Option<SocketAddr>,
644 finality_timeout: Option<Duration>,
645 ) -> Result<QuorumTransactionResponse, QuorumDriverError> {
646 debug!("TO Received transaction execution request.");
647
648 let timer = Instant::now();
649 let tx_type = if verified_transaction.is_consensus_tx() {
650 TxType::SharedObject
651 } else {
652 TxType::SingleWriter
653 };
654
655 let (_in_flight_metrics_guards, good_response_metrics) =
656 self.update_metrics(&request.transaction);
657
658 let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
660 wait_for_finality_gauge.inc();
661 let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
662 in_flight.dec();
663 });
664
665 let response = self
666 .submit_with_transaction_driver(
667 &self.transaction_driver,
668 &request,
669 client_addr,
670 &verified_transaction,
671 good_response_metrics,
672 finality_timeout,
673 )
674 .await?;
675 let driver_type = "transaction_driver";
676
677 add_server_timing("wait_for_finality done");
678
679 self.metrics.wait_for_finality_finished.inc();
680
681 let elapsed = timer.elapsed().as_secs_f64();
682 self.metrics
683 .settlement_finality_latency
684 .with_label_values(&[tx_type.as_str(), driver_type])
685 .observe(elapsed);
686 good_response_metrics.inc();
687
688 Ok(response)
689 }
690
691 #[instrument(level = "error", skip_all, err(level = "info"))]
692 async fn submit_with_transaction_driver(
693 &self,
694 td: &Arc<TransactionDriver<A>>,
695 request: &ExecuteTransactionRequestV3,
696 client_addr: Option<SocketAddr>,
697 verified_transaction: &VerifiedTransaction,
698 good_response_metrics: &GenericCounter<AtomicU64>,
699 timeout_duration: Option<Duration>,
700 ) -> Result<QuorumTransactionResponse, QuorumDriverError> {
701 let tx_digest = *verified_transaction.digest();
702 debug!("Using TransactionDriver for transaction {:?}", tx_digest);
703
704 let td_response = td
705 .drive_transaction(
706 SubmitTxRequest::new_transaction(request.transaction.clone()),
707 SubmitTransactionOptions {
708 forwarded_client_addr: client_addr,
709 allowed_validators: self.td_allowed_submission_list.clone(),
710 blocked_validators: self.td_blocked_submission_list.clone(),
711 },
712 timeout_duration,
713 )
714 .await
715 .map_err(|e| match e {
716 TransactionDriverError::TimeoutWithLastRetriableError {
717 last_error,
718 attempts,
719 timeout,
720 } => QuorumDriverError::TimeoutBeforeFinalityWithErrors {
721 last_error: last_error.map(|e| e.to_string()).unwrap_or_default(),
722 attempts,
723 timeout,
724 },
725 other => QuorumDriverError::TransactionFailed {
726 category: other.categorize(),
727 details: other.to_string(),
728 },
729 });
730
731 match td_response {
732 Err(e) => {
733 warn!("TransactionDriver error: {e:?}");
734 Err(e)
735 }
736 Ok(quorum_transaction_response) => {
737 good_response_metrics.inc();
738 Ok(quorum_transaction_response)
739 }
740 }
741 }
742
743 #[instrument(
744 name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout",
745 level = "debug",
746 skip_all,
747 err(level = "info")
748 )]
749 async fn wait_for_finalized_tx_executed_locally_with_timeout(
750 validator_state: &Arc<AuthorityState>,
751 tx_digest: TransactionDigest,
752 tx_type: TxType,
753 metrics: &TransactionOrchestratorMetrics,
754 ) -> SuiResult {
755 metrics.local_execution_in_flight.inc();
756 let _metrics_guard =
757 scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
758 in_flight.dec();
759 });
760
761 let _latency_guard = metrics
762 .local_execution_latency
763 .with_label_values(&[tx_type.as_str()])
764 .start_timer();
765 debug!("Waiting for finalized tx to be executed locally.");
766 match timeout(
767 LOCAL_EXECUTION_TIMEOUT,
768 validator_state
769 .get_transaction_cache_reader()
770 .notify_read_executed_effects_digests(
771 "TransactionOrchestrator::notify_read_wait_for_local_execution",
772 &[tx_digest],
773 ),
774 )
775 .instrument(error_span!(
776 "transaction_orchestrator::local_execution",
777 ?tx_digest
778 ))
779 .await
780 {
781 Err(_elapsed) => {
782 debug!(
783 "Waiting for finalized tx to be executed locally timed out within {:?}.",
784 LOCAL_EXECUTION_TIMEOUT
785 );
786 metrics.local_execution_timeout.inc();
787 Err(SuiErrorKind::TimeoutError.into())
788 }
789 Ok(_) => {
790 metrics.local_execution_success.inc();
791 Ok(())
792 }
793 }
794 }
795
796 fn update_metrics<'a>(
797 &'a self,
798 transaction: &Transaction,
799 ) -> (impl Drop + use<A>, &'a GenericCounter<AtomicU64>) {
800 let (in_flight, good_response) = if transaction.is_consensus_tx() {
801 self.metrics.total_req_received_shared_object.inc();
802 (
803 self.metrics.req_in_flight_shared_object.clone(),
804 &self.metrics.good_response_shared_object,
805 )
806 } else {
807 self.metrics.total_req_received_single_writer.inc();
808 (
809 self.metrics.req_in_flight_single_writer.clone(),
810 &self.metrics.good_response_single_writer,
811 )
812 };
813 in_flight.inc();
814 (
815 scopeguard::guard(in_flight, |in_flight| {
816 in_flight.dec();
817 }),
818 good_response,
819 )
820 }
821
822 fn start_task_to_recover_txes_in_log(
823 pending_tx_log: Arc<WritePathPendingTransactionLog>,
824 transaction_driver: Arc<TransactionDriver<A>>,
825 ) {
826 spawn_logged_monitored_task!(async move {
827 if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
828 info!("Skipping loading pending transactions from pending_tx_log.");
829 return;
830 }
831 let pending_txes = pending_tx_log
832 .load_all_pending_transactions()
833 .expect("failed to load all pending transactions");
834 let num_pending_txes = pending_txes.len();
835 info!(
836 "Recovering {} pending transactions from pending_tx_log.",
837 num_pending_txes
838 );
839 let mut recovery = pending_txes
840 .into_iter()
841 .map(|tx| {
842 let pending_tx_log = pending_tx_log.clone();
843 let transaction_driver = transaction_driver.clone();
844 async move {
845 let tx = tx.into_inner();
848 let tx_digest = *tx.digest();
849 if let Err(err) = transaction_driver
852 .drive_transaction(
853 SubmitTxRequest::new_transaction(tx),
854 SubmitTransactionOptions::default(),
855 Some(Duration::from_secs(60)),
856 )
857 .await
858 {
859 warn!(?tx_digest, "Failed to execute recovered transaction: {err}");
860 } else {
861 debug!(?tx_digest, "Executed recovered transaction");
862 }
863 if let Err(err) = pending_tx_log.finish_transaction(&tx_digest) {
864 warn!(
865 ?tx_digest,
866 "Failed to clean up transaction in pending log: {err}"
867 );
868 } else {
869 debug!(?tx_digest, "Cleaned up transaction in pending log");
870 }
871 }
872 })
873 .collect::<FuturesUnordered<_>>();
874
875 let mut num_recovered = 0;
876 while recovery.next().await.is_some() {
877 num_recovered += 1;
878 if num_recovered % 1000 == 0 {
879 info!(
880 "Recovered {} out of {} transactions from pending_tx_log.",
881 num_recovered, num_pending_txes
882 );
883 }
884 }
885 info!(
886 "Recovery finished. Recovered {} out of {} transactions from pending_tx_log.",
887 num_recovered, num_pending_txes
888 );
889 });
890 }
891}
892#[derive(Clone)]
894pub struct TransactionOrchestratorMetrics {
895 total_req_received_single_writer: GenericCounter<AtomicU64>,
896 total_req_received_shared_object: GenericCounter<AtomicU64>,
897
898 good_response_single_writer: GenericCounter<AtomicU64>,
899 good_response_shared_object: GenericCounter<AtomicU64>,
900
901 req_in_flight_single_writer: GenericGauge<AtomicI64>,
902 req_in_flight_shared_object: GenericGauge<AtomicI64>,
903
904 wait_for_finality_in_flight: GenericGauge<AtomicI64>,
905 wait_for_finality_finished: GenericCounter<AtomicU64>,
906 wait_for_finality_timeout: GenericCounter<AtomicU64>,
907
908 local_execution_in_flight: GenericGauge<AtomicI64>,
909 local_execution_success: GenericCounter<AtomicU64>,
910 local_execution_timeout: GenericCounter<AtomicU64>,
911
912 concurrent_execution: IntCounter,
913
914 early_validation_rejections: IntCounterVec,
915
916 background_retry_started: IntGauge,
917 background_retry_errors: IntCounter,
918
919 request_latency: HistogramVec,
920 local_execution_latency: HistogramVec,
921 settlement_finality_latency: HistogramVec,
922}
923
924impl TransactionOrchestratorMetrics {
928 pub fn new(registry: &Registry) -> Self {
929 let total_req_received = register_int_counter_vec_with_registry!(
930 "tx_orchestrator_total_req_received",
931 "Total number of executions request Transaction Orchestrator receives, group by tx type",
932 &["tx_type"],
933 registry
934 )
935 .unwrap();
936
937 let total_req_received_single_writer =
938 total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
939 let total_req_received_shared_object =
940 total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
941
942 let good_response = register_int_counter_vec_with_registry!(
943 "tx_orchestrator_good_response",
944 "Total number of good responses Transaction Orchestrator generates, group by tx type",
945 &["tx_type"],
946 registry
947 )
948 .unwrap();
949
950 let good_response_single_writer =
951 good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
952 let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
953
954 let req_in_flight = register_int_gauge_vec_with_registry!(
955 "tx_orchestrator_req_in_flight",
956 "Number of requests in flights Transaction Orchestrator processes, group by tx type",
957 &["tx_type"],
958 registry
959 )
960 .unwrap();
961
962 let req_in_flight_single_writer =
963 req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
964 let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
965
966 Self {
967 total_req_received_single_writer,
968 total_req_received_shared_object,
969 good_response_single_writer,
970 good_response_shared_object,
971 req_in_flight_single_writer,
972 req_in_flight_shared_object,
973 wait_for_finality_in_flight: register_int_gauge_with_registry!(
974 "tx_orchestrator_wait_for_finality_in_flight",
975 "Number of in flight txns Transaction Orchestrator are waiting for finality for",
976 registry,
977 )
978 .unwrap(),
979 wait_for_finality_finished: register_int_counter_with_registry!(
980 "tx_orchestrator_wait_for_finality_fnished",
981 "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
982 registry,
983 )
984 .unwrap(),
985 wait_for_finality_timeout: register_int_counter_with_registry!(
986 "tx_orchestrator_wait_for_finality_timeout",
987 "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
988 registry,
989 )
990 .unwrap(),
991 local_execution_in_flight: register_int_gauge_with_registry!(
992 "tx_orchestrator_local_execution_in_flight",
993 "Number of local execution txns in flights Transaction Orchestrator handles",
994 registry,
995 )
996 .unwrap(),
997 local_execution_success: register_int_counter_with_registry!(
998 "tx_orchestrator_local_execution_success",
999 "Total number of successful local execution txns Transaction Orchestrator handles",
1000 registry,
1001 )
1002 .unwrap(),
1003 local_execution_timeout: register_int_counter_with_registry!(
1004 "tx_orchestrator_local_execution_timeout",
1005 "Total number of timed-out local execution txns Transaction Orchestrator handles",
1006 registry,
1007 )
1008 .unwrap(),
1009 concurrent_execution: register_int_counter_with_registry!(
1010 "tx_orchestrator_concurrent_execution",
1011 "Total number of concurrent execution where effects are available locally finishing driving the transaction to finality",
1012 registry,
1013 )
1014 .unwrap(),
1015 early_validation_rejections: register_int_counter_vec_with_registry!(
1016 "tx_orchestrator_early_validation_rejections",
1017 "Total number of transactions rejected during early validation before submission, by reason",
1018 &["reason"],
1019 registry,
1020 )
1021 .unwrap(),
1022 background_retry_started: register_int_gauge_with_registry!(
1023 "tx_orchestrator_background_retry_started",
1024 "Number of background retry tasks kicked off for transactions with retriable errors",
1025 registry,
1026 )
1027 .unwrap(),
1028 background_retry_errors: register_int_counter_with_registry!(
1029 "tx_orchestrator_background_retry_errors",
1030 "Total number of background retry errors, by error type",
1031 registry,
1032 )
1033 .unwrap(),
1034 request_latency: register_histogram_vec_with_registry!(
1035 "tx_orchestrator_request_latency",
1036 "Time spent in processing one Transaction Orchestrator request",
1037 &["tx_type", "route", "wait_for_local_execution"],
1038 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1039 registry,
1040 )
1041 .unwrap(),
1042 local_execution_latency: register_histogram_vec_with_registry!(
1043 "tx_orchestrator_local_execution_latency",
1044 "Time spent in waiting for one Transaction Orchestrator gets locally executed",
1045 &["tx_type"],
1046 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1047 registry,
1048 )
1049 .unwrap(),
1050 settlement_finality_latency: register_histogram_vec_with_registry!(
1051 "tx_orchestrator_settlement_finality_latency",
1052 "Time spent in waiting for one Transaction Orchestrator gets settled and finalized",
1053 &["tx_type", "driver_type"],
1054 mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
1055 registry,
1056 )
1057 .unwrap(),
1058 }
1059 }
1060
1061 pub fn new_for_tests() -> Self {
1062 let registry = Registry::new();
1063 Self::new(®istry)
1064 }
1065}
1066
1067#[async_trait::async_trait]
1068impl<A> sui_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
1069where
1070 A: AuthorityAPI + Send + Sync + 'static + Clone,
1071{
1072 async fn execute_transaction(
1073 &self,
1074 request: ExecuteTransactionRequestV3,
1075 client_addr: Option<std::net::SocketAddr>,
1076 ) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
1077 self.execute_transaction_v3(request, client_addr).await
1078 }
1079
1080 fn simulate_transaction(
1081 &self,
1082 transaction: TransactionData,
1083 checks: TransactionChecks,
1084 ) -> Result<SimulateTransactionResult, SuiError> {
1085 self.inner
1086 .validator_state
1087 .simulate_transaction(transaction, checks)
1088 }
1089}
1090
1091struct TransactionSubmissionGuard {
1094 pending_tx_log: Arc<WritePathPendingTransactionLog>,
1095 tx_digest: TransactionDigest,
1096 is_new_transaction: bool,
1097}
1098
1099impl TransactionSubmissionGuard {
1100 pub fn new(
1101 pending_tx_log: Arc<WritePathPendingTransactionLog>,
1102 tx: &VerifiedTransaction,
1103 ) -> Self {
1104 let is_new_transaction = pending_tx_log.write_pending_transaction_maybe(tx);
1105 let tx_digest = *tx.digest();
1106 if is_new_transaction {
1107 debug!(?tx_digest, "Added transaction to inflight set");
1108 } else {
1109 debug!(
1110 ?tx_digest,
1111 "Transaction already being processed, no new submission will be made"
1112 );
1113 };
1114 Self {
1115 pending_tx_log,
1116 tx_digest,
1117 is_new_transaction,
1118 }
1119 }
1120
1121 fn is_new_transaction(&self) -> bool {
1122 self.is_new_transaction
1123 }
1124}
1125
1126impl Drop for TransactionSubmissionGuard {
1127 fn drop(&mut self) {
1128 if let Err(err) = self.pending_tx_log.finish_transaction(&self.tx_digest) {
1129 warn!(?self.tx_digest, "Failed to clean up transaction in pending log: {err}");
1130 } else {
1131 debug!(?self.tx_digest, "Cleaned up transaction in pending log");
1132 }
1133 }
1134}