1use std::net::SocketAddr;
11use std::ops::Deref;
12use std::path::Path;
13use std::sync::Arc;
14use std::sync::atomic::{AtomicBool, Ordering};
15use std::time::Duration;
16
17use futures::FutureExt;
18use futures::future::{Either, Future, select};
19use futures::stream::{FuturesUnordered, StreamExt};
20use mysten_common::in_antithesis;
21use mysten_common::sync::notify_read::NotifyRead;
22use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX};
23use mysten_metrics::{add_server_timing, spawn_logged_monitored_task, spawn_monitored_task};
24use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
25use prometheus::{
26 HistogramVec, IntCounter, IntCounterVec, Registry, register_histogram_vec_with_registry,
27 register_int_counter_vec_with_registry, register_int_counter_with_registry,
28 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
29};
30use rand::Rng;
31use sui_config::NodeConfig;
32use sui_protocol_config::Chain;
33use sui_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
34use sui_types::base_types::TransactionDigest;
35use sui_types::effects::TransactionEffectsAPI;
36use sui_types::error::{SuiError, SuiErrorKind, SuiResult};
37use sui_types::messages_grpc::{SubmitTxRequest, TxType};
38use sui_types::quorum_driver_types::{
39 EffectsFinalityInfo, ExecuteTransactionRequestType, ExecuteTransactionRequestV3,
40 ExecuteTransactionResponseV3, FinalizedEffects, IsTransactionExecutedLocally,
41 QuorumDriverEffectsQueueResult, QuorumDriverError, QuorumDriverResult,
42};
43use sui_types::sui_system_state::SuiSystemState;
44use sui_types::transaction::{Transaction, TransactionData, VerifiedTransaction};
45use sui_types::transaction_executor::{SimulateTransactionResult, TransactionChecks};
46use tokio::sync::broadcast::Receiver;
47use tokio::sync::broadcast::error::RecvError;
48use tokio::task::JoinHandle;
49use tokio::time::{Instant, sleep, timeout};
50use tracing::{Instrument, debug, error, error_span, info, instrument, warn};
51
52use crate::authority::AuthorityState;
53use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
54use crate::authority_aggregator::AuthorityAggregator;
55use crate::authority_client::{AuthorityAPI, NetworkAuthorityClient};
56use crate::quorum_driver::reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver};
57use crate::quorum_driver::{QuorumDriverHandler, QuorumDriverHandlerBuilder, QuorumDriverMetrics};
58use crate::transaction_driver::{
59 QuorumTransactionResponse, SubmitTransactionOptions, TransactionDriver, TransactionDriverError,
60 TransactionDriverMetrics, choose_transaction_driver_percentage,
61};
62
63const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
66
67const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(90);
69
70pub type QuorumTransactionEffectsResult =
71 Result<(Transaction, QuorumTransactionResponse), (TransactionDigest, QuorumDriverError)>;
72pub struct TransactionOrchestrator<A: Clone> {
73 quorum_driver_handler: Arc<QuorumDriverHandler<A>>,
74 validator_state: Arc<AuthorityState>,
75 _local_executor_handle: JoinHandle<()>,
76 pending_tx_log: Arc<WritePathPendingTransactionLog>,
77 notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
78 metrics: Arc<TransactionOrchestratorMetrics>,
79 transaction_driver: Option<Arc<TransactionDriver<A>>>,
80 td_percentage: u8,
81 td_allowed_submission_list: Vec<String>,
82 td_blocked_submission_list: Vec<String>,
83 enable_early_validation: bool,
84}
85
86impl TransactionOrchestrator<NetworkAuthorityClient> {
87 pub fn new_with_auth_aggregator(
88 validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
89 validator_state: Arc<AuthorityState>,
90 reconfig_channel: Receiver<SuiSystemState>,
91 parent_path: &Path,
92 prometheus_registry: &Registry,
93 node_config: &NodeConfig,
94 ) -> Self {
95 let observer = OnsiteReconfigObserver::new(
96 reconfig_channel,
97 validator_state.get_object_cache_reader().clone(),
98 validator_state.clone_committee_store(),
99 validators.safe_client_metrics_base.clone(),
100 validators.metrics.deref().clone(),
101 );
102 TransactionOrchestrator::new(
103 validators,
104 validator_state,
105 parent_path,
106 prometheus_registry,
107 observer,
108 node_config,
109 )
110 }
111}
112
113impl<A> TransactionOrchestrator<A>
114where
115 A: AuthorityAPI + Send + Sync + 'static + Clone,
116 OnsiteReconfigObserver: ReconfigObserver<A>,
117{
118 pub fn new(
119 validators: Arc<AuthorityAggregator<A>>,
120 validator_state: Arc<AuthorityState>,
121 parent_path: &Path,
122 prometheus_registry: &Registry,
123 reconfig_observer: OnsiteReconfigObserver,
124 node_config: &NodeConfig,
125 ) -> Self {
126 let metrics = Arc::new(QuorumDriverMetrics::new(prometheus_registry));
127 let notifier = Arc::new(NotifyRead::new());
128 let reconfig_observer = Arc::new(reconfig_observer);
129 let quorum_driver_handler = Arc::new(
130 QuorumDriverHandlerBuilder::new(validators.clone(), metrics.clone())
131 .with_notifier(notifier.clone())
132 .with_reconfig_observer(reconfig_observer.clone())
133 .start(),
134 );
135
136 let effects_receiver = quorum_driver_handler.subscribe_to_effects();
137 let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
138 let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
139 parent_path.join("fullnode_pending_transactions"),
140 ));
141 let pending_tx_log_clone = pending_tx_log.clone();
142 let _local_executor_handle = {
143 spawn_monitored_task!(async move {
144 Self::loop_pending_transaction_log(effects_receiver, pending_tx_log_clone).await;
145 })
146 };
147 Self::schedule_txes_in_log(pending_tx_log.clone(), quorum_driver_handler.clone());
148
149 let epoch_store = validator_state.load_epoch_store_one_call_per_task();
150 let td_percentage = if !epoch_store.protocol_config().mysticeti_fastpath() {
151 0
152 } else {
153 choose_transaction_driver_percentage(Some(epoch_store.get_chain_identifier()))
154 };
155
156 let transaction_driver = if td_percentage > 0 {
157 let td_metrics = Arc::new(TransactionDriverMetrics::new(prometheus_registry));
158 let client_metrics = Arc::new(
159 crate::validator_client_monitor::ValidatorClientMetrics::new(prometheus_registry),
160 );
161 Some(TransactionDriver::new(
162 validators.clone(),
163 reconfig_observer.clone(),
164 td_metrics,
165 Some(node_config),
166 client_metrics,
167 ))
168 } else {
169 None
170 };
171
172 let td_allowed_submission_list = node_config
173 .transaction_driver_config
174 .as_ref()
175 .map(|config| config.allowed_submission_validators.clone())
176 .unwrap_or_default();
177
178 let td_blocked_submission_list = node_config
179 .transaction_driver_config
180 .as_ref()
181 .map(|config| config.blocked_submission_validators.clone())
182 .unwrap_or_default();
183
184 if !td_allowed_submission_list.is_empty() && !td_blocked_submission_list.is_empty() {
185 panic!(
186 "Both allowed and blocked submission lists are set, this is not allowed, {:?} {:?}",
187 td_allowed_submission_list, td_blocked_submission_list
188 );
189 }
190
191 let enable_early_validation = node_config
192 .transaction_driver_config
193 .as_ref()
194 .map(|config| config.enable_early_validation)
195 .unwrap_or(true);
196
197 Self {
198 quorum_driver_handler,
199 validator_state,
200 _local_executor_handle,
201 pending_tx_log,
202 notifier,
203 metrics,
204 transaction_driver,
205 td_percentage,
206 td_allowed_submission_list,
207 td_blocked_submission_list,
208 enable_early_validation,
209 }
210 }
211}
212
213impl<A> TransactionOrchestrator<A>
214where
215 A: AuthorityAPI + Send + Sync + 'static + Clone,
216{
217 #[instrument(name = "tx_orchestrator_execute_transaction", level = "debug", skip_all,
218 fields(
219 tx_digest = ?request.transaction.digest(),
220 tx_type = ?request_type,
221 ))]
222 pub async fn execute_transaction_block(
223 &self,
224 request: ExecuteTransactionRequestV3,
225 request_type: ExecuteTransactionRequestType,
226 client_addr: Option<SocketAddr>,
227 ) -> Result<(ExecuteTransactionResponseV3, IsTransactionExecutedLocally), QuorumDriverError>
228 {
229 let timer = Instant::now();
230 let tx_type = if request.transaction.is_consensus_tx() {
231 TxType::SharedObject
232 } else {
233 TxType::SingleWriter
234 };
235 let tx_digest = *request.transaction.digest();
236
237 let (response, mut executed_locally) = self
238 .execute_transaction_with_effects_waiting(request, client_addr)
239 .await?;
240
241 if !executed_locally {
242 executed_locally = if matches!(
243 request_type,
244 ExecuteTransactionRequestType::WaitForLocalExecution
245 ) {
246 let executed_locally = Self::wait_for_finalized_tx_executed_locally_with_timeout(
247 &self.validator_state,
248 tx_digest,
249 tx_type,
250 &self.metrics,
251 )
252 .await
253 .is_ok();
254 add_server_timing("local_execution done");
255 executed_locally
256 } else {
257 false
258 };
259 }
260
261 let QuorumTransactionResponse {
262 effects,
263 events,
264 input_objects,
265 output_objects,
266 auxiliary_data,
267 } = response;
268
269 let response = ExecuteTransactionResponseV3 {
270 effects,
271 events,
272 input_objects,
273 output_objects,
274 auxiliary_data,
275 };
276
277 self.metrics
278 .request_latency
279 .with_label_values(&[
280 tx_type.as_str(),
281 "execute_transaction_block",
282 executed_locally.to_string().as_str(),
283 ])
284 .observe(timer.elapsed().as_secs_f64());
285
286 Ok((response, executed_locally))
287 }
288
289 #[instrument(name = "tx_orchestrator_execute_transaction_v3", level = "debug", skip_all,
291 fields(tx_digest = ?request.transaction.digest()))]
292 pub async fn execute_transaction_v3(
293 &self,
294 request: ExecuteTransactionRequestV3,
295 client_addr: Option<SocketAddr>,
296 ) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
297 let timer = Instant::now();
298 let tx_type = if request.transaction.is_consensus_tx() {
299 TxType::SharedObject
300 } else {
301 TxType::SingleWriter
302 };
303
304 let (response, _) = self
305 .execute_transaction_with_effects_waiting(request, client_addr)
306 .await?;
307
308 self.metrics
309 .request_latency
310 .with_label_values(&[tx_type.as_str(), "execute_transaction_v3", "false"])
311 .observe(timer.elapsed().as_secs_f64());
312
313 let QuorumTransactionResponse {
314 effects,
315 events,
316 input_objects,
317 output_objects,
318 auxiliary_data,
319 } = response;
320
321 Ok(ExecuteTransactionResponseV3 {
322 effects,
323 events,
324 input_objects,
325 output_objects,
326 auxiliary_data,
327 })
328 }
329
330 async fn execute_transaction_with_effects_waiting(
332 &self,
333 request: ExecuteTransactionRequestV3,
334 client_addr: Option<SocketAddr>,
335 ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), QuorumDriverError> {
336 let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
337 let verified_transaction = epoch_store
338 .verify_transaction(request.transaction.clone())
339 .map_err(QuorumDriverError::InvalidUserSignature)?;
340 let tx_digest = *verified_transaction.digest();
341
342 if self.enable_early_validation
345 && let Err(e) = self
346 .validator_state
347 .check_transaction_validity(&epoch_store, &verified_transaction)
348 {
349 let error_category = e.categorize();
350 if !error_category.is_submission_retriable() {
351 if !self.validator_state.is_tx_already_executed(&tx_digest) {
353 self.metrics
354 .early_validation_rejections
355 .with_label_values(&[e.to_variant_name()])
356 .inc();
357 debug!(
358 error = ?e,
359 "Transaction rejected during early validation"
360 );
361
362 return Err(QuorumDriverError::TransactionFailed {
363 category: error_category,
364 details: e.to_string(),
365 });
366 }
367 }
368 }
369
370 let is_new_transaction = self
372 .pending_tx_log
373 .write_pending_transaction_maybe(&verified_transaction)
374 .await
375 .map_err(|e| {
376 warn!("QuorumDriverInternalError: {e:?}");
377 QuorumDriverError::QuorumDriverInternalError(e)
378 })?;
379 if is_new_transaction {
380 debug!("Added transaction to WAL log for TransactionDriver");
381 } else {
382 debug!("Transaction already in pending_tx_log");
383 }
384
385 let include_events = request.include_events;
386 let include_input_objects = request.include_input_objects;
387 let include_output_objects = request.include_output_objects;
388 let include_auxiliary_data = request.include_auxiliary_data;
389
390 let using_td = Arc::new(AtomicBool::new(false));
392
393 let finality_timeout = std::env::var("WAIT_FOR_FINALITY_TIMEOUT_SECS")
394 .ok()
395 .and_then(|v| v.parse().ok())
396 .map(Duration::from_secs)
397 .unwrap_or(WAIT_FOR_FINALITY_TIMEOUT);
398
399 let num_submissions = if !is_new_transaction {
400 0
402 } else if cfg!(msim) || in_antithesis() {
403 let r = rand::thread_rng().gen_range(1..=100);
405 let n = if r <= 10 {
406 3
407 } else if r <= 30 {
408 2
409 } else {
410 1
411 };
412 if n > 1 {
413 debug!("Making {n} execution calls");
414 }
415 n
416 } else {
417 1
418 };
419
420 let mut execution_futures = FuturesUnordered::new();
422 for i in 0..num_submissions {
423 let should_delay = i > 0 && rand::thread_rng().gen_bool(0.8);
425 let delay_ms = if should_delay {
426 rand::thread_rng().gen_range(100..=500)
427 } else {
428 0
429 };
430
431 let epoch_store = epoch_store.clone();
432 let request = request.clone();
433 let verified_transaction = verified_transaction.clone();
434 let using_td = using_td.clone();
435
436 let future = async move {
437 if delay_ms > 0 {
438 sleep(Duration::from_millis(delay_ms)).await;
440 }
441 self.execute_transaction_impl(
442 &epoch_store,
443 request,
444 verified_transaction,
445 client_addr,
446 Some(finality_timeout),
447 using_td,
448 )
449 .await
450 }
451 .boxed();
452 execution_futures.push(future);
453 }
454
455 let mut last_execution_error: Option<QuorumDriverError> = None;
457
458 let digests = [tx_digest];
460 let mut local_effects_future = epoch_store
461 .within_alive_epoch(
462 self.validator_state
463 .get_transaction_cache_reader()
464 .notify_read_executed_effects(
465 "TransactionOrchestrator::notify_read_execute_transaction_with_effects_waiting",
466 &digests,
467 ),
468 )
469 .boxed();
470
471 let mut timeout_future = tokio::time::sleep(finality_timeout).boxed();
473
474 let result = loop {
475 tokio::select! {
476 biased;
477
478 local_effects_result = &mut local_effects_future => {
480 match local_effects_result {
481 Ok(effects) => {
482 debug!(
483 "Effects became available while execution was running"
484 );
485 if let Some(effects) = effects.into_iter().next() {
486 self.metrics.concurrent_execution.inc();
487 let epoch = effects.executed_epoch();
488 let events = if include_events {
489 if effects.events_digest().is_some() {
490 Some(self.validator_state.get_transaction_events(effects.transaction_digest())
491 .map_err(QuorumDriverError::QuorumDriverInternalError)?)
492 } else {
493 None
494 }
495 } else {
496 None
497 };
498 let input_objects = include_input_objects
499 .then(|| self.validator_state.get_transaction_input_objects(&effects))
500 .transpose()
501 .map_err(QuorumDriverError::QuorumDriverInternalError)?;
502 let output_objects = include_output_objects
503 .then(|| self.validator_state.get_transaction_output_objects(&effects))
504 .transpose()
505 .map_err(QuorumDriverError::QuorumDriverInternalError)?;
506 let response = QuorumTransactionResponse {
507 effects: FinalizedEffects {
508 effects,
509 finality_info: EffectsFinalityInfo::QuorumExecuted(epoch),
510 },
511 events,
512 input_objects,
513 output_objects,
514 auxiliary_data: None,
515 };
516 break Ok((response, true));
517 }
518 }
519 Err(_) => {
520 warn!("Epoch terminated before effects were available");
521 }
522 };
523
524 local_effects_future = futures::future::pending().boxed();
526 }
527
528 Some(result) = execution_futures.next() => {
530 match result {
531 Ok(resp) => {
532 debug!("Execution succeeded, returning response");
534 let QuorumTransactionResponse {
535 effects,
536 events,
537 input_objects,
538 output_objects,
539 auxiliary_data,
540 } = resp;
541 let resp = QuorumTransactionResponse {
543 effects,
544 events: if include_events { events } else { None },
545 input_objects: if include_input_objects { input_objects } else { None },
546 output_objects: if include_output_objects { output_objects } else { None },
547 auxiliary_data: if include_auxiliary_data { auxiliary_data } else { None },
548 };
549 break Ok((resp, false));
550 }
551 Err(QuorumDriverError::PendingExecutionInTransactionOrchestrator) => {
552 debug!(
553 "Transaction is already being processed"
554 );
555 if last_execution_error.is_none() {
557 last_execution_error = Some(QuorumDriverError::PendingExecutionInTransactionOrchestrator);
558 }
559 }
560 Err(e) => {
561 debug!(?e, "Execution attempt failed, wait for other attempts");
562 last_execution_error = Some(e);
563 }
564 };
565
566 if execution_futures.is_empty() {
568 break Err(last_execution_error.unwrap());
569 }
570 }
571
572 _ = &mut timeout_future => {
574 debug!("Timeout waiting for transaction finality.");
575 self.metrics.wait_for_finality_timeout.inc();
576
577 if using_td.load(Ordering::Acquire) {
580 debug!("Cleaning up TD transaction from WAL due to timeout");
581 if let Err(err) = self.pending_tx_log.finish_transaction(&tx_digest) {
582 warn!(
583 "Failed to finish TD transaction in pending transaction log: {err}"
584 );
585 }
586 }
587
588 break Err(QuorumDriverError::TimeoutBeforeFinality);
589 }
590 }
591 };
592
593 if let Err(err) = self.pending_tx_log.finish_transaction(&tx_digest) {
595 warn!("Failed to finish transaction in pending transaction log: {err}");
596 }
597
598 result
599 }
600
601 #[instrument(level = "error", skip_all)]
602 async fn execute_transaction_impl(
603 &self,
604 epoch_store: &Arc<AuthorityPerEpochStore>,
605 request: ExecuteTransactionRequestV3,
606 verified_transaction: VerifiedTransaction,
607 client_addr: Option<SocketAddr>,
608 finality_timeout: Option<Duration>,
609 using_td: Arc<AtomicBool>,
610 ) -> Result<QuorumTransactionResponse, QuorumDriverError> {
611 let tx_digest = *verified_transaction.digest();
612 debug!("TO Received transaction execution request.");
613
614 let timer = Instant::now();
615 let tx_type = if verified_transaction.is_consensus_tx() {
616 TxType::SharedObject
617 } else {
618 TxType::SingleWriter
619 };
620
621 let (_in_flight_metrics_guards, good_response_metrics) =
622 self.update_metrics(&request.transaction);
623
624 let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
626 wait_for_finality_gauge.inc();
627 let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
628 in_flight.dec();
629 });
630
631 let (response, driver_type) = if self.transaction_driver.is_some()
633 && self.should_use_transaction_driver(epoch_store, tx_digest)
634 {
635 using_td.store(true, Ordering::Release);
637
638 (
639 self.submit_with_transaction_driver(
640 self.transaction_driver.as_ref().unwrap(),
641 &request,
642 client_addr,
643 &verified_transaction,
644 good_response_metrics,
645 finality_timeout,
646 )
647 .await?,
648 "transaction_driver",
649 )
650 } else {
651 using_td.store(false, Ordering::Release);
653
654 let resp = self
655 .submit_with_quorum_driver(
656 epoch_store.clone(),
657 verified_transaction.clone(),
658 request,
659 client_addr,
660 )
661 .await
662 .map_err(|e| {
663 warn!("QuorumDriverInternalError: {e:?}");
664 QuorumDriverError::QuorumDriverInternalError(e)
665 })?
666 .await
667 .map_err(|e| {
668 warn!("QuorumDriverInternalError: {e:?}");
669 QuorumDriverError::QuorumDriverInternalError(e)
670 })??;
671
672 (
673 QuorumTransactionResponse {
674 effects: FinalizedEffects::new_from_effects_cert(resp.effects_cert.into()),
675 events: resp.events,
676 input_objects: resp.input_objects,
677 output_objects: resp.output_objects,
678 auxiliary_data: resp.auxiliary_data,
679 },
680 "quorum_driver",
681 )
682 };
683
684 add_server_timing("wait_for_finality done");
685
686 self.metrics.wait_for_finality_finished.inc();
687
688 let elapsed = timer.elapsed().as_secs_f64();
689 self.metrics
690 .settlement_finality_latency
691 .with_label_values(&[tx_type.as_str(), driver_type])
692 .observe(elapsed);
693 good_response_metrics.inc();
694
695 Ok(response)
696 }
697
698 #[instrument(level = "error", skip_all, err(level = "info"))]
699 async fn submit_with_transaction_driver(
700 &self,
701 td: &Arc<TransactionDriver<A>>,
702 request: &ExecuteTransactionRequestV3,
703 client_addr: Option<SocketAddr>,
704 verified_transaction: &VerifiedTransaction,
705 good_response_metrics: &GenericCounter<AtomicU64>,
706 timeout_duration: Option<Duration>,
707 ) -> Result<QuorumTransactionResponse, QuorumDriverError> {
708 let tx_digest = *verified_transaction.digest();
709 debug!("Using TransactionDriver for transaction {:?}", tx_digest);
710
711 let td_response = td
712 .drive_transaction(
713 SubmitTxRequest::new_transaction(request.transaction.clone()),
714 SubmitTransactionOptions {
715 forwarded_client_addr: client_addr,
716 allowed_validators: self.td_allowed_submission_list.clone(),
717 blocked_validators: self.td_blocked_submission_list.clone(),
718 },
719 timeout_duration,
720 )
721 .await
722 .map_err(|e| match e {
723 TransactionDriverError::TimeoutWithLastRetriableError {
724 last_error,
725 attempts,
726 timeout,
727 } => QuorumDriverError::TimeoutBeforeFinalityWithErrors {
728 last_error: last_error.map(|e| e.to_string()).unwrap_or_default(),
729 attempts,
730 timeout,
731 },
732 other => QuorumDriverError::TransactionFailed {
733 category: other.categorize(),
734 details: other.to_string(),
735 },
736 });
737
738 match td_response {
739 Err(e) => {
740 warn!("TransactionDriver error: {e:?}");
741 Err(e)
742 }
743 Ok(quorum_transaction_response) => {
744 good_response_metrics.inc();
745 Ok(quorum_transaction_response)
746 }
747 }
748 }
749
750 #[instrument(name = "tx_orchestrator_submit", level = "trace", skip_all)]
753 async fn submit_with_quorum_driver(
754 &self,
755 epoch_store: Arc<AuthorityPerEpochStore>,
756 transaction: VerifiedTransaction,
757 request: ExecuteTransactionRequestV3,
758 client_addr: Option<SocketAddr>,
759 ) -> SuiResult<impl Future<Output = SuiResult<QuorumDriverResult>> + '_> {
760 let tx_digest = *transaction.digest();
761
762 let ticket = self.notifier.register_one(&tx_digest);
763 self.quorum_driver()
764 .submit_transaction_no_ticket(request.clone(), client_addr)
765 .await?;
766
767 let cache_reader = self.validator_state.get_transaction_cache_reader().clone();
772 let qd = self.clone_quorum_driver();
773 Ok(async move {
774 let digests = [tx_digest];
775 let effects_await =
776 epoch_store.within_alive_epoch(cache_reader.notify_read_executed_effects(
777 "TransactionOrchestrator::notify_read_submit_with_qd",
778 &digests,
779 ));
780 #[allow(clippy::let_and_return)]
782 let res = match select(ticket, effects_await.boxed()).await {
783 Either::Left((quorum_driver_response, _)) => Ok(quorum_driver_response),
784 Either::Right((_, unfinished_quorum_driver_task)) => {
785 debug!("Effects are available in DB, use quorum driver to get a certificate");
786 qd.submit_transaction_no_ticket(request, client_addr)
787 .await?;
788 Ok(unfinished_quorum_driver_task.await)
789 }
790 };
791 res
792 })
793 }
794
795 #[instrument(
796 name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout",
797 level = "debug",
798 skip_all,
799 err(level = "info")
800 )]
801 async fn wait_for_finalized_tx_executed_locally_with_timeout(
802 validator_state: &Arc<AuthorityState>,
803 tx_digest: TransactionDigest,
804 tx_type: TxType,
805 metrics: &TransactionOrchestratorMetrics,
806 ) -> SuiResult {
807 metrics.local_execution_in_flight.inc();
808 let _metrics_guard =
809 scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
810 in_flight.dec();
811 });
812
813 let _latency_guard = metrics
814 .local_execution_latency
815 .with_label_values(&[tx_type.as_str()])
816 .start_timer();
817 debug!("Waiting for finalized tx to be executed locally.");
818 match timeout(
819 LOCAL_EXECUTION_TIMEOUT,
820 validator_state
821 .get_transaction_cache_reader()
822 .notify_read_executed_effects_digests(
823 "TransactionOrchestrator::notify_read_wait_for_local_execution",
824 &[tx_digest],
825 ),
826 )
827 .instrument(error_span!(
828 "transaction_orchestrator::local_execution",
829 ?tx_digest
830 ))
831 .await
832 {
833 Err(_elapsed) => {
834 debug!(
835 "Waiting for finalized tx to be executed locally timed out within {:?}.",
836 LOCAL_EXECUTION_TIMEOUT
837 );
838 metrics.local_execution_timeout.inc();
839 Err(SuiErrorKind::TimeoutError.into())
840 }
841 Ok(_) => {
842 metrics.local_execution_success.inc();
843 Ok(())
844 }
845 }
846 }
847
848 fn should_use_transaction_driver(
849 &self,
850 epoch_store: &Arc<AuthorityPerEpochStore>,
851 tx_digest: TransactionDigest,
852 ) -> bool {
853 const MAX_PERCENTAGE: u8 = 100;
854 let unknown_network = epoch_store.get_chain() == Chain::Unknown;
855 let v = if unknown_network {
856 rand::thread_rng().gen_range(1..=MAX_PERCENTAGE)
857 } else {
858 let v = u32::from_le_bytes(tx_digest.inner()[..4].try_into().unwrap());
859 (v % (MAX_PERCENTAGE as u32) + 1) as u8
860 };
861 debug!(
862 "Choosing whether to use transaction driver: {} vs {}",
863 v, self.td_percentage
864 );
865 v <= self.td_percentage
866 }
867
868 async fn loop_pending_transaction_log(
870 mut effects_receiver: Receiver<QuorumDriverEffectsQueueResult>,
871 pending_transaction_log: Arc<WritePathPendingTransactionLog>,
872 ) {
873 loop {
874 match effects_receiver.recv().await {
875 Ok(Ok((transaction, ..))) => {
876 let tx_digest = transaction.digest();
877 if let Err(err) = pending_transaction_log.finish_transaction(tx_digest) {
878 error!(
879 ?tx_digest,
880 "Failed to finish transaction in pending transaction log: {err}"
881 );
882 }
883 }
884 Ok(Err((tx_digest, _err))) => {
885 if let Err(err) = pending_transaction_log.finish_transaction(&tx_digest) {
886 error!(
887 ?tx_digest,
888 "Failed to finish transaction in pending transaction log: {err}"
889 );
890 }
891 }
892 Err(RecvError::Closed) => {
893 error!("Sender of effects subscriber queue has been dropped!");
894 return;
895 }
896 Err(RecvError::Lagged(skipped_count)) => {
897 warn!("Skipped {skipped_count} transasctions in effects subscriber queue.");
898 }
899 }
900 }
901 }
902
903 pub fn quorum_driver(&self) -> &Arc<QuorumDriverHandler<A>> {
904 &self.quorum_driver_handler
905 }
906
907 pub fn clone_quorum_driver(&self) -> Arc<QuorumDriverHandler<A>> {
908 self.quorum_driver_handler.clone()
909 }
910
911 pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
912 self.quorum_driver().authority_aggregator().load_full()
913 }
914
915 fn update_metrics<'a>(
916 &'a self,
917 transaction: &Transaction,
918 ) -> (impl Drop + use<A>, &'a GenericCounter<AtomicU64>) {
919 let (in_flight, good_response) = if transaction.is_consensus_tx() {
920 self.metrics.total_req_received_shared_object.inc();
921 (
922 self.metrics.req_in_flight_shared_object.clone(),
923 &self.metrics.good_response_shared_object,
924 )
925 } else {
926 self.metrics.total_req_received_single_writer.inc();
927 (
928 self.metrics.req_in_flight_single_writer.clone(),
929 &self.metrics.good_response_single_writer,
930 )
931 };
932 in_flight.inc();
933 (
934 scopeguard::guard(in_flight, |in_flight| {
935 in_flight.dec();
936 }),
937 good_response,
938 )
939 }
940
941 fn schedule_txes_in_log(
942 pending_tx_log: Arc<WritePathPendingTransactionLog>,
943 quorum_driver: Arc<QuorumDriverHandler<A>>,
944 ) {
945 spawn_logged_monitored_task!(async move {
946 if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
947 info!("Skipping loading pending transactions from pending_tx_log.");
948 return;
949 }
950 let pending_txes = pending_tx_log
951 .load_all_pending_transactions()
952 .expect("failed to load all pending transactions");
953 info!(
954 "Recovering {} pending transactions from pending_tx_log.",
955 pending_txes.len()
956 );
957 for (i, tx) in pending_txes.into_iter().enumerate() {
958 let tx = tx.into_inner();
961 let tx_digest = *tx.digest();
962 if let Err(err) = quorum_driver
965 .submit_transaction_no_ticket(
966 ExecuteTransactionRequestV3 {
967 transaction: tx,
968 include_events: true,
969 include_input_objects: false,
970 include_output_objects: false,
971 include_auxiliary_data: false,
972 },
973 None,
974 )
975 .await
976 {
977 warn!(
978 ?tx_digest,
979 "Failed to enqueue transaction from pending_tx_log, err: {err:?}"
980 );
981 } else {
982 debug!("Enqueued transaction from pending_tx_log");
983 if (i + 1) % 1000 == 0 {
984 info!("Enqueued {} transactions from pending_tx_log.", i + 1);
985 }
986 }
987 }
988 });
991 }
992
993 pub fn load_all_pending_transactions_in_test(&self) -> SuiResult<Vec<VerifiedTransaction>> {
994 self.pending_tx_log.load_all_pending_transactions()
995 }
996}
997#[derive(Clone)]
999pub struct TransactionOrchestratorMetrics {
1000 total_req_received_single_writer: GenericCounter<AtomicU64>,
1001 total_req_received_shared_object: GenericCounter<AtomicU64>,
1002
1003 good_response_single_writer: GenericCounter<AtomicU64>,
1004 good_response_shared_object: GenericCounter<AtomicU64>,
1005
1006 req_in_flight_single_writer: GenericGauge<AtomicI64>,
1007 req_in_flight_shared_object: GenericGauge<AtomicI64>,
1008
1009 wait_for_finality_in_flight: GenericGauge<AtomicI64>,
1010 wait_for_finality_finished: GenericCounter<AtomicU64>,
1011 wait_for_finality_timeout: GenericCounter<AtomicU64>,
1012
1013 local_execution_in_flight: GenericGauge<AtomicI64>,
1014 local_execution_success: GenericCounter<AtomicU64>,
1015 local_execution_timeout: GenericCounter<AtomicU64>,
1016
1017 concurrent_execution: IntCounter,
1018
1019 early_validation_rejections: IntCounterVec,
1020
1021 request_latency: HistogramVec,
1022 local_execution_latency: HistogramVec,
1023 settlement_finality_latency: HistogramVec,
1024}
1025
1026impl TransactionOrchestratorMetrics {
1030 pub fn new(registry: &Registry) -> Self {
1031 let total_req_received = register_int_counter_vec_with_registry!(
1032 "tx_orchestrator_total_req_received",
1033 "Total number of executions request Transaction Orchestrator receives, group by tx type",
1034 &["tx_type"],
1035 registry
1036 )
1037 .unwrap();
1038
1039 let total_req_received_single_writer =
1040 total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
1041 let total_req_received_shared_object =
1042 total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
1043
1044 let good_response = register_int_counter_vec_with_registry!(
1045 "tx_orchestrator_good_response",
1046 "Total number of good responses Transaction Orchestrator generates, group by tx type",
1047 &["tx_type"],
1048 registry
1049 )
1050 .unwrap();
1051
1052 let good_response_single_writer =
1053 good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
1054 let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
1055
1056 let req_in_flight = register_int_gauge_vec_with_registry!(
1057 "tx_orchestrator_req_in_flight",
1058 "Number of requests in flights Transaction Orchestrator processes, group by tx type",
1059 &["tx_type"],
1060 registry
1061 )
1062 .unwrap();
1063
1064 let req_in_flight_single_writer =
1065 req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
1066 let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
1067
1068 Self {
1069 total_req_received_single_writer,
1070 total_req_received_shared_object,
1071 good_response_single_writer,
1072 good_response_shared_object,
1073 req_in_flight_single_writer,
1074 req_in_flight_shared_object,
1075 wait_for_finality_in_flight: register_int_gauge_with_registry!(
1076 "tx_orchestrator_wait_for_finality_in_flight",
1077 "Number of in flight txns Transaction Orchestrator are waiting for finality for",
1078 registry,
1079 )
1080 .unwrap(),
1081 wait_for_finality_finished: register_int_counter_with_registry!(
1082 "tx_orchestrator_wait_for_finality_fnished",
1083 "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
1084 registry,
1085 )
1086 .unwrap(),
1087 wait_for_finality_timeout: register_int_counter_with_registry!(
1088 "tx_orchestrator_wait_for_finality_timeout",
1089 "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
1090 registry,
1091 )
1092 .unwrap(),
1093 local_execution_in_flight: register_int_gauge_with_registry!(
1094 "tx_orchestrator_local_execution_in_flight",
1095 "Number of local execution txns in flights Transaction Orchestrator handles",
1096 registry,
1097 )
1098 .unwrap(),
1099 local_execution_success: register_int_counter_with_registry!(
1100 "tx_orchestrator_local_execution_success",
1101 "Total number of successful local execution txns Transaction Orchestrator handles",
1102 registry,
1103 )
1104 .unwrap(),
1105 local_execution_timeout: register_int_counter_with_registry!(
1106 "tx_orchestrator_local_execution_timeout",
1107 "Total number of timed-out local execution txns Transaction Orchestrator handles",
1108 registry,
1109 )
1110 .unwrap(),
1111 concurrent_execution: register_int_counter_with_registry!(
1112 "tx_orchestrator_concurrent_execution",
1113 "Total number of concurrent execution where effects are available locally finishing driving the transaction to finality",
1114 registry,
1115 )
1116 .unwrap(),
1117 early_validation_rejections: register_int_counter_vec_with_registry!(
1118 "tx_orchestrator_early_validation_rejections",
1119 "Total number of transactions rejected during early validation before submission, by reason",
1120 &["reason"],
1121 registry,
1122 )
1123 .unwrap(),
1124 request_latency: register_histogram_vec_with_registry!(
1125 "tx_orchestrator_request_latency",
1126 "Time spent in processing one Transaction Orchestrator request",
1127 &["tx_type", "route", "wait_for_local_execution"],
1128 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1129 registry,
1130 )
1131 .unwrap(),
1132 local_execution_latency: register_histogram_vec_with_registry!(
1133 "tx_orchestrator_local_execution_latency",
1134 "Time spent in waiting for one Transaction Orchestrator gets locally executed",
1135 &["tx_type"],
1136 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1137 registry,
1138 )
1139 .unwrap(),
1140 settlement_finality_latency: register_histogram_vec_with_registry!(
1141 "tx_orchestrator_settlement_finality_latency",
1142 "Time spent in waiting for one Transaction Orchestrator gets settled and finalized",
1143 &["tx_type", "driver_type"],
1144 mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
1145 registry,
1146 )
1147 .unwrap(),
1148 }
1149 }
1150
1151 pub fn new_for_tests() -> Self {
1152 let registry = Registry::new();
1153 Self::new(®istry)
1154 }
1155}
1156
1157#[async_trait::async_trait]
1158impl<A> sui_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
1159where
1160 A: AuthorityAPI + Send + Sync + 'static + Clone,
1161{
1162 async fn execute_transaction(
1163 &self,
1164 request: ExecuteTransactionRequestV3,
1165 client_addr: Option<std::net::SocketAddr>,
1166 ) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
1167 self.execute_transaction_v3(request, client_addr).await
1168 }
1169
1170 fn simulate_transaction(
1171 &self,
1172 transaction: TransactionData,
1173 checks: TransactionChecks,
1174 ) -> Result<SimulateTransactionResult, SuiError> {
1175 self.validator_state
1176 .simulate_transaction(transaction, checks)
1177 }
1178}