1use std::net::SocketAddr;
5use std::ops::Deref;
6use std::path::Path;
7use std::sync::Arc;
8use std::time::Duration;
9
10use futures::FutureExt;
11use futures::stream::{FuturesUnordered, StreamExt};
12use mysten_common::in_antithesis;
13use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX};
14use mysten_metrics::{add_server_timing, spawn_logged_monitored_task};
15use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
16use prometheus::{
17 HistogramVec, IntCounter, IntCounterVec, Registry, register_histogram_vec_with_registry,
18 register_int_counter_vec_with_registry, register_int_counter_with_registry,
19 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
20};
21use rand::Rng;
22use sui_config::NodeConfig;
23use sui_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
24use sui_types::base_types::TransactionDigest;
25use sui_types::effects::TransactionEffectsAPI;
26use sui_types::error::{SuiError, SuiErrorKind, SuiResult};
27use sui_types::messages_grpc::{SubmitTxRequest, TxType};
28use sui_types::quorum_driver_types::{
29 EffectsFinalityInfo, ExecuteTransactionRequestType, ExecuteTransactionRequestV3,
30 ExecuteTransactionResponseV3, FinalizedEffects, IsTransactionExecutedLocally,
31 QuorumDriverError,
32};
33use sui_types::sui_system_state::SuiSystemState;
34use sui_types::transaction::{Transaction, TransactionData, VerifiedTransaction};
35use sui_types::transaction_executor::{SimulateTransactionResult, TransactionChecks};
36use tokio::sync::broadcast::Receiver;
37use tokio::time::{Instant, sleep, timeout};
38use tracing::{Instrument, debug, error_span, info, instrument, warn};
39
40use crate::authority::AuthorityState;
41use crate::authority_aggregator::AuthorityAggregator;
42use crate::authority_client::{AuthorityAPI, NetworkAuthorityClient};
43use crate::quorum_driver::reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver};
44use crate::transaction_driver::{
45 QuorumTransactionResponse, SubmitTransactionOptions, TransactionDriver, TransactionDriverError,
46 TransactionDriverMetrics,
47};
48
49const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
52
53const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(90);
55
56pub type QuorumTransactionEffectsResult =
57 Result<(Transaction, QuorumTransactionResponse), (TransactionDigest, QuorumDriverError)>;
58
59pub struct TransactionOrchestrator<A: Clone> {
65 validator_state: Arc<AuthorityState>,
66 pending_tx_log: Arc<WritePathPendingTransactionLog>,
67 metrics: Arc<TransactionOrchestratorMetrics>,
68 transaction_driver: Arc<TransactionDriver<A>>,
69 td_allowed_submission_list: Vec<String>,
70 td_blocked_submission_list: Vec<String>,
71 enable_early_validation: bool,
72}
73
74impl TransactionOrchestrator<NetworkAuthorityClient> {
75 pub fn new_with_auth_aggregator(
76 validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
77 validator_state: Arc<AuthorityState>,
78 reconfig_channel: Receiver<SuiSystemState>,
79 parent_path: &Path,
80 prometheus_registry: &Registry,
81 node_config: &NodeConfig,
82 ) -> Self {
83 let observer = OnsiteReconfigObserver::new(
84 reconfig_channel,
85 validator_state.get_object_cache_reader().clone(),
86 validator_state.clone_committee_store(),
87 validators.safe_client_metrics_base.clone(),
88 validators.metrics.deref().clone(),
89 );
90 TransactionOrchestrator::new(
91 validators,
92 validator_state,
93 parent_path,
94 prometheus_registry,
95 observer,
96 node_config,
97 )
98 }
99}
100
101impl<A> TransactionOrchestrator<A>
102where
103 A: AuthorityAPI + Send + Sync + 'static + Clone,
104 OnsiteReconfigObserver: ReconfigObserver<A>,
105{
106 pub fn new(
107 validators: Arc<AuthorityAggregator<A>>,
108 validator_state: Arc<AuthorityState>,
109 parent_path: &Path,
110 prometheus_registry: &Registry,
111 reconfig_observer: OnsiteReconfigObserver,
112 node_config: &NodeConfig,
113 ) -> Self {
114 let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
115 let td_metrics = Arc::new(TransactionDriverMetrics::new(prometheus_registry));
116 let client_metrics = Arc::new(
117 crate::validator_client_monitor::ValidatorClientMetrics::new(prometheus_registry),
118 );
119 let reconfig_observer = Arc::new(reconfig_observer);
120
121 let transaction_driver = TransactionDriver::new(
122 validators.clone(),
123 reconfig_observer.clone(),
124 td_metrics,
125 Some(node_config),
126 client_metrics,
127 );
128
129 let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
130 parent_path.join("fullnode_pending_transactions"),
131 ));
132 Self::start_task_to_recover_txes_in_log(pending_tx_log.clone(), transaction_driver.clone());
133
134 let td_allowed_submission_list = node_config
135 .transaction_driver_config
136 .as_ref()
137 .map(|config| config.allowed_submission_validators.clone())
138 .unwrap_or_default();
139
140 let td_blocked_submission_list = node_config
141 .transaction_driver_config
142 .as_ref()
143 .map(|config| config.blocked_submission_validators.clone())
144 .unwrap_or_default();
145
146 if !td_allowed_submission_list.is_empty() && !td_blocked_submission_list.is_empty() {
147 panic!(
148 "Both allowed and blocked submission lists are set, this is not allowed, {:?} {:?}",
149 td_allowed_submission_list, td_blocked_submission_list
150 );
151 }
152
153 let enable_early_validation = node_config
154 .transaction_driver_config
155 .as_ref()
156 .map(|config| config.enable_early_validation)
157 .unwrap_or(true);
158
159 Self {
160 validator_state,
161 pending_tx_log,
162 metrics,
163 transaction_driver,
164 td_allowed_submission_list,
165 td_blocked_submission_list,
166 enable_early_validation,
167 }
168 }
169}
170
171impl<A> TransactionOrchestrator<A>
172where
173 A: AuthorityAPI + Send + Sync + 'static + Clone,
174{
175 #[instrument(name = "tx_orchestrator_execute_transaction", level = "debug", skip_all,
176 fields(
177 tx_digest = ?request.transaction.digest(),
178 tx_type = ?request_type,
179 ))]
180 pub async fn execute_transaction_block(
181 &self,
182 request: ExecuteTransactionRequestV3,
183 request_type: ExecuteTransactionRequestType,
184 client_addr: Option<SocketAddr>,
185 ) -> Result<(ExecuteTransactionResponseV3, IsTransactionExecutedLocally), QuorumDriverError>
186 {
187 let timer = Instant::now();
188 let tx_type = if request.transaction.is_consensus_tx() {
189 TxType::SharedObject
190 } else {
191 TxType::SingleWriter
192 };
193 let tx_digest = *request.transaction.digest();
194
195 let (response, mut executed_locally) = self
196 .execute_transaction_with_effects_waiting(request, client_addr)
197 .await?;
198
199 if !executed_locally {
200 executed_locally = if matches!(
201 request_type,
202 ExecuteTransactionRequestType::WaitForLocalExecution
203 ) {
204 let executed_locally = Self::wait_for_finalized_tx_executed_locally_with_timeout(
205 &self.validator_state,
206 tx_digest,
207 tx_type,
208 &self.metrics,
209 )
210 .await
211 .is_ok();
212 add_server_timing("local_execution done");
213 executed_locally
214 } else {
215 false
216 };
217 }
218
219 let QuorumTransactionResponse {
220 effects,
221 events,
222 input_objects,
223 output_objects,
224 auxiliary_data,
225 } = response;
226
227 let response = ExecuteTransactionResponseV3 {
228 effects,
229 events,
230 input_objects,
231 output_objects,
232 auxiliary_data,
233 };
234
235 self.metrics
236 .request_latency
237 .with_label_values(&[
238 tx_type.as_str(),
239 "execute_transaction_block",
240 executed_locally.to_string().as_str(),
241 ])
242 .observe(timer.elapsed().as_secs_f64());
243
244 Ok((response, executed_locally))
245 }
246
247 #[instrument(name = "tx_orchestrator_execute_transaction_v3", level = "debug", skip_all,
249 fields(tx_digest = ?request.transaction.digest()))]
250 pub async fn execute_transaction_v3(
251 &self,
252 request: ExecuteTransactionRequestV3,
253 client_addr: Option<SocketAddr>,
254 ) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
255 let timer = Instant::now();
256 let tx_type = if request.transaction.is_consensus_tx() {
257 TxType::SharedObject
258 } else {
259 TxType::SingleWriter
260 };
261
262 let (response, _) = self
263 .execute_transaction_with_effects_waiting(request, client_addr)
264 .await?;
265
266 self.metrics
267 .request_latency
268 .with_label_values(&[tx_type.as_str(), "execute_transaction_v3", "false"])
269 .observe(timer.elapsed().as_secs_f64());
270
271 let QuorumTransactionResponse {
272 effects,
273 events,
274 input_objects,
275 output_objects,
276 auxiliary_data,
277 } = response;
278
279 Ok(ExecuteTransactionResponseV3 {
280 effects,
281 events,
282 input_objects,
283 output_objects,
284 auxiliary_data,
285 })
286 }
287
288 async fn execute_transaction_with_effects_waiting(
290 &self,
291 request: ExecuteTransactionRequestV3,
292 client_addr: Option<SocketAddr>,
293 ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), QuorumDriverError> {
294 let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
295 let verified_transaction = epoch_store
296 .verify_transaction_with_current_aliases(request.transaction.clone())
297 .map_err(QuorumDriverError::InvalidUserSignature)?
298 .into_tx();
299 let tx_digest = *verified_transaction.digest();
300
301 if self.enable_early_validation
304 && let Err(e) = self
305 .validator_state
306 .check_transaction_validity(&epoch_store, &verified_transaction)
307 {
308 let error_category = e.categorize();
309 if !error_category.is_submission_retriable() {
310 if !self.validator_state.is_tx_already_executed(&tx_digest) {
312 self.metrics
313 .early_validation_rejections
314 .with_label_values(&[e.to_variant_name()])
315 .inc();
316 debug!(
317 error = ?e,
318 "Transaction rejected during early validation"
319 );
320
321 return Err(QuorumDriverError::TransactionFailed {
322 category: error_category,
323 details: e.to_string(),
324 });
325 }
326 }
327 }
328
329 let guard =
331 TransactionSubmissionGuard::new(self.pending_tx_log.clone(), &verified_transaction);
332 let is_new_transaction = guard.is_new_transaction();
333
334 let include_events = request.include_events;
335 let include_input_objects = request.include_input_objects;
336 let include_output_objects = request.include_output_objects;
337 let include_auxiliary_data = request.include_auxiliary_data;
338
339 let finality_timeout = std::env::var("WAIT_FOR_FINALITY_TIMEOUT_SECS")
340 .ok()
341 .and_then(|v| v.parse().ok())
342 .map(Duration::from_secs)
343 .unwrap_or(WAIT_FOR_FINALITY_TIMEOUT);
344
345 let num_submissions = if !is_new_transaction {
346 0
348 } else if cfg!(msim) || in_antithesis() {
349 let r = rand::thread_rng().gen_range(1..=100);
351 let n = if r <= 10 {
352 3
353 } else if r <= 30 {
354 2
355 } else {
356 1
357 };
358 if n > 1 {
359 debug!("Making {n} execution calls");
360 }
361 n
362 } else {
363 1
364 };
365
366 let mut execution_futures = FuturesUnordered::new();
368 for i in 0..num_submissions {
369 let should_delay = i > 0 && rand::thread_rng().gen_bool(0.8);
371 let delay_ms = if should_delay {
372 rand::thread_rng().gen_range(100..=500)
373 } else {
374 0
375 };
376
377 let request = request.clone();
378 let verified_transaction = verified_transaction.clone();
379
380 let future = async move {
381 if delay_ms > 0 {
382 sleep(Duration::from_millis(delay_ms)).await;
384 }
385 self.execute_transaction_impl(
386 request,
387 verified_transaction,
388 client_addr,
389 Some(finality_timeout),
390 )
391 .await
392 }
393 .boxed();
394 execution_futures.push(future);
395 }
396
397 let mut last_execution_error: Option<QuorumDriverError> = None;
399
400 let digests = [tx_digest];
402 let mut local_effects_future = epoch_store
403 .within_alive_epoch(
404 self.validator_state
405 .get_transaction_cache_reader()
406 .notify_read_executed_effects(
407 "TransactionOrchestrator::notify_read_execute_transaction_with_effects_waiting",
408 &digests,
409 ),
410 )
411 .boxed();
412
413 let mut timeout_future = tokio::time::sleep(finality_timeout).boxed();
415
416 loop {
417 tokio::select! {
418 biased;
419
420 local_effects_result = &mut local_effects_future => {
422 match local_effects_result {
423 Ok(effects) => {
424 debug!(
425 "Effects became available while execution was running"
426 );
427 if let Some(effects) = effects.into_iter().next() {
428 self.metrics.concurrent_execution.inc();
429 let epoch = effects.executed_epoch();
430 let events = if include_events {
431 if effects.events_digest().is_some() {
432 Some(self.validator_state.get_transaction_events(effects.transaction_digest())
433 .map_err(QuorumDriverError::QuorumDriverInternalError)?)
434 } else {
435 None
436 }
437 } else {
438 None
439 };
440 let input_objects = include_input_objects
441 .then(|| self.validator_state.get_transaction_input_objects(&effects))
442 .transpose()
443 .map_err(QuorumDriverError::QuorumDriverInternalError)?;
444 let output_objects = include_output_objects
445 .then(|| self.validator_state.get_transaction_output_objects(&effects))
446 .transpose()
447 .map_err(QuorumDriverError::QuorumDriverInternalError)?;
448 let response = QuorumTransactionResponse {
449 effects: FinalizedEffects {
450 effects,
451 finality_info: EffectsFinalityInfo::QuorumExecuted(epoch),
452 },
453 events,
454 input_objects,
455 output_objects,
456 auxiliary_data: None,
457 };
458 break Ok((response, true));
459 }
460 }
461 Err(_) => {
462 warn!("Epoch terminated before effects were available");
463 }
464 };
465
466 local_effects_future = futures::future::pending().boxed();
468 }
469
470 Some(result) = execution_futures.next() => {
472 match result {
473 Ok(resp) => {
474 debug!("Execution succeeded, returning response");
476 let QuorumTransactionResponse {
477 effects,
478 events,
479 input_objects,
480 output_objects,
481 auxiliary_data,
482 } = resp;
483 let resp = QuorumTransactionResponse {
485 effects,
486 events: if include_events { events } else { None },
487 input_objects: if include_input_objects { input_objects } else { None },
488 output_objects: if include_output_objects { output_objects } else { None },
489 auxiliary_data: if include_auxiliary_data { auxiliary_data } else { None },
490 };
491 break Ok((resp, false));
492 }
493 Err(e) => {
494 debug!(?e, "Execution attempt failed, wait for other attempts");
495 last_execution_error = Some(e);
496 }
497 };
498
499 if execution_futures.is_empty() {
501 break Err(last_execution_error.unwrap());
502 }
503 }
504
505 _ = &mut timeout_future => {
507 if let Some(e) = last_execution_error {
508 debug!("Timeout waiting for transaction finality. Last execution error: {e}");
509 } else {
510 debug!("Timeout waiting for transaction finality.");
511 }
512 self.metrics.wait_for_finality_timeout.inc();
513
514 break Err(QuorumDriverError::TimeoutBeforeFinality);
516 }
517 }
518 }
519 }
520
521 #[instrument(level = "error", skip_all)]
522 async fn execute_transaction_impl(
523 &self,
524 request: ExecuteTransactionRequestV3,
525 verified_transaction: VerifiedTransaction,
526 client_addr: Option<SocketAddr>,
527 finality_timeout: Option<Duration>,
528 ) -> Result<QuorumTransactionResponse, QuorumDriverError> {
529 debug!("TO Received transaction execution request.");
530
531 let timer = Instant::now();
532 let tx_type = if verified_transaction.is_consensus_tx() {
533 TxType::SharedObject
534 } else {
535 TxType::SingleWriter
536 };
537
538 let (_in_flight_metrics_guards, good_response_metrics) =
539 self.update_metrics(&request.transaction);
540
541 let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
543 wait_for_finality_gauge.inc();
544 let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
545 in_flight.dec();
546 });
547
548 let response = self
549 .submit_with_transaction_driver(
550 &self.transaction_driver,
551 &request,
552 client_addr,
553 &verified_transaction,
554 good_response_metrics,
555 finality_timeout,
556 )
557 .await?;
558 let driver_type = "transaction_driver";
559
560 add_server_timing("wait_for_finality done");
561
562 self.metrics.wait_for_finality_finished.inc();
563
564 let elapsed = timer.elapsed().as_secs_f64();
565 self.metrics
566 .settlement_finality_latency
567 .with_label_values(&[tx_type.as_str(), driver_type])
568 .observe(elapsed);
569 good_response_metrics.inc();
570
571 Ok(response)
572 }
573
574 #[instrument(level = "error", skip_all, err(level = "info"))]
575 async fn submit_with_transaction_driver(
576 &self,
577 td: &Arc<TransactionDriver<A>>,
578 request: &ExecuteTransactionRequestV3,
579 client_addr: Option<SocketAddr>,
580 verified_transaction: &VerifiedTransaction,
581 good_response_metrics: &GenericCounter<AtomicU64>,
582 timeout_duration: Option<Duration>,
583 ) -> Result<QuorumTransactionResponse, QuorumDriverError> {
584 let tx_digest = *verified_transaction.digest();
585 debug!("Using TransactionDriver for transaction {:?}", tx_digest);
586
587 let td_response = td
588 .drive_transaction(
589 SubmitTxRequest::new_transaction(request.transaction.clone()),
590 SubmitTransactionOptions {
591 forwarded_client_addr: client_addr,
592 allowed_validators: self.td_allowed_submission_list.clone(),
593 blocked_validators: self.td_blocked_submission_list.clone(),
594 },
595 timeout_duration,
596 )
597 .await
598 .map_err(|e| match e {
599 TransactionDriverError::TimeoutWithLastRetriableError {
600 last_error,
601 attempts,
602 timeout,
603 } => QuorumDriverError::TimeoutBeforeFinalityWithErrors {
604 last_error: last_error.map(|e| e.to_string()).unwrap_or_default(),
605 attempts,
606 timeout,
607 },
608 other => QuorumDriverError::TransactionFailed {
609 category: other.categorize(),
610 details: other.to_string(),
611 },
612 });
613
614 match td_response {
615 Err(e) => {
616 warn!("TransactionDriver error: {e:?}");
617 Err(e)
618 }
619 Ok(quorum_transaction_response) => {
620 good_response_metrics.inc();
621 Ok(quorum_transaction_response)
622 }
623 }
624 }
625
626 #[instrument(
627 name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout",
628 level = "debug",
629 skip_all,
630 err(level = "info")
631 )]
632 async fn wait_for_finalized_tx_executed_locally_with_timeout(
633 validator_state: &Arc<AuthorityState>,
634 tx_digest: TransactionDigest,
635 tx_type: TxType,
636 metrics: &TransactionOrchestratorMetrics,
637 ) -> SuiResult {
638 metrics.local_execution_in_flight.inc();
639 let _metrics_guard =
640 scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
641 in_flight.dec();
642 });
643
644 let _latency_guard = metrics
645 .local_execution_latency
646 .with_label_values(&[tx_type.as_str()])
647 .start_timer();
648 debug!("Waiting for finalized tx to be executed locally.");
649 match timeout(
650 LOCAL_EXECUTION_TIMEOUT,
651 validator_state
652 .get_transaction_cache_reader()
653 .notify_read_executed_effects_digests(
654 "TransactionOrchestrator::notify_read_wait_for_local_execution",
655 &[tx_digest],
656 ),
657 )
658 .instrument(error_span!(
659 "transaction_orchestrator::local_execution",
660 ?tx_digest
661 ))
662 .await
663 {
664 Err(_elapsed) => {
665 debug!(
666 "Waiting for finalized tx to be executed locally timed out within {:?}.",
667 LOCAL_EXECUTION_TIMEOUT
668 );
669 metrics.local_execution_timeout.inc();
670 Err(SuiErrorKind::TimeoutError.into())
671 }
672 Ok(_) => {
673 metrics.local_execution_success.inc();
674 Ok(())
675 }
676 }
677 }
678
679 pub fn authority_state(&self) -> &Arc<AuthorityState> {
680 &self.validator_state
681 }
682
683 pub fn transaction_driver(&self) -> &Arc<TransactionDriver<A>> {
684 &self.transaction_driver
685 }
686
687 pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
688 self.transaction_driver.authority_aggregator().load_full()
689 }
690
691 fn update_metrics<'a>(
692 &'a self,
693 transaction: &Transaction,
694 ) -> (impl Drop + use<A>, &'a GenericCounter<AtomicU64>) {
695 let (in_flight, good_response) = if transaction.is_consensus_tx() {
696 self.metrics.total_req_received_shared_object.inc();
697 (
698 self.metrics.req_in_flight_shared_object.clone(),
699 &self.metrics.good_response_shared_object,
700 )
701 } else {
702 self.metrics.total_req_received_single_writer.inc();
703 (
704 self.metrics.req_in_flight_single_writer.clone(),
705 &self.metrics.good_response_single_writer,
706 )
707 };
708 in_flight.inc();
709 (
710 scopeguard::guard(in_flight, |in_flight| {
711 in_flight.dec();
712 }),
713 good_response,
714 )
715 }
716
717 fn start_task_to_recover_txes_in_log(
718 pending_tx_log: Arc<WritePathPendingTransactionLog>,
719 transaction_driver: Arc<TransactionDriver<A>>,
720 ) {
721 spawn_logged_monitored_task!(async move {
722 if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
723 info!("Skipping loading pending transactions from pending_tx_log.");
724 return;
725 }
726 let pending_txes = pending_tx_log
727 .load_all_pending_transactions()
728 .expect("failed to load all pending transactions");
729 let num_pending_txes = pending_txes.len();
730 info!(
731 "Recovering {} pending transactions from pending_tx_log.",
732 num_pending_txes
733 );
734 let mut recovery = pending_txes
735 .into_iter()
736 .map(|tx| {
737 let pending_tx_log = pending_tx_log.clone();
738 let transaction_driver = transaction_driver.clone();
739 async move {
740 let tx = tx.into_inner();
743 let tx_digest = *tx.digest();
744 if let Err(err) = transaction_driver
747 .drive_transaction(
748 SubmitTxRequest::new_transaction(tx),
749 SubmitTransactionOptions::default(),
750 Some(Duration::from_secs(60)),
751 )
752 .await
753 {
754 warn!(?tx_digest, "Failed to execute recovered transaction: {err}");
755 } else {
756 debug!(?tx_digest, "Executed recovered transaction");
757 }
758 if let Err(err) = pending_tx_log.finish_transaction(&tx_digest) {
759 warn!(
760 ?tx_digest,
761 "Failed to clean up transaction in pending log: {err}"
762 );
763 } else {
764 debug!(?tx_digest, "Cleaned up transaction in pending log");
765 }
766 }
767 })
768 .collect::<FuturesUnordered<_>>();
769
770 let mut num_recovered = 0;
771 while recovery.next().await.is_some() {
772 num_recovered += 1;
773 if num_recovered % 1000 == 0 {
774 info!(
775 "Recovered {} out of {} transactions from pending_tx_log.",
776 num_recovered, num_pending_txes
777 );
778 }
779 }
780 info!(
781 "Recovery finished. Recovered {} out of {} transactions from pending_tx_log.",
782 num_recovered, num_pending_txes
783 );
784 });
785 }
786
787 pub fn load_all_pending_transactions_in_test(&self) -> SuiResult<Vec<VerifiedTransaction>> {
788 self.pending_tx_log.load_all_pending_transactions()
789 }
790
791 pub fn empty_pending_tx_log_in_test(&self) -> bool {
792 self.pending_tx_log.is_empty()
793 }
794}
795#[derive(Clone)]
797pub struct TransactionOrchestratorMetrics {
798 total_req_received_single_writer: GenericCounter<AtomicU64>,
799 total_req_received_shared_object: GenericCounter<AtomicU64>,
800
801 good_response_single_writer: GenericCounter<AtomicU64>,
802 good_response_shared_object: GenericCounter<AtomicU64>,
803
804 req_in_flight_single_writer: GenericGauge<AtomicI64>,
805 req_in_flight_shared_object: GenericGauge<AtomicI64>,
806
807 wait_for_finality_in_flight: GenericGauge<AtomicI64>,
808 wait_for_finality_finished: GenericCounter<AtomicU64>,
809 wait_for_finality_timeout: GenericCounter<AtomicU64>,
810
811 local_execution_in_flight: GenericGauge<AtomicI64>,
812 local_execution_success: GenericCounter<AtomicU64>,
813 local_execution_timeout: GenericCounter<AtomicU64>,
814
815 concurrent_execution: IntCounter,
816
817 early_validation_rejections: IntCounterVec,
818
819 request_latency: HistogramVec,
820 local_execution_latency: HistogramVec,
821 settlement_finality_latency: HistogramVec,
822}
823
824impl TransactionOrchestratorMetrics {
828 pub fn new(registry: &Registry) -> Self {
829 let total_req_received = register_int_counter_vec_with_registry!(
830 "tx_orchestrator_total_req_received",
831 "Total number of executions request Transaction Orchestrator receives, group by tx type",
832 &["tx_type"],
833 registry
834 )
835 .unwrap();
836
837 let total_req_received_single_writer =
838 total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
839 let total_req_received_shared_object =
840 total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
841
842 let good_response = register_int_counter_vec_with_registry!(
843 "tx_orchestrator_good_response",
844 "Total number of good responses Transaction Orchestrator generates, group by tx type",
845 &["tx_type"],
846 registry
847 )
848 .unwrap();
849
850 let good_response_single_writer =
851 good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
852 let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
853
854 let req_in_flight = register_int_gauge_vec_with_registry!(
855 "tx_orchestrator_req_in_flight",
856 "Number of requests in flights Transaction Orchestrator processes, group by tx type",
857 &["tx_type"],
858 registry
859 )
860 .unwrap();
861
862 let req_in_flight_single_writer =
863 req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
864 let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
865
866 Self {
867 total_req_received_single_writer,
868 total_req_received_shared_object,
869 good_response_single_writer,
870 good_response_shared_object,
871 req_in_flight_single_writer,
872 req_in_flight_shared_object,
873 wait_for_finality_in_flight: register_int_gauge_with_registry!(
874 "tx_orchestrator_wait_for_finality_in_flight",
875 "Number of in flight txns Transaction Orchestrator are waiting for finality for",
876 registry,
877 )
878 .unwrap(),
879 wait_for_finality_finished: register_int_counter_with_registry!(
880 "tx_orchestrator_wait_for_finality_fnished",
881 "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
882 registry,
883 )
884 .unwrap(),
885 wait_for_finality_timeout: register_int_counter_with_registry!(
886 "tx_orchestrator_wait_for_finality_timeout",
887 "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
888 registry,
889 )
890 .unwrap(),
891 local_execution_in_flight: register_int_gauge_with_registry!(
892 "tx_orchestrator_local_execution_in_flight",
893 "Number of local execution txns in flights Transaction Orchestrator handles",
894 registry,
895 )
896 .unwrap(),
897 local_execution_success: register_int_counter_with_registry!(
898 "tx_orchestrator_local_execution_success",
899 "Total number of successful local execution txns Transaction Orchestrator handles",
900 registry,
901 )
902 .unwrap(),
903 local_execution_timeout: register_int_counter_with_registry!(
904 "tx_orchestrator_local_execution_timeout",
905 "Total number of timed-out local execution txns Transaction Orchestrator handles",
906 registry,
907 )
908 .unwrap(),
909 concurrent_execution: register_int_counter_with_registry!(
910 "tx_orchestrator_concurrent_execution",
911 "Total number of concurrent execution where effects are available locally finishing driving the transaction to finality",
912 registry,
913 )
914 .unwrap(),
915 early_validation_rejections: register_int_counter_vec_with_registry!(
916 "tx_orchestrator_early_validation_rejections",
917 "Total number of transactions rejected during early validation before submission, by reason",
918 &["reason"],
919 registry,
920 )
921 .unwrap(),
922 request_latency: register_histogram_vec_with_registry!(
923 "tx_orchestrator_request_latency",
924 "Time spent in processing one Transaction Orchestrator request",
925 &["tx_type", "route", "wait_for_local_execution"],
926 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
927 registry,
928 )
929 .unwrap(),
930 local_execution_latency: register_histogram_vec_with_registry!(
931 "tx_orchestrator_local_execution_latency",
932 "Time spent in waiting for one Transaction Orchestrator gets locally executed",
933 &["tx_type"],
934 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
935 registry,
936 )
937 .unwrap(),
938 settlement_finality_latency: register_histogram_vec_with_registry!(
939 "tx_orchestrator_settlement_finality_latency",
940 "Time spent in waiting for one Transaction Orchestrator gets settled and finalized",
941 &["tx_type", "driver_type"],
942 mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
943 registry,
944 )
945 .unwrap(),
946 }
947 }
948
949 pub fn new_for_tests() -> Self {
950 let registry = Registry::new();
951 Self::new(®istry)
952 }
953}
954
955#[async_trait::async_trait]
956impl<A> sui_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
957where
958 A: AuthorityAPI + Send + Sync + 'static + Clone,
959{
960 async fn execute_transaction(
961 &self,
962 request: ExecuteTransactionRequestV3,
963 client_addr: Option<std::net::SocketAddr>,
964 ) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
965 self.execute_transaction_v3(request, client_addr).await
966 }
967
968 fn simulate_transaction(
969 &self,
970 transaction: TransactionData,
971 checks: TransactionChecks,
972 ) -> Result<SimulateTransactionResult, SuiError> {
973 self.validator_state
974 .simulate_transaction(transaction, checks)
975 }
976}
977
978struct TransactionSubmissionGuard {
981 pending_tx_log: Arc<WritePathPendingTransactionLog>,
982 tx_digest: TransactionDigest,
983 is_new_transaction: bool,
984}
985
986impl TransactionSubmissionGuard {
987 pub fn new(
988 pending_tx_log: Arc<WritePathPendingTransactionLog>,
989 tx: &VerifiedTransaction,
990 ) -> Self {
991 let is_new_transaction = pending_tx_log.write_pending_transaction_maybe(tx);
992 let tx_digest = *tx.digest();
993 if is_new_transaction {
994 debug!(?tx_digest, "Added transaction to inflight set");
995 } else {
996 debug!(
997 ?tx_digest,
998 "Transaction already being processed, no new submission will be made"
999 );
1000 };
1001 Self {
1002 pending_tx_log,
1003 tx_digest,
1004 is_new_transaction,
1005 }
1006 }
1007
1008 fn is_new_transaction(&self) -> bool {
1009 self.is_new_transaction
1010 }
1011}
1012
1013impl Drop for TransactionSubmissionGuard {
1014 fn drop(&mut self) {
1015 if let Err(err) = self.pending_tx_log.finish_transaction(&self.tx_digest) {
1016 warn!(?self.tx_digest, "Failed to clean up transaction in pending log: {err}");
1017 } else {
1018 debug!(?self.tx_digest, "Cleaned up transaction in pending log");
1019 }
1020 }
1021}