sui_core/transaction_driver/
mod.rs1mod effects_certifier;
5mod error;
6mod metrics;
7mod request_retrier;
8mod transaction_submitter;
9
10pub use error::TransactionDriverError;
12pub use metrics::*;
13use mysten_common::backoff::ExponentialBackoff;
14
15use std::{
16 net::SocketAddr,
17 sync::Arc,
18 time::{Duration, Instant},
19};
20
21use arc_swap::ArcSwap;
22use effects_certifier::*;
23use mysten_metrics::{monitored_future, spawn_logged_monitored_task};
24use parking_lot::Mutex;
25use rand::Rng;
26use sui_types::{
27 committee::EpochId,
28 error::{ErrorCategory, UserInputError},
29 messages_grpc::{PingType, SubmitTxRequest, SubmitTxResult, TxType},
30 transaction::TransactionDataAPI as _,
31};
32use tokio::{
33 task::JoinSet,
34 time::{interval, sleep},
35};
36use tracing::instrument;
37use transaction_submitter::*;
38
39use crate::{
40 authority_aggregator::AuthorityAggregator,
41 authority_client::AuthorityAPI,
42 quorum_driver::{AuthorityAggregatorUpdatable, reconfig_observer::ReconfigObserver},
43 validator_client_monitor::{
44 OperationFeedback, OperationType, ValidatorClientMetrics, ValidatorClientMonitor,
45 },
46};
47use sui_config::NodeConfig;
48#[derive(Clone, Default, Debug)]
50pub struct SubmitTransactionOptions {
51 pub forwarded_client_addr: Option<SocketAddr>,
54
55 pub allowed_validators: Vec<String>,
58
59 pub blocked_validators: Vec<String>,
62}
63
64#[derive(Clone, Debug)]
65pub struct QuorumTransactionResponse {
66 pub effects: sui_types::quorum_driver_types::FinalizedEffects,
68
69 pub events: Option<sui_types::effects::TransactionEvents>,
70 pub input_objects: Option<Vec<sui_types::object::Object>>,
72 pub output_objects: Option<Vec<sui_types::object::Object>>,
74 pub auxiliary_data: Option<Vec<u8>>,
75}
76
77pub struct TransactionDriver<A: Clone> {
78 authority_aggregator: Arc<ArcSwap<AuthorityAggregator<A>>>,
79 state: Mutex<State>,
80 metrics: Arc<TransactionDriverMetrics>,
81 submitter: TransactionSubmitter,
82 certifier: EffectsCertifier,
83 client_monitor: Arc<ValidatorClientMonitor<A>>,
84}
85
86impl<A> TransactionDriver<A>
87where
88 A: AuthorityAPI + Send + Sync + 'static + Clone,
89{
90 pub fn new(
92 authority_aggregator: Arc<AuthorityAggregator<A>>,
93 reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
94 metrics: Arc<TransactionDriverMetrics>,
95 node_config: Option<&NodeConfig>,
96 client_metrics: Arc<ValidatorClientMetrics>,
97 ) -> Arc<Self> {
98 let shared_swap = Arc::new(ArcSwap::new(authority_aggregator));
99
100 let monitor_config = node_config
102 .and_then(|nc| nc.validator_client_monitor_config.clone())
103 .unwrap_or_default();
104 let client_monitor =
105 ValidatorClientMonitor::new(monitor_config, client_metrics, shared_swap.clone());
106
107 let driver = Arc::new(Self {
108 authority_aggregator: shared_swap,
109 state: Mutex::new(State::new()),
110 metrics: metrics.clone(),
111 submitter: TransactionSubmitter::new(metrics.clone()),
112 certifier: EffectsCertifier::new(metrics),
113 client_monitor,
114 });
115
116 let driver_clone = driver.clone();
117
118 spawn_logged_monitored_task!(Self::run_latency_checks(driver_clone));
119
120 driver.enable_reconfig(reconfig_observer);
121 driver
122 }
123
124 pub fn authority_aggregator(&self) -> &Arc<ArcSwap<AuthorityAggregator<A>>> {
126 &self.authority_aggregator
127 }
128
129 #[instrument(level = "error", skip_all, fields(tx_digest = ?request.transaction.as_ref().map(|t| t.digest()), ping = %request.ping_type.is_some()))]
136 pub async fn drive_transaction(
137 &self,
138 request: SubmitTxRequest,
139 options: SubmitTransactionOptions,
140 timeout_duration: Option<Duration>,
141 ) -> Result<QuorumTransactionResponse, TransactionDriverError> {
142 const MAX_DRIVE_TRANSACTION_RETRY_DELAY: Duration = Duration::from_secs(10);
143
144 let amplification_factor = if request.ping_type.is_some() {
146 1
147 } else {
148 let gas_price = request
149 .transaction
150 .as_ref()
151 .unwrap()
152 .transaction_data()
153 .gas_price();
154 let reference_gas_price = self.authority_aggregator.load().reference_gas_price;
155 let amplification_factor = gas_price / reference_gas_price.max(1);
156 if amplification_factor == 0 {
157 return Err(TransactionDriverError::ValidationFailed {
158 error: UserInputError::GasPriceUnderRGP {
159 gas_price,
160 reference_gas_price,
161 }
162 .to_string(),
163 });
164 }
165 amplification_factor
166 };
167
168 let tx_type = request.tx_type();
169 let ping_label = if request.ping_type.is_some() {
170 "true"
171 } else {
172 "false"
173 };
174 let timer = Instant::now();
175
176 self.metrics
177 .total_transactions_submitted
178 .with_label_values(&[tx_type.as_str(), ping_label])
179 .inc();
180
181 let mut backoff = ExponentialBackoff::new(MAX_DRIVE_TRANSACTION_RETRY_DELAY);
182 let mut attempts = 0;
183 let mut latest_retriable_error = None;
184
185 let retry_loop = async {
186 loop {
187 match self
189 .drive_transaction_once(amplification_factor, request.clone(), &options)
190 .await
191 {
192 Ok(resp) => {
193 let settlement_finality_latency = timer.elapsed().as_secs_f64();
194 self.metrics
195 .settlement_finality_latency
196 .with_label_values(&[tx_type.as_str(), ping_label])
197 .observe(settlement_finality_latency);
198 self.metrics
200 .transaction_retries
201 .with_label_values(&["success", tx_type.as_str(), ping_label])
202 .observe(attempts as f64);
203 return Ok(resp);
204 }
205 Err(e) => {
206 self.metrics
207 .drive_transaction_errors
208 .with_label_values(&[
209 e.categorize().into(),
210 tx_type.as_str(),
211 ping_label,
212 ])
213 .inc();
214 if !e.is_submission_retriable() {
215 self.metrics
217 .transaction_retries
218 .with_label_values(&["failure", tx_type.as_str(), ping_label])
219 .observe(attempts as f64);
220 if request.transaction.is_some() {
221 tracing::info!(
222 "Failed to finalize transaction with non-retriable error after {} attempts: {}",
223 attempts,
224 e
225 );
226 }
227 return Err(e);
228 }
229 if request.transaction.is_some() {
230 tracing::info!(
231 "Failed to finalize transaction (attempt {}): {}. Retrying ...",
232 attempts,
233 e
234 );
235 }
236 latest_retriable_error = Some(e);
238 }
239 }
240
241 let overload = if let Some(e) = &latest_retriable_error {
242 e.categorize() == ErrorCategory::ValidatorOverloaded
243 } else {
244 false
245 };
246 let delay = if overload {
247 const OVERLOAD_ADDITIONAL_DELAY: Duration = Duration::from_secs(10);
249 backoff.next().unwrap() + OVERLOAD_ADDITIONAL_DELAY
250 } else {
251 backoff.next().unwrap()
252 };
253 sleep(delay).await;
254
255 attempts += 1;
256 }
257 };
258
259 match timeout_duration {
260 Some(duration) => {
261 tokio::time::timeout(duration, retry_loop)
262 .await
263 .unwrap_or_else(|_| {
264 let e = TransactionDriverError::TimeoutWithLastRetriableError {
266 last_error: latest_retriable_error.map(Box::new),
267 attempts,
268 timeout: duration,
269 };
270 if request.transaction.is_some() {
271 tracing::info!(
272 "Transaction timed out after {} attempts. Last error: {}",
273 attempts,
274 e
275 );
276 }
277 Err(e)
278 })
279 }
280 None => retry_loop.await,
281 }
282 }
283
284 #[instrument(level = "error", skip_all, err(level = "debug"))]
285 async fn drive_transaction_once(
286 &self,
287 amplification_factor: u64,
288 request: SubmitTxRequest,
289 options: &SubmitTransactionOptions,
290 ) -> Result<QuorumTransactionResponse, TransactionDriverError> {
291 let auth_agg = self.authority_aggregator.load();
292 let amplification_factor =
293 amplification_factor.min(auth_agg.committee.num_members() as u64);
294 let start_time = Instant::now();
295 let tx_type = request.tx_type();
296 let tx_digest = request.tx_digest();
297 let ping_type = request.ping_type;
298
299 let (name, submit_txn_result) = self
300 .submitter
301 .submit_transaction(
302 &auth_agg,
303 &self.client_monitor,
304 tx_type,
305 amplification_factor,
306 request,
307 options,
308 )
309 .await?;
310 if let SubmitTxResult::Rejected { error } = &submit_txn_result {
311 return Err(TransactionDriverError::ClientInternal {
312 error: format!(
313 "SubmitTxResult::Rejected should have been returned as an error in submit_transaction(): {}",
314 error
315 ),
316 });
317 }
318
319 let result = self
321 .certifier
322 .get_certified_finalized_effects(
323 &auth_agg,
324 &self.client_monitor,
325 tx_digest,
326 tx_type,
327 name,
328 submit_txn_result,
329 options,
330 )
331 .await;
332
333 if result.is_ok() {
334 self.client_monitor
335 .record_interaction_result(OperationFeedback {
336 authority_name: name,
337 display_name: auth_agg.get_display_name(&name),
338 operation: if tx_type == TxType::SingleWriter {
339 OperationType::FastPath
340 } else {
341 OperationType::Consensus
342 },
343 ping_type,
344 result: Ok(start_time.elapsed()),
345 });
346 }
347 result
348 }
349
350 async fn run_latency_checks(self: Arc<Self>) {
352 const INTERVAL_BETWEEN_RUNS: Duration = Duration::from_secs(15);
353 const MAX_JITTER: Duration = Duration::from_secs(10);
354 const PING_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
355
356 let mut interval = interval(INTERVAL_BETWEEN_RUNS);
357 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
358
359 loop {
360 interval.tick().await;
361
362 let mut tasks = JoinSet::new();
363
364 for tx_type in [TxType::SingleWriter, TxType::SharedObject] {
365 Self::ping_for_tx_type(
366 self.clone(),
367 &mut tasks,
368 tx_type,
369 MAX_JITTER,
370 PING_REQUEST_TIMEOUT,
371 );
372 }
373
374 while let Some(result) = tasks.join_next().await {
375 if let Err(e) = result {
376 tracing::debug!("Error while driving ping transaction: {}", e);
377 }
378 }
379 }
380 }
381
382 fn ping_for_tx_type(
384 self: Arc<Self>,
385 tasks: &mut JoinSet<()>,
386 tx_type: TxType,
387 max_jitter: Duration,
388 ping_timeout: Duration,
389 ) {
390 let auth_agg = self.authority_aggregator.load().clone();
392 let validators = auth_agg.committee.names().cloned().collect::<Vec<_>>();
393
394 self.metrics
395 .latency_check_runs
396 .with_label_values(&[tx_type.as_str()])
397 .inc();
398
399 for name in validators {
400 let display_name = auth_agg.get_display_name(&name);
401 let delay_ms = rand::thread_rng().gen_range(0..max_jitter.as_millis()) as u64;
402 let self_clone = self.clone();
403
404 let task = async move {
405 if delay_ms > 0 {
407 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
408 }
409 let start_time = Instant::now();
410
411 let ping_type = if tx_type == TxType::SingleWriter {
412 PingType::FastPath
413 } else {
414 PingType::Consensus
415 };
416
417 match self_clone
419 .drive_transaction(
420 SubmitTxRequest::new_ping(ping_type),
421 SubmitTransactionOptions {
422 allowed_validators: vec![display_name.clone()],
423 ..Default::default()
424 },
425 Some(ping_timeout),
426 )
427 .await
428 {
429 Ok(_) => {
430 tracing::debug!(
431 "Ping transaction to validator {} for tx type {} completed end to end in {} seconds",
432 display_name,
433 tx_type.as_str(),
434 start_time.elapsed().as_secs_f64()
435 );
436 }
437 Err(err) => {
438 tracing::debug!(
439 "Failed to get certified finalized effects for tx type {}, for ping transaction to validator {}: {}",
440 tx_type.as_str(),
441 display_name,
442 err
443 );
444 }
445 }
446 };
447
448 tasks.spawn(task);
449 }
450 }
451
452 fn enable_reconfig(
453 self: &Arc<Self>,
454 reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
455 ) {
456 let driver = self.clone();
457 self.state.lock().tasks.spawn(monitored_future!(async move {
458 let mut reconfig_observer = reconfig_observer.clone_boxed();
459 reconfig_observer.run(driver).await;
460 }));
461 }
462}
463
464impl<A> AuthorityAggregatorUpdatable<A> for TransactionDriver<A>
465where
466 A: AuthorityAPI + Send + Sync + 'static + Clone,
467{
468 fn epoch(&self) -> EpochId {
469 self.authority_aggregator.load().committee.epoch
470 }
471
472 fn authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
473 self.authority_aggregator.load_full()
474 }
475
476 fn update_authority_aggregator(&self, new_authorities: Arc<AuthorityAggregator<A>>) {
477 tracing::info!(
478 "Transaction Driver updating AuthorityAggregator with committee {}",
479 new_authorities.committee
480 );
481
482 self.authority_aggregator.store(new_authorities);
483 }
484}
485
486pub fn choose_transaction_driver_percentage(
488 chain_id: Option<sui_types::digests::ChainIdentifier>,
489) -> u8 {
490 if let Ok(v) = std::env::var("TRANSACTION_DRIVER")
491 && let Ok(tx_driver_percentage) = v.parse::<u8>()
492 && (0..=100).contains(&tx_driver_percentage)
493 {
494 return tx_driver_percentage;
495 }
496
497 if let Some(chain_identifier) = chain_id
498 && chain_identifier.chain() == sui_protocol_config::Chain::Unknown
499 {
500 return 50;
502 }
503
504 100
506}
507
508struct State {
510 tasks: JoinSet<()>,
511}
512
513impl State {
514 fn new() -> Self {
515 Self {
516 tasks: JoinSet::new(),
517 }
518 }
519}