1use std::net::SocketAddr;
11use std::ops::Deref;
12use std::path::Path;
13use std::sync::Arc;
14use std::sync::atomic::{AtomicBool, Ordering};
15use std::time::Duration;
16
17use futures::FutureExt;
18use futures::future::{Either, Future, select};
19use futures::stream::{FuturesUnordered, StreamExt};
20use mysten_common::in_antithesis;
21use mysten_common::sync::notify_read::NotifyRead;
22use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX};
23use mysten_metrics::{add_server_timing, spawn_logged_monitored_task};
24use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
25use prometheus::{
26 HistogramVec, IntCounter, IntCounterVec, Registry, register_histogram_vec_with_registry,
27 register_int_counter_vec_with_registry, register_int_counter_with_registry,
28 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
29};
30use rand::Rng;
31use sui_config::NodeConfig;
32use sui_protocol_config::Chain;
33use sui_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
34use sui_types::base_types::TransactionDigest;
35use sui_types::effects::TransactionEffectsAPI;
36use sui_types::error::{SuiError, SuiErrorKind, SuiResult};
37use sui_types::messages_grpc::{SubmitTxRequest, TxType};
38use sui_types::quorum_driver_types::{
39 EffectsFinalityInfo, ExecuteTransactionRequestType, ExecuteTransactionRequestV3,
40 ExecuteTransactionResponseV3, FinalizedEffects, IsTransactionExecutedLocally,
41 QuorumDriverError, QuorumDriverResult,
42};
43use sui_types::sui_system_state::SuiSystemState;
44use sui_types::transaction::{Transaction, TransactionData, VerifiedTransaction};
45use sui_types::transaction_executor::{SimulateTransactionResult, TransactionChecks};
46use tokio::sync::broadcast::Receiver;
47use tokio::time::{Instant, sleep, timeout};
48use tracing::{Instrument, debug, error_span, info, instrument, warn};
49
50use crate::authority::AuthorityState;
51use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
52use crate::authority_aggregator::AuthorityAggregator;
53use crate::authority_client::{AuthorityAPI, NetworkAuthorityClient};
54use crate::quorum_driver::reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver};
55use crate::quorum_driver::{QuorumDriverHandler, QuorumDriverHandlerBuilder, QuorumDriverMetrics};
56use crate::transaction_driver::{
57 QuorumTransactionResponse, SubmitTransactionOptions, TransactionDriver, TransactionDriverError,
58 TransactionDriverMetrics, choose_transaction_driver_percentage,
59};
60
61const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
64
65const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(90);
67
68pub type QuorumTransactionEffectsResult =
69 Result<(Transaction, QuorumTransactionResponse), (TransactionDigest, QuorumDriverError)>;
70pub struct TransactionOrchestrator<A: Clone> {
71 quorum_driver_handler: Arc<QuorumDriverHandler<A>>,
72 validator_state: Arc<AuthorityState>,
73 pending_tx_log: Arc<WritePathPendingTransactionLog>,
74 notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
75 metrics: Arc<TransactionOrchestratorMetrics>,
76 transaction_driver: Arc<TransactionDriver<A>>,
77 td_percentage: u8,
78 td_allowed_submission_list: Vec<String>,
79 td_blocked_submission_list: Vec<String>,
80 enable_early_validation: bool,
81}
82
83impl TransactionOrchestrator<NetworkAuthorityClient> {
84 pub fn new_with_auth_aggregator(
85 validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
86 validator_state: Arc<AuthorityState>,
87 reconfig_channel: Receiver<SuiSystemState>,
88 parent_path: &Path,
89 prometheus_registry: &Registry,
90 node_config: &NodeConfig,
91 ) -> Self {
92 let observer = OnsiteReconfigObserver::new(
93 reconfig_channel,
94 validator_state.get_object_cache_reader().clone(),
95 validator_state.clone_committee_store(),
96 validators.safe_client_metrics_base.clone(),
97 validators.metrics.deref().clone(),
98 );
99 TransactionOrchestrator::new(
100 validators,
101 validator_state,
102 parent_path,
103 prometheus_registry,
104 observer,
105 node_config,
106 )
107 }
108}
109
110impl<A> TransactionOrchestrator<A>
111where
112 A: AuthorityAPI + Send + Sync + 'static + Clone,
113 OnsiteReconfigObserver: ReconfigObserver<A>,
114{
115 pub fn new(
116 validators: Arc<AuthorityAggregator<A>>,
117 validator_state: Arc<AuthorityState>,
118 parent_path: &Path,
119 prometheus_registry: &Registry,
120 reconfig_observer: OnsiteReconfigObserver,
121 node_config: &NodeConfig,
122 ) -> Self {
123 let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
124
125 let qd_metrics = Arc::new(QuorumDriverMetrics::new(prometheus_registry));
126 let notifier = Arc::new(NotifyRead::new());
127 let reconfig_observer = Arc::new(reconfig_observer);
128 let quorum_driver_handler = Arc::new(
129 QuorumDriverHandlerBuilder::new(validators.clone(), qd_metrics.clone())
130 .with_notifier(notifier.clone())
131 .with_reconfig_observer(reconfig_observer.clone())
132 .start(),
133 );
134
135 let td_metrics = Arc::new(TransactionDriverMetrics::new(prometheus_registry));
136 let client_metrics = Arc::new(
137 crate::validator_client_monitor::ValidatorClientMetrics::new(prometheus_registry),
138 );
139 let transaction_driver = TransactionDriver::new(
140 validators.clone(),
141 reconfig_observer.clone(),
142 td_metrics,
143 Some(node_config),
144 client_metrics,
145 );
146
147 let epoch_store = validator_state.load_epoch_store_one_call_per_task();
148 let td_percentage = if !epoch_store.protocol_config().mysticeti_fastpath() {
149 0
150 } else {
151 choose_transaction_driver_percentage(Some(epoch_store.get_chain_identifier()))
152 };
153
154 let td_allowed_submission_list = node_config
155 .transaction_driver_config
156 .as_ref()
157 .map(|config| config.allowed_submission_validators.clone())
158 .unwrap_or_default();
159
160 let td_blocked_submission_list = node_config
161 .transaction_driver_config
162 .as_ref()
163 .map(|config| config.blocked_submission_validators.clone())
164 .unwrap_or_default();
165
166 if !td_allowed_submission_list.is_empty() && !td_blocked_submission_list.is_empty() {
167 panic!(
168 "Both allowed and blocked submission lists are set, this is not allowed, {:?} {:?}",
169 td_allowed_submission_list, td_blocked_submission_list
170 );
171 }
172
173 let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
174 parent_path.join("fullnode_pending_transactions"),
175 ));
176 Self::start_task_to_recover_txes_in_log(pending_tx_log.clone(), transaction_driver.clone());
177
178 let enable_early_validation = node_config
179 .transaction_driver_config
180 .as_ref()
181 .map(|config| config.enable_early_validation)
182 .unwrap_or(true);
183
184 Self {
185 quorum_driver_handler,
186 validator_state,
187 pending_tx_log,
188 notifier,
189 metrics,
190 transaction_driver,
191 td_percentage,
192 td_allowed_submission_list,
193 td_blocked_submission_list,
194 enable_early_validation,
195 }
196 }
197}
198
199impl<A> TransactionOrchestrator<A>
200where
201 A: AuthorityAPI + Send + Sync + 'static + Clone,
202{
203 #[instrument(name = "tx_orchestrator_execute_transaction", level = "debug", skip_all,
204 fields(
205 tx_digest = ?request.transaction.digest(),
206 tx_type = ?request_type,
207 ))]
208 pub async fn execute_transaction_block(
209 &self,
210 request: ExecuteTransactionRequestV3,
211 request_type: ExecuteTransactionRequestType,
212 client_addr: Option<SocketAddr>,
213 ) -> Result<(ExecuteTransactionResponseV3, IsTransactionExecutedLocally), QuorumDriverError>
214 {
215 let timer = Instant::now();
216 let tx_type = if request.transaction.is_consensus_tx() {
217 TxType::SharedObject
218 } else {
219 TxType::SingleWriter
220 };
221 let tx_digest = *request.transaction.digest();
222
223 let (response, mut executed_locally) = self
224 .execute_transaction_with_effects_waiting(request, client_addr)
225 .await?;
226
227 if !executed_locally {
228 executed_locally = if matches!(
229 request_type,
230 ExecuteTransactionRequestType::WaitForLocalExecution
231 ) {
232 let executed_locally = Self::wait_for_finalized_tx_executed_locally_with_timeout(
233 &self.validator_state,
234 tx_digest,
235 tx_type,
236 &self.metrics,
237 )
238 .await
239 .is_ok();
240 add_server_timing("local_execution done");
241 executed_locally
242 } else {
243 false
244 };
245 }
246
247 let QuorumTransactionResponse {
248 effects,
249 events,
250 input_objects,
251 output_objects,
252 auxiliary_data,
253 } = response;
254
255 let response = ExecuteTransactionResponseV3 {
256 effects,
257 events,
258 input_objects,
259 output_objects,
260 auxiliary_data,
261 };
262
263 self.metrics
264 .request_latency
265 .with_label_values(&[
266 tx_type.as_str(),
267 "execute_transaction_block",
268 executed_locally.to_string().as_str(),
269 ])
270 .observe(timer.elapsed().as_secs_f64());
271
272 Ok((response, executed_locally))
273 }
274
275 #[instrument(name = "tx_orchestrator_execute_transaction_v3", level = "debug", skip_all,
277 fields(tx_digest = ?request.transaction.digest()))]
278 pub async fn execute_transaction_v3(
279 &self,
280 request: ExecuteTransactionRequestV3,
281 client_addr: Option<SocketAddr>,
282 ) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
283 let timer = Instant::now();
284 let tx_type = if request.transaction.is_consensus_tx() {
285 TxType::SharedObject
286 } else {
287 TxType::SingleWriter
288 };
289
290 let (response, _) = self
291 .execute_transaction_with_effects_waiting(request, client_addr)
292 .await?;
293
294 self.metrics
295 .request_latency
296 .with_label_values(&[tx_type.as_str(), "execute_transaction_v3", "false"])
297 .observe(timer.elapsed().as_secs_f64());
298
299 let QuorumTransactionResponse {
300 effects,
301 events,
302 input_objects,
303 output_objects,
304 auxiliary_data,
305 } = response;
306
307 Ok(ExecuteTransactionResponseV3 {
308 effects,
309 events,
310 input_objects,
311 output_objects,
312 auxiliary_data,
313 })
314 }
315
316 async fn execute_transaction_with_effects_waiting(
318 &self,
319 request: ExecuteTransactionRequestV3,
320 client_addr: Option<SocketAddr>,
321 ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), QuorumDriverError> {
322 let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
323 let verified_transaction = epoch_store
324 .verify_transaction(request.transaction.clone())
325 .map_err(QuorumDriverError::InvalidUserSignature)?;
326 let tx_digest = *verified_transaction.digest();
327
328 if self.enable_early_validation
331 && let Err(e) = self
332 .validator_state
333 .check_transaction_validity(&epoch_store, &verified_transaction)
334 {
335 let error_category = e.categorize();
336 if !error_category.is_submission_retriable() {
337 if !self.validator_state.is_tx_already_executed(&tx_digest) {
339 self.metrics
340 .early_validation_rejections
341 .with_label_values(&[e.to_variant_name()])
342 .inc();
343 debug!(
344 error = ?e,
345 "Transaction rejected during early validation"
346 );
347
348 return Err(QuorumDriverError::TransactionFailed {
349 category: error_category,
350 details: e.to_string(),
351 });
352 }
353 }
354 }
355
356 let guard =
358 TransactionSubmissionGuard::new(self.pending_tx_log.clone(), &verified_transaction);
359 let is_new_transaction = guard.is_new_transaction();
360
361 let include_events = request.include_events;
362 let include_input_objects = request.include_input_objects;
363 let include_output_objects = request.include_output_objects;
364 let include_auxiliary_data = request.include_auxiliary_data;
365
366 let using_td = Arc::new(AtomicBool::new(false));
368
369 let finality_timeout = std::env::var("WAIT_FOR_FINALITY_TIMEOUT_SECS")
370 .ok()
371 .and_then(|v| v.parse().ok())
372 .map(Duration::from_secs)
373 .unwrap_or(WAIT_FOR_FINALITY_TIMEOUT);
374
375 let num_submissions = if !is_new_transaction {
376 0
378 } else if cfg!(msim) || in_antithesis() {
379 let r = rand::thread_rng().gen_range(1..=100);
381 let n = if r <= 10 {
382 3
383 } else if r <= 30 {
384 2
385 } else {
386 1
387 };
388 if n > 1 {
389 debug!("Making {n} execution calls");
390 }
391 n
392 } else {
393 1
394 };
395
396 let mut execution_futures = FuturesUnordered::new();
398 for i in 0..num_submissions {
399 let should_delay = i > 0 && rand::thread_rng().gen_bool(0.8);
401 let delay_ms = if should_delay {
402 rand::thread_rng().gen_range(100..=500)
403 } else {
404 0
405 };
406
407 let epoch_store = epoch_store.clone();
408 let request = request.clone();
409 let verified_transaction = verified_transaction.clone();
410 let using_td = using_td.clone();
411
412 let future = async move {
413 if delay_ms > 0 {
414 sleep(Duration::from_millis(delay_ms)).await;
416 }
417 self.execute_transaction_impl(
418 &epoch_store,
419 request,
420 verified_transaction,
421 client_addr,
422 Some(finality_timeout),
423 using_td,
424 )
425 .await
426 }
427 .boxed();
428 execution_futures.push(future);
429 }
430
431 let mut last_execution_error: Option<QuorumDriverError> = None;
433
434 let digests = [tx_digest];
436 let mut local_effects_future = epoch_store
437 .within_alive_epoch(
438 self.validator_state
439 .get_transaction_cache_reader()
440 .notify_read_executed_effects(
441 "TransactionOrchestrator::notify_read_execute_transaction_with_effects_waiting",
442 &digests,
443 ),
444 )
445 .boxed();
446
447 let mut timeout_future = tokio::time::sleep(finality_timeout).boxed();
449
450 loop {
451 tokio::select! {
452 biased;
453
454 local_effects_result = &mut local_effects_future => {
456 match local_effects_result {
457 Ok(effects) => {
458 debug!(
459 "Effects became available while execution was running"
460 );
461 if let Some(effects) = effects.into_iter().next() {
462 self.metrics.concurrent_execution.inc();
463 let epoch = effects.executed_epoch();
464 let events = if include_events {
465 if effects.events_digest().is_some() {
466 Some(self.validator_state.get_transaction_events(effects.transaction_digest())
467 .map_err(QuorumDriverError::QuorumDriverInternalError)?)
468 } else {
469 None
470 }
471 } else {
472 None
473 };
474 let input_objects = include_input_objects
475 .then(|| self.validator_state.get_transaction_input_objects(&effects))
476 .transpose()
477 .map_err(QuorumDriverError::QuorumDriverInternalError)?;
478 let output_objects = include_output_objects
479 .then(|| self.validator_state.get_transaction_output_objects(&effects))
480 .transpose()
481 .map_err(QuorumDriverError::QuorumDriverInternalError)?;
482 let response = QuorumTransactionResponse {
483 effects: FinalizedEffects {
484 effects,
485 finality_info: EffectsFinalityInfo::QuorumExecuted(epoch),
486 },
487 events,
488 input_objects,
489 output_objects,
490 auxiliary_data: None,
491 };
492 break Ok((response, true));
493 }
494 }
495 Err(_) => {
496 warn!("Epoch terminated before effects were available");
497 }
498 };
499
500 local_effects_future = futures::future::pending().boxed();
502 }
503
504 Some(result) = execution_futures.next() => {
506 match result {
507 Ok(resp) => {
508 debug!("Execution succeeded, returning response");
510 let QuorumTransactionResponse {
511 effects,
512 events,
513 input_objects,
514 output_objects,
515 auxiliary_data,
516 } = resp;
517 let resp = QuorumTransactionResponse {
519 effects,
520 events: if include_events { events } else { None },
521 input_objects: if include_input_objects { input_objects } else { None },
522 output_objects: if include_output_objects { output_objects } else { None },
523 auxiliary_data: if include_auxiliary_data { auxiliary_data } else { None },
524 };
525 break Ok((resp, false));
526 }
527 Err(QuorumDriverError::PendingExecutionInTransactionOrchestrator) => {
528 debug!(
529 "Transaction is already being processed"
530 );
531 if last_execution_error.is_none() {
533 last_execution_error = Some(QuorumDriverError::PendingExecutionInTransactionOrchestrator);
534 }
535 }
536 Err(e) => {
537 debug!(?e, "Execution attempt failed, wait for other attempts");
538 last_execution_error = Some(e);
539 }
540 };
541
542 if execution_futures.is_empty() {
544 break Err(last_execution_error.unwrap());
545 }
546 }
547
548 _ = &mut timeout_future => {
550 debug!("Timeout waiting for transaction finality.");
551 self.metrics.wait_for_finality_timeout.inc();
552
553 break Err(QuorumDriverError::TimeoutBeforeFinality);
554 }
555 }
556 }
557 }
558
559 #[instrument(level = "error", skip_all)]
560 async fn execute_transaction_impl(
561 &self,
562 epoch_store: &Arc<AuthorityPerEpochStore>,
563 request: ExecuteTransactionRequestV3,
564 verified_transaction: VerifiedTransaction,
565 client_addr: Option<SocketAddr>,
566 finality_timeout: Option<Duration>,
567 using_td: Arc<AtomicBool>,
568 ) -> Result<QuorumTransactionResponse, QuorumDriverError> {
569 let tx_digest = *verified_transaction.digest();
570 debug!("TO Received transaction execution request.");
571
572 let timer = Instant::now();
573 let tx_type = if verified_transaction.is_consensus_tx() {
574 TxType::SharedObject
575 } else {
576 TxType::SingleWriter
577 };
578
579 let (_in_flight_metrics_guards, good_response_metrics) =
580 self.update_metrics(&request.transaction);
581
582 let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
584 wait_for_finality_gauge.inc();
585 let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
586 in_flight.dec();
587 });
588
589 let (response, driver_type) = if self.should_use_transaction_driver(epoch_store, tx_digest)
591 {
592 using_td.store(true, Ordering::Release);
594
595 (
596 self.submit_with_transaction_driver(
597 &self.transaction_driver,
598 &request,
599 client_addr,
600 &verified_transaction,
601 good_response_metrics,
602 finality_timeout,
603 )
604 .await?,
605 "transaction_driver",
606 )
607 } else {
608 using_td.store(false, Ordering::Release);
610
611 let resp = self
612 .submit_with_quorum_driver(
613 epoch_store.clone(),
614 verified_transaction.clone(),
615 request,
616 client_addr,
617 )
618 .await
619 .map_err(|e| {
620 warn!("QuorumDriverInternalError: {e:?}");
621 QuorumDriverError::QuorumDriverInternalError(e)
622 })?
623 .await
624 .map_err(|e| {
625 warn!("QuorumDriverInternalError: {e:?}");
626 QuorumDriverError::QuorumDriverInternalError(e)
627 })??;
628
629 (
630 QuorumTransactionResponse {
631 effects: FinalizedEffects::new_from_effects_cert(resp.effects_cert.into()),
632 events: resp.events,
633 input_objects: resp.input_objects,
634 output_objects: resp.output_objects,
635 auxiliary_data: resp.auxiliary_data,
636 },
637 "quorum_driver",
638 )
639 };
640
641 add_server_timing("wait_for_finality done");
642
643 self.metrics.wait_for_finality_finished.inc();
644
645 let elapsed = timer.elapsed().as_secs_f64();
646 self.metrics
647 .settlement_finality_latency
648 .with_label_values(&[tx_type.as_str(), driver_type])
649 .observe(elapsed);
650 good_response_metrics.inc();
651
652 Ok(response)
653 }
654
655 #[instrument(level = "error", skip_all, err(level = "info"))]
656 async fn submit_with_transaction_driver(
657 &self,
658 td: &Arc<TransactionDriver<A>>,
659 request: &ExecuteTransactionRequestV3,
660 client_addr: Option<SocketAddr>,
661 verified_transaction: &VerifiedTransaction,
662 good_response_metrics: &GenericCounter<AtomicU64>,
663 timeout_duration: Option<Duration>,
664 ) -> Result<QuorumTransactionResponse, QuorumDriverError> {
665 let tx_digest = *verified_transaction.digest();
666 debug!("Using TransactionDriver for transaction {:?}", tx_digest);
667
668 let td_response = td
669 .drive_transaction(
670 SubmitTxRequest::new_transaction(request.transaction.clone()),
671 SubmitTransactionOptions {
672 forwarded_client_addr: client_addr,
673 allowed_validators: self.td_allowed_submission_list.clone(),
674 blocked_validators: self.td_blocked_submission_list.clone(),
675 },
676 timeout_duration,
677 )
678 .await
679 .map_err(|e| match e {
680 TransactionDriverError::TimeoutWithLastRetriableError {
681 last_error,
682 attempts,
683 timeout,
684 } => QuorumDriverError::TimeoutBeforeFinalityWithErrors {
685 last_error: last_error.map(|e| e.to_string()).unwrap_or_default(),
686 attempts,
687 timeout,
688 },
689 other => QuorumDriverError::TransactionFailed {
690 category: other.categorize(),
691 details: other.to_string(),
692 },
693 });
694
695 match td_response {
696 Err(e) => {
697 warn!("TransactionDriver error: {e:?}");
698 Err(e)
699 }
700 Ok(quorum_transaction_response) => {
701 good_response_metrics.inc();
702 Ok(quorum_transaction_response)
703 }
704 }
705 }
706
707 #[instrument(name = "tx_orchestrator_submit", level = "trace", skip_all)]
710 async fn submit_with_quorum_driver(
711 &self,
712 epoch_store: Arc<AuthorityPerEpochStore>,
713 transaction: VerifiedTransaction,
714 request: ExecuteTransactionRequestV3,
715 client_addr: Option<SocketAddr>,
716 ) -> SuiResult<impl Future<Output = SuiResult<QuorumDriverResult>> + '_> {
717 let tx_digest = *transaction.digest();
718
719 let ticket = self.notifier.register_one(&tx_digest);
720 self.quorum_driver()
721 .submit_transaction_no_ticket(request.clone(), client_addr)
722 .await?;
723
724 let cache_reader = self.validator_state.get_transaction_cache_reader().clone();
729 let qd = self.clone_quorum_driver();
730 Ok(async move {
731 let digests = [tx_digest];
732 let effects_await =
733 epoch_store.within_alive_epoch(cache_reader.notify_read_executed_effects(
734 "TransactionOrchestrator::notify_read_submit_with_qd",
735 &digests,
736 ));
737 #[allow(clippy::let_and_return)]
739 let res = match select(ticket, effects_await.boxed()).await {
740 Either::Left((quorum_driver_response, _)) => Ok(quorum_driver_response),
741 Either::Right((_, unfinished_quorum_driver_task)) => {
742 debug!("Effects are available in DB, use quorum driver to get a certificate");
743 qd.submit_transaction_no_ticket(request, client_addr)
744 .await?;
745 Ok(unfinished_quorum_driver_task.await)
746 }
747 };
748 res
749 })
750 }
751
752 #[instrument(
753 name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout",
754 level = "debug",
755 skip_all,
756 err(level = "info")
757 )]
758 async fn wait_for_finalized_tx_executed_locally_with_timeout(
759 validator_state: &Arc<AuthorityState>,
760 tx_digest: TransactionDigest,
761 tx_type: TxType,
762 metrics: &TransactionOrchestratorMetrics,
763 ) -> SuiResult {
764 metrics.local_execution_in_flight.inc();
765 let _metrics_guard =
766 scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
767 in_flight.dec();
768 });
769
770 let _latency_guard = metrics
771 .local_execution_latency
772 .with_label_values(&[tx_type.as_str()])
773 .start_timer();
774 debug!("Waiting for finalized tx to be executed locally.");
775 match timeout(
776 LOCAL_EXECUTION_TIMEOUT,
777 validator_state
778 .get_transaction_cache_reader()
779 .notify_read_executed_effects_digests(
780 "TransactionOrchestrator::notify_read_wait_for_local_execution",
781 &[tx_digest],
782 ),
783 )
784 .instrument(error_span!(
785 "transaction_orchestrator::local_execution",
786 ?tx_digest
787 ))
788 .await
789 {
790 Err(_elapsed) => {
791 debug!(
792 "Waiting for finalized tx to be executed locally timed out within {:?}.",
793 LOCAL_EXECUTION_TIMEOUT
794 );
795 metrics.local_execution_timeout.inc();
796 Err(SuiErrorKind::TimeoutError.into())
797 }
798 Ok(_) => {
799 metrics.local_execution_success.inc();
800 Ok(())
801 }
802 }
803 }
804
805 fn should_use_transaction_driver(
806 &self,
807 epoch_store: &Arc<AuthorityPerEpochStore>,
808 tx_digest: TransactionDigest,
809 ) -> bool {
810 const MAX_PERCENTAGE: u8 = 100;
811 let unknown_network = epoch_store.get_chain() == Chain::Unknown;
812 let v = if unknown_network {
813 rand::thread_rng().gen_range(1..=MAX_PERCENTAGE)
814 } else {
815 let v = u32::from_le_bytes(tx_digest.inner()[..4].try_into().unwrap());
816 (v % (MAX_PERCENTAGE as u32) + 1) as u8
817 };
818 debug!(
819 "Choosing whether to use transaction driver: {} vs {}",
820 v, self.td_percentage
821 );
822 v <= self.td_percentage
823 }
824
825 pub fn authority_state(&self) -> &Arc<AuthorityState> {
826 &self.validator_state
827 }
828
829 pub fn transaction_driver(&self) -> &Arc<TransactionDriver<A>> {
830 &self.transaction_driver
831 }
832
833 fn quorum_driver(&self) -> &Arc<QuorumDriverHandler<A>> {
834 &self.quorum_driver_handler
835 }
836
837 fn clone_quorum_driver(&self) -> Arc<QuorumDriverHandler<A>> {
838 self.quorum_driver_handler.clone()
839 }
840
841 pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
842 self.transaction_driver.authority_aggregator().load_full()
843 }
844
845 fn update_metrics<'a>(
846 &'a self,
847 transaction: &Transaction,
848 ) -> (impl Drop + use<A>, &'a GenericCounter<AtomicU64>) {
849 let (in_flight, good_response) = if transaction.is_consensus_tx() {
850 self.metrics.total_req_received_shared_object.inc();
851 (
852 self.metrics.req_in_flight_shared_object.clone(),
853 &self.metrics.good_response_shared_object,
854 )
855 } else {
856 self.metrics.total_req_received_single_writer.inc();
857 (
858 self.metrics.req_in_flight_single_writer.clone(),
859 &self.metrics.good_response_single_writer,
860 )
861 };
862 in_flight.inc();
863 (
864 scopeguard::guard(in_flight, |in_flight| {
865 in_flight.dec();
866 }),
867 good_response,
868 )
869 }
870
871 fn start_task_to_recover_txes_in_log(
872 pending_tx_log: Arc<WritePathPendingTransactionLog>,
873 transaction_driver: Arc<TransactionDriver<A>>,
874 ) {
875 spawn_logged_monitored_task!(async move {
876 if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
877 info!("Skipping loading pending transactions from pending_tx_log.");
878 return;
879 }
880 let pending_txes = pending_tx_log
881 .load_all_pending_transactions()
882 .expect("failed to load all pending transactions");
883 let num_pending_txes = pending_txes.len();
884 info!(
885 "Recovering {} pending transactions from pending_tx_log.",
886 num_pending_txes
887 );
888 let mut recovery = pending_txes
889 .into_iter()
890 .map(|tx| {
891 let pending_tx_log = pending_tx_log.clone();
892 let transaction_driver = transaction_driver.clone();
893 async move {
894 let tx = tx.into_inner();
897 let tx_digest = *tx.digest();
898 if let Err(err) = transaction_driver
901 .drive_transaction(
902 SubmitTxRequest::new_transaction(tx),
903 SubmitTransactionOptions::default(),
904 Some(Duration::from_secs(60)),
905 )
906 .await
907 {
908 warn!(?tx_digest, "Failed to execute recovered transaction: {err}");
909 } else {
910 debug!(?tx_digest, "Executed recovered transaction");
911 }
912 if let Err(err) = pending_tx_log.finish_transaction(&tx_digest) {
913 warn!(
914 ?tx_digest,
915 "Failed to clean up transaction in pending log: {err}"
916 );
917 } else {
918 debug!(?tx_digest, "Cleaned up transaction in pending log");
919 }
920 }
921 })
922 .collect::<FuturesUnordered<_>>();
923
924 let mut num_recovered = 0;
925 while recovery.next().await.is_some() {
926 num_recovered += 1;
927 if num_recovered % 1000 == 0 {
928 info!(
929 "Recovered {} out of {} transactions from pending_tx_log.",
930 num_recovered, num_pending_txes
931 );
932 }
933 }
934 info!(
935 "Recovery finished. Recovered {} out of {} transactions from pending_tx_log.",
936 num_recovered, num_pending_txes
937 );
938 });
939 }
940
941 pub fn load_all_pending_transactions_in_test(&self) -> SuiResult<Vec<VerifiedTransaction>> {
942 self.pending_tx_log.load_all_pending_transactions()
943 }
944
945 pub fn empty_pending_tx_log_in_test(&self) -> bool {
946 self.pending_tx_log.is_empty()
947 }
948}
949#[derive(Clone)]
951pub struct TransactionOrchestratorMetrics {
952 total_req_received_single_writer: GenericCounter<AtomicU64>,
953 total_req_received_shared_object: GenericCounter<AtomicU64>,
954
955 good_response_single_writer: GenericCounter<AtomicU64>,
956 good_response_shared_object: GenericCounter<AtomicU64>,
957
958 req_in_flight_single_writer: GenericGauge<AtomicI64>,
959 req_in_flight_shared_object: GenericGauge<AtomicI64>,
960
961 wait_for_finality_in_flight: GenericGauge<AtomicI64>,
962 wait_for_finality_finished: GenericCounter<AtomicU64>,
963 wait_for_finality_timeout: GenericCounter<AtomicU64>,
964
965 local_execution_in_flight: GenericGauge<AtomicI64>,
966 local_execution_success: GenericCounter<AtomicU64>,
967 local_execution_timeout: GenericCounter<AtomicU64>,
968
969 concurrent_execution: IntCounter,
970
971 early_validation_rejections: IntCounterVec,
972
973 request_latency: HistogramVec,
974 local_execution_latency: HistogramVec,
975 settlement_finality_latency: HistogramVec,
976}
977
978impl TransactionOrchestratorMetrics {
982 pub fn new(registry: &Registry) -> Self {
983 let total_req_received = register_int_counter_vec_with_registry!(
984 "tx_orchestrator_total_req_received",
985 "Total number of executions request Transaction Orchestrator receives, group by tx type",
986 &["tx_type"],
987 registry
988 )
989 .unwrap();
990
991 let total_req_received_single_writer =
992 total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
993 let total_req_received_shared_object =
994 total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
995
996 let good_response = register_int_counter_vec_with_registry!(
997 "tx_orchestrator_good_response",
998 "Total number of good responses Transaction Orchestrator generates, group by tx type",
999 &["tx_type"],
1000 registry
1001 )
1002 .unwrap();
1003
1004 let good_response_single_writer =
1005 good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
1006 let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
1007
1008 let req_in_flight = register_int_gauge_vec_with_registry!(
1009 "tx_orchestrator_req_in_flight",
1010 "Number of requests in flights Transaction Orchestrator processes, group by tx type",
1011 &["tx_type"],
1012 registry
1013 )
1014 .unwrap();
1015
1016 let req_in_flight_single_writer =
1017 req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
1018 let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
1019
1020 Self {
1021 total_req_received_single_writer,
1022 total_req_received_shared_object,
1023 good_response_single_writer,
1024 good_response_shared_object,
1025 req_in_flight_single_writer,
1026 req_in_flight_shared_object,
1027 wait_for_finality_in_flight: register_int_gauge_with_registry!(
1028 "tx_orchestrator_wait_for_finality_in_flight",
1029 "Number of in flight txns Transaction Orchestrator are waiting for finality for",
1030 registry,
1031 )
1032 .unwrap(),
1033 wait_for_finality_finished: register_int_counter_with_registry!(
1034 "tx_orchestrator_wait_for_finality_fnished",
1035 "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
1036 registry,
1037 )
1038 .unwrap(),
1039 wait_for_finality_timeout: register_int_counter_with_registry!(
1040 "tx_orchestrator_wait_for_finality_timeout",
1041 "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
1042 registry,
1043 )
1044 .unwrap(),
1045 local_execution_in_flight: register_int_gauge_with_registry!(
1046 "tx_orchestrator_local_execution_in_flight",
1047 "Number of local execution txns in flights Transaction Orchestrator handles",
1048 registry,
1049 )
1050 .unwrap(),
1051 local_execution_success: register_int_counter_with_registry!(
1052 "tx_orchestrator_local_execution_success",
1053 "Total number of successful local execution txns Transaction Orchestrator handles",
1054 registry,
1055 )
1056 .unwrap(),
1057 local_execution_timeout: register_int_counter_with_registry!(
1058 "tx_orchestrator_local_execution_timeout",
1059 "Total number of timed-out local execution txns Transaction Orchestrator handles",
1060 registry,
1061 )
1062 .unwrap(),
1063 concurrent_execution: register_int_counter_with_registry!(
1064 "tx_orchestrator_concurrent_execution",
1065 "Total number of concurrent execution where effects are available locally finishing driving the transaction to finality",
1066 registry,
1067 )
1068 .unwrap(),
1069 early_validation_rejections: register_int_counter_vec_with_registry!(
1070 "tx_orchestrator_early_validation_rejections",
1071 "Total number of transactions rejected during early validation before submission, by reason",
1072 &["reason"],
1073 registry,
1074 )
1075 .unwrap(),
1076 request_latency: register_histogram_vec_with_registry!(
1077 "tx_orchestrator_request_latency",
1078 "Time spent in processing one Transaction Orchestrator request",
1079 &["tx_type", "route", "wait_for_local_execution"],
1080 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1081 registry,
1082 )
1083 .unwrap(),
1084 local_execution_latency: register_histogram_vec_with_registry!(
1085 "tx_orchestrator_local_execution_latency",
1086 "Time spent in waiting for one Transaction Orchestrator gets locally executed",
1087 &["tx_type"],
1088 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1089 registry,
1090 )
1091 .unwrap(),
1092 settlement_finality_latency: register_histogram_vec_with_registry!(
1093 "tx_orchestrator_settlement_finality_latency",
1094 "Time spent in waiting for one Transaction Orchestrator gets settled and finalized",
1095 &["tx_type", "driver_type"],
1096 mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
1097 registry,
1098 )
1099 .unwrap(),
1100 }
1101 }
1102
1103 pub fn new_for_tests() -> Self {
1104 let registry = Registry::new();
1105 Self::new(®istry)
1106 }
1107}
1108
1109#[async_trait::async_trait]
1110impl<A> sui_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
1111where
1112 A: AuthorityAPI + Send + Sync + 'static + Clone,
1113{
1114 async fn execute_transaction(
1115 &self,
1116 request: ExecuteTransactionRequestV3,
1117 client_addr: Option<std::net::SocketAddr>,
1118 ) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
1119 self.execute_transaction_v3(request, client_addr).await
1120 }
1121
1122 fn simulate_transaction(
1123 &self,
1124 transaction: TransactionData,
1125 checks: TransactionChecks,
1126 ) -> Result<SimulateTransactionResult, SuiError> {
1127 self.validator_state
1128 .simulate_transaction(transaction, checks)
1129 }
1130}
1131
1132struct TransactionSubmissionGuard {
1135 pending_tx_log: Arc<WritePathPendingTransactionLog>,
1136 tx_digest: TransactionDigest,
1137 is_new_transaction: bool,
1138}
1139
1140impl TransactionSubmissionGuard {
1141 pub fn new(
1142 pending_tx_log: Arc<WritePathPendingTransactionLog>,
1143 tx: &VerifiedTransaction,
1144 ) -> Self {
1145 let is_new_transaction = pending_tx_log.write_pending_transaction_maybe(tx);
1146 let tx_digest = *tx.digest();
1147 if is_new_transaction {
1148 debug!(?tx_digest, "Added transaction to inflight set");
1149 } else {
1150 debug!(
1151 ?tx_digest,
1152 "Transaction already being processed, no new submission will be made"
1153 );
1154 };
1155 Self {
1156 pending_tx_log,
1157 tx_digest,
1158 is_new_transaction,
1159 }
1160 }
1161
1162 fn is_new_transaction(&self) -> bool {
1163 self.is_new_transaction
1164 }
1165}
1166
1167impl Drop for TransactionSubmissionGuard {
1168 fn drop(&mut self) {
1169 if let Err(err) = self.pending_tx_log.finish_transaction(&self.tx_digest) {
1170 warn!(?self.tx_digest, "Failed to clean up transaction in pending log: {err}");
1171 } else {
1172 debug!(?self.tx_digest, "Cleaned up transaction in pending log");
1173 }
1174 }
1175}