1use std::net::SocketAddr;
11use std::ops::Deref;
12use std::path::Path;
13use std::sync::Arc;
14use std::sync::atomic::{AtomicBool, Ordering};
15use std::time::Duration;
16
17use futures::FutureExt;
18use futures::future::{Either, Future, select};
19use futures::stream::{FuturesUnordered, StreamExt};
20use mysten_common::in_antithesis;
21use mysten_common::sync::notify_read::NotifyRead;
22use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX};
23use mysten_metrics::{add_server_timing, spawn_logged_monitored_task, spawn_monitored_task};
24use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
25use prometheus::{
26 HistogramVec, IntCounter, IntCounterVec, Registry, register_histogram_vec_with_registry,
27 register_int_counter_vec_with_registry, register_int_counter_with_registry,
28 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
29};
30use rand::Rng;
31use sui_config::NodeConfig;
32use sui_protocol_config::Chain;
33use sui_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
34use sui_types::base_types::TransactionDigest;
35use sui_types::effects::TransactionEffectsAPI;
36use sui_types::error::{SuiError, SuiErrorKind, SuiResult};
37use sui_types::messages_grpc::{SubmitTxRequest, TxType};
38use sui_types::quorum_driver_types::{
39 EffectsFinalityInfo, ExecuteTransactionRequestType, ExecuteTransactionRequestV3,
40 ExecuteTransactionResponseV3, FinalizedEffects, IsTransactionExecutedLocally,
41 QuorumDriverEffectsQueueResult, QuorumDriverError, QuorumDriverResult,
42};
43use sui_types::sui_system_state::SuiSystemState;
44use sui_types::transaction::{Transaction, TransactionData, VerifiedTransaction};
45use sui_types::transaction_executor::{SimulateTransactionResult, TransactionChecks};
46use tokio::sync::broadcast::Receiver;
47use tokio::sync::broadcast::error::RecvError;
48use tokio::task::JoinHandle;
49use tokio::time::{Instant, sleep, timeout};
50use tracing::{Instrument, debug, error, error_span, info, instrument, warn};
51
52use crate::authority::AuthorityState;
53use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
54use crate::authority_aggregator::AuthorityAggregator;
55use crate::authority_client::{AuthorityAPI, NetworkAuthorityClient};
56use crate::quorum_driver::reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver};
57use crate::quorum_driver::{QuorumDriverHandler, QuorumDriverHandlerBuilder, QuorumDriverMetrics};
58use crate::transaction_driver::{
59 QuorumTransactionResponse, SubmitTransactionOptions, TransactionDriver, TransactionDriverError,
60 TransactionDriverMetrics, choose_transaction_driver_percentage,
61};
62
63const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
66
67const WAIT_FOR_FINALITY_TIMEOUT: Duration = Duration::from_secs(90);
69
70pub type QuorumTransactionEffectsResult =
71 Result<(Transaction, QuorumTransactionResponse), (TransactionDigest, QuorumDriverError)>;
72pub struct TransactionOrchestrator<A: Clone> {
73 quorum_driver_handler: Arc<QuorumDriverHandler<A>>,
74 validator_state: Arc<AuthorityState>,
75 _local_executor_handle: JoinHandle<()>,
76 pending_tx_log: Arc<WritePathPendingTransactionLog>,
77 notifier: Arc<NotifyRead<TransactionDigest, QuorumDriverResult>>,
78 metrics: Arc<TransactionOrchestratorMetrics>,
79 transaction_driver: Option<Arc<TransactionDriver<A>>>,
80 td_percentage: u8,
81 td_allowed_submission_list: Vec<String>,
82 enable_early_validation: bool,
83}
84
85impl TransactionOrchestrator<NetworkAuthorityClient> {
86 pub fn new_with_auth_aggregator(
87 validators: Arc<AuthorityAggregator<NetworkAuthorityClient>>,
88 validator_state: Arc<AuthorityState>,
89 reconfig_channel: Receiver<SuiSystemState>,
90 parent_path: &Path,
91 prometheus_registry: &Registry,
92 node_config: &NodeConfig,
93 ) -> Self {
94 let observer = OnsiteReconfigObserver::new(
95 reconfig_channel,
96 validator_state.get_object_cache_reader().clone(),
97 validator_state.clone_committee_store(),
98 validators.safe_client_metrics_base.clone(),
99 validators.metrics.deref().clone(),
100 );
101 TransactionOrchestrator::new(
102 validators,
103 validator_state,
104 parent_path,
105 prometheus_registry,
106 observer,
107 node_config,
108 )
109 }
110}
111
112impl<A> TransactionOrchestrator<A>
113where
114 A: AuthorityAPI + Send + Sync + 'static + Clone,
115 OnsiteReconfigObserver: ReconfigObserver<A>,
116{
117 pub fn new(
118 validators: Arc<AuthorityAggregator<A>>,
119 validator_state: Arc<AuthorityState>,
120 parent_path: &Path,
121 prometheus_registry: &Registry,
122 reconfig_observer: OnsiteReconfigObserver,
123 node_config: &NodeConfig,
124 ) -> Self {
125 let metrics = Arc::new(QuorumDriverMetrics::new(prometheus_registry));
126 let notifier = Arc::new(NotifyRead::new());
127 let reconfig_observer = Arc::new(reconfig_observer);
128 let quorum_driver_handler = Arc::new(
129 QuorumDriverHandlerBuilder::new(validators.clone(), metrics.clone())
130 .with_notifier(notifier.clone())
131 .with_reconfig_observer(reconfig_observer.clone())
132 .start(),
133 );
134
135 let effects_receiver = quorum_driver_handler.subscribe_to_effects();
136 let metrics = Arc::new(TransactionOrchestratorMetrics::new(prometheus_registry));
137 let pending_tx_log = Arc::new(WritePathPendingTransactionLog::new(
138 parent_path.join("fullnode_pending_transactions"),
139 ));
140 let pending_tx_log_clone = pending_tx_log.clone();
141 let _local_executor_handle = {
142 spawn_monitored_task!(async move {
143 Self::loop_pending_transaction_log(effects_receiver, pending_tx_log_clone).await;
144 })
145 };
146 Self::schedule_txes_in_log(pending_tx_log.clone(), quorum_driver_handler.clone());
147
148 let epoch_store = validator_state.load_epoch_store_one_call_per_task();
149 let td_percentage = if !epoch_store.protocol_config().mysticeti_fastpath() {
150 0
151 } else {
152 choose_transaction_driver_percentage(Some(epoch_store.get_chain_identifier()))
153 };
154
155 let transaction_driver = if td_percentage > 0 {
156 let td_metrics = Arc::new(TransactionDriverMetrics::new(prometheus_registry));
157 let client_metrics = Arc::new(
158 crate::validator_client_monitor::ValidatorClientMetrics::new(prometheus_registry),
159 );
160 Some(TransactionDriver::new(
161 validators.clone(),
162 reconfig_observer.clone(),
163 td_metrics,
164 Some(node_config),
165 client_metrics,
166 ))
167 } else {
168 None
169 };
170
171 let td_allowed_submission_list = node_config
172 .transaction_driver_config
173 .as_ref()
174 .map(|config| config.allowed_submission_validators.clone())
175 .unwrap_or_default();
176
177 let enable_early_validation = node_config
178 .transaction_driver_config
179 .as_ref()
180 .map(|config| config.enable_early_validation)
181 .unwrap_or(true);
182
183 Self {
184 quorum_driver_handler,
185 validator_state,
186 _local_executor_handle,
187 pending_tx_log,
188 notifier,
189 metrics,
190 transaction_driver,
191 td_percentage,
192 td_allowed_submission_list,
193 enable_early_validation,
194 }
195 }
196}
197
198impl<A> TransactionOrchestrator<A>
199where
200 A: AuthorityAPI + Send + Sync + 'static + Clone,
201{
202 #[instrument(name = "tx_orchestrator_execute_transaction", level = "debug", skip_all,
203 fields(
204 tx_digest = ?request.transaction.digest(),
205 tx_type = ?request_type,
206 ))]
207 pub async fn execute_transaction_block(
208 &self,
209 request: ExecuteTransactionRequestV3,
210 request_type: ExecuteTransactionRequestType,
211 client_addr: Option<SocketAddr>,
212 ) -> Result<(ExecuteTransactionResponseV3, IsTransactionExecutedLocally), QuorumDriverError>
213 {
214 let timer = Instant::now();
215 let tx_type = if request.transaction.is_consensus_tx() {
216 TxType::SharedObject
217 } else {
218 TxType::SingleWriter
219 };
220 let tx_digest = *request.transaction.digest();
221
222 let (response, mut executed_locally) = self
223 .execute_transaction_with_effects_waiting(request, client_addr)
224 .await?;
225
226 if !executed_locally {
227 executed_locally = if matches!(
228 request_type,
229 ExecuteTransactionRequestType::WaitForLocalExecution
230 ) {
231 let executed_locally = Self::wait_for_finalized_tx_executed_locally_with_timeout(
232 &self.validator_state,
233 tx_digest,
234 tx_type,
235 &self.metrics,
236 )
237 .await
238 .is_ok();
239 add_server_timing("local_execution done");
240 executed_locally
241 } else {
242 false
243 };
244 }
245
246 let QuorumTransactionResponse {
247 effects,
248 events,
249 input_objects,
250 output_objects,
251 auxiliary_data,
252 } = response;
253
254 let response = ExecuteTransactionResponseV3 {
255 effects,
256 events,
257 input_objects,
258 output_objects,
259 auxiliary_data,
260 };
261
262 self.metrics
263 .request_latency
264 .with_label_values(&[
265 tx_type.as_str(),
266 "execute_transaction_block",
267 executed_locally.to_string().as_str(),
268 ])
269 .observe(timer.elapsed().as_secs_f64());
270
271 Ok((response, executed_locally))
272 }
273
274 #[instrument(name = "tx_orchestrator_execute_transaction_v3", level = "debug", skip_all,
276 fields(tx_digest = ?request.transaction.digest()))]
277 pub async fn execute_transaction_v3(
278 &self,
279 request: ExecuteTransactionRequestV3,
280 client_addr: Option<SocketAddr>,
281 ) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
282 let timer = Instant::now();
283 let tx_type = if request.transaction.is_consensus_tx() {
284 TxType::SharedObject
285 } else {
286 TxType::SingleWriter
287 };
288
289 let (response, _) = self
290 .execute_transaction_with_effects_waiting(request, client_addr)
291 .await?;
292
293 self.metrics
294 .request_latency
295 .with_label_values(&[tx_type.as_str(), "execute_transaction_v3", "false"])
296 .observe(timer.elapsed().as_secs_f64());
297
298 let QuorumTransactionResponse {
299 effects,
300 events,
301 input_objects,
302 output_objects,
303 auxiliary_data,
304 } = response;
305
306 Ok(ExecuteTransactionResponseV3 {
307 effects,
308 events,
309 input_objects,
310 output_objects,
311 auxiliary_data,
312 })
313 }
314
315 async fn execute_transaction_with_effects_waiting(
317 &self,
318 request: ExecuteTransactionRequestV3,
319 client_addr: Option<SocketAddr>,
320 ) -> Result<(QuorumTransactionResponse, IsTransactionExecutedLocally), QuorumDriverError> {
321 let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();
322 let verified_transaction = epoch_store
323 .verify_transaction(request.transaction.clone())
324 .map_err(QuorumDriverError::InvalidUserSignature)?;
325 let tx_digest = *verified_transaction.digest();
326
327 if self.enable_early_validation
330 && let Err(e) = self
331 .validator_state
332 .check_transaction_validity(&epoch_store, &verified_transaction)
333 {
334 let error_category = e.categorize();
335 if !error_category.is_submission_retriable() {
336 if !self.validator_state.is_tx_already_executed(&tx_digest) {
338 self.metrics
339 .early_validation_rejections
340 .with_label_values(&[e.as_ref()])
341 .inc();
342 debug!(
343 error = ?e,
344 "Transaction rejected during early validation"
345 );
346
347 return Err(QuorumDriverError::TransactionFailed {
348 category: error_category,
349 details: e.to_string(),
350 });
351 }
352 }
353 }
354
355 let is_new_transaction = self
357 .pending_tx_log
358 .write_pending_transaction_maybe(&verified_transaction)
359 .await
360 .map_err(|e| {
361 warn!("QuorumDriverInternalError: {e:?}");
362 QuorumDriverError::QuorumDriverInternalError(e)
363 })?;
364 if is_new_transaction {
365 debug!("Added transaction to WAL log for TransactionDriver");
366 } else {
367 debug!("Transaction already in pending_tx_log");
368 }
369
370 let include_events = request.include_events;
371 let include_input_objects = request.include_input_objects;
372 let include_output_objects = request.include_output_objects;
373 let include_auxiliary_data = request.include_auxiliary_data;
374
375 let using_td = Arc::new(AtomicBool::new(false));
377
378 let finality_timeout = std::env::var("WAIT_FOR_FINALITY_TIMEOUT_SECS")
379 .ok()
380 .and_then(|v| v.parse().ok())
381 .map(Duration::from_secs)
382 .unwrap_or(WAIT_FOR_FINALITY_TIMEOUT);
383
384 let num_submissions = if !is_new_transaction {
385 0
387 } else if cfg!(msim) || in_antithesis() {
388 let r = rand::thread_rng().gen_range(1..=100);
390 let n = if r <= 10 {
391 3
392 } else if r <= 30 {
393 2
394 } else {
395 1
396 };
397 if n > 1 {
398 debug!("Making {n} execution calls");
399 }
400 n
401 } else {
402 1
403 };
404
405 let mut execution_futures = FuturesUnordered::new();
407 for i in 0..num_submissions {
408 let should_delay = i > 0 && rand::thread_rng().gen_bool(0.8);
410 let delay_ms = if should_delay {
411 rand::thread_rng().gen_range(100..=500)
412 } else {
413 0
414 };
415
416 let epoch_store = epoch_store.clone();
417 let request = request.clone();
418 let verified_transaction = verified_transaction.clone();
419 let using_td = using_td.clone();
420
421 let future = async move {
422 if delay_ms > 0 {
423 sleep(Duration::from_millis(delay_ms)).await;
425 }
426 self.execute_transaction_impl(
427 &epoch_store,
428 request,
429 verified_transaction,
430 client_addr,
431 Some(finality_timeout),
432 using_td,
433 )
434 .await
435 }
436 .boxed();
437 execution_futures.push(future);
438 }
439
440 let mut last_execution_error: Option<QuorumDriverError> = None;
442
443 let digests = [tx_digest];
445 let mut local_effects_future = epoch_store
446 .within_alive_epoch(
447 self.validator_state
448 .get_transaction_cache_reader()
449 .notify_read_executed_effects(
450 "TransactionOrchestrator::notify_read_execute_transaction_with_effects_waiting",
451 &digests,
452 ),
453 )
454 .boxed();
455
456 let mut timeout_future = tokio::time::sleep(finality_timeout).boxed();
458
459 let result = loop {
460 tokio::select! {
461 biased;
462
463 local_effects_result = &mut local_effects_future => {
465 match local_effects_result {
466 Ok(effects) => {
467 debug!(
468 "Effects became available while execution was running"
469 );
470 if let Some(effects) = effects.into_iter().next() {
471 self.metrics.concurrent_execution.inc();
472 let epoch = effects.executed_epoch();
473 let events = if include_events {
474 if effects.events_digest().is_some() {
475 Some(self.validator_state.get_transaction_events(effects.transaction_digest())
476 .map_err(QuorumDriverError::QuorumDriverInternalError)?)
477 } else {
478 None
479 }
480 } else {
481 None
482 };
483 let input_objects = include_input_objects
484 .then(|| self.validator_state.get_transaction_input_objects(&effects))
485 .transpose()
486 .map_err(QuorumDriverError::QuorumDriverInternalError)?;
487 let output_objects = include_output_objects
488 .then(|| self.validator_state.get_transaction_output_objects(&effects))
489 .transpose()
490 .map_err(QuorumDriverError::QuorumDriverInternalError)?;
491 let response = QuorumTransactionResponse {
492 effects: FinalizedEffects {
493 effects,
494 finality_info: EffectsFinalityInfo::QuorumExecuted(epoch),
495 },
496 events,
497 input_objects,
498 output_objects,
499 auxiliary_data: None,
500 };
501 break Ok((response, true));
502 }
503 }
504 Err(_) => {
505 warn!("Epoch terminated before effects were available");
506 }
507 };
508
509 local_effects_future = futures::future::pending().boxed();
511 }
512
513 Some(result) = execution_futures.next() => {
515 match result {
516 Ok(resp) => {
517 debug!("Execution succeeded, returning response");
519 let QuorumTransactionResponse {
520 effects,
521 events,
522 input_objects,
523 output_objects,
524 auxiliary_data,
525 } = resp;
526 let resp = QuorumTransactionResponse {
528 effects,
529 events: if include_events { events } else { None },
530 input_objects: if include_input_objects { input_objects } else { None },
531 output_objects: if include_output_objects { output_objects } else { None },
532 auxiliary_data: if include_auxiliary_data { auxiliary_data } else { None },
533 };
534 break Ok((resp, false));
535 }
536 Err(QuorumDriverError::PendingExecutionInTransactionOrchestrator) => {
537 debug!(
538 "Transaction is already being processed"
539 );
540 if last_execution_error.is_none() {
542 last_execution_error = Some(QuorumDriverError::PendingExecutionInTransactionOrchestrator);
543 }
544 }
545 Err(e) => {
546 debug!(?e, "Execution attempt failed, wait for other attempts");
547 last_execution_error = Some(e);
548 }
549 };
550
551 if execution_futures.is_empty() {
553 break Err(last_execution_error.unwrap());
554 }
555 }
556
557 _ = &mut timeout_future => {
559 debug!("Timeout waiting for transaction finality.");
560 self.metrics.wait_for_finality_timeout.inc();
561
562 if using_td.load(Ordering::Acquire) {
565 debug!("Cleaning up TD transaction from WAL due to timeout");
566 if let Err(err) = self.pending_tx_log.finish_transaction(&tx_digest) {
567 warn!(
568 "Failed to finish TD transaction in pending transaction log: {err}"
569 );
570 }
571 }
572
573 break Err(QuorumDriverError::TimeoutBeforeFinality);
574 }
575 }
576 };
577
578 if let Err(err) = self.pending_tx_log.finish_transaction(&tx_digest) {
580 warn!("Failed to finish transaction in pending transaction log: {err}");
581 }
582
583 result
584 }
585
586 #[instrument(level = "error", skip_all)]
587 async fn execute_transaction_impl(
588 &self,
589 epoch_store: &Arc<AuthorityPerEpochStore>,
590 request: ExecuteTransactionRequestV3,
591 verified_transaction: VerifiedTransaction,
592 client_addr: Option<SocketAddr>,
593 finality_timeout: Option<Duration>,
594 using_td: Arc<AtomicBool>,
595 ) -> Result<QuorumTransactionResponse, QuorumDriverError> {
596 let tx_digest = *verified_transaction.digest();
597 debug!("TO Received transaction execution request.");
598
599 let timer = Instant::now();
600 let tx_type = if verified_transaction.is_consensus_tx() {
601 TxType::SharedObject
602 } else {
603 TxType::SingleWriter
604 };
605
606 let (_in_flight_metrics_guards, good_response_metrics) =
607 self.update_metrics(&request.transaction);
608
609 let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
611 wait_for_finality_gauge.inc();
612 let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
613 in_flight.dec();
614 });
615
616 let (response, driver_type) = if self.transaction_driver.is_some()
618 && self.should_use_transaction_driver(epoch_store, tx_digest)
619 {
620 using_td.store(true, Ordering::Release);
622
623 (
624 self.submit_with_transaction_driver(
625 self.transaction_driver.as_ref().unwrap(),
626 &request,
627 client_addr,
628 &verified_transaction,
629 good_response_metrics,
630 finality_timeout,
631 )
632 .await?,
633 "transaction_driver",
634 )
635 } else {
636 using_td.store(false, Ordering::Release);
638
639 let resp = self
640 .submit_with_quorum_driver(
641 epoch_store.clone(),
642 verified_transaction.clone(),
643 request,
644 client_addr,
645 )
646 .await
647 .map_err(|e| {
648 warn!("QuorumDriverInternalError: {e:?}");
649 QuorumDriverError::QuorumDriverInternalError(e)
650 })?
651 .await
652 .map_err(|e| {
653 warn!("QuorumDriverInternalError: {e:?}");
654 QuorumDriverError::QuorumDriverInternalError(e)
655 })??;
656
657 (
658 QuorumTransactionResponse {
659 effects: FinalizedEffects::new_from_effects_cert(resp.effects_cert.into()),
660 events: resp.events,
661 input_objects: resp.input_objects,
662 output_objects: resp.output_objects,
663 auxiliary_data: resp.auxiliary_data,
664 },
665 "quorum_driver",
666 )
667 };
668
669 add_server_timing("wait_for_finality done");
670
671 self.metrics.wait_for_finality_finished.inc();
672
673 let elapsed = timer.elapsed().as_secs_f64();
674 self.metrics
675 .settlement_finality_latency
676 .with_label_values(&[tx_type.as_str(), driver_type])
677 .observe(elapsed);
678 good_response_metrics.inc();
679
680 Ok(response)
681 }
682
683 #[instrument(level = "error", skip_all, err(level = "info"))]
684 async fn submit_with_transaction_driver(
685 &self,
686 td: &Arc<TransactionDriver<A>>,
687 request: &ExecuteTransactionRequestV3,
688 client_addr: Option<SocketAddr>,
689 verified_transaction: &VerifiedTransaction,
690 good_response_metrics: &GenericCounter<AtomicU64>,
691 timeout_duration: Option<Duration>,
692 ) -> Result<QuorumTransactionResponse, QuorumDriverError> {
693 let tx_digest = *verified_transaction.digest();
694 debug!("Using TransactionDriver for transaction {:?}", tx_digest);
695
696 let td_response = td
697 .drive_transaction(
698 SubmitTxRequest::new_transaction(request.transaction.clone()),
699 SubmitTransactionOptions {
700 forwarded_client_addr: client_addr,
701 allowed_validators: self.td_allowed_submission_list.clone(),
702 },
703 timeout_duration,
704 )
705 .await
706 .map_err(|e| match e {
707 TransactionDriverError::TimeoutWithLastRetriableError {
708 last_error,
709 attempts,
710 timeout,
711 } => QuorumDriverError::TimeoutBeforeFinalityWithErrors {
712 last_error: last_error.map(|e| e.to_string()).unwrap_or_default(),
713 attempts,
714 timeout,
715 },
716 other => QuorumDriverError::TransactionFailed {
717 category: other.categorize(),
718 details: other.to_string(),
719 },
720 });
721
722 match td_response {
723 Err(e) => {
724 warn!("TransactionDriver error: {e:?}");
725 Err(e)
726 }
727 Ok(quorum_transaction_response) => {
728 good_response_metrics.inc();
729 Ok(quorum_transaction_response)
730 }
731 }
732 }
733
734 #[instrument(name = "tx_orchestrator_submit", level = "trace", skip_all)]
737 async fn submit_with_quorum_driver(
738 &self,
739 epoch_store: Arc<AuthorityPerEpochStore>,
740 transaction: VerifiedTransaction,
741 request: ExecuteTransactionRequestV3,
742 client_addr: Option<SocketAddr>,
743 ) -> SuiResult<impl Future<Output = SuiResult<QuorumDriverResult>> + '_> {
744 let tx_digest = *transaction.digest();
745
746 let ticket = self.notifier.register_one(&tx_digest);
747 self.quorum_driver()
748 .submit_transaction_no_ticket(request.clone(), client_addr)
749 .await?;
750
751 let cache_reader = self.validator_state.get_transaction_cache_reader().clone();
756 let qd = self.clone_quorum_driver();
757 Ok(async move {
758 let digests = [tx_digest];
759 let effects_await =
760 epoch_store.within_alive_epoch(cache_reader.notify_read_executed_effects(
761 "TransactionOrchestrator::notify_read_submit_with_qd",
762 &digests,
763 ));
764 #[allow(clippy::let_and_return)]
766 let res = match select(ticket, effects_await.boxed()).await {
767 Either::Left((quorum_driver_response, _)) => Ok(quorum_driver_response),
768 Either::Right((_, unfinished_quorum_driver_task)) => {
769 debug!("Effects are available in DB, use quorum driver to get a certificate");
770 qd.submit_transaction_no_ticket(request, client_addr)
771 .await?;
772 Ok(unfinished_quorum_driver_task.await)
773 }
774 };
775 res
776 })
777 }
778
779 #[instrument(
780 name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout",
781 level = "debug",
782 skip_all,
783 err(level = "info")
784 )]
785 async fn wait_for_finalized_tx_executed_locally_with_timeout(
786 validator_state: &Arc<AuthorityState>,
787 tx_digest: TransactionDigest,
788 tx_type: TxType,
789 metrics: &TransactionOrchestratorMetrics,
790 ) -> SuiResult {
791 metrics.local_execution_in_flight.inc();
792 let _metrics_guard =
793 scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
794 in_flight.dec();
795 });
796
797 let _latency_guard = metrics
798 .local_execution_latency
799 .with_label_values(&[tx_type.as_str()])
800 .start_timer();
801 debug!("Waiting for finalized tx to be executed locally.");
802 match timeout(
803 LOCAL_EXECUTION_TIMEOUT,
804 validator_state
805 .get_transaction_cache_reader()
806 .notify_read_executed_effects_digests(
807 "TransactionOrchestrator::notify_read_wait_for_local_execution",
808 &[tx_digest],
809 ),
810 )
811 .instrument(error_span!(
812 "transaction_orchestrator::local_execution",
813 ?tx_digest
814 ))
815 .await
816 {
817 Err(_elapsed) => {
818 debug!(
819 "Waiting for finalized tx to be executed locally timed out within {:?}.",
820 LOCAL_EXECUTION_TIMEOUT
821 );
822 metrics.local_execution_timeout.inc();
823 Err(SuiErrorKind::TimeoutError.into())
824 }
825 Ok(_) => {
826 metrics.local_execution_success.inc();
827 Ok(())
828 }
829 }
830 }
831
832 fn should_use_transaction_driver(
833 &self,
834 epoch_store: &Arc<AuthorityPerEpochStore>,
835 tx_digest: TransactionDigest,
836 ) -> bool {
837 const MAX_PERCENTAGE: u8 = 100;
838 let unknown_network = epoch_store.get_chain() == Chain::Unknown;
839 let v = if unknown_network {
840 rand::thread_rng().gen_range(1..=MAX_PERCENTAGE)
841 } else {
842 let v = u32::from_le_bytes(tx_digest.inner()[..4].try_into().unwrap());
843 (v % (MAX_PERCENTAGE as u32) + 1) as u8
844 };
845 debug!(
846 "Choosing whether to use transaction driver: {} vs {}",
847 v, self.td_percentage
848 );
849 v <= self.td_percentage
850 }
851
852 async fn loop_pending_transaction_log(
854 mut effects_receiver: Receiver<QuorumDriverEffectsQueueResult>,
855 pending_transaction_log: Arc<WritePathPendingTransactionLog>,
856 ) {
857 loop {
858 match effects_receiver.recv().await {
859 Ok(Ok((transaction, ..))) => {
860 let tx_digest = transaction.digest();
861 if let Err(err) = pending_transaction_log.finish_transaction(tx_digest) {
862 error!(
863 ?tx_digest,
864 "Failed to finish transaction in pending transaction log: {err}"
865 );
866 }
867 }
868 Ok(Err((tx_digest, _err))) => {
869 if let Err(err) = pending_transaction_log.finish_transaction(&tx_digest) {
870 error!(
871 ?tx_digest,
872 "Failed to finish transaction in pending transaction log: {err}"
873 );
874 }
875 }
876 Err(RecvError::Closed) => {
877 error!("Sender of effects subscriber queue has been dropped!");
878 return;
879 }
880 Err(RecvError::Lagged(skipped_count)) => {
881 warn!("Skipped {skipped_count} transasctions in effects subscriber queue.");
882 }
883 }
884 }
885 }
886
887 pub fn quorum_driver(&self) -> &Arc<QuorumDriverHandler<A>> {
888 &self.quorum_driver_handler
889 }
890
891 pub fn clone_quorum_driver(&self) -> Arc<QuorumDriverHandler<A>> {
892 self.quorum_driver_handler.clone()
893 }
894
895 pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
896 self.quorum_driver().authority_aggregator().load_full()
897 }
898
899 fn update_metrics<'a>(
900 &'a self,
901 transaction: &Transaction,
902 ) -> (impl Drop + use<A>, &'a GenericCounter<AtomicU64>) {
903 let (in_flight, good_response) = if transaction.is_consensus_tx() {
904 self.metrics.total_req_received_shared_object.inc();
905 (
906 self.metrics.req_in_flight_shared_object.clone(),
907 &self.metrics.good_response_shared_object,
908 )
909 } else {
910 self.metrics.total_req_received_single_writer.inc();
911 (
912 self.metrics.req_in_flight_single_writer.clone(),
913 &self.metrics.good_response_single_writer,
914 )
915 };
916 in_flight.inc();
917 (
918 scopeguard::guard(in_flight, |in_flight| {
919 in_flight.dec();
920 }),
921 good_response,
922 )
923 }
924
925 fn schedule_txes_in_log(
926 pending_tx_log: Arc<WritePathPendingTransactionLog>,
927 quorum_driver: Arc<QuorumDriverHandler<A>>,
928 ) {
929 spawn_logged_monitored_task!(async move {
930 if std::env::var("SKIP_LOADING_FROM_PENDING_TX_LOG").is_ok() {
931 info!("Skipping loading pending transactions from pending_tx_log.");
932 return;
933 }
934 let pending_txes = pending_tx_log
935 .load_all_pending_transactions()
936 .expect("failed to load all pending transactions");
937 info!(
938 "Recovering {} pending transactions from pending_tx_log.",
939 pending_txes.len()
940 );
941 for (i, tx) in pending_txes.into_iter().enumerate() {
942 let tx = tx.into_inner();
945 let tx_digest = *tx.digest();
946 if let Err(err) = quorum_driver
949 .submit_transaction_no_ticket(
950 ExecuteTransactionRequestV3 {
951 transaction: tx,
952 include_events: true,
953 include_input_objects: false,
954 include_output_objects: false,
955 include_auxiliary_data: false,
956 },
957 None,
958 )
959 .await
960 {
961 warn!(
962 ?tx_digest,
963 "Failed to enqueue transaction from pending_tx_log, err: {err:?}"
964 );
965 } else {
966 debug!("Enqueued transaction from pending_tx_log");
967 if (i + 1) % 1000 == 0 {
968 info!("Enqueued {} transactions from pending_tx_log.", i + 1);
969 }
970 }
971 }
972 });
975 }
976
977 pub fn load_all_pending_transactions_in_test(&self) -> SuiResult<Vec<VerifiedTransaction>> {
978 self.pending_tx_log.load_all_pending_transactions()
979 }
980}
981#[derive(Clone)]
983pub struct TransactionOrchestratorMetrics {
984 total_req_received_single_writer: GenericCounter<AtomicU64>,
985 total_req_received_shared_object: GenericCounter<AtomicU64>,
986
987 good_response_single_writer: GenericCounter<AtomicU64>,
988 good_response_shared_object: GenericCounter<AtomicU64>,
989
990 req_in_flight_single_writer: GenericGauge<AtomicI64>,
991 req_in_flight_shared_object: GenericGauge<AtomicI64>,
992
993 wait_for_finality_in_flight: GenericGauge<AtomicI64>,
994 wait_for_finality_finished: GenericCounter<AtomicU64>,
995 wait_for_finality_timeout: GenericCounter<AtomicU64>,
996
997 local_execution_in_flight: GenericGauge<AtomicI64>,
998 local_execution_success: GenericCounter<AtomicU64>,
999 local_execution_timeout: GenericCounter<AtomicU64>,
1000
1001 concurrent_execution: IntCounter,
1002
1003 early_validation_rejections: IntCounterVec,
1004
1005 request_latency: HistogramVec,
1006 local_execution_latency: HistogramVec,
1007 settlement_finality_latency: HistogramVec,
1008}
1009
1010impl TransactionOrchestratorMetrics {
1014 pub fn new(registry: &Registry) -> Self {
1015 let total_req_received = register_int_counter_vec_with_registry!(
1016 "tx_orchestrator_total_req_received",
1017 "Total number of executions request Transaction Orchestrator receives, group by tx type",
1018 &["tx_type"],
1019 registry
1020 )
1021 .unwrap();
1022
1023 let total_req_received_single_writer =
1024 total_req_received.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
1025 let total_req_received_shared_object =
1026 total_req_received.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
1027
1028 let good_response = register_int_counter_vec_with_registry!(
1029 "tx_orchestrator_good_response",
1030 "Total number of good responses Transaction Orchestrator generates, group by tx type",
1031 &["tx_type"],
1032 registry
1033 )
1034 .unwrap();
1035
1036 let good_response_single_writer =
1037 good_response.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
1038 let good_response_shared_object = good_response.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
1039
1040 let req_in_flight = register_int_gauge_vec_with_registry!(
1041 "tx_orchestrator_req_in_flight",
1042 "Number of requests in flights Transaction Orchestrator processes, group by tx type",
1043 &["tx_type"],
1044 registry
1045 )
1046 .unwrap();
1047
1048 let req_in_flight_single_writer =
1049 req_in_flight.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
1050 let req_in_flight_shared_object = req_in_flight.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
1051
1052 Self {
1053 total_req_received_single_writer,
1054 total_req_received_shared_object,
1055 good_response_single_writer,
1056 good_response_shared_object,
1057 req_in_flight_single_writer,
1058 req_in_flight_shared_object,
1059 wait_for_finality_in_flight: register_int_gauge_with_registry!(
1060 "tx_orchestrator_wait_for_finality_in_flight",
1061 "Number of in flight txns Transaction Orchestrator are waiting for finality for",
1062 registry,
1063 )
1064 .unwrap(),
1065 wait_for_finality_finished: register_int_counter_with_registry!(
1066 "tx_orchestrator_wait_for_finality_fnished",
1067 "Total number of txns Transaction Orchestrator gets responses from Quorum Driver before timeout, either success or failure",
1068 registry,
1069 )
1070 .unwrap(),
1071 wait_for_finality_timeout: register_int_counter_with_registry!(
1072 "tx_orchestrator_wait_for_finality_timeout",
1073 "Total number of txns timing out in waiting for finality Transaction Orchestrator handles",
1074 registry,
1075 )
1076 .unwrap(),
1077 local_execution_in_flight: register_int_gauge_with_registry!(
1078 "tx_orchestrator_local_execution_in_flight",
1079 "Number of local execution txns in flights Transaction Orchestrator handles",
1080 registry,
1081 )
1082 .unwrap(),
1083 local_execution_success: register_int_counter_with_registry!(
1084 "tx_orchestrator_local_execution_success",
1085 "Total number of successful local execution txns Transaction Orchestrator handles",
1086 registry,
1087 )
1088 .unwrap(),
1089 local_execution_timeout: register_int_counter_with_registry!(
1090 "tx_orchestrator_local_execution_timeout",
1091 "Total number of timed-out local execution txns Transaction Orchestrator handles",
1092 registry,
1093 )
1094 .unwrap(),
1095 concurrent_execution: register_int_counter_with_registry!(
1096 "tx_orchestrator_concurrent_execution",
1097 "Total number of concurrent execution where effects are available locally finishing driving the transaction to finality",
1098 registry,
1099 )
1100 .unwrap(),
1101 early_validation_rejections: register_int_counter_vec_with_registry!(
1102 "tx_orchestrator_early_validation_rejections",
1103 "Total number of transactions rejected during early validation before submission, by error type",
1104 &["error_type"],
1105 registry,
1106 )
1107 .unwrap(),
1108 request_latency: register_histogram_vec_with_registry!(
1109 "tx_orchestrator_request_latency",
1110 "Time spent in processing one Transaction Orchestrator request",
1111 &["tx_type", "route", "wait_for_local_execution"],
1112 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1113 registry,
1114 )
1115 .unwrap(),
1116 local_execution_latency: register_histogram_vec_with_registry!(
1117 "tx_orchestrator_local_execution_latency",
1118 "Time spent in waiting for one Transaction Orchestrator gets locally executed",
1119 &["tx_type"],
1120 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
1121 registry,
1122 )
1123 .unwrap(),
1124 settlement_finality_latency: register_histogram_vec_with_registry!(
1125 "tx_orchestrator_settlement_finality_latency",
1126 "Time spent in waiting for one Transaction Orchestrator gets settled and finalized",
1127 &["tx_type", "driver_type"],
1128 mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
1129 registry,
1130 )
1131 .unwrap(),
1132 }
1133 }
1134
1135 pub fn new_for_tests() -> Self {
1136 let registry = Registry::new();
1137 Self::new(®istry)
1138 }
1139}
1140
1141#[async_trait::async_trait]
1142impl<A> sui_types::transaction_executor::TransactionExecutor for TransactionOrchestrator<A>
1143where
1144 A: AuthorityAPI + Send + Sync + 'static + Clone,
1145{
1146 async fn execute_transaction(
1147 &self,
1148 request: ExecuteTransactionRequestV3,
1149 client_addr: Option<std::net::SocketAddr>,
1150 ) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
1151 self.execute_transaction_v3(request, client_addr).await
1152 }
1153
1154 fn simulate_transaction(
1155 &self,
1156 transaction: TransactionData,
1157 checks: TransactionChecks,
1158 ) -> Result<SimulateTransactionResult, SuiError> {
1159 self.validator_state
1160 .simulate_transaction(transaction, checks)
1161 }
1162}