sui_core/transaction_driver/
mod.rs1mod effects_certifier;
5mod error;
6mod metrics;
7mod request_retrier;
8mod transaction_submitter;
9
10pub use error::TransactionDriverError;
12pub use metrics::*;
13use mysten_common::backoff::ExponentialBackoff;
14
15use std::{
16 net::SocketAddr,
17 sync::Arc,
18 time::{Duration, Instant},
19};
20
21use arc_swap::ArcSwap;
22use effects_certifier::*;
23use mysten_metrics::{monitored_future, spawn_logged_monitored_task};
24use parking_lot::Mutex;
25use rand::Rng;
26use sui_types::{
27 committee::EpochId,
28 error::{ErrorCategory, UserInputError},
29 messages_grpc::{PingType, SubmitTxRequest, SubmitTxResult, TxType},
30 transaction::TransactionDataAPI as _,
31};
32use tokio::{
33 task::JoinSet,
34 time::{interval, sleep},
35};
36use tracing::instrument;
37use transaction_submitter::*;
38
39use crate::{
40 authority_aggregator::AuthorityAggregator,
41 authority_client::AuthorityAPI,
42 quorum_driver::{AuthorityAggregatorUpdatable, reconfig_observer::ReconfigObserver},
43 validator_client_monitor::{
44 OperationFeedback, OperationType, ValidatorClientMetrics, ValidatorClientMonitor,
45 },
46};
47use sui_config::NodeConfig;
48#[derive(Clone, Default, Debug)]
50pub struct SubmitTransactionOptions {
51 pub forwarded_client_addr: Option<SocketAddr>,
54
55 pub allowed_validators: Vec<String>,
58
59 pub blocked_validators: Vec<String>,
62}
63
64#[derive(Clone, Debug)]
65pub struct QuorumTransactionResponse {
66 pub effects: sui_types::quorum_driver_types::FinalizedEffects,
68
69 pub events: Option<sui_types::effects::TransactionEvents>,
70 pub input_objects: Option<Vec<sui_types::object::Object>>,
72 pub output_objects: Option<Vec<sui_types::object::Object>>,
74 pub auxiliary_data: Option<Vec<u8>>,
75}
76
77pub struct TransactionDriver<A: Clone> {
78 authority_aggregator: Arc<ArcSwap<AuthorityAggregator<A>>>,
79 state: Mutex<State>,
80 metrics: Arc<TransactionDriverMetrics>,
81 submitter: TransactionSubmitter,
82 certifier: EffectsCertifier,
83 client_monitor: Arc<ValidatorClientMonitor<A>>,
84}
85
86impl<A> TransactionDriver<A>
87where
88 A: AuthorityAPI + Send + Sync + 'static + Clone,
89{
90 pub fn new(
91 authority_aggregator: Arc<AuthorityAggregator<A>>,
92 reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
93 metrics: Arc<TransactionDriverMetrics>,
94 node_config: Option<&NodeConfig>,
95 client_metrics: Arc<ValidatorClientMetrics>,
96 ) -> Arc<Self> {
97 let shared_swap = Arc::new(ArcSwap::new(authority_aggregator));
98
99 let monitor_config = node_config
101 .and_then(|nc| nc.validator_client_monitor_config.clone())
102 .unwrap_or_default();
103 let client_monitor =
104 ValidatorClientMonitor::new(monitor_config, client_metrics, shared_swap.clone());
105
106 let driver = Arc::new(Self {
107 authority_aggregator: shared_swap,
108 state: Mutex::new(State::new()),
109 metrics: metrics.clone(),
110 submitter: TransactionSubmitter::new(metrics.clone()),
111 certifier: EffectsCertifier::new(metrics),
112 client_monitor,
113 });
114
115 let driver_clone = driver.clone();
116
117 spawn_logged_monitored_task!(Self::run_latency_checks(driver_clone));
118
119 driver.enable_reconfig(reconfig_observer);
120 driver
121 }
122
123 async fn run_latency_checks(self: Arc<Self>) {
125 const INTERVAL_BETWEEN_RUNS: Duration = Duration::from_secs(15);
126 const MAX_JITTER: Duration = Duration::from_secs(10);
127 const PING_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
128
129 let mut interval = interval(INTERVAL_BETWEEN_RUNS);
130 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
131
132 loop {
133 interval.tick().await;
134
135 let mut tasks = JoinSet::new();
136
137 for tx_type in [TxType::SingleWriter, TxType::SharedObject] {
138 Self::ping_for_tx_type(
139 self.clone(),
140 &mut tasks,
141 tx_type,
142 MAX_JITTER,
143 PING_REQUEST_TIMEOUT,
144 );
145 }
146
147 while let Some(result) = tasks.join_next().await {
148 if let Err(e) = result {
149 tracing::debug!("Error while driving ping transaction: {}", e);
150 }
151 }
152 }
153 }
154
155 fn ping_for_tx_type(
157 self: Arc<Self>,
158 tasks: &mut JoinSet<()>,
159 tx_type: TxType,
160 max_jitter: Duration,
161 ping_timeout: Duration,
162 ) {
163 let auth_agg = self.authority_aggregator.load().clone();
165 let validators = auth_agg.committee.names().cloned().collect::<Vec<_>>();
166
167 self.metrics
168 .latency_check_runs
169 .with_label_values(&[tx_type.as_str()])
170 .inc();
171
172 for name in validators {
173 let display_name = auth_agg.get_display_name(&name);
174 let delay_ms = rand::thread_rng().gen_range(0..max_jitter.as_millis()) as u64;
175 let self_clone = self.clone();
176
177 let task = async move {
178 if delay_ms > 0 {
180 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
181 }
182 let start_time = Instant::now();
183
184 let ping_type = if tx_type == TxType::SingleWriter {
185 PingType::FastPath
186 } else {
187 PingType::Consensus
188 };
189
190 match self_clone
192 .drive_transaction(
193 SubmitTxRequest::new_ping(ping_type),
194 SubmitTransactionOptions {
195 allowed_validators: vec![display_name.clone()],
196 ..Default::default()
197 },
198 Some(ping_timeout),
199 )
200 .await
201 {
202 Ok(_) => {
203 tracing::debug!(
204 "Ping transaction to validator {} for tx type {} completed end to end in {} seconds",
205 display_name,
206 tx_type.as_str(),
207 start_time.elapsed().as_secs_f64()
208 );
209 }
210 Err(err) => {
211 tracing::debug!(
212 "Failed to get certified finalized effects for tx type {}, for ping transaction to validator {}: {}",
213 tx_type.as_str(),
214 display_name,
215 err
216 );
217 }
218 }
219 };
220
221 tasks.spawn(task);
222 }
223 }
224
225 #[instrument(level = "error", skip_all, fields(tx_digest = ?request.transaction.as_ref().map(|t| t.digest()), ping = %request.ping_type.is_some()))]
227 pub async fn drive_transaction(
228 &self,
229 request: SubmitTxRequest,
230 options: SubmitTransactionOptions,
231 timeout_duration: Option<Duration>,
232 ) -> Result<QuorumTransactionResponse, TransactionDriverError> {
233 const MAX_DRIVE_TRANSACTION_RETRY_DELAY: Duration = Duration::from_secs(10);
234
235 let amplification_factor = if request.ping_type.is_some() {
237 1
238 } else {
239 let gas_price = request
240 .transaction
241 .as_ref()
242 .unwrap()
243 .transaction_data()
244 .gas_price();
245 let reference_gas_price = self.authority_aggregator.load().reference_gas_price;
246 let amplification_factor = gas_price / reference_gas_price.max(1);
247 if amplification_factor == 0 {
248 return Err(TransactionDriverError::ValidationFailed {
249 error: UserInputError::GasPriceUnderRGP {
250 gas_price,
251 reference_gas_price,
252 }
253 .to_string(),
254 });
255 }
256 amplification_factor
257 };
258
259 let tx_type = request.tx_type();
260 let ping_label = if request.ping_type.is_some() {
261 "true"
262 } else {
263 "false"
264 };
265 let timer = Instant::now();
266
267 self.metrics
268 .total_transactions_submitted
269 .with_label_values(&[tx_type.as_str(), ping_label])
270 .inc();
271
272 let mut backoff = ExponentialBackoff::new(MAX_DRIVE_TRANSACTION_RETRY_DELAY);
273 let mut attempts = 0;
274 let mut latest_retriable_error = None;
275
276 let retry_loop = async {
277 loop {
278 match self
280 .drive_transaction_once(amplification_factor, request.clone(), &options)
281 .await
282 {
283 Ok(resp) => {
284 let settlement_finality_latency = timer.elapsed().as_secs_f64();
285 self.metrics
286 .settlement_finality_latency
287 .with_label_values(&[tx_type.as_str(), ping_label])
288 .observe(settlement_finality_latency);
289 self.metrics
291 .transaction_retries
292 .with_label_values(&["success", tx_type.as_str(), ping_label])
293 .observe(attempts as f64);
294 return Ok(resp);
295 }
296 Err(e) => {
297 self.metrics
298 .drive_transaction_errors
299 .with_label_values(&[
300 e.categorize().into(),
301 tx_type.as_str(),
302 ping_label,
303 ])
304 .inc();
305 if !e.is_submission_retriable() {
306 self.metrics
308 .transaction_retries
309 .with_label_values(&["failure", tx_type.as_str(), ping_label])
310 .observe(attempts as f64);
311 if request.transaction.is_some() {
312 tracing::info!(
313 "Failed to finalize transaction with non-retriable error after {} attempts: {}",
314 attempts,
315 e
316 );
317 }
318 return Err(e);
319 }
320 if request.transaction.is_some() {
321 tracing::info!(
322 "Failed to finalize transaction (attempt {}): {}. Retrying ...",
323 attempts,
324 e
325 );
326 }
327 latest_retriable_error = Some(e);
329 }
330 }
331
332 let overload = if let Some(e) = &latest_retriable_error {
333 e.categorize() == ErrorCategory::ValidatorOverloaded
334 } else {
335 false
336 };
337 let delay = if overload {
338 const OVERLOAD_ADDITIONAL_DELAY: Duration = Duration::from_secs(10);
340 backoff.next().unwrap() + OVERLOAD_ADDITIONAL_DELAY
341 } else {
342 backoff.next().unwrap()
343 };
344 sleep(delay).await;
345
346 attempts += 1;
347 }
348 };
349
350 match timeout_duration {
351 Some(duration) => {
352 tokio::time::timeout(duration, retry_loop)
353 .await
354 .unwrap_or_else(|_| {
355 let e = TransactionDriverError::TimeoutWithLastRetriableError {
357 last_error: latest_retriable_error.map(Box::new),
358 attempts,
359 timeout: duration,
360 };
361 if request.transaction.is_some() {
362 tracing::info!(
363 "Transaction timed out after {} attempts. Last error: {}",
364 attempts,
365 e
366 );
367 }
368 Err(e)
369 })
370 }
371 None => retry_loop.await,
372 }
373 }
374
375 #[instrument(level = "error", skip_all, err(level = "debug"))]
376 async fn drive_transaction_once(
377 &self,
378 amplification_factor: u64,
379 request: SubmitTxRequest,
380 options: &SubmitTransactionOptions,
381 ) -> Result<QuorumTransactionResponse, TransactionDriverError> {
382 let auth_agg = self.authority_aggregator.load();
383 let amplification_factor =
384 amplification_factor.min(auth_agg.committee.num_members() as u64);
385 let start_time = Instant::now();
386 let tx_type = request.tx_type();
387 let tx_digest = request.tx_digest();
388 let ping_type = request.ping_type;
389
390 let (name, submit_txn_result) = self
391 .submitter
392 .submit_transaction(
393 &auth_agg,
394 &self.client_monitor,
395 tx_type,
396 amplification_factor,
397 request,
398 options,
399 )
400 .await?;
401 if let SubmitTxResult::Rejected { error } = &submit_txn_result {
402 return Err(TransactionDriverError::ClientInternal {
403 error: format!(
404 "SubmitTxResult::Rejected should have been returned as an error in submit_transaction(): {}",
405 error
406 ),
407 });
408 }
409
410 let result = self
412 .certifier
413 .get_certified_finalized_effects(
414 &auth_agg,
415 &self.client_monitor,
416 tx_digest,
417 tx_type,
418 name,
419 submit_txn_result,
420 options,
421 )
422 .await;
423
424 if result.is_ok() {
425 self.client_monitor
426 .record_interaction_result(OperationFeedback {
427 authority_name: name,
428 display_name: auth_agg.get_display_name(&name),
429 operation: if tx_type == TxType::SingleWriter {
430 OperationType::FastPath
431 } else {
432 OperationType::Consensus
433 },
434 ping_type,
435 result: Ok(start_time.elapsed()),
436 });
437 }
438 result
439 }
440
441 fn enable_reconfig(
442 self: &Arc<Self>,
443 reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
444 ) {
445 let driver = self.clone();
446 self.state.lock().tasks.spawn(monitored_future!(async move {
447 let mut reconfig_observer = reconfig_observer.clone_boxed();
448 reconfig_observer.run(driver).await;
449 }));
450 }
451}
452
453impl<A> AuthorityAggregatorUpdatable<A> for TransactionDriver<A>
454where
455 A: AuthorityAPI + Send + Sync + 'static + Clone,
456{
457 fn epoch(&self) -> EpochId {
458 self.authority_aggregator.load().committee.epoch
459 }
460
461 fn authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
462 self.authority_aggregator.load_full()
463 }
464
465 fn update_authority_aggregator(&self, new_authorities: Arc<AuthorityAggregator<A>>) {
466 tracing::info!(
467 "Transaction Driver updating AuthorityAggregator with committee {}",
468 new_authorities.committee
469 );
470
471 self.authority_aggregator.store(new_authorities);
472 }
473}
474
475pub fn choose_transaction_driver_percentage(
477 chain_id: Option<sui_types::digests::ChainIdentifier>,
478) -> u8 {
479 if let Ok(v) = std::env::var("TRANSACTION_DRIVER")
480 && let Ok(tx_driver_percentage) = v.parse::<u8>()
481 && (0..=100).contains(&tx_driver_percentage)
482 {
483 return tx_driver_percentage;
484 }
485
486 if let Some(chain_identifier) = chain_id
487 && chain_identifier.chain() == sui_protocol_config::Chain::Unknown
488 {
489 return 50;
491 }
492
493 100
495}
496
497struct State {
499 tasks: JoinSet<()>,
500}
501
502impl State {
503 fn new() -> Self {
504 Self {
505 tasks: JoinSet::new(),
506 }
507 }
508}