1use std::net::SocketAddr;
5use std::ops::Deref;
6use std::path::Path;
7use std::sync::Arc;
8use std::time::Duration;
9
10use futures::FutureExt;
11use futures::stream::{FuturesUnordered, StreamExt};
12use mysten_common::in_antithesis;
13use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX};
14use mysten_metrics::{add_server_timing, spawn_logged_monitored_task};
15use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
16use prometheus::{
17 HistogramVec, IntCounter, IntCounterVec, Registry, register_histogram_vec_with_registry,
18 register_int_counter_vec_with_registry, register_int_counter_with_registry,
19 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
20};
21use rand::Rng;
22use sui_config::NodeConfig;
23use sui_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
24use sui_types::base_types::TransactionDigest;
25use sui_types::effects::TransactionEffectsAPI;
26use sui_types::error::{SuiError, SuiErrorKind, SuiResult};
27use sui_types::messages_grpc::{SubmitTxRequest, TxType};
28use sui_types::quorum_driver_types::{
29 EffectsFinalityInfo, ExecuteTransactionRequestType, ExecuteTransactionRequestV3,
30 ExecuteTransactionResponseV3, FinalizedEffects, IsTransactionExecutedLocally,
31 QuorumDriverError,
32};
33use sui_types::sui_system_state::SuiSystemState;
34use sui_types::transaction::{Transaction, TransactionData, VerifiedTransaction};
35use sui_types::transaction_executor::{SimulateTransactionResult, TransactionChecks};
36use tokio::sync::broadcast::Receiver;
37use tokio::time::{Instant, sleep, timeout};
38use tracing::{Instrument, debug, error_span, info, instrument, warn};
39
40use crate::authority::AuthorityState;
41use crate::authority_aggregator::AuthorityAggregator;
42use crate::authority_client::{AuthorityAPI, NetworkAuthorityClient};
43use crate::quorum_driver::reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver};
44use crate::transaction_driver::{
45 QuorumTransactionResponse, SubmitTransactionOptions, TransactionDriver, TransactionDriverError,
46 TransactionDriverMetrics,
47};
48
49const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
52
53const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(90);
55
56pub type QuorumTransactionEffectsResult =
57 Result<(Transaction, QuorumTransactionResponse), (TransactionDigest, QuorumDriverError)>;
58
59pub struct TransactionOrchestrator<A: Clone> {
65 validator_state: Arc<AuthorityState>,
66 pending_tx_log: Arc<WritePathPendingTransactionLog>,
67 metrics: Arc<TransactionOrchestratorMetrics>,
68 transaction_driver: Arc<TransactionDriver<A>>,
69 td_allowed_submission_list: Vec<String>,
70 td_blocked_submission_list: Vec<String>,
71 enable_early_validation: bool,
72}
73
74impl TransactionOrchestrator<NetworkAuthorityClient> {
75 pub fn new_with_auth_aggregator(
76 validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
77 validator_state: Arc<AuthorityState>,
78 reconfig_channel: Receiver<SuiSystemState>,
79 parent_path: &Path,
80 prometheus_registry: &Registry,
81 node_config: &NodeConfig,
82 ) -> Self {
83 let observer = OnsiteReconfigObserver::new(
84 reconfig_channel,
85 validator_state.get_object_cache_reader().clone(),
86 validator_state.clone_committee_store(),
87 validators.safe_client_metrics_base.clone(),
88 validators.metrics.deref().clone(),
89 );
90 TransactionOrchestrator::new(
91 validators,
92 validator_state,
93 parent_path,
94 prometheus_registry,
95 observer,
96 node_config,
97 )
98 }
99}
100
101impl<A> TransactionOrchestrator<A>
102where
103 A: AuthorityAPI + Send + Sync + 'static + Clone,
104 OnsiteReconfigObserver: ReconfigObserver<A>,
105{
106 pub fn new(
107 validators: Arc<AuthorityAggregator<A>>,
108 validator_state: Arc<AuthorityState>,
109 parent_path: &Path,
110 prometheus_registry: &Registry,
111 reconfig_observer: OnsiteReconfigObserver,
112 node_config: &NodeConfig,
113 ) -> Self {
114 let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
115 let td_metrics = Arc::new(TransactionDriverMetrics::new(prometheus_registry));
116 let client_metrics = Arc::new(
117 crate::validator_client_monitor::ValidatorClientMetrics::new(prometheus_registry),
118 );
119 let reconfig_observer = Arc::new(reconfig_observer);
120
121 let transaction_driver = TransactionDriver::new(
122 validators.clone(),
123 reconfig_observer.clone(),
124 td_metrics,
125 Some(node_config),
126 client_metrics,
127 );
128
129 let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
130 parent_path.join("fullnode_pending_transactions"),
131 ));
132 Self::start_task_to_recover_txes_in_log(pending_tx_log.clone(), transaction_driver.clone());
133
134 let td_allowed_submission_list = node_config
135 .transaction_driver_config
136 .as_ref()
137 .map(|config| config.allowed_submission_validators.clone())
138 .unwrap_or_default();
139
140 let td_blocked_submission_list = node_config
141 .transaction_driver_config
142 .as_ref()
143 .map(|config| config.blocked_submission_validators.clone())
144 .unwrap_or_default();
145
146 if !td_allowed_submission_list.is_empty() && !td_blocked_submission_list.is_empty() {
147 panic!(
148 "Both allowed and blocked submission lists are set, this is not allowed, {:?} {:?}",
149 td_allowed_submission_list, td_blocked_submission_list
150 );
151 }
152
153 let enable_early_validation = node_config
154 .transaction_driver_config
155 .as_ref()
156 .map(|config| config.enable_early_validation)
157 .unwrap_or(true);
158
159 Self {
160 validator_state,
161 pending_tx_log,
162 metrics,
163 transaction_driver,
164 td_allowed_submission_list,
165 td_blocked_submission_list,
166 enable_early_validation,
167 }
168 }
169}
170
171impl<A> TransactionOrchestrator<A>
172where
173 A: AuthorityAPI + Send + Sync + 'static + Clone,
174{
175 #[instrument(name = "tx_orchestrator_execute_transaction", level = "debug", skip_all,
176 fields(
177 tx_digest = ?request.transaction.digest(),
178 tx_type = ?request_type,
179 ))]
180 pub async fn execute_transaction_block(
181 &self,
182 request: ExecuteTransactionRequestV3,
183 request_type: ExecuteTransactionRequestType,
184 client_addr: Option<SocketAddr>,
185 ) -> Result<(ExecuteTransactionResponseV3, IsTransactionExecutedLocally), QuorumDriverError>
186 {
187 let timer = Instant::now();
188 let tx_type = if request.transaction.is_consensus_tx() {
189 TxType::SharedObject
190 } else {
191 TxType::SingleWriter
192 };
193 let tx_digest = *request.transaction.digest();
194
195 let (response, mut executed_locally) = self
196 .execute_transaction_with_effects_waiting(request, client_addr)
197 .await?;
198
199 if !executed_locally {
200 executed_locally = if matches!(
201 request_type,
202 ExecuteTransactionRequestType::WaitForLocalExecution
203 ) {
204 let executed_locally = Self::wait_for_finalized_tx_executed_locally_with_timeout(
205 &self.validator_state,
206 tx_digest,
207 tx_type,
208 &self.metrics,
209 )
210 .await
211 .is_ok();
212 add_server_timing("local_execution done");
213 executed_locally
214 } else {
215 false
216 };
217 }
218
219 let QuorumTransactionResponse {
220 effects,
221 events,
222 input_objects,
223 output_objects,
224 auxiliary_data,
225 } = response;
226
227 let response = ExecuteTransactionResponseV3 {
228 effects,
229 events,
230 input_objects,
231 output_objects,
232 auxiliary_data,
233 };
234
235 self.metrics
236 .request_latency
237 .with_label_values(&[
238 tx_type.as_str(),
239 "execute_transaction_block",
240 executed_locally.to_string().as_str(),
241 ])
242 .observe(timer.elapsed().as_secs_f64());
243
244 Ok((response, executed_locally))
245 }
246
247 #[instrument(name = "tx_orchestrator_execute_transaction_v3", level = "debug", skip_all,
249 fields(tx_digest = ?request.transaction.digest()))]
250 pub async fn execute_transaction_v3(
251 &self,
252 request: ExecuteTransactionRequestV3,
253 client_addr: Option<SocketAddr>,
254 ) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
255 let timer = Instant::now();
256 let tx_type = if request.transaction.is_consensus_tx() {
257 TxType::SharedObject
258 } else {
259 TxType::SingleWriter
260 };
261
262 let (response, _) = self
263 .execute_transaction_with_effects_waiting(request, client_addr)
264 .await?;
265
266 self.metrics
267 .request_latency
268 .with_label_values(&[tx_type.as_str(), "execute_transaction_v3", "false"])
269 .observe(timer.elapsed().as_secs_f64());
270
271 let QuorumTransactionResponse {
272 effects,
273 events,
274 input_objects,
275 output_objects,
276 auxiliary_data,
277 } = response;
278
279 Ok(ExecuteTransactionResponseV3 {
280 effects,
281 events,
282 input_objects,
283 output_objects,
284 auxiliary_data,
285 })
286 }
287
288 async fn execute_transaction_with_effects_waiting(
290 &self,
291 request: ExecuteTransactionRequestV3,
292 client_addr: Option<SocketAddr>,
293 ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), QuorumDriverError> {
294 let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
295 let verified_transaction = epoch_store
296 .verify_transaction(request.transaction.clone())
297 .map_err(QuorumDriverError::InvalidUserSignature)?;
298 let tx_digest = *verified_transaction.digest();
299
300 if self.enable_early_validation
303 && let Err(e) = self
304 .validator_state
305 .check_transaction_validity(&epoch_store, &verified_transaction)
306 {
307 let error_category = e.categorize();
308 if !error_category.is_submission_retriable() {
309 if !self.validator_state.is_tx_already_executed(&tx_digest) {
311 self.metrics
312 .early_validation_rejections
313 .with_label_values(&[e.to_variant_name()])
314 .inc();
315 debug!(
316 error = ?e,
317 "Transaction rejected during early validation"
318 );
319
320 return Err(QuorumDriverError::TransactionFailed {
321 category: error_category,
322 details: e.to_string(),
323 });
324 }
325 }
326 }
327
328 let guard =
330 TransactionSubmissionGuard::new(self.pending_tx_log.clone(), &verified_transaction);
331 let is_new_transaction = guard.is_new_transaction();
332
333 let include_events = request.include_events;
334 let include_input_objects = request.include_input_objects;
335 let include_output_objects = request.include_output_objects;
336 let include_auxiliary_data = request.include_auxiliary_data;
337
338 let finality_timeout = std::env::var("WAIT_FOR_FINALITY_TIMEOUT_SECS")
339 .ok()
340 .and_then(|v| v.parse().ok())
341 .map(Duration::from_secs)
342 .unwrap_or(WAIT_FOR_FINALITY_TIMEOUT);
343
344 let num_submissions = if !is_new_transaction {
345 0
347 } else if cfg!(msim) || in_antithesis() {
348 let r = rand::thread_rng().gen_range(1..=100);
350 let n = if r <= 10 {
351 3
352 } else if r <= 30 {
353 2
354 } else {
355 1
356 };
357 if n > 1 {
358 debug!("Making {n} execution calls");
359 }
360 n
361 } else {
362 1
363 };
364
365 let mut execution_futures = FuturesUnordered::new();
367 for i in 0..num_submissions {
368 let should_delay = i > 0 && rand::thread_rng().gen_bool(0.8);
370 let delay_ms = if should_delay {
371 rand::thread_rng().gen_range(100..=500)
372 } else {
373 0
374 };
375
376 let request = request.clone();
377 let verified_transaction = verified_transaction.clone();
378
379 let future = async move {
380 if delay_ms > 0 {
381 sleep(Duration::from_millis(delay_ms)).await;
383 }
384 self.execute_transaction_impl(
385 request,
386 verified_transaction,
387 client_addr,
388 Some(finality_timeout),
389 )
390 .await
391 }
392 .boxed();
393 execution_futures.push(future);
394 }
395
396 let mut last_execution_error: Option<QuorumDriverError> = None;
398
399 let digests = [tx_digest];
401 let mut local_effects_future = epoch_store
402 .within_alive_epoch(
403 self.validator_state
404 .get_transaction_cache_reader()
405 .notify_read_executed_effects(
406 "TransactionOrchestrator::notify_read_execute_transaction_with_effects_waiting",
407 &digests,
408 ),
409 )
410 .boxed();
411
412 let mut timeout_future = tokio::time::sleep(finality_timeout).boxed();
414
415 loop {
416 tokio::select! {
417 biased;
418
419 local_effects_result = &mut local_effects_future => {
421 match local_effects_result {
422 Ok(effects) => {
423 debug!(
424 "Effects became available while execution was running"
425 );
426 if let Some(effects) = effects.into_iter().next() {
427 self.metrics.concurrent_execution.inc();
428 let epoch = effects.executed_epoch();
429 let events = if include_events {
430 if effects.events_digest().is_some() {
431 Some(self.validator_state.get_transaction_events(effects.transaction_digest())
432 .map_err(QuorumDriverError::QuorumDriverInternalError)?)
433 } else {
434 None
435 }
436 } else {
437 None
438 };
439 let input_objects = include_input_objects
440 .then(|| self.validator_state.get_transaction_input_objects(&effects))
441 .transpose()
442 .map_err(QuorumDriverError::QuorumDriverInternalError)?;
443 let output_objects = include_output_objects
444 .then(|| self.validator_state.get_transaction_output_objects(&effects))
445 .transpose()
446 .map_err(QuorumDriverError::QuorumDriverInternalError)?;
447 let response = QuorumTransactionResponse {
448 effects: FinalizedEffects {
449 effects,
450 finality_info: EffectsFinalityInfo::QuorumExecuted(epoch),
451 },
452 events,
453 input_objects,
454 output_objects,
455 auxiliary_data: None,
456 };
457 break Ok((response, true));
458 }
459 }
460 Err(_) => {
461 warn!("Epoch terminated before effects were available");
462 }
463 };
464
465 local_effects_future = futures::future::pending().boxed();
467 }
468
469 Some(result) = execution_futures.next() => {
471 match result {
472 Ok(resp) => {
473 debug!("Execution succeeded, returning response");
475 let QuorumTransactionResponse {
476 effects,
477 events,
478 input_objects,
479 output_objects,
480 auxiliary_data,
481 } = resp;
482 let resp = QuorumTransactionResponse {
484 effects,
485 events: if include_events { events } else { None },
486 input_objects: if include_input_objects { input_objects } else { None },
487 output_objects: if include_output_objects { output_objects } else { None },
488 auxiliary_data: if include_auxiliary_data { auxiliary_data } else { None },
489 };
490 break Ok((resp, false));
491 }
492 Err(e) => {
493 debug!(?e, "Execution attempt failed, wait for other attempts");
494 last_execution_error = Some(e);
495 }
496 };
497
498 if execution_futures.is_empty() {
500 break Err(last_execution_error.unwrap());
501 }
502 }
503
504 _ = &mut timeout_future => {
506 if let Some(e) = last_execution_error {
507 debug!("Timeout waiting for transaction finality. Last execution error: {e}");
508 } else {
509 debug!("Timeout waiting for transaction finality.");
510 }
511 self.metrics.wait_for_finality_timeout.inc();
512
513 break Err(QuorumDriverError::TimeoutBeforeFinality);
515 }
516 }
517 }
518 }
519
520 #[instrument(level = "error", skip_all)]
521 async fn execute_transaction_impl(
522 &self,
523 request: ExecuteTransactionRequestV3,
524 verified_transaction: VerifiedTransaction,
525 client_addr: Option<SocketAddr>,
526 finality_timeout: Option<Duration>,
527 ) -> Result<QuorumTransactionResponse, QuorumDriverError> {
528 debug!("TO Received transaction execution request.");
529
530 let timer = Instant::now();
531 let tx_type = if verified_transaction.is_consensus_tx() {
532 TxType::SharedObject
533 } else {
534 TxType::SingleWriter
535 };
536
537 let (_in_flight_metrics_guards, good_response_metrics) =
538 self.update_metrics(&request.transaction);
539
540 let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
542 wait_for_finality_gauge.inc();
543 let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
544 in_flight.dec();
545 });
546
547 let response = self
548 .submit_with_transaction_driver(
549 &self.transaction_driver,
550 &request,
551 client_addr,
552 &verified_transaction,
553 good_response_metrics,
554 finality_timeout,
555 )
556 .await?;
557 let driver_type = "transaction_driver";
558
559 add_server_timing("wait_for_finality done");
560
561 self.metrics.wait_for_finality_finished.inc();
562
563 let elapsed = timer.elapsed().as_secs_f64();
564 self.metrics
565 .settlement_finality_latency
566 .with_label_values(&[tx_type.as_str(), driver_type])
567 .observe(elapsed);
568 good_response_metrics.inc();
569
570 Ok(response)
571 }
572
573 #[instrument(level = "error", skip_all, err(level = "info"))]
574 async fn submit_with_transaction_driver(
575 &self,
576 td: &Arc<TransactionDriver<A>>,
577 request: &ExecuteTransactionRequestV3,
578 client_addr: Option<SocketAddr>,
579 verified_transaction: &VerifiedTransaction,
580 good_response_metrics: &GenericCounter<AtomicU64>,
581 timeout_duration: Option<Duration>,
582 ) -> Result<QuorumTransactionResponse, QuorumDriverError> {
583 let tx_digest = *verified_transaction.digest();
584 debug!("Using TransactionDriver for transaction {:?}", tx_digest);
585
586 let td_response = td
587 .drive_transaction(
588 SubmitTxRequest::new_transaction(request.transaction.clone()),
589 SubmitTransactionOptions {
590 forwarded_client_addr: client_addr,
591 allowed_validators: self.td_allowed_submission_list.clone(),
592 blocked_validators: self.td_blocked_submission_list.clone(),
593 },
594 timeout_duration,
595 )
596 .await
597 .map_err(|e| match e {
598 TransactionDriverError::TimeoutWithLastRetriableError {
599 last_error,
600 attempts,
601 timeout,
602 } => QuorumDriverError::TimeoutBeforeFinalityWithErrors {
603 last_error: last_error.map(|e| e.to_string()).unwrap_or_default(),
604 attempts,
605 timeout,
606 },
607 other => QuorumDriverError::TransactionFailed {
608 category: other.categorize(),
609 details: other.to_string(),
610 },
611 });
612
613 match td_response {
614 Err(e) => {
615 warn!("TransactionDriver error: {e:?}");
616 Err(e)
617 }
618 Ok(quorum_transaction_response) => {
619 good_response_metrics.inc();
620 Ok(quorum_transaction_response)
621 }
622 }
623 }
624
625 #[instrument(
626 name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout",
627 level = "debug",
628 skip_all,
629 err(level = "info")
630 )]
631 async fn wait_for_finalized_tx_executed_locally_with_timeout(
632 validator_state: &Arc<AuthorityState>,
633 tx_digest: TransactionDigest,
634 tx_type: TxType,
635 metrics: &TransactionOrchestratorMetrics,
636 ) -> SuiResult {
637 metrics.local_execution_in_flight.inc();
638 let _metrics_guard =
639 scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
640 in_flight.dec();
641 });
642
643 let _latency_guard = metrics
644 .local_execution_latency
645 .with_label_values(&[tx_type.as_str()])
646 .start_timer();
647 debug!("Waiting for finalized tx to be executed locally.");
648 match timeout(
649 LOCAL_EXECUTION_TIMEOUT,
650 validator_state
651 .get_transaction_cache_reader()
652 .notify_read_executed_effects_digests(
653 "TransactionOrchestrator::notify_read_wait_for_local_execution",
654 &[tx_digest],
655 ),
656 )
657 .instrument(error_span!(
658 "transaction_orchestrator::local_execution",
659 ?tx_digest
660 ))
661 .await
662 {
663 Err(_elapsed) => {
664 debug!(
665 "Waiting for finalized tx to be executed locally timed out within {:?}.",
666 LOCAL_EXECUTION_TIMEOUT
667 );
668 metrics.local_execution_timeout.inc();
669 Err(SuiErrorKind::TimeoutError.into())
670 }
671 Ok(_) => {
672 metrics.local_execution_success.inc();
673 Ok(())
674 }
675 }
676 }
677
678 pub fn authority_state(&self) -> &Arc<AuthorityState> {
679 &self.validator_state
680 }
681
682 pub fn transaction_driver(&self) -> &Arc<TransactionDriver<A>> {
683 &self.transaction_driver
684 }
685
686 pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
687 self.transaction_driver.authority_aggregator().load_full()
688 }
689
690 fn update_metrics<'a>(
691 &'a self,
692 transaction: &Transaction,
693 ) -> (impl Drop + use<A>, &'a GenericCounter<AtomicU64>) {
694 let (in_flight, good_response) = if transaction.is_consensus_tx() {
695 self.metrics.total_req_received_shared_object.inc();
696 (
697 self.metrics.req_in_flight_shared_object.clone(),
698 &self.metrics.good_response_shared_object,
699 )
700 } else {
701 self.metrics.total_req_received_single_writer.inc();
702 (
703 self.metrics.req_in_flight_single_writer.clone(),
704 &self.metrics.good_response_single_writer,
705 )
706 };
707 in_flight.inc();
708 (
709 scopeguard::guard(in_flight, |in_flight| {
710 in_flight.dec();
711 }),
712 good_response,
713 )
714 }
715
716 fn start_task_to_recover_txes_in_log(
717 pending_tx_log: Arc<WritePathPendingTransactionLog>,
718 transaction_driver: Arc<TransactionDriver<A>>,
719 ) {
720 spawn_logged_monitored_task!(async move {
721 if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
722 info!("Skipping loading pending transactions from pending_tx_log.");
723 return;
724 }
725 let pending_txes = pending_tx_log
726 .load_all_pending_transactions()
727 .expect("failed to load all pending transactions");
728 let num_pending_txes = pending_txes.len();
729 info!(
730 "Recovering {} pending transactions from pending_tx_log.",
731 num_pending_txes
732 );
733 let mut recovery = pending_txes
734 .into_iter()
735 .map(|tx| {
736 let pending_tx_log = pending_tx_log.clone();
737 let transaction_driver = transaction_driver.clone();
738 async move {
739 let tx = tx.into_inner();
742 let tx_digest = *tx.digest();
743 if let Err(err) = transaction_driver
746 .drive_transaction(
747 SubmitTxRequest::new_transaction(tx),
748 SubmitTransactionOptions::default(),
749 Some(Duration::from_secs(60)),
750 )
751 .await
752 {
753 warn!(?tx_digest, "Failed to execute recovered transaction: {err}");
754 } else {
755 debug!(?tx_digest, "Executed recovered transaction");
756 }
757 if let Err(err) = pending_tx_log.finish_transaction(&tx_digest) {
758 warn!(
759 ?tx_digest,
760 "Failed to clean up transaction in pending log: {err}"
761 );
762 } else {
763 debug!(?tx_digest, "Cleaned up transaction in pending log");
764 }
765 }
766 })
767 .collect::<FuturesUnordered<_>>();
768
769 let mut num_recovered = 0;
770 while recovery.next().await.is_some() {
771 num_recovered += 1;
772 if num_recovered % 1000 == 0 {
773 info!(
774 "Recovered {} out of {} transactions from pending_tx_log.",
775 num_recovered, num_pending_txes
776 );
777 }
778 }
779 info!(
780 "Recovery finished. Recovered {} out of {} transactions from pending_tx_log.",
781 num_recovered, num_pending_txes
782 );
783 });
784 }
785
786 pub fn load_all_pending_transactions_in_test(&self) -> SuiResult<Vec<VerifiedTransaction>> {
787 self.pending_tx_log.load_all_pending_transactions()
788 }
789
790 pub fn empty_pending_tx_log_in_test(&self) -> bool {
791 self.pending_tx_log.is_empty()
792 }
793}
794#[derive(Clone)]
796pub struct TransactionOrchestratorMetrics {
797 total_req_received_single_writer: GenericCounter<AtomicU64>,
798 total_req_received_shared_object: GenericCounter<AtomicU64>,
799
800 good_response_single_writer: GenericCounter<AtomicU64>,
801 good_response_shared_object: GenericCounter<AtomicU64>,
802
803 req_in_flight_single_writer: GenericGauge<AtomicI64>,
804 req_in_flight_shared_object: GenericGauge<AtomicI64>,
805
806 wait_for_finality_in_flight: GenericGauge<AtomicI64>,
807 wait_for_finality_finished: GenericCounter<AtomicU64>,
808 wait_for_finality_timeout: GenericCounter<AtomicU64>,
809
810 local_execution_in_flight: GenericGauge<AtomicI64>,
811 local_execution_success: GenericCounter<AtomicU64>,
812 local_execution_timeout: GenericCounter<AtomicU64>,
813
814 concurrent_execution: IntCounter,
815
816 early_validation_rejections: IntCounterVec,
817
818 request_latency: HistogramVec,
819 local_execution_latency: HistogramVec,
820 settlement_finality_latency: HistogramVec,
821}
822
823impl TransactionOrchestratorMetrics {
827 pub fn new(registry: &Registry) -> Self {
828 let total_req_received = register_int_counter_vec_with_registry!(
829 "tx_orchestrator_total_req_received",
830 "Total number of executions request Transaction Orchestrator receives, group by tx type",
831 &["tx_type"],
832 registry
833 )
834 .unwrap();
835
836 let total_req_received_single_writer =
837 total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
838 let total_req_received_shared_object =
839 total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
840
841 let good_response = register_int_counter_vec_with_registry!(
842 "tx_orchestrator_good_response",
843 "Total number of good responses Transaction Orchestrator generates, group by tx type",
844 &["tx_type"],
845 registry
846 )
847 .unwrap();
848
849 let good_response_single_writer =
850 good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
851 let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
852
853 let req_in_flight = register_int_gauge_vec_with_registry!(
854 "tx_orchestrator_req_in_flight",
855 "Number of requests in flights Transaction Orchestrator processes, group by tx type",
856 &["tx_type"],
857 registry
858 )
859 .unwrap();
860
861 let req_in_flight_single_writer =
862 req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
863 let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
864
865 Self {
866 total_req_received_single_writer,
867 total_req_received_shared_object,
868 good_response_single_writer,
869 good_response_shared_object,
870 req_in_flight_single_writer,
871 req_in_flight_shared_object,
872 wait_for_finality_in_flight: register_int_gauge_with_registry!(
873 "tx_orchestrator_wait_for_finality_in_flight",
874 "Number of in flight txns Transaction Orchestrator are waiting for finality for",
875 registry,
876 )
877 .unwrap(),
878 wait_for_finality_finished: register_int_counter_with_registry!(
879 "tx_orchestrator_wait_for_finality_fnished",
880 "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
881 registry,
882 )
883 .unwrap(),
884 wait_for_finality_timeout: register_int_counter_with_registry!(
885 "tx_orchestrator_wait_for_finality_timeout",
886 "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
887 registry,
888 )
889 .unwrap(),
890 local_execution_in_flight: register_int_gauge_with_registry!(
891 "tx_orchestrator_local_execution_in_flight",
892 "Number of local execution txns in flights Transaction Orchestrator handles",
893 registry,
894 )
895 .unwrap(),
896 local_execution_success: register_int_counter_with_registry!(
897 "tx_orchestrator_local_execution_success",
898 "Total number of successful local execution txns Transaction Orchestrator handles",
899 registry,
900 )
901 .unwrap(),
902 local_execution_timeout: register_int_counter_with_registry!(
903 "tx_orchestrator_local_execution_timeout",
904 "Total number of timed-out local execution txns Transaction Orchestrator handles",
905 registry,
906 )
907 .unwrap(),
908 concurrent_execution: register_int_counter_with_registry!(
909 "tx_orchestrator_concurrent_execution",
910 "Total number of concurrent execution where effects are available locally finishing driving the transaction to finality",
911 registry,
912 )
913 .unwrap(),
914 early_validation_rejections: register_int_counter_vec_with_registry!(
915 "tx_orchestrator_early_validation_rejections",
916 "Total number of transactions rejected during early validation before submission, by reason",
917 &["reason"],
918 registry,
919 )
920 .unwrap(),
921 request_latency: register_histogram_vec_with_registry!(
922 "tx_orchestrator_request_latency",
923 "Time spent in processing one Transaction Orchestrator request",
924 &["tx_type", "route", "wait_for_local_execution"],
925 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
926 registry,
927 )
928 .unwrap(),
929 local_execution_latency: register_histogram_vec_with_registry!(
930 "tx_orchestrator_local_execution_latency",
931 "Time spent in waiting for one Transaction Orchestrator gets locally executed",
932 &["tx_type"],
933 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
934 registry,
935 )
936 .unwrap(),
937 settlement_finality_latency: register_histogram_vec_with_registry!(
938 "tx_orchestrator_settlement_finality_latency",
939 "Time spent in waiting for one Transaction Orchestrator gets settled and finalized",
940 &["tx_type", "driver_type"],
941 mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
942 registry,
943 )
944 .unwrap(),
945 }
946 }
947
948 pub fn new_for_tests() -> Self {
949 let registry = Registry::new();
950 Self::new(®istry)
951 }
952}
953
954#[async_trait::async_trait]
955impl<A> sui_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
956where
957 A: AuthorityAPI + Send + Sync + 'static + Clone,
958{
959 async fn execute_transaction(
960 &self,
961 request: ExecuteTransactionRequestV3,
962 client_addr: Option<std::net::SocketAddr>,
963 ) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
964 self.execute_transaction_v3(request, client_addr).await
965 }
966
967 fn simulate_transaction(
968 &self,
969 transaction: TransactionData,
970 checks: TransactionChecks,
971 ) -> Result<SimulateTransactionResult, SuiError> {
972 self.validator_state
973 .simulate_transaction(transaction, checks)
974 }
975}
976
977struct TransactionSubmissionGuard {
980 pending_tx_log: Arc<WritePathPendingTransactionLog>,
981 tx_digest: TransactionDigest,
982 is_new_transaction: bool,
983}
984
985impl TransactionSubmissionGuard {
986 pub fn new(
987 pending_tx_log: Arc<WritePathPendingTransactionLog>,
988 tx: &VerifiedTransaction,
989 ) -> Self {
990 let is_new_transaction = pending_tx_log.write_pending_transaction_maybe(tx);
991 let tx_digest = *tx.digest();
992 if is_new_transaction {
993 debug!(?tx_digest, "Added transaction to inflight set");
994 } else {
995 debug!(
996 ?tx_digest,
997 "Transaction already being processed, no new submission will be made"
998 );
999 };
1000 Self {
1001 pending_tx_log,
1002 tx_digest,
1003 is_new_transaction,
1004 }
1005 }
1006
1007 fn is_new_transaction(&self) -> bool {
1008 self.is_new_transaction
1009 }
1010}
1011
1012impl Drop for TransactionSubmissionGuard {
1013 fn drop(&mut self) {
1014 if let Err(err) = self.pending_tx_log.finish_transaction(&self.tx_digest) {
1015 warn!(?self.tx_digest, "Failed to clean up transaction in pending log: {err}");
1016 } else {
1017 debug!(?self.tx_digest, "Cleaned up transaction in pending log");
1018 }
1019 }
1020}