sui_core/transaction_driver/
mod.rs1mod effects_certifier;
5mod error;
6mod metrics;
7mod request_retrier;
8mod transaction_submitter;
9
10pub use error::TransactionDriverError;
12pub use metrics::*;
13use mysten_common::backoff::ExponentialBackoff;
14
15use std::{
16 net::SocketAddr,
17 sync::Arc,
18 time::{Duration, Instant},
19};
20
21use arc_swap::ArcSwap;
22use effects_certifier::*;
23use mysten_metrics::{monitored_future, spawn_logged_monitored_task};
24use parking_lot::Mutex;
25use rand::Rng;
26use sui_types::{
27 committee::EpochId,
28 error::{ErrorCategory, UserInputError},
29 messages_grpc::{PingType, SubmitTxRequest, SubmitTxResult, TxType},
30 transaction::TransactionDataAPI as _,
31};
32use tokio::{
33 task::JoinSet,
34 time::{interval, sleep},
35};
36use tracing::instrument;
37use transaction_submitter::*;
38
39use crate::{
40 authority_aggregator::AuthorityAggregator,
41 authority_client::AuthorityAPI,
42 quorum_driver::{AuthorityAggregatorUpdatable, reconfig_observer::ReconfigObserver},
43 validator_client_monitor::{
44 OperationFeedback, OperationType, ValidatorClientMetrics, ValidatorClientMonitor,
45 },
46};
47use sui_config::NodeConfig;
48#[derive(Clone, Default, Debug)]
50pub struct SubmitTransactionOptions {
51 pub forwarded_client_addr: Option<SocketAddr>,
54
55 pub allowed_validators: Vec<String>,
58}
59
60#[derive(Clone, Debug)]
61pub struct QuorumTransactionResponse {
62 pub effects: sui_types::quorum_driver_types::FinalizedEffects,
64
65 pub events: Option<sui_types::effects::TransactionEvents>,
66 pub input_objects: Option<Vec<sui_types::object::Object>>,
68 pub output_objects: Option<Vec<sui_types::object::Object>>,
70 pub auxiliary_data: Option<Vec<u8>>,
71}
72
73pub struct TransactionDriver<A: Clone> {
74 authority_aggregator: Arc<ArcSwap<AuthorityAggregator<A>>>,
75 state: Mutex<State>,
76 metrics: Arc<TransactionDriverMetrics>,
77 submitter: TransactionSubmitter,
78 certifier: EffectsCertifier,
79 client_monitor: Arc<ValidatorClientMonitor<A>>,
80}
81
82impl<A> TransactionDriver<A>
83where
84 A: AuthorityAPI + Send + Sync + 'static + Clone,
85{
86 pub fn new(
87 authority_aggregator: Arc<AuthorityAggregator<A>>,
88 reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
89 metrics: Arc<TransactionDriverMetrics>,
90 node_config: Option<&NodeConfig>,
91 client_metrics: Arc<ValidatorClientMetrics>,
92 ) -> Arc<Self> {
93 let shared_swap = Arc::new(ArcSwap::new(authority_aggregator));
94
95 let monitor_config = node_config
97 .and_then(|nc| nc.validator_client_monitor_config.clone())
98 .unwrap_or_default();
99 let client_monitor =
100 ValidatorClientMonitor::new(monitor_config, client_metrics, shared_swap.clone());
101
102 let driver = Arc::new(Self {
103 authority_aggregator: shared_swap,
104 state: Mutex::new(State::new()),
105 metrics: metrics.clone(),
106 submitter: TransactionSubmitter::new(metrics.clone()),
107 certifier: EffectsCertifier::new(metrics),
108 client_monitor,
109 });
110
111 let driver_clone = driver.clone();
112
113 spawn_logged_monitored_task!(Self::run_latency_checks(driver_clone));
114
115 driver.enable_reconfig(reconfig_observer);
116 driver
117 }
118
119 async fn run_latency_checks(self: Arc<Self>) {
121 const INTERVAL_BETWEEN_RUNS: Duration = Duration::from_secs(15);
122 const MAX_JITTER: Duration = Duration::from_secs(10);
123 const PING_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
124
125 let mut interval = interval(INTERVAL_BETWEEN_RUNS);
126 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
127
128 loop {
129 interval.tick().await;
130
131 let mut tasks = JoinSet::new();
132
133 for tx_type in [TxType::SingleWriter, TxType::SharedObject] {
134 Self::ping_for_tx_type(
135 self.clone(),
136 &mut tasks,
137 tx_type,
138 MAX_JITTER,
139 PING_REQUEST_TIMEOUT,
140 );
141 }
142
143 while let Some(result) = tasks.join_next().await {
144 if let Err(e) = result {
145 tracing::info!("Error while driving ping transaction: {}", e);
146 }
147 }
148 }
149 }
150
151 fn ping_for_tx_type(
153 self: Arc<Self>,
154 tasks: &mut JoinSet<()>,
155 tx_type: TxType,
156 max_jitter: Duration,
157 ping_timeout: Duration,
158 ) {
159 let auth_agg = self.authority_aggregator.load().clone();
161 let validators = auth_agg.committee.names().cloned().collect::<Vec<_>>();
162
163 self.metrics
164 .latency_check_runs
165 .with_label_values(&[tx_type.as_str()])
166 .inc();
167
168 for name in validators {
169 let display_name = auth_agg.get_display_name(&name);
170 let delay_ms = rand::thread_rng().gen_range(0..max_jitter.as_millis()) as u64;
171 let self_clone = self.clone();
172
173 let task = async move {
174 if delay_ms > 0 {
176 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
177 }
178 let start_time = Instant::now();
179
180 let ping_type = if tx_type == TxType::SingleWriter {
181 PingType::FastPath
182 } else {
183 PingType::Consensus
184 };
185
186 match self_clone
188 .drive_transaction(
189 SubmitTxRequest::new_ping(ping_type),
190 SubmitTransactionOptions {
191 allowed_validators: vec![display_name.clone()],
192 ..Default::default()
193 },
194 Some(ping_timeout),
195 )
196 .await
197 {
198 Ok(_) => {
199 tracing::debug!(
200 "Ping transaction to validator {} for tx type {} completed end to end in {} seconds",
201 display_name,
202 tx_type.as_str(),
203 start_time.elapsed().as_secs_f64()
204 );
205 }
206 Err(err) => {
207 tracing::info!(
208 "Failed to get certified finalized effects for tx type {}, for ping transaction to validator {}: {}",
209 tx_type.as_str(),
210 display_name,
211 err
212 );
213 }
214 }
215 };
216
217 tasks.spawn(task);
218 }
219 }
220
221 #[instrument(level = "error", skip_all, fields(tx_digest = ?request.transaction.as_ref().map(|t| t.digest()), ping = %request.ping_type.is_some()))]
223 pub async fn drive_transaction(
224 &self,
225 request: SubmitTxRequest,
226 options: SubmitTransactionOptions,
227 timeout_duration: Option<Duration>,
228 ) -> Result<QuorumTransactionResponse, TransactionDriverError> {
229 const MAX_DRIVE_TRANSACTION_RETRY_DELAY: Duration = Duration::from_secs(10);
230
231 let amplification_factor = if request.ping_type.is_some() {
233 1
234 } else {
235 let gas_price = request
236 .transaction
237 .as_ref()
238 .unwrap()
239 .transaction_data()
240 .gas_price();
241 let reference_gas_price = self.authority_aggregator.load().reference_gas_price;
242 let amplification_factor = gas_price / reference_gas_price.max(1);
243 if amplification_factor == 0 {
244 return Err(TransactionDriverError::ValidationFailed {
245 error: UserInputError::GasPriceUnderRGP {
246 gas_price,
247 reference_gas_price,
248 }
249 .to_string(),
250 });
251 }
252 amplification_factor
253 };
254
255 let tx_type = request.tx_type();
256 let ping_label = if request.ping_type.is_some() {
257 "true"
258 } else {
259 "false"
260 };
261 let timer = Instant::now();
262
263 self.metrics
264 .total_transactions_submitted
265 .with_label_values(&[tx_type.as_str(), ping_label])
266 .inc();
267
268 let mut backoff = ExponentialBackoff::new(MAX_DRIVE_TRANSACTION_RETRY_DELAY);
269 let mut attempts = 0;
270 let mut latest_retriable_error = None;
271
272 let retry_loop = async {
273 loop {
274 match self
276 .drive_transaction_once(amplification_factor, request.clone(), &options)
277 .await
278 {
279 Ok(resp) => {
280 let settlement_finality_latency = timer.elapsed().as_secs_f64();
281 self.metrics
282 .settlement_finality_latency
283 .with_label_values(&[tx_type.as_str(), ping_label])
284 .observe(settlement_finality_latency);
285 self.metrics
287 .transaction_retries
288 .with_label_values(&["success", tx_type.as_str(), ping_label])
289 .observe(attempts as f64);
290 return Ok(resp);
291 }
292 Err(e) => {
293 if !e.is_submission_retriable() {
294 self.metrics
296 .transaction_retries
297 .with_label_values(&["failure", tx_type.as_str(), ping_label])
298 .observe(attempts as f64);
299 tracing::info!(
300 "Failed to finalize transaction with non-retriable error after {} attempts: {}",
301 attempts,
302 e
303 );
304 return Err(e);
305 }
306 tracing::info!(
307 "Failed to finalize transaction (attempt {}): {}. Retrying ...",
308 attempts,
309 e
310 );
311 latest_retriable_error = Some(e);
313 }
314 }
315
316 let overload = if let Some(e) = &latest_retriable_error {
317 e.categorize() == ErrorCategory::ValidatorOverloaded
318 } else {
319 false
320 };
321 let delay = if overload {
322 const OVERLOAD_ADDITIONAL_DELAY: Duration = Duration::from_secs(10);
324 backoff.next().unwrap() + OVERLOAD_ADDITIONAL_DELAY
325 } else {
326 backoff.next().unwrap()
327 };
328 sleep(delay).await;
329
330 attempts += 1;
331 }
332 };
333
334 match timeout_duration {
335 Some(duration) => {
336 tokio::time::timeout(duration, retry_loop)
337 .await
338 .unwrap_or_else(|_| {
339 let e = TransactionDriverError::TimeoutWithLastRetriableError {
341 last_error: latest_retriable_error.map(Box::new),
342 attempts,
343 timeout: duration,
344 };
345 tracing::info!(
346 "Transaction timed out after {} attempts. Last error: {}",
347 attempts,
348 e
349 );
350 Err(e)
351 })
352 }
353 None => retry_loop.await,
354 }
355 }
356
357 #[instrument(level = "error", skip_all, err(level = "debug"))]
358 async fn drive_transaction_once(
359 &self,
360 amplification_factor: u64,
361 request: SubmitTxRequest,
362 options: &SubmitTransactionOptions,
363 ) -> Result<QuorumTransactionResponse, TransactionDriverError> {
364 let auth_agg = self.authority_aggregator.load();
365 let amplification_factor =
366 amplification_factor.min(auth_agg.committee.num_members() as u64);
367 let start_time = Instant::now();
368 let tx_type = request.tx_type();
369 let tx_digest = request.tx_digest();
370 let ping_type = request.ping_type;
371
372 let (name, submit_txn_result) = self
373 .submitter
374 .submit_transaction(
375 &auth_agg,
376 &self.client_monitor,
377 tx_type,
378 amplification_factor,
379 request,
380 options,
381 )
382 .await?;
383 if let SubmitTxResult::Rejected { error } = &submit_txn_result {
384 return Err(TransactionDriverError::ClientInternal {
385 error: format!(
386 "SubmitTxResult::Rejected should have been returned as an error in submit_transaction(): {}",
387 error
388 ),
389 });
390 }
391
392 let result = self
394 .certifier
395 .get_certified_finalized_effects(
396 &auth_agg,
397 &self.client_monitor,
398 tx_digest,
399 tx_type,
400 name,
401 submit_txn_result,
402 options,
403 )
404 .await;
405
406 if result.is_ok() {
407 self.client_monitor
408 .record_interaction_result(OperationFeedback {
409 authority_name: name,
410 display_name: auth_agg.get_display_name(&name),
411 operation: if tx_type == TxType::SingleWriter {
412 OperationType::FastPath
413 } else {
414 OperationType::Consensus
415 },
416 ping_type,
417 result: Ok(start_time.elapsed()),
418 });
419 }
420 result
421 }
422
423 fn enable_reconfig(
424 self: &Arc<Self>,
425 reconfig_observer: Arc<dyn ReconfigObserver<A> + Sync + Send>,
426 ) {
427 let driver = self.clone();
428 self.state.lock().tasks.spawn(monitored_future!(async move {
429 let mut reconfig_observer = reconfig_observer.clone_boxed();
430 reconfig_observer.run(driver).await;
431 }));
432 }
433}
434
435impl<A> AuthorityAggregatorUpdatable<A> for TransactionDriver<A>
436where
437 A: AuthorityAPI + Send + Sync + 'static + Clone,
438{
439 fn epoch(&self) -> EpochId {
440 self.authority_aggregator.load().committee.epoch
441 }
442
443 fn authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
444 self.authority_aggregator.load_full()
445 }
446
447 fn update_authority_aggregator(&self, new_authorities: Arc<AuthorityAggregator<A>>) {
448 tracing::info!(
449 "Transaction Driver updating AuthorityAggregator with committee {}",
450 new_authorities.committee
451 );
452
453 self.authority_aggregator.store(new_authorities);
454 }
455}
456
457pub fn choose_transaction_driver_percentage(
459 chain_id: Option<sui_types::digests::ChainIdentifier>,
460) -> u8 {
461 if let Ok(v) = std::env::var("TRANSACTION_DRIVER")
462 && let Ok(tx_driver_percentage) = v.parse::<u8>()
463 && tx_driver_percentage > 0
464 && tx_driver_percentage <= 100
465 {
466 return tx_driver_percentage;
467 }
468
469 if let Some(chain_identifier) = chain_id
470 && chain_identifier.chain() == sui_protocol_config::Chain::Unknown
471 {
472 return 50;
474 }
475
476 100
478}
479
480struct State {
482 tasks: JoinSet<()>,
483}
484
485impl State {
486 fn new() -> Self {
487 Self {
488 tasks: JoinSet::new(),
489 }
490 }
491}