sui_core/transaction_driver/
mod.rs1mod effects_certifier;
5mod error;
6mod metrics;
7mod reconfig_observer;
8mod request_retrier;
9mod transaction_submitter;
10
11pub use error::TransactionDriverError;
13pub use metrics::*;
14pub use reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver};
15
16use std::{
17 net::SocketAddr,
18 sync::Arc,
19 time::{Duration, Instant},
20};
21
22use arc_swap::ArcSwap;
23use effects_certifier::*;
24use mysten_common::backoff::ExponentialBackoff;
25use mysten_metrics::{monitored_future, spawn_logged_monitored_task};
26use parking_lot::Mutex;
27use rand::Rng;
28use sui_config::NodeConfig;
29use sui_types::{
30 committee::EpochId,
31 error::{ErrorCategory, UserInputError},
32 messages_grpc::{SubmitTxRequest, SubmitTxResult, TxType},
33 transaction::TransactionDataAPI as _,
34};
35use tokio::{
36 task::JoinSet,
37 time::{interval, sleep},
38};
39use tracing::instrument;
40use transaction_submitter::*;
41
42use crate::{
43 authority_aggregator::AuthorityAggregator,
44 authority_client::AuthorityAPI,
45 validator_client_monitor::{
46 OperationFeedback, OperationType, ValidatorClientMetrics, ValidatorClientMonitor,
47 },
48};
49
50pub trait AuthorityAggregatorUpdatable<A: Clone>: Send + Sync + 'static {
53 fn epoch(&self) -> EpochId;
54 fn authority_aggregator(&self) -> Arc<AuthorityAggregator<A>>;
55 fn update_authority_aggregator(&self, new_authorities: Arc<AuthorityAggregator<A>>);
56}
57
58#[derive(Clone, Default, Debug)]
60pub struct SubmitTransactionOptions {
61 pub forwarded_client_addr: Option<SocketAddr>,
64
65 pub allowed_validators: Vec<String>,
68
69 pub blocked_validators: Vec<String>,
72}
73
74#[derive(Clone, Debug)]
75pub struct QuorumTransactionResponse {
76 pub effects: sui_types::transaction_driver_types::FinalizedEffects,
77
78 pub events: Option<sui_types::effects::TransactionEvents>,
79 pub input_objects: Option<Vec<sui_types::object::Object>>,
81 pub output_objects: Option<Vec<sui_types::object::Object>>,
83 pub auxiliary_data: Option<Vec<u8>>,
84}
85
86pub struct TransactionDriver<A: Clone> {
87 authority_aggregator: Arc<ArcSwap<AuthorityAggregator<A>>>,
88 state: Mutex<State>,
89 metrics: Arc<TransactionDriverMetrics>,
90 submitter: TransactionSubmitter,
91 certifier: EffectsCertifier,
92 client_monitor: Arc<ValidatorClientMonitor<A>>,
93}
94
95impl<A> TransactionDriver<A>
96where
97 A: AuthorityAPI + Send + Sync + 'static + Clone,
98{
99 pub fn new(
101 authority_aggregator: Arc<AuthorityAggregator<A>>,
102 reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
103 metrics: Arc<TransactionDriverMetrics>,
104 node_config: Option<&NodeConfig>,
105 client_metrics: Arc<ValidatorClientMetrics>,
106 ) -> Arc<Self> {
107 if std::env::var("TRANSACTION_DRIVER").is_ok() {
108 tracing::warn!(
109 "Transaction Driver is the only supported driver for transaction submission. Setting TRANSACTION_DRIVER is a no-op."
110 );
111 }
112
113 let shared_swap = Arc::new(ArcSwap::new(authority_aggregator));
114
115 let monitor_config = node_config
117 .and_then(|nc| nc.validator_client_monitor_config.clone())
118 .unwrap_or_default();
119 let client_monitor =
120 ValidatorClientMonitor::new(monitor_config, client_metrics, shared_swap.clone());
121
122 let driver = Arc::new(Self {
123 authority_aggregator: shared_swap,
124 state: Mutex::new(State::new()),
125 metrics: metrics.clone(),
126 submitter: TransactionSubmitter::new(metrics.clone()),
127 certifier: EffectsCertifier::new(metrics),
128 client_monitor,
129 });
130
131 let driver_clone = driver.clone();
132
133 spawn_logged_monitored_task!(Self::run_latency_checks(driver_clone));
134
135 driver.enable_reconfig(reconfig_observer);
136 driver
137 }
138
139 pub fn authority_aggregator(&self) -> &Arc<ArcSwap<AuthorityAggregator<A>>> {
141 &self.authority_aggregator
142 }
143
144 #[instrument(level = "error", skip_all, fields(tx_digest = ?request.transaction.as_ref().map(|t| t.digest()), ping = %request.ping_type.is_some()))]
151 pub async fn drive_transaction(
152 &self,
153 request: SubmitTxRequest,
154 options: SubmitTransactionOptions,
155 timeout_duration: Option<Duration>,
156 ) -> Result<QuorumTransactionResponse, TransactionDriverError> {
157 const MAX_DRIVE_TRANSACTION_RETRY_DELAY: Duration = Duration::from_secs(10);
158
159 let tx_data = request.transaction.as_ref().map(|t| t.transaction_data());
160 let amplification_factor =
162 if request.ping_type.is_some() || tx_data.is_some_and(|d| d.is_gasless_transaction()) {
163 1
164 } else {
165 let tx_data = tx_data.unwrap();
166 let gas_price = tx_data.gas_price();
167 let reference_gas_price = self.authority_aggregator.load().reference_gas_price;
168 let amplification_factor = gas_price / reference_gas_price.max(1);
169 if amplification_factor == 0 {
170 return Err(TransactionDriverError::ValidationFailed {
171 error: UserInputError::GasPriceUnderRGP {
172 gas_price,
173 reference_gas_price,
174 }
175 .to_string(),
176 });
177 }
178 amplification_factor
179 };
180
181 let tx_type = request.tx_type();
182 let ping_label = if request.ping_type.is_some() {
183 "true"
184 } else {
185 "false"
186 };
187 let timer = Instant::now();
188
189 self.metrics
190 .total_transactions_submitted
191 .with_label_values(&[tx_type.as_str(), ping_label])
192 .inc();
193
194 let mut backoff = ExponentialBackoff::new(
195 Duration::from_millis(100),
196 MAX_DRIVE_TRANSACTION_RETRY_DELAY,
197 );
198 let mut attempts = 0;
199 let mut latest_retriable_error = None;
200
201 let retry_loop = async {
202 loop {
203 match self
205 .drive_transaction_once(amplification_factor, request.clone(), &options)
206 .await
207 {
208 Ok(resp) => {
209 let settlement_finality_latency = timer.elapsed().as_secs_f64();
210 self.metrics
211 .settlement_finality_latency
212 .with_label_values(&[tx_type.as_str(), ping_label])
213 .observe(settlement_finality_latency);
214 let is_out_of_expected_range = settlement_finality_latency >= 8.0
215 || settlement_finality_latency <= 0.1;
216 tracing::debug!(
217 ?tx_type,
218 ?is_out_of_expected_range,
219 "Settlement finality latency: {:.3} seconds",
220 settlement_finality_latency
221 );
222 self.metrics
224 .transaction_retries
225 .with_label_values(&["success", tx_type.as_str(), ping_label])
226 .observe(attempts as f64);
227 return Ok(resp);
228 }
229 Err(e) => {
230 self.metrics
231 .drive_transaction_errors
232 .with_label_values(&[
233 e.categorize().into(),
234 tx_type.as_str(),
235 ping_label,
236 ])
237 .inc();
238 if !e.is_submission_retriable() {
239 self.metrics
241 .transaction_retries
242 .with_label_values(&["failure", tx_type.as_str(), ping_label])
243 .observe(attempts as f64);
244 if request.transaction.is_some() {
245 tracing::info!(
246 "User transaction failed to finalize (attempt {}), with non-retriable error: {} ({})",
247 attempts,
248 e,
249 Into::<&str>::into(e.categorize())
250 );
251 }
252 return Err(e);
253 }
254 if request.transaction.is_some() {
255 tracing::info!(
256 "User transaction failed to finalize (attempt {}): {} ({}). Retrying ...",
257 attempts,
258 e,
259 Into::<&str>::into(e.categorize())
260 );
261 }
262 latest_retriable_error = Some(e);
264 }
265 }
266
267 let overload = if let Some(e) = &latest_retriable_error {
268 e.categorize() == ErrorCategory::ValidatorOverloaded
269 } else {
270 false
271 };
272 let delay = if overload {
273 const OVERLOAD_ADDITIONAL_DELAY: Duration = Duration::from_secs(10);
275 backoff.next().unwrap() + OVERLOAD_ADDITIONAL_DELAY
276 } else {
277 backoff.next().unwrap()
278 };
279
280 tracing::debug!("Retrying after {:.3}s", delay.as_secs_f32());
281 sleep(delay).await;
282
283 attempts += 1;
284 }
285 };
286
287 match timeout_duration {
288 Some(duration) => {
289 tokio::time::timeout(duration, retry_loop)
290 .await
291 .unwrap_or_else(|_| {
292 let e = TransactionDriverError::TimeoutWithLastRetriableError {
294 last_error: latest_retriable_error.map(Box::new),
295 attempts,
296 timeout: duration,
297 };
298 if request.transaction.is_some() {
299 tracing::info!(
300 "User transaction timed out after {} attempts. Last error: {}",
301 attempts,
302 e
303 );
304 }
305 Err(e)
306 })
307 }
308 None => retry_loop.await,
309 }
310 }
311
312 #[instrument(level = "error", skip_all, err(level = "debug"))]
313 async fn drive_transaction_once(
314 &self,
315 amplification_factor: u64,
316 request: SubmitTxRequest,
317 options: &SubmitTransactionOptions,
318 ) -> Result<QuorumTransactionResponse, TransactionDriverError> {
319 let auth_agg = self.authority_aggregator.load();
320 let start_time = Instant::now();
321 let tx_type = request.tx_type();
322 let tx_digest = request.tx_digest();
323 let ping_type = request.ping_type;
324
325 let (name, submit_txn_result) = self
326 .submitter
327 .submit_transaction(
328 &auth_agg,
329 &self.client_monitor,
330 tx_type,
331 amplification_factor,
332 request,
333 options,
334 )
335 .await?;
336 if let SubmitTxResult::Rejected { error } = &submit_txn_result {
337 return Err(TransactionDriverError::ClientInternal {
338 error: format!(
339 "SubmitTxResult::Rejected should have been returned as an error in submit_transaction(): {}",
340 error
341 ),
342 });
343 }
344
345 let result = self
347 .certifier
348 .get_certified_finalized_effects(
349 &auth_agg,
350 &self.client_monitor,
351 tx_digest,
352 tx_type,
353 name,
354 submit_txn_result,
355 options,
356 )
357 .await;
358
359 if result.is_ok() {
360 self.client_monitor
361 .record_interaction_result(OperationFeedback {
362 authority_name: name,
363 display_name: auth_agg.get_display_name(&name),
364 operation: if tx_type == TxType::SingleWriter {
365 OperationType::SingleWriterFinality
366 } else {
367 OperationType::SharedObjectFinality
368 },
369 ping_type,
370 result: Ok(start_time.elapsed()),
371 });
372 }
373 result
374 }
375
376 async fn run_latency_checks(self: Arc<Self>) {
378 const INTERVAL_BETWEEN_RUNS: Duration = Duration::from_secs(15);
379 const MAX_JITTER: Duration = Duration::from_secs(10);
380 const PING_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
381
382 let mut interval = interval(INTERVAL_BETWEEN_RUNS);
383 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
384
385 loop {
386 interval.tick().await;
387
388 let auth_agg = self.authority_aggregator.load().clone();
391 let validators = auth_agg.committee.names().cloned().collect::<Vec<_>>();
392
393 self.metrics.latency_check_runs.inc();
394
395 let mut tasks = JoinSet::new();
396
397 for name in validators {
398 let display_name = auth_agg.get_display_name(&name);
399 let delay_ms = rand::thread_rng().gen_range(0..MAX_JITTER.as_millis()) as u64;
400 let self_clone = self.clone();
401
402 let task = async move {
403 if delay_ms > 0 {
405 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
406 }
407 let start_time = Instant::now();
408
409 match self_clone
411 .drive_transaction(
412 SubmitTxRequest::new_ping(),
413 SubmitTransactionOptions {
414 allowed_validators: vec![display_name.clone()],
415 ..Default::default()
416 },
417 Some(PING_REQUEST_TIMEOUT),
418 )
419 .await
420 {
421 Ok(_) => {
422 tracing::debug!(
423 "Ping transaction to validator {} completed end to end in {} seconds",
424 display_name,
425 start_time.elapsed().as_secs_f64()
426 );
427 }
428 Err(err) => {
429 tracing::debug!(
430 "Failed to get certified finalized effects for ping transaction to validator {}: {}",
431 display_name,
432 err
433 );
434 }
435 }
436 };
437
438 tasks.spawn(task);
439 }
440
441 while let Some(result) = tasks.join_next().await {
442 if let Err(e) = result {
443 tracing::debug!("Error while driving ping transaction: {}", e);
444 }
445 }
446 }
447 }
448
449 fn enable_reconfig(
450 self: &Arc<Self>,
451 reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
452 ) {
453 let driver = self.clone();
454 self.state.lock().tasks.spawn(monitored_future!(async move {
455 let mut reconfig_observer = reconfig_observer.clone_boxed();
456 reconfig_observer.run(driver).await;
457 }));
458 }
459}
460
461impl<A> AuthorityAggregatorUpdatable<A> for TransactionDriver<A>
462where
463 A: AuthorityAPI + Send + Sync + 'static + Clone,
464{
465 fn epoch(&self) -> EpochId {
466 self.authority_aggregator.load().committee.epoch
467 }
468
469 fn authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
470 self.authority_aggregator.load_full()
471 }
472
473 fn update_authority_aggregator(&self, new_authorities: Arc<AuthorityAggregator<A>>) {
474 tracing::info!(
475 "Transaction Driver updating AuthorityAggregator with committee {}",
476 new_authorities.committee
477 );
478
479 self.authority_aggregator.store(new_authorities);
480 }
481}
482
483struct State {
485 tasks: JoinSet<()>,
486}
487
488impl State {
489 fn new() -> Self {
490 Self {
491 tasks: JoinSet::new(),
492 }
493 }
494}