sui_core/transaction_driver/
mod.rs1mod effects_certifier;
5mod error;
6mod metrics;
7mod reconfig_observer;
8mod request_retrier;
9mod transaction_submitter;
10
11pub use error::TransactionDriverError;
13pub use metrics::*;
14pub use reconfig_observer::{OnsiteReconfigObserver, ReconfigObserver};
15
16use std::{
17 net::SocketAddr,
18 sync::Arc,
19 time::{Duration, Instant},
20};
21
22use arc_swap::ArcSwap;
23use effects_certifier::*;
24use mysten_common::backoff::ExponentialBackoff;
25use mysten_metrics::{monitored_future, spawn_logged_monitored_task};
26use parking_lot::Mutex;
27use rand::Rng;
28use sui_config::NodeConfig;
29use sui_types::{
30 committee::EpochId,
31 error::{ErrorCategory, UserInputError},
32 messages_grpc::{PingType, SubmitTxRequest, SubmitTxResult, TxType},
33 transaction::TransactionDataAPI as _,
34};
35use tokio::{
36 task::JoinSet,
37 time::{interval, sleep},
38};
39use tracing::instrument;
40use transaction_submitter::*;
41
42use crate::{
43 authority_aggregator::AuthorityAggregator,
44 authority_client::AuthorityAPI,
45 validator_client_monitor::{
46 OperationFeedback, OperationType, ValidatorClientMetrics, ValidatorClientMonitor,
47 },
48};
49
50pub trait AuthorityAggregatorUpdatable<A: Clone>: Send + Sync + 'static {
53 fn epoch(&self) -> EpochId;
54 fn authority_aggregator(&self) -> Arc<AuthorityAggregator<A>>;
55 fn update_authority_aggregator(&self, new_authorities: Arc<AuthorityAggregator<A>>);
56}
57
58#[derive(Clone, Default, Debug)]
60pub struct SubmitTransactionOptions {
61 pub forwarded_client_addr: Option<SocketAddr>,
64
65 pub allowed_validators: Vec<String>,
68
69 pub blocked_validators: Vec<String>,
72}
73
74#[derive(Clone, Debug)]
75pub struct QuorumTransactionResponse {
76 pub effects: sui_types::transaction_driver_types::FinalizedEffects,
77
78 pub events: Option<sui_types::effects::TransactionEvents>,
79 pub input_objects: Option<Vec<sui_types::object::Object>>,
81 pub output_objects: Option<Vec<sui_types::object::Object>>,
83 pub auxiliary_data: Option<Vec<u8>>,
84}
85
86pub struct TransactionDriver<A: Clone> {
87 authority_aggregator: Arc<ArcSwap<AuthorityAggregator<A>>>,
88 state: Mutex<State>,
89 metrics: Arc<TransactionDriverMetrics>,
90 submitter: TransactionSubmitter,
91 certifier: EffectsCertifier,
92 client_monitor: Arc<ValidatorClientMonitor<A>>,
93}
94
95impl<A> TransactionDriver<A>
96where
97 A: AuthorityAPI + Send + Sync + 'static + Clone,
98{
99 pub fn new(
101 authority_aggregator: Arc<AuthorityAggregator<A>>,
102 reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
103 metrics: Arc<TransactionDriverMetrics>,
104 node_config: Option<&NodeConfig>,
105 client_metrics: Arc<ValidatorClientMetrics>,
106 ) -> Arc<Self> {
107 if std::env::var("TRANSACTION_DRIVER").is_ok() {
108 tracing::warn!(
109 "Transaction Driver is the only supported driver for transaction submission. Setting TRANSACTION_DRIVER is a no-op."
110 );
111 }
112
113 let shared_swap = Arc::new(ArcSwap::new(authority_aggregator));
114
115 let monitor_config = node_config
117 .and_then(|nc| nc.validator_client_monitor_config.clone())
118 .unwrap_or_default();
119 let client_monitor =
120 ValidatorClientMonitor::new(monitor_config, client_metrics, shared_swap.clone());
121
122 let driver = Arc::new(Self {
123 authority_aggregator: shared_swap,
124 state: Mutex::new(State::new()),
125 metrics: metrics.clone(),
126 submitter: TransactionSubmitter::new(metrics.clone()),
127 certifier: EffectsCertifier::new(metrics),
128 client_monitor,
129 });
130
131 let driver_clone = driver.clone();
132
133 spawn_logged_monitored_task!(Self::run_latency_checks(driver_clone));
134
135 driver.enable_reconfig(reconfig_observer);
136 driver
137 }
138
139 pub fn authority_aggregator(&self) -> &Arc<ArcSwap<AuthorityAggregator<A>>> {
141 &self.authority_aggregator
142 }
143
144 #[instrument(level = "error", skip_all, fields(tx_digest = ?request.transaction.as_ref().map(|t| t.digest()), ping = %request.ping_type.is_some()))]
151 pub async fn drive_transaction(
152 &self,
153 request: SubmitTxRequest,
154 options: SubmitTransactionOptions,
155 timeout_duration: Option<Duration>,
156 ) -> Result<QuorumTransactionResponse, TransactionDriverError> {
157 const MAX_DRIVE_TRANSACTION_RETRY_DELAY: Duration = Duration::from_secs(10);
158
159 let amplification_factor = if request.ping_type.is_some() {
161 1
162 } else {
163 let gas_price = request
164 .transaction
165 .as_ref()
166 .unwrap()
167 .transaction_data()
168 .gas_price();
169 let reference_gas_price = self.authority_aggregator.load().reference_gas_price;
170 let amplification_factor = gas_price / reference_gas_price.max(1);
171 if amplification_factor == 0 {
172 return Err(TransactionDriverError::ValidationFailed {
173 error: UserInputError::GasPriceUnderRGP {
174 gas_price,
175 reference_gas_price,
176 }
177 .to_string(),
178 });
179 }
180 amplification_factor
181 };
182
183 let tx_type = request.tx_type();
184 let ping_label = if request.ping_type.is_some() {
185 "true"
186 } else {
187 "false"
188 };
189 let timer = Instant::now();
190
191 self.metrics
192 .total_transactions_submitted
193 .with_label_values(&[tx_type.as_str(), ping_label])
194 .inc();
195
196 let mut backoff = ExponentialBackoff::new(
197 Duration::from_millis(100),
198 MAX_DRIVE_TRANSACTION_RETRY_DELAY,
199 );
200 let mut attempts = 0;
201 let mut latest_retriable_error = None;
202
203 let retry_loop = async {
204 loop {
205 match self
207 .drive_transaction_once(amplification_factor, request.clone(), &options)
208 .await
209 {
210 Ok(resp) => {
211 let settlement_finality_latency = timer.elapsed().as_secs_f64();
212 self.metrics
213 .settlement_finality_latency
214 .with_label_values(&[tx_type.as_str(), ping_label])
215 .observe(settlement_finality_latency);
216 let is_out_of_expected_range = settlement_finality_latency >= 8.0
217 || settlement_finality_latency <= 0.1;
218 tracing::debug!(
219 ?tx_type,
220 ?is_out_of_expected_range,
221 "Settlement finality latency: {:.3} seconds",
222 settlement_finality_latency
223 );
224 self.metrics
226 .transaction_retries
227 .with_label_values(&["success", tx_type.as_str(), ping_label])
228 .observe(attempts as f64);
229 return Ok(resp);
230 }
231 Err(e) => {
232 self.metrics
233 .drive_transaction_errors
234 .with_label_values(&[
235 e.categorize().into(),
236 tx_type.as_str(),
237 ping_label,
238 ])
239 .inc();
240 if !e.is_submission_retriable() {
241 self.metrics
243 .transaction_retries
244 .with_label_values(&["failure", tx_type.as_str(), ping_label])
245 .observe(attempts as f64);
246 if request.transaction.is_some() {
247 tracing::info!(
248 "User transaction failed to finalize (attempt {}), with non-retriable error: {} ({})",
249 attempts,
250 e,
251 Into::<&str>::into(e.categorize())
252 );
253 }
254 return Err(e);
255 }
256 if request.transaction.is_some() {
257 tracing::info!(
258 "User transaction failed to finalize (attempt {}): {} ({}). Retrying ...",
259 attempts,
260 e,
261 Into::<&str>::into(e.categorize())
262 );
263 }
264 latest_retriable_error = Some(e);
266 }
267 }
268
269 let overload = if let Some(e) = &latest_retriable_error {
270 e.categorize() == ErrorCategory::ValidatorOverloaded
271 } else {
272 false
273 };
274 let delay = if overload {
275 const OVERLOAD_ADDITIONAL_DELAY: Duration = Duration::from_secs(10);
277 backoff.next().unwrap() + OVERLOAD_ADDITIONAL_DELAY
278 } else {
279 backoff.next().unwrap()
280 };
281
282 tracing::debug!("Retrying after {:.3}s", delay.as_secs_f32());
283 sleep(delay).await;
284
285 attempts += 1;
286 }
287 };
288
289 match timeout_duration {
290 Some(duration) => {
291 tokio::time::timeout(duration, retry_loop)
292 .await
293 .unwrap_or_else(|_| {
294 let e = TransactionDriverError::TimeoutWithLastRetriableError {
296 last_error: latest_retriable_error.map(Box::new),
297 attempts,
298 timeout: duration,
299 };
300 if request.transaction.is_some() {
301 tracing::info!(
302 "User transaction timed out after {} attempts. Last error: {}",
303 attempts,
304 e
305 );
306 }
307 Err(e)
308 })
309 }
310 None => retry_loop.await,
311 }
312 }
313
314 #[instrument(level = "error", skip_all, err(level = "debug"))]
315 async fn drive_transaction_once(
316 &self,
317 amplification_factor: u64,
318 request: SubmitTxRequest,
319 options: &SubmitTransactionOptions,
320 ) -> Result<QuorumTransactionResponse, TransactionDriverError> {
321 let auth_agg = self.authority_aggregator.load();
322 let start_time = Instant::now();
323 let tx_type = request.tx_type();
324 let tx_digest = request.tx_digest();
325 let ping_type = request.ping_type;
326
327 let (name, submit_txn_result) = self
328 .submitter
329 .submit_transaction(
330 &auth_agg,
331 &self.client_monitor,
332 tx_type,
333 amplification_factor,
334 request,
335 options,
336 )
337 .await?;
338 if let SubmitTxResult::Rejected { error } = &submit_txn_result {
339 return Err(TransactionDriverError::ClientInternal {
340 error: format!(
341 "SubmitTxResult::Rejected should have been returned as an error in submit_transaction(): {}",
342 error
343 ),
344 });
345 }
346
347 let result = self
349 .certifier
350 .get_certified_finalized_effects(
351 &auth_agg,
352 &self.client_monitor,
353 tx_digest,
354 tx_type,
355 name,
356 submit_txn_result,
357 options,
358 )
359 .await;
360
361 if result.is_ok() {
362 self.client_monitor
363 .record_interaction_result(OperationFeedback {
364 authority_name: name,
365 display_name: auth_agg.get_display_name(&name),
366 operation: if tx_type == TxType::SingleWriter {
367 OperationType::FastPath
368 } else {
369 OperationType::Consensus
370 },
371 ping_type,
372 result: Ok(start_time.elapsed()),
373 });
374 }
375 result
376 }
377
378 async fn run_latency_checks(self: Arc<Self>) {
380 const INTERVAL_BETWEEN_RUNS: Duration = Duration::from_secs(15);
381 const MAX_JITTER: Duration = Duration::from_secs(10);
382 const PING_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
383
384 let mut interval = interval(INTERVAL_BETWEEN_RUNS);
385 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
386
387 loop {
388 interval.tick().await;
389
390 let auth_agg = self.authority_aggregator.load().clone();
393 let validators = auth_agg.committee.names().cloned().collect::<Vec<_>>();
394
395 self.metrics.latency_check_runs.inc();
396
397 let mut tasks = JoinSet::new();
398
399 for name in validators {
400 let display_name = auth_agg.get_display_name(&name);
401 let delay_ms = rand::thread_rng().gen_range(0..MAX_JITTER.as_millis()) as u64;
402 let self_clone = self.clone();
403
404 let task = async move {
405 if delay_ms > 0 {
407 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
408 }
409 let start_time = Instant::now();
410
411 match self_clone
413 .drive_transaction(
414 SubmitTxRequest::new_ping(PingType::Consensus),
415 SubmitTransactionOptions {
416 allowed_validators: vec![display_name.clone()],
417 ..Default::default()
418 },
419 Some(PING_REQUEST_TIMEOUT),
420 )
421 .await
422 {
423 Ok(_) => {
424 tracing::debug!(
425 "Ping transaction to validator {} completed end to end in {} seconds",
426 display_name,
427 start_time.elapsed().as_secs_f64()
428 );
429 }
430 Err(err) => {
431 tracing::debug!(
432 "Failed to get certified finalized effects for ping transaction to validator {}: {}",
433 display_name,
434 err
435 );
436 }
437 }
438 };
439
440 tasks.spawn(task);
441 }
442
443 while let Some(result) = tasks.join_next().await {
444 if let Err(e) = result {
445 tracing::debug!("Error while driving ping transaction: {}", e);
446 }
447 }
448 }
449 }
450
451 fn enable_reconfig(
452 self: &Arc<Self>,
453 reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
454 ) {
455 let driver = self.clone();
456 self.state.lock().tasks.spawn(monitored_future!(async move {
457 let mut reconfig_observer = reconfig_observer.clone_boxed();
458 reconfig_observer.run(driver).await;
459 }));
460 }
461}
462
463impl<A> AuthorityAggregatorUpdatable<A> for TransactionDriver<A>
464where
465 A: AuthorityAPI + Send + Sync + 'static + Clone,
466{
467 fn epoch(&self) -> EpochId {
468 self.authority_aggregator.load().committee.epoch
469 }
470
471 fn authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
472 self.authority_aggregator.load_full()
473 }
474
475 fn update_authority_aggregator(&self, new_authorities: Arc<AuthorityAggregator<A>>) {
476 tracing::info!(
477 "Transaction Driver updating AuthorityAggregator with committee {}",
478 new_authorities.committee
479 );
480
481 self.authority_aggregator.store(new_authorities);
482 }
483}
484
485struct State {
487 tasks: JoinSet<()>,
488}
489
490impl State {
491 fn new() -> Self {
492 Self {
493 tasks: JoinSet::new(),
494 }
495 }
496}