sui_core/transaction_driver/
mod.rs1mod effects_certifier;
5mod error;
6mod metrics;
7mod request_retrier;
8mod transaction_submitter;
9
10pub use error::TransactionDriverError;
12pub use metrics::*;
13use mysten_common::backoff::ExponentialBackoff;
14
15use std::{
16 net::SocketAddr,
17 sync::Arc,
18 time::{Duration, Instant},
19};
20
21use arc_swap::ArcSwap;
22use effects_certifier::*;
23use mysten_metrics::{monitored_future, spawn_logged_monitored_task};
24use parking_lot::Mutex;
25use rand::Rng;
26use sui_types::{
27 committee::EpochId,
28 error::{ErrorCategory, UserInputError},
29 messages_grpc::{PingType, SubmitTxRequest, SubmitTxResult, TxType},
30 transaction::TransactionDataAPI as _,
31};
32use tokio::{
33 task::JoinSet,
34 time::{interval, sleep},
35};
36use tracing::instrument;
37use transaction_submitter::*;
38
39use crate::{
40 authority_aggregator::AuthorityAggregator,
41 authority_client::AuthorityAPI,
42 quorum_driver::{AuthorityAggregatorUpdatable, reconfig_observer::ReconfigObserver},
43 validator_client_monitor::{
44 OperationFeedback, OperationType, ValidatorClientMetrics, ValidatorClientMonitor,
45 },
46};
47use sui_config::NodeConfig;
48#[derive(Clone, Default, Debug)]
50pub struct SubmitTransactionOptions {
51 pub forwarded_client_addr: Option<SocketAddr>,
54
55 pub allowed_validators: Vec<String>,
58
59 pub blocked_validators: Vec<String>,
62}
63
64#[derive(Clone, Debug)]
65pub struct QuorumTransactionResponse {
66 pub effects: sui_types::quorum_driver_types::FinalizedEffects,
68
69 pub events: Option<sui_types::effects::TransactionEvents>,
70 pub input_objects: Option<Vec<sui_types::object::Object>>,
72 pub output_objects: Option<Vec<sui_types::object::Object>>,
74 pub auxiliary_data: Option<Vec<u8>>,
75}
76
77pub struct TransactionDriver<A: Clone> {
78 authority_aggregator: Arc<ArcSwap<AuthorityAggregator<A>>>,
79 state: Mutex<State>,
80 metrics: Arc<TransactionDriverMetrics>,
81 submitter: TransactionSubmitter,
82 certifier: EffectsCertifier,
83 client_monitor: Arc<ValidatorClientMonitor<A>>,
84}
85
86impl<A> TransactionDriver<A>
87where
88 A: AuthorityAPI + Send + Sync + 'static + Clone,
89{
90 pub fn new(
92 authority_aggregator: Arc<AuthorityAggregator<A>>,
93 reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
94 metrics: Arc<TransactionDriverMetrics>,
95 node_config: Option<&NodeConfig>,
96 client_metrics: Arc<ValidatorClientMetrics>,
97 ) -> Arc<Self> {
98 if std::env::var("TRANSACTION_DRIVER").is_ok() {
99 tracing::warn!(
100 "Transaction Driver is the only supported driver for transaction submission. Setting TRANSACTION_DRIVER is a no-op."
101 );
102 }
103
104 let shared_swap = Arc::new(ArcSwap::new(authority_aggregator));
105
106 let monitor_config = node_config
108 .and_then(|nc| nc.validator_client_monitor_config.clone())
109 .unwrap_or_default();
110 let client_monitor =
111 ValidatorClientMonitor::new(monitor_config, client_metrics, shared_swap.clone());
112
113 let driver = Arc::new(Self {
114 authority_aggregator: shared_swap,
115 state: Mutex::new(State::new()),
116 metrics: metrics.clone(),
117 submitter: TransactionSubmitter::new(metrics.clone()),
118 certifier: EffectsCertifier::new(metrics),
119 client_monitor,
120 });
121
122 let driver_clone = driver.clone();
123
124 spawn_logged_monitored_task!(Self::run_latency_checks(driver_clone));
125
126 driver.enable_reconfig(reconfig_observer);
127 driver
128 }
129
130 pub fn authority_aggregator(&self) -> &Arc<ArcSwap<AuthorityAggregator<A>>> {
132 &self.authority_aggregator
133 }
134
135 #[instrument(level = "error", skip_all, fields(tx_digest = ?request.transaction.as_ref().map(|t| t.digest()), ping = %request.ping_type.is_some()))]
142 pub async fn drive_transaction(
143 &self,
144 request: SubmitTxRequest,
145 options: SubmitTransactionOptions,
146 timeout_duration: Option<Duration>,
147 ) -> Result<QuorumTransactionResponse, TransactionDriverError> {
148 const MAX_DRIVE_TRANSACTION_RETRY_DELAY: Duration = Duration::from_secs(10);
149
150 let amplification_factor = if request.ping_type.is_some() {
152 1
153 } else {
154 let gas_price = request
155 .transaction
156 .as_ref()
157 .unwrap()
158 .transaction_data()
159 .gas_price();
160 let reference_gas_price = self.authority_aggregator.load().reference_gas_price;
161 let amplification_factor = gas_price / reference_gas_price.max(1);
162 if amplification_factor == 0 {
163 return Err(TransactionDriverError::ValidationFailed {
164 error: UserInputError::GasPriceUnderRGP {
165 gas_price,
166 reference_gas_price,
167 }
168 .to_string(),
169 });
170 }
171 amplification_factor
172 };
173
174 let tx_type = request.tx_type();
175 let ping_label = if request.ping_type.is_some() {
176 "true"
177 } else {
178 "false"
179 };
180 let timer = Instant::now();
181
182 self.metrics
183 .total_transactions_submitted
184 .with_label_values(&[tx_type.as_str(), ping_label])
185 .inc();
186
187 let mut backoff = ExponentialBackoff::new(
188 Duration::from_millis(100),
189 MAX_DRIVE_TRANSACTION_RETRY_DELAY,
190 );
191 let mut attempts = 0;
192 let mut latest_retriable_error = None;
193
194 let retry_loop = async {
195 loop {
196 match self
198 .drive_transaction_once(amplification_factor, request.clone(), &options)
199 .await
200 {
201 Ok(resp) => {
202 let settlement_finality_latency = timer.elapsed().as_secs_f64();
203 self.metrics
204 .settlement_finality_latency
205 .with_label_values(&[tx_type.as_str(), ping_label])
206 .observe(settlement_finality_latency);
207 self.metrics
209 .transaction_retries
210 .with_label_values(&["success", tx_type.as_str(), ping_label])
211 .observe(attempts as f64);
212 return Ok(resp);
213 }
214 Err(e) => {
215 self.metrics
216 .drive_transaction_errors
217 .with_label_values(&[
218 e.categorize().into(),
219 tx_type.as_str(),
220 ping_label,
221 ])
222 .inc();
223 if !e.is_submission_retriable() {
224 self.metrics
226 .transaction_retries
227 .with_label_values(&["failure", tx_type.as_str(), ping_label])
228 .observe(attempts as f64);
229 if request.transaction.is_some() {
230 tracing::info!(
231 "User transaction failed to finalize (attempt {}), with non-retriable error: {}",
232 attempts,
233 e
234 );
235 }
236 return Err(e);
237 }
238 if request.transaction.is_some() {
239 tracing::info!(
240 "User transaction failed to finalize (attempt {}): {}. Retrying ...",
241 attempts,
242 e
243 );
244 }
245 latest_retriable_error = Some(e);
247 }
248 }
249
250 let overload = if let Some(e) = &latest_retriable_error {
251 e.categorize() == ErrorCategory::ValidatorOverloaded
252 } else {
253 false
254 };
255 let delay = if overload {
256 const OVERLOAD_ADDITIONAL_DELAY: Duration = Duration::from_secs(10);
258 backoff.next().unwrap() + OVERLOAD_ADDITIONAL_DELAY
259 } else {
260 backoff.next().unwrap()
261 };
262 sleep(delay).await;
263
264 attempts += 1;
265 }
266 };
267
268 match timeout_duration {
269 Some(duration) => {
270 tokio::time::timeout(duration, retry_loop)
271 .await
272 .unwrap_or_else(|_| {
273 let e = TransactionDriverError::TimeoutWithLastRetriableError {
275 last_error: latest_retriable_error.map(Box::new),
276 attempts,
277 timeout: duration,
278 };
279 if request.transaction.is_some() {
280 tracing::info!(
281 "User transaction timed out after {} attempts. Last error: {}",
282 attempts,
283 e
284 );
285 }
286 Err(e)
287 })
288 }
289 None => retry_loop.await,
290 }
291 }
292
293 #[instrument(level = "error", skip_all, err(level = "debug"))]
294 async fn drive_transaction_once(
295 &self,
296 amplification_factor: u64,
297 request: SubmitTxRequest,
298 options: &SubmitTransactionOptions,
299 ) -> Result<QuorumTransactionResponse, TransactionDriverError> {
300 let auth_agg = self.authority_aggregator.load();
301 let amplification_factor =
302 amplification_factor.min(auth_agg.committee.num_members() as u64);
303 let start_time = Instant::now();
304 let tx_type = request.tx_type();
305 let tx_digest = request.tx_digest();
306 let ping_type = request.ping_type;
307
308 let (name, submit_txn_result) = self
309 .submitter
310 .submit_transaction(
311 &auth_agg,
312 &self.client_monitor,
313 tx_type,
314 amplification_factor,
315 request,
316 options,
317 )
318 .await?;
319 if let SubmitTxResult::Rejected { error } = &submit_txn_result {
320 return Err(TransactionDriverError::ClientInternal {
321 error: format!(
322 "SubmitTxResult::Rejected should have been returned as an error in submit_transaction(): {}",
323 error
324 ),
325 });
326 }
327
328 let result = self
330 .certifier
331 .get_certified_finalized_effects(
332 &auth_agg,
333 &self.client_monitor,
334 tx_digest,
335 tx_type,
336 name,
337 submit_txn_result,
338 options,
339 )
340 .await;
341
342 if result.is_ok() {
343 self.client_monitor
344 .record_interaction_result(OperationFeedback {
345 authority_name: name,
346 display_name: auth_agg.get_display_name(&name),
347 operation: if tx_type == TxType::SingleWriter {
348 OperationType::FastPath
349 } else {
350 OperationType::Consensus
351 },
352 ping_type,
353 result: Ok(start_time.elapsed()),
354 });
355 }
356 result
357 }
358
359 async fn run_latency_checks(self: Arc<Self>) {
361 const INTERVAL_BETWEEN_RUNS: Duration = Duration::from_secs(15);
362 const MAX_JITTER: Duration = Duration::from_secs(10);
363 const PING_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
364
365 let mut interval = interval(INTERVAL_BETWEEN_RUNS);
366 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
367
368 loop {
369 interval.tick().await;
370
371 let mut tasks = JoinSet::new();
372
373 for tx_type in [TxType::SingleWriter, TxType::SharedObject] {
374 Self::ping_for_tx_type(
375 self.clone(),
376 &mut tasks,
377 tx_type,
378 MAX_JITTER,
379 PING_REQUEST_TIMEOUT,
380 );
381 }
382
383 while let Some(result) = tasks.join_next().await {
384 if let Err(e) = result {
385 tracing::debug!("Error while driving ping transaction: {}", e);
386 }
387 }
388 }
389 }
390
391 fn ping_for_tx_type(
393 self: Arc<Self>,
394 tasks: &mut JoinSet<()>,
395 tx_type: TxType,
396 max_jitter: Duration,
397 ping_timeout: Duration,
398 ) {
399 let auth_agg = self.authority_aggregator.load().clone();
401 let validators = auth_agg.committee.names().cloned().collect::<Vec<_>>();
402
403 self.metrics
404 .latency_check_runs
405 .with_label_values(&[tx_type.as_str()])
406 .inc();
407
408 for name in validators {
409 let display_name = auth_agg.get_display_name(&name);
410 let delay_ms = rand::thread_rng().gen_range(0..max_jitter.as_millis()) as u64;
411 let self_clone = self.clone();
412
413 let task = async move {
414 if delay_ms > 0 {
416 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
417 }
418 let start_time = Instant::now();
419
420 let ping_type = if tx_type == TxType::SingleWriter {
421 PingType::FastPath
422 } else {
423 PingType::Consensus
424 };
425
426 match self_clone
428 .drive_transaction(
429 SubmitTxRequest::new_ping(ping_type),
430 SubmitTransactionOptions {
431 allowed_validators: vec![display_name.clone()],
432 ..Default::default()
433 },
434 Some(ping_timeout),
435 )
436 .await
437 {
438 Ok(_) => {
439 tracing::debug!(
440 "Ping transaction to validator {} for tx type {} completed end to end in {} seconds",
441 display_name,
442 tx_type.as_str(),
443 start_time.elapsed().as_secs_f64()
444 );
445 }
446 Err(err) => {
447 tracing::debug!(
448 "Failed to get certified finalized effects for tx type {}, for ping transaction to validator {}: {}",
449 tx_type.as_str(),
450 display_name,
451 err
452 );
453 }
454 }
455 };
456
457 tasks.spawn(task);
458 }
459 }
460
461 fn enable_reconfig(
462 self: &Arc<Self>,
463 reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
464 ) {
465 let driver = self.clone();
466 self.state.lock().tasks.spawn(monitored_future!(async move {
467 let mut reconfig_observer = reconfig_observer.clone_boxed();
468 reconfig_observer.run(driver).await;
469 }));
470 }
471}
472
473impl<A> AuthorityAggregatorUpdatable<A> for TransactionDriver<A>
474where
475 A: AuthorityAPI + Send + Sync + 'static + Clone,
476{
477 fn epoch(&self) -> EpochId {
478 self.authority_aggregator.load().committee.epoch
479 }
480
481 fn authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
482 self.authority_aggregator.load_full()
483 }
484
485 fn update_authority_aggregator(&self, new_authorities: Arc<AuthorityAggregator<A>>) {
486 tracing::info!(
487 "Transaction Driver updating AuthorityAggregator with committee {}",
488 new_authorities.committee
489 );
490
491 self.authority_aggregator.store(new_authorities);
492 }
493}
494
495struct State {
497 tasks: JoinSet<()>,
498}
499
500impl State {
501 fn new() -> Self {
502 Self {
503 tasks: JoinSet::new(),
504 }
505 }
506}