sui_core/transaction_driver/
mod.rs1mod effects_certifier;
5mod error;
6mod metrics;
7mod request_retrier;
8mod transaction_submitter;
9
10pub use error::TransactionDriverError;
12pub use metrics::*;
13use mysten_common::backoff::ExponentialBackoff;
14
15use std::{
16 net::SocketAddr,
17 sync::Arc,
18 time::{Duration, Instant},
19};
20
21use arc_swap::ArcSwap;
22use effects_certifier::*;
23use mysten_metrics::{monitored_future, spawn_logged_monitored_task};
24use parking_lot::Mutex;
25use rand::Rng;
26use sui_types::{
27 committee::EpochId,
28 error::{ErrorCategory, UserInputError},
29 messages_grpc::{PingType, SubmitTxRequest, SubmitTxResult, TxType},
30 transaction::TransactionDataAPI as _,
31};
32use tokio::{
33 task::JoinSet,
34 time::{interval, sleep},
35};
36use tracing::instrument;
37use transaction_submitter::*;
38
39use crate::{
40 authority_aggregator::AuthorityAggregator,
41 authority_client::AuthorityAPI,
42 validator_client_monitor::{
43 OperationFeedback, OperationType, ValidatorClientMetrics, ValidatorClientMonitor,
44 },
45};
46
47pub mod reconfig_observer;
48pub use reconfig_observer::ReconfigObserver;
49
50pub trait AuthorityAggregatorUpdatable<A: Clone>: Send + Sync + 'static {
53 fn epoch(&self) -> EpochId;
54 fn authority_aggregator(&self) -> Arc<AuthorityAggregator<A>>;
55 fn update_authority_aggregator(&self, new_authorities: Arc<AuthorityAggregator<A>>);
56}
57use sui_config::NodeConfig;
58#[derive(Clone, Default, Debug)]
60pub struct SubmitTransactionOptions {
61 pub forwarded_client_addr: Option<SocketAddr>,
64
65 pub allowed_validators: Vec<String>,
68
69 pub blocked_validators: Vec<String>,
72}
73
74#[derive(Clone, Debug)]
75pub struct QuorumTransactionResponse {
76 pub effects: sui_types::transaction_driver_types::FinalizedEffects,
77
78 pub events: Option<sui_types::effects::TransactionEvents>,
79 pub input_objects: Option<Vec<sui_types::object::Object>>,
81 pub output_objects: Option<Vec<sui_types::object::Object>>,
83 pub auxiliary_data: Option<Vec<u8>>,
84}
85
86pub struct TransactionDriver<A: Clone> {
87 authority_aggregator: Arc<ArcSwap<AuthorityAggregator<A>>>,
88 state: Mutex<State>,
89 metrics: Arc<TransactionDriverMetrics>,
90 submitter: TransactionSubmitter,
91 certifier: EffectsCertifier,
92 client_monitor: Arc<ValidatorClientMonitor<A>>,
93}
94
95impl<A> TransactionDriver<A>
96where
97 A: AuthorityAPI + Send + Sync + 'static + Clone,
98{
99 pub fn new(
101 authority_aggregator: Arc<AuthorityAggregator<A>>,
102 reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
103 metrics: Arc<TransactionDriverMetrics>,
104 node_config: Option<&NodeConfig>,
105 client_metrics: Arc<ValidatorClientMetrics>,
106 ) -> Arc<Self> {
107 if std::env::var("TRANSACTION_DRIVER").is_ok() {
108 tracing::warn!(
109 "Transaction Driver is the only supported driver for transaction submission. Setting TRANSACTION_DRIVER is a no-op."
110 );
111 }
112
113 let shared_swap = Arc::new(ArcSwap::new(authority_aggregator));
114
115 let monitor_config = node_config
117 .and_then(|nc| nc.validator_client_monitor_config.clone())
118 .unwrap_or_default();
119 let client_monitor =
120 ValidatorClientMonitor::new(monitor_config, client_metrics, shared_swap.clone());
121
122 let driver = Arc::new(Self {
123 authority_aggregator: shared_swap,
124 state: Mutex::new(State::new()),
125 metrics: metrics.clone(),
126 submitter: TransactionSubmitter::new(metrics.clone()),
127 certifier: EffectsCertifier::new(metrics),
128 client_monitor,
129 });
130
131 let driver_clone = driver.clone();
132
133 spawn_logged_monitored_task!(Self::run_latency_checks(driver_clone));
134
135 driver.enable_reconfig(reconfig_observer);
136 driver
137 }
138
139 pub fn authority_aggregator(&self) -> &Arc<ArcSwap<AuthorityAggregator<A>>> {
141 &self.authority_aggregator
142 }
143
144 #[instrument(level = "error", skip_all, fields(tx_digest = ?request.transaction.as_ref().map(|t| t.digest()), ping = %request.ping_type.is_some()))]
151 pub async fn drive_transaction(
152 &self,
153 request: SubmitTxRequest,
154 options: SubmitTransactionOptions,
155 timeout_duration: Option<Duration>,
156 ) -> Result<QuorumTransactionResponse, TransactionDriverError> {
157 const MAX_DRIVE_TRANSACTION_RETRY_DELAY: Duration = Duration::from_secs(10);
158
159 let amplification_factor = if request.ping_type.is_some() {
161 1
162 } else {
163 let gas_price = request
164 .transaction
165 .as_ref()
166 .unwrap()
167 .transaction_data()
168 .gas_price();
169 let reference_gas_price = self.authority_aggregator.load().reference_gas_price;
170 let amplification_factor = gas_price / reference_gas_price.max(1);
171 if amplification_factor == 0 {
172 return Err(TransactionDriverError::ValidationFailed {
173 error: UserInputError::GasPriceUnderRGP {
174 gas_price,
175 reference_gas_price,
176 }
177 .to_string(),
178 });
179 }
180 amplification_factor
181 };
182
183 let tx_type = request.tx_type();
184 let ping_label = if request.ping_type.is_some() {
185 "true"
186 } else {
187 "false"
188 };
189 let timer = Instant::now();
190
191 self.metrics
192 .total_transactions_submitted
193 .with_label_values(&[tx_type.as_str(), ping_label])
194 .inc();
195
196 let mut backoff = ExponentialBackoff::new(
197 Duration::from_millis(100),
198 MAX_DRIVE_TRANSACTION_RETRY_DELAY,
199 );
200 let mut attempts = 0;
201 let mut latest_retriable_error = None;
202
203 let retry_loop = async {
204 loop {
205 match self
207 .drive_transaction_once(amplification_factor, request.clone(), &options)
208 .await
209 {
210 Ok(resp) => {
211 let settlement_finality_latency = timer.elapsed().as_secs_f64();
212 self.metrics
213 .settlement_finality_latency
214 .with_label_values(&[tx_type.as_str(), ping_label])
215 .observe(settlement_finality_latency);
216 self.metrics
218 .transaction_retries
219 .with_label_values(&["success", tx_type.as_str(), ping_label])
220 .observe(attempts as f64);
221 return Ok(resp);
222 }
223 Err(e) => {
224 self.metrics
225 .drive_transaction_errors
226 .with_label_values(&[
227 e.categorize().into(),
228 tx_type.as_str(),
229 ping_label,
230 ])
231 .inc();
232 if !e.is_submission_retriable() {
233 self.metrics
235 .transaction_retries
236 .with_label_values(&["failure", tx_type.as_str(), ping_label])
237 .observe(attempts as f64);
238 if request.transaction.is_some() {
239 tracing::info!(
240 "User transaction failed to finalize (attempt {}), with non-retriable error: {}",
241 attempts,
242 e
243 );
244 }
245 return Err(e);
246 }
247 if request.transaction.is_some() {
248 tracing::info!(
249 "User transaction failed to finalize (attempt {}): {}. Retrying ...",
250 attempts,
251 e
252 );
253 }
254 latest_retriable_error = Some(e);
256 }
257 }
258
259 let overload = if let Some(e) = &latest_retriable_error {
260 e.categorize() == ErrorCategory::ValidatorOverloaded
261 } else {
262 false
263 };
264 let delay = if overload {
265 const OVERLOAD_ADDITIONAL_DELAY: Duration = Duration::from_secs(10);
267 backoff.next().unwrap() + OVERLOAD_ADDITIONAL_DELAY
268 } else {
269 backoff.next().unwrap()
270 };
271 sleep(delay).await;
272
273 attempts += 1;
274 }
275 };
276
277 match timeout_duration {
278 Some(duration) => {
279 tokio::time::timeout(duration, retry_loop)
280 .await
281 .unwrap_or_else(|_| {
282 let e = TransactionDriverError::TimeoutWithLastRetriableError {
284 last_error: latest_retriable_error.map(Box::new),
285 attempts,
286 timeout: duration,
287 };
288 if request.transaction.is_some() {
289 tracing::info!(
290 "User transaction timed out after {} attempts. Last error: {}",
291 attempts,
292 e
293 );
294 }
295 Err(e)
296 })
297 }
298 None => retry_loop.await,
299 }
300 }
301
302 #[instrument(level = "error", skip_all, err(level = "debug"))]
303 async fn drive_transaction_once(
304 &self,
305 amplification_factor: u64,
306 request: SubmitTxRequest,
307 options: &SubmitTransactionOptions,
308 ) -> Result<QuorumTransactionResponse, TransactionDriverError> {
309 let auth_agg = self.authority_aggregator.load();
310 let amplification_factor =
311 amplification_factor.min(auth_agg.committee.num_members() as u64);
312 let start_time = Instant::now();
313 let tx_type = request.tx_type();
314 let tx_digest = request.tx_digest();
315 let ping_type = request.ping_type;
316
317 let (name, submit_txn_result) = self
318 .submitter
319 .submit_transaction(
320 &auth_agg,
321 &self.client_monitor,
322 tx_type,
323 amplification_factor,
324 request,
325 options,
326 )
327 .await?;
328 if let SubmitTxResult::Rejected { error } = &submit_txn_result {
329 return Err(TransactionDriverError::ClientInternal {
330 error: format!(
331 "SubmitTxResult::Rejected should have been returned as an error in submit_transaction(): {}",
332 error
333 ),
334 });
335 }
336
337 let result = self
339 .certifier
340 .get_certified_finalized_effects(
341 &auth_agg,
342 &self.client_monitor,
343 tx_digest,
344 tx_type,
345 name,
346 submit_txn_result,
347 options,
348 )
349 .await;
350
351 if result.is_ok() {
352 self.client_monitor
353 .record_interaction_result(OperationFeedback {
354 authority_name: name,
355 display_name: auth_agg.get_display_name(&name),
356 operation: if tx_type == TxType::SingleWriter {
357 OperationType::FastPath
358 } else {
359 OperationType::Consensus
360 },
361 ping_type,
362 result: Ok(start_time.elapsed()),
363 });
364 }
365 result
366 }
367
368 async fn run_latency_checks(self: Arc<Self>) {
370 const INTERVAL_BETWEEN_RUNS: Duration = Duration::from_secs(15);
371 const MAX_JITTER: Duration = Duration::from_secs(10);
372 const PING_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
373
374 let mut interval = interval(INTERVAL_BETWEEN_RUNS);
375 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
376
377 loop {
378 interval.tick().await;
379
380 let mut tasks = JoinSet::new();
381
382 for tx_type in [TxType::SingleWriter, TxType::SharedObject] {
383 Self::ping_for_tx_type(
384 self.clone(),
385 &mut tasks,
386 tx_type,
387 MAX_JITTER,
388 PING_REQUEST_TIMEOUT,
389 );
390 }
391
392 while let Some(result) = tasks.join_next().await {
393 if let Err(e) = result {
394 tracing::debug!("Error while driving ping transaction: {}", e);
395 }
396 }
397 }
398 }
399
400 fn ping_for_tx_type(
402 self: Arc<Self>,
403 tasks: &mut JoinSet<()>,
404 tx_type: TxType,
405 max_jitter: Duration,
406 ping_timeout: Duration,
407 ) {
408 let auth_agg = self.authority_aggregator.load().clone();
410 let validators = auth_agg.committee.names().cloned().collect::<Vec<_>>();
411
412 self.metrics
413 .latency_check_runs
414 .with_label_values(&[tx_type.as_str()])
415 .inc();
416
417 for name in validators {
418 let display_name = auth_agg.get_display_name(&name);
419 let delay_ms = rand::thread_rng().gen_range(0..max_jitter.as_millis()) as u64;
420 let self_clone = self.clone();
421
422 let task = async move {
423 if delay_ms > 0 {
425 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
426 }
427 let start_time = Instant::now();
428
429 let ping_type = if tx_type == TxType::SingleWriter {
430 PingType::FastPath
431 } else {
432 PingType::Consensus
433 };
434
435 match self_clone
437 .drive_transaction(
438 SubmitTxRequest::new_ping(ping_type),
439 SubmitTransactionOptions {
440 allowed_validators: vec![display_name.clone()],
441 ..Default::default()
442 },
443 Some(ping_timeout),
444 )
445 .await
446 {
447 Ok(_) => {
448 tracing::debug!(
449 "Ping transaction to validator {} for tx type {} completed end to end in {} seconds",
450 display_name,
451 tx_type.as_str(),
452 start_time.elapsed().as_secs_f64()
453 );
454 }
455 Err(err) => {
456 tracing::debug!(
457 "Failed to get certified finalized effects for tx type {}, for ping transaction to validator {}: {}",
458 tx_type.as_str(),
459 display_name,
460 err
461 );
462 }
463 }
464 };
465
466 tasks.spawn(task);
467 }
468 }
469
470 fn enable_reconfig(
471 self: &Arc<Self>,
472 reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
473 ) {
474 let driver = self.clone();
475 self.state.lock().tasks.spawn(monitored_future!(async move {
476 let mut reconfig_observer = reconfig_observer.clone_boxed();
477 reconfig_observer.run(driver).await;
478 }));
479 }
480}
481
482impl<A> AuthorityAggregatorUpdatable<A> for TransactionDriver<A>
483where
484 A: AuthorityAPI + Send + Sync + 'static + Clone,
485{
486 fn epoch(&self) -> EpochId {
487 self.authority_aggregator.load().committee.epoch
488 }
489
490 fn authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
491 self.authority_aggregator.load_full()
492 }
493
494 fn update_authority_aggregator(&self, new_authorities: Arc<AuthorityAggregator<A>>) {
495 tracing::info!(
496 "Transaction Driver updating AuthorityAggregator with committee {}",
497 new_authorities.committee
498 );
499
500 self.authority_aggregator.store(new_authorities);
501 }
502}
503
504struct State {
506 tasks: JoinSet<()>,
507}
508
509impl State {
510 fn new() -> Self {
511 Self {
512 tasks: JoinSet::new(),
513 }
514 }
515}