1use anyhow::Result;
6use async_trait::async_trait;
7use fastcrypto::traits::KeyPair;
8use futures::{TryFutureExt, future};
9use itertools::Itertools as _;
10use moka::sync::Cache;
11use mysten_common::ZipDebugEqIteratorExt;
12use mysten_common::{assert_reachable, debug_fatal};
13use mysten_metrics::spawn_monitored_task;
14use prometheus::{
15 Gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, Registry,
16 register_gauge_with_registry, register_histogram_vec_with_registry,
17 register_histogram_with_registry, register_int_counter_vec_with_registry,
18 register_int_counter_with_registry, register_int_gauge_with_registry,
19};
20use std::{
21 collections::HashSet,
22 io,
23 net::{IpAddr, SocketAddr},
24 sync::Arc,
25 time::{Duration, Instant, SystemTime},
26};
27use sui_network::{
28 api::{Validator, ValidatorServer},
29 tonic,
30 validator::server::SUI_TLS_SERVER_NAME,
31};
32use sui_types::effects::TransactionEffectsAPI;
33use sui_types::message_envelope::Message;
34use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKey};
35use sui_types::messages_grpc::{
36 ObjectInfoRequest, ObjectInfoResponse, RawSubmitTxResponse, SystemStateRequest,
37 TransactionInfoRequest, TransactionInfoResponse,
38};
39use sui_types::multiaddr::Multiaddr;
40use sui_types::object::Object;
41use sui_types::sui_system_state::SuiSystemState;
42use sui_types::traffic_control::{ClientIdSource, Weight};
43use sui_types::{
44 base_types::ObjectID,
45 digests::{TransactionDigest, TransactionEffectsDigest},
46 error::{SuiErrorKind, UserInputError},
47};
48use sui_types::{
49 effects::TransactionEffects,
50 messages_grpc::{
51 ExecutedData, RawSubmitTxRequest, RawWaitForEffectsRequest, RawWaitForEffectsResponse,
52 SubmitTxResult, WaitForEffectsRequest, WaitForEffectsResponse,
53 },
54};
55use sui_types::{effects::TransactionEvents, messages_grpc::SubmitTxType};
56use sui_types::{error::*, transaction::*};
57use sui_types::{
58 fp_ensure,
59 messages_checkpoint::{
60 CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
61 },
62};
63use tokio::time::timeout;
64use tonic::metadata::{Ascii, MetadataValue};
65use tracing::{debug, error, info, instrument};
66
67use crate::admission_queue::{AdmissionQueueContext, AdmissionQueueManager};
68use crate::gasless_rate_limiter::GaslessRateLimiter;
69use crate::{
70 authority::{AuthorityState, consensus_tx_status_cache::ConsensusTxStatus},
71 consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics, ConsensusOverloadChecker},
72 consensus_handler::SequencedConsensusTransactionKey,
73 traffic_controller::{TrafficController, parse_ip, policies::TrafficTally},
74};
75use crate::{
76 authority::{
77 authority_per_epoch_store::AuthorityPerEpochStore,
78 consensus_tx_status_cache::NotifyReadConsensusTxStatusResult,
79 },
80 checkpoints::CheckpointStore,
81 mysticeti_adapter::LazyMysticetiClient,
82};
83use sui_config::local_ip_utils::new_local_tcp_address_for_testing;
84
85#[cfg(test)]
86#[path = "unit_tests/server_tests.rs"]
87mod server_tests;
88
89#[cfg(test)]
90#[path = "unit_tests/wait_for_effects_tests.rs"]
91mod wait_for_effects_tests;
92
93#[cfg(test)]
94#[path = "unit_tests/submit_transaction_tests.rs"]
95mod submit_transaction_tests;
96
97pub struct AuthorityServerHandle {
98 server_handle: sui_network::validator::server::Server,
99}
100
101impl AuthorityServerHandle {
102 pub async fn join(self) -> Result<(), io::Error> {
103 self.server_handle.handle().wait_for_shutdown().await;
104 Ok(())
105 }
106
107 pub async fn kill(self) -> Result<(), io::Error> {
108 self.server_handle.handle().shutdown().await;
109 Ok(())
110 }
111
112 pub fn address(&self) -> &Multiaddr {
113 self.server_handle.local_addr()
114 }
115}
116
117pub struct AuthorityServer {
118 address: Multiaddr,
119 pub state: Arc<AuthorityState>,
120 consensus_adapter: Arc<ConsensusAdapter>,
121 pub metrics: Arc<ValidatorServiceMetrics>,
122}
123
124impl AuthorityServer {
125 pub fn new_for_test_with_consensus_adapter(
126 state: Arc<AuthorityState>,
127 consensus_adapter: Arc<ConsensusAdapter>,
128 ) -> Self {
129 let address = new_local_tcp_address_for_testing();
130 let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
131
132 Self {
133 address,
134 state,
135 consensus_adapter,
136 metrics,
137 }
138 }
139
140 pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
141 let slot_freed_notify = Arc::new(tokio::sync::Notify::new());
142 let consensus_adapter = Arc::new(ConsensusAdapter::new(
143 Arc::new(LazyMysticetiClient::new()),
144 CheckpointStore::new_for_tests(),
145 state.name,
146 100_000,
147 100_000,
148 ConsensusAdapterMetrics::new_test(),
149 slot_freed_notify,
150 ));
151 Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
152 }
153
154 pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
155 let address = self.address.clone();
156 self.spawn_with_bind_address_for_test(address).await
157 }
158
159 pub async fn spawn_with_bind_address_for_test(
160 self,
161 address: Multiaddr,
162 ) -> Result<AuthorityServerHandle, io::Error> {
163 let tls_config = sui_tls::create_rustls_server_config(
164 self.state.config.network_key_pair().copy().private(),
165 SUI_TLS_SERVER_NAME.to_string(),
166 );
167 let config = mysten_network::config::Config::new();
168 let server = sui_network::validator::server::ServerBuilder::from_config(
169 &config,
170 mysten_network::metrics::DefaultMetricsCallbackProvider::default(),
171 )
172 .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
173 self.state,
174 self.consensus_adapter,
175 self.metrics,
176 )))
177 .bind(&address, Some(tls_config))
178 .await
179 .unwrap();
180 let local_addr = server.local_addr().to_owned();
181 info!("Listening to traffic on {local_addr}");
182 let handle = AuthorityServerHandle {
183 server_handle: server,
184 };
185 Ok(handle)
186 }
187}
188
189pub struct ValidatorServiceMetrics {
190 pub signature_errors: IntCounter,
191 pub tx_verification_latency: Histogram,
192 pub handle_transaction_latency: Histogram,
193 pub handle_transaction_consensus_latency: Histogram,
194 pub handle_submit_transaction_consensus_latency: HistogramVec,
195 pub handle_wait_for_effects_ping_latency: HistogramVec,
196
197 handle_submit_transaction_latency: HistogramVec,
198 handle_submit_transaction_bytes: HistogramVec,
199 handle_submit_transaction_batch_size: HistogramVec,
200
201 num_rejected_tx_during_overload: IntCounterVec,
202 submission_rejected_transactions: IntCounterVec,
203 submission_suppressed_already_processed: IntCounterVec,
204 submission_suppressed_recently_submitted: IntCounterVec,
205 recently_submitted_cache_size: IntGauge,
206 recently_submitted_resubmission_interval: Histogram,
207 connection_ip_not_found: IntCounter,
208 forwarded_header_parse_error: IntCounter,
209 forwarded_header_invalid: IntCounter,
210 forwarded_header_not_included: IntCounter,
211 client_id_source_config_mismatch: IntCounter,
212 x_forwarded_for_num_hops: Gauge,
213 pub gasless_rate_limited_count: IntCounter,
214 pub gasless_submission_outcomes: IntCounterVec,
215 admission_queue_direct_bypasses: IntCounter,
216}
217
218impl ValidatorServiceMetrics {
219 pub fn new(registry: &Registry) -> Self {
220 Self {
221 signature_errors: register_int_counter_with_registry!(
222 "total_signature_errors",
223 "Number of transaction signature errors",
224 registry,
225 )
226 .unwrap(),
227 tx_verification_latency: register_histogram_with_registry!(
228 "validator_service_tx_verification_latency",
229 "Latency of verifying a transaction",
230 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
231 registry,
232 )
233 .unwrap(),
234 handle_transaction_latency: register_histogram_with_registry!(
235 "validator_service_handle_transaction_latency",
236 "Latency of handling a transaction",
237 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
238 registry,
239 )
240 .unwrap(),
241 handle_transaction_consensus_latency: register_histogram_with_registry!(
242 "validator_service_handle_transaction_consensus_latency",
243 "Latency of handling a user transaction sent through consensus",
244 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
245 registry,
246 )
247 .unwrap(),
248 handle_submit_transaction_consensus_latency: register_histogram_vec_with_registry!(
249 "validator_service_submit_transaction_consensus_latency",
250 "Latency of submitting a user transaction sent through consensus",
251 &["req_type"],
252 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
253 registry,
254 )
255 .unwrap(),
256 handle_submit_transaction_latency: register_histogram_vec_with_registry!(
257 "validator_service_submit_transaction_latency",
258 "Latency of submit transaction handler",
259 &["req_type"],
260 mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
261 registry,
262 )
263 .unwrap(),
264 handle_wait_for_effects_ping_latency: register_histogram_vec_with_registry!(
265 "validator_service_handle_wait_for_effects_ping_latency",
266 "Latency of handling a ping request for wait_for_effects",
267 &["req_type"],
268 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
269 registry,
270 )
271 .unwrap(),
272 handle_submit_transaction_bytes: register_histogram_vec_with_registry!(
273 "validator_service_submit_transaction_bytes",
274 "The size of transactions in the submit transaction request",
275 &["req_type"],
276 mysten_metrics::BYTES_BUCKETS.to_vec(),
277 registry,
278 )
279 .unwrap(),
280 handle_submit_transaction_batch_size: register_histogram_vec_with_registry!(
281 "validator_service_submit_transaction_batch_size",
282 "The number of transactions in the submit transaction request",
283 &["req_type"],
284 mysten_metrics::COUNT_BUCKETS.to_vec(),
285 registry,
286 )
287 .unwrap(),
288 num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
289 "validator_service_num_rejected_tx_during_overload",
290 "Number of rejected transaction due to system overload",
291 &["error_type"],
292 registry,
293 )
294 .unwrap(),
295 submission_rejected_transactions: register_int_counter_vec_with_registry!(
296 "validator_service_submission_rejected_transactions",
297 "Number of transactions rejected during submission",
298 &["reason"],
299 registry,
300 )
301 .unwrap(),
302 submission_suppressed_already_processed: register_int_counter_vec_with_registry!(
303 "validator_service_submission_suppressed_already_processed",
304 "Number of submitted transactions suppressed because consensus had already \
305 processed them this epoch (re-submission of already-processed transactions)",
306 &["req_type"],
307 registry,
308 )
309 .unwrap(),
310 submission_suppressed_recently_submitted: register_int_counter_vec_with_registry!(
311 "validator_service_submission_suppressed_recently_submitted",
312 "Number of submitted transactions suppressed because the same transaction was \
313 submitted within the recent-submission window",
314 &["req_type"],
315 registry,
316 )
317 .unwrap(),
318 recently_submitted_cache_size: register_int_gauge_with_registry!(
319 "validator_service_recently_submitted_cache_size",
320 "Approximate number of transaction digests held in the recent-submission duplicate-suppression cache",
321 registry,
322 )
323 .unwrap(),
324 recently_submitted_resubmission_interval: register_histogram_with_registry!(
325 "validator_service_recently_submitted_resubmission_interval_seconds",
326 "Time between a transaction being recorded and a duplicate resubmission of it being suppressed",
327 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
328 registry,
329 )
330 .unwrap(),
331 connection_ip_not_found: register_int_counter_with_registry!(
332 "validator_service_connection_ip_not_found",
333 "Number of times connection IP was not extractable from request",
334 registry,
335 )
336 .unwrap(),
337 forwarded_header_parse_error: register_int_counter_with_registry!(
338 "validator_service_forwarded_header_parse_error",
339 "Number of times x-forwarded-for header could not be parsed",
340 registry,
341 )
342 .unwrap(),
343 forwarded_header_invalid: register_int_counter_with_registry!(
344 "validator_service_forwarded_header_invalid",
345 "Number of times x-forwarded-for header was invalid",
346 registry,
347 )
348 .unwrap(),
349 forwarded_header_not_included: register_int_counter_with_registry!(
350 "validator_service_forwarded_header_not_included",
351 "Number of times x-forwarded-for header was (unexpectedly) not included in request",
352 registry,
353 )
354 .unwrap(),
355 client_id_source_config_mismatch: register_int_counter_with_registry!(
356 "validator_service_client_id_source_config_mismatch",
357 "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
358 registry,
359 )
360 .unwrap(),
361 x_forwarded_for_num_hops: register_gauge_with_registry!(
362 "validator_service_x_forwarded_for_num_hops",
363 "Number of hops in x-forwarded-for header",
364 registry,
365 )
366 .unwrap(),
367 gasless_rate_limited_count: register_int_counter_with_registry!(
368 "validator_service_gasless_rate_limited_count",
369 "Number of gasless transactions rejected by rate limiter",
370 registry,
371 )
372 .unwrap(),
373 gasless_submission_outcomes: register_int_counter_vec_with_registry!(
374 "validator_service_gasless_submission_outcomes",
375 "Number of valid gasless transaction submissions by outcome",
376 &["outcome"],
377 registry,
378 )
379 .unwrap(),
380 admission_queue_direct_bypasses: register_int_counter_with_registry!(
381 "validator_service_admission_queue_direct_bypasses",
382 "Number of transactions that bypassed the queue (system not overloaded)",
383 registry,
384 )
385 .unwrap(),
386 }
387 }
388
389 pub fn new_for_tests() -> Self {
390 let registry = Registry::new();
391 Self::new(®istry)
392 }
393}
394
395enum AdmissionQueueSubmitMode {
397 Bypass,
399 Queue,
401 Disabled,
405}
406
407#[derive(Clone)]
408pub struct ValidatorService {
409 state: Arc<AuthorityState>,
410 consensus_adapter: Arc<ConsensusAdapter>,
411 metrics: Arc<ValidatorServiceMetrics>,
412 traffic_controller: Option<Arc<TrafficController>>,
413 client_id_source: Option<ClientIdSource>,
414 gasless_limiter: GaslessRateLimiter,
415 admission_queue: Option<AdmissionQueueContext>,
416 recently_submitted: Cache<TransactionDigest, Instant>,
419 recent_submission_window: Duration,
421}
422
423const RECENT_SUBMISSION_PEAK_TPS: u64 = 50_000;
425
426impl ValidatorService {
427 pub fn new(
428 state: Arc<AuthorityState>,
429 consensus_adapter: Arc<ConsensusAdapter>,
430 validator_metrics: Arc<ValidatorServiceMetrics>,
431 client_id_source: Option<ClientIdSource>,
432 admission_queue: Option<AdmissionQueueContext>,
433 ) -> Self {
434 let traffic_controller = state.traffic_controller.clone();
435 let gasless_limiter = GaslessRateLimiter::new(state.consensus_gasless_counter.clone());
436 let recent_submission_window = state.config.recent_submission_dedup_window();
437 Self {
438 state,
439 consensus_adapter,
440 metrics: validator_metrics,
441 traffic_controller,
442 client_id_source,
443 gasless_limiter,
444 admission_queue,
445 recently_submitted: Self::new_recently_submitted_cache(recent_submission_window),
446 recent_submission_window,
447 }
448 }
449
450 fn new_recently_submitted_cache(window: Duration) -> Cache<TransactionDigest, Instant> {
451 let max_capacity = window.as_secs().max(1) * RECENT_SUBMISSION_PEAK_TPS;
454 Cache::builder()
455 .time_to_live(window)
456 .max_capacity(max_capacity)
457 .build()
458 }
459
460 pub fn new_for_tests(
461 state: Arc<AuthorityState>,
462 consensus_adapter: Arc<ConsensusAdapter>,
463 metrics: Arc<ValidatorServiceMetrics>,
464 ) -> Self {
465 let gasless_limiter = GaslessRateLimiter::new(state.consensus_gasless_counter.clone());
466 let epoch_store = state.epoch_store_for_testing().clone();
467 let slot_freed_notify = Arc::new(tokio::sync::Notify::new());
468 let manager = Arc::new(AdmissionQueueManager::new_for_tests(
469 consensus_adapter.clone(),
470 slot_freed_notify,
471 ));
472 let admission_queue = Some(AdmissionQueueContext::spawn(manager, epoch_store));
473 let recent_submission_window = state.config.recent_submission_dedup_window();
474 Self {
475 state,
476 consensus_adapter,
477 metrics,
478 traffic_controller: None,
479 client_id_source: None,
480 gasless_limiter,
481 admission_queue,
482 recently_submitted: Self::new_recently_submitted_cache(recent_submission_window),
483 recent_submission_window,
484 }
485 }
486
487 pub fn validator_state(&self) -> &Arc<AuthorityState> {
488 &self.state
489 }
490
491 pub fn handle_transaction_for_testing(&self, transaction: Transaction) -> SuiResult<()> {
493 let epoch_store = self.state.load_epoch_store_one_call_per_task();
494
495 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
497
498 let transaction = epoch_store
500 .verify_transaction_require_no_aliases(transaction)?
501 .into_tx();
502
503 self.state
505 .handle_vote_transaction(&epoch_store, transaction)?;
506
507 Ok(())
508 }
509
510 pub fn handle_transaction_for_testing_with_overload_check(
513 &self,
514 transaction: Transaction,
515 ) -> SuiResult<()> {
516 let epoch_store = self.state.load_epoch_store_one_call_per_task();
517
518 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
520
521 self.state.check_system_overload(
523 transaction.data(),
524 self.state.check_system_overload_at_signing(),
525 )?;
526
527 let transaction = epoch_store
529 .verify_transaction_require_no_aliases(transaction)?
530 .into_tx();
531
532 self.state
534 .handle_vote_transaction(&epoch_store, transaction)?;
535
536 Ok(())
537 }
538
539 async fn collect_immutable_object_ids(
542 &self,
543 tx: &VerifiedTransaction,
544 state: &AuthorityState,
545 ) -> SuiResult<Vec<ObjectID>> {
546 let input_objects = tx.data().transaction_data().input_objects()?;
547
548 let object_ids: Vec<ObjectID> = input_objects
550 .iter()
551 .filter_map(|obj| match obj {
552 InputObjectKind::ImmOrOwnedMoveObject((id, _, _)) => Some(*id),
553 _ => None,
554 })
555 .collect();
556 if object_ids.is_empty() {
557 return Ok(vec![]);
558 }
559
560 let objects = state.get_object_cache_reader().get_objects(&object_ids);
562
563 objects
565 .into_iter()
566 .zip_debug_eq(object_ids.iter())
567 .filter_map(|(obj, id)| {
568 let Some(o) = obj else {
569 return Some(Err::<ObjectID, SuiError>(
570 SuiErrorKind::UserInputError {
571 error: UserInputError::ObjectNotFound {
572 object_id: *id,
573 version: None,
574 },
575 }
576 .into(),
577 ));
578 };
579 if o.is_immutable() {
580 Some(Ok(*id))
581 } else {
582 None
583 }
584 })
585 .collect::<SuiResult<Vec<ObjectID>>>()
586 }
587
588 #[instrument(
589 name = "ValidatorService::handle_submit_transaction",
590 level = "error",
591 skip_all,
592 err(level = "debug")
593 )]
594 async fn handle_submit_transaction(
595 &self,
596 request: tonic::Request<RawSubmitTxRequest>,
597 ) -> WrappedServiceResponse<RawSubmitTxResponse> {
598 let Self {
599 state,
600 consensus_adapter: _,
601 metrics,
602 traffic_controller: _,
603 client_id_source,
604 gasless_limiter: _,
605 admission_queue: _,
606 recently_submitted: _,
607 recent_submission_window: _,
608 } = self.clone();
609
610 let submitter_client_addr = if let Some(client_id_source) = &client_id_source {
611 self.get_client_ip_addr(&request, client_id_source)
612 } else {
613 self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
614 };
615
616 let inner = request.into_inner();
617 let start_epoch = state.load_epoch_store_one_call_per_task().epoch();
618
619 let next_epoch = start_epoch + 1;
620 let mut max_retries = 1;
621
622 loop {
623 let res = self
624 .handle_submit_transaction_inner(&state, &metrics, &inner, submitter_client_addr)
625 .await;
626 match res {
627 Ok((response, weight)) => return Ok((tonic::Response::new(response), weight)),
628 Err(err) => {
629 if max_retries > 0
630 && let SuiErrorKind::ValidatorHaltedAtEpochEnd = err.as_inner()
631 {
632 max_retries -= 1;
633
634 debug!(
635 "ValidatorHaltedAtEpochEnd. Will retry after validator reconfigures"
636 );
637
638 if let Ok(Ok(new_epoch)) =
639 timeout(Duration::from_secs(15), state.wait_for_epoch(next_epoch)).await
640 {
641 assert_reachable!("retry submission at epoch end");
642 if new_epoch >= next_epoch {
643 continue;
644 }
645 debug_fatal!(
647 "wait_for_epoch returned early: expected >= {}, got {}",
648 next_epoch,
649 new_epoch
650 );
651 }
652 }
653 return Err(err.into());
654 }
655 }
656 }
657 }
658
659 async fn handle_submit_transaction_inner(
660 &self,
661 state: &AuthorityState,
662 metrics: &ValidatorServiceMetrics,
663 request: &RawSubmitTxRequest,
664 submitter_client_addr: Option<IpAddr>,
665 ) -> SuiResult<(RawSubmitTxResponse, Weight)> {
666 let epoch_store = state.load_epoch_store_one_call_per_task();
667 let submit_type = SubmitTxType::try_from(request.submit_type).map_err(|e| {
668 SuiErrorKind::GrpcMessageDeserializeError {
669 type_info: "RawSubmitTxRequest.submit_type".to_string(),
670 error: e.to_string(),
671 }
672 })?;
673
674 let is_ping_request = submit_type == SubmitTxType::Ping;
675 if is_ping_request {
676 fp_ensure!(
677 request.transactions.is_empty(),
678 SuiErrorKind::InvalidRequest(format!(
679 "Ping request cannot contain {} transactions",
680 request.transactions.len()
681 ))
682 .into()
683 );
684 } else {
685 fp_ensure!(
687 !request.transactions.is_empty(),
688 SuiErrorKind::InvalidRequest(
689 "At least one transaction needs to be submitted".to_string(),
690 )
691 .into()
692 );
693 }
694
695 let is_soft_bundle_request = submit_type == SubmitTxType::SoftBundle;
700
701 let max_num_transactions = if is_soft_bundle_request {
702 epoch_store.protocol_config().max_soft_bundle_size()
705 } else {
706 epoch_store
708 .protocol_config()
709 .max_num_transactions_in_block()
710 };
711 fp_ensure!(
712 request.transactions.len() <= max_num_transactions as usize,
713 SuiErrorKind::InvalidRequest(format!(
714 "Too many transactions in request: {} vs {}",
715 request.transactions.len(),
716 max_num_transactions
717 ))
718 .into()
719 );
720
721 let mut tx_digests = Vec::with_capacity(request.transactions.len());
723 let mut consensus_transactions = Vec::with_capacity(request.transactions.len());
725 let mut transaction_indexes = Vec::with_capacity(request.transactions.len());
727 let mut results: Vec<Option<SubmitTxResult>> = vec![None; request.transactions.len()];
729 let mut total_size_bytes = 0;
731 let mut has_gasless = false;
733 let mut duplicate_at_admission = false;
737 let mut expected_soft_bundle_gas_price = None;
739 let mut soft_bundle_digests = HashSet::new();
742
743 let req_type = if is_ping_request {
744 "ping"
745 } else if request.transactions.len() == 1 {
746 "single_transaction"
747 } else if is_soft_bundle_request {
748 "soft_bundle"
749 } else {
750 "batch"
751 };
752
753 let _handle_tx_metrics_guard = metrics
754 .handle_submit_transaction_latency
755 .with_label_values(&[req_type])
756 .start_timer();
757
758 let submit_mode = self.classify_submit_mode(is_ping_request);
759
760 for (idx, tx_bytes) in request.transactions.iter().enumerate() {
761 let transaction = match bcs::from_bytes::<Transaction>(tx_bytes) {
762 Ok(txn) => txn,
763 Err(e) => {
764 return Err(SuiErrorKind::TransactionDeserializationError {
766 error: format!("Failed to deserialize transaction at index {}: {}", idx, e),
767 }
768 .into());
769 }
770 };
771
772 let tx_size = transaction.validity_check(&epoch_store.tx_validity_check_context())?;
774 let tx_digest = *transaction.digest();
775
776 if is_soft_bundle_request {
778 fp_ensure!(
779 soft_bundle_digests.insert(tx_digest),
780 SuiErrorKind::UserInputError {
781 error: UserInputError::RepeatedTransactionInSoftBundle {
782 digest: tx_digest
783 }
784 }
785 .into()
786 );
787 }
788
789 if is_soft_bundle_request {
791 let gas_price = transaction.data().transaction_data().gas_price();
792 if let Some(expected) = expected_soft_bundle_gas_price {
793 fp_ensure!(
794 gas_price == expected,
795 SuiErrorKind::UserInputError {
796 error: UserInputError::GasPriceMismatchError {
797 digest: tx_digest,
798 expected,
799 actual: gas_price,
800 }
801 }
802 .into()
803 );
804 } else {
805 expected_soft_bundle_gas_price = Some(gas_price);
806 }
807 }
808
809 let is_gasless = transaction
810 .data()
811 .transaction_data()
812 .is_gasless_transaction();
813
814 if is_gasless {
815 has_gasless = true;
816 metrics
817 .gasless_submission_outcomes
818 .with_label_values(&["attempted"])
819 .inc();
820 }
821
822 let overload_check_res = state.check_system_overload(
823 transaction.data(),
824 state.check_system_overload_at_signing(),
825 );
826 if let Err(error) = overload_check_res {
827 metrics
828 .num_rejected_tx_during_overload
829 .with_label_values(&[error.as_ref()])
830 .inc();
831 if is_gasless {
832 metrics
833 .gasless_submission_outcomes
834 .with_label_values(&["rejected_overload"])
835 .inc();
836 }
837 results[idx] = Some(SubmitTxResult::Rejected { error });
838 continue;
839 }
840
841 if matches!(submit_mode, AdmissionQueueSubmitMode::Disabled)
844 && let Err(error) = self.consensus_adapter.check_consensus_overload()
845 {
846 state.update_overload_metrics("consensus");
847 metrics
848 .num_rejected_tx_during_overload
849 .with_label_values(&[error.as_ref()])
850 .inc();
851 if is_gasless {
852 metrics
853 .gasless_submission_outcomes
854 .with_label_values(&["rejected_overload"])
855 .inc();
856 }
857 results[idx] = Some(SubmitTxResult::Rejected { error });
858 continue;
859 }
860
861 if is_gasless
862 && !self
863 .gasless_limiter
864 .try_acquire(epoch_store.protocol_config())
865 {
866 metrics.gasless_rate_limited_count.inc();
867 metrics
868 .gasless_submission_outcomes
869 .with_label_values(&["rejected_rate_limited"])
870 .inc();
871 results[idx] = Some(SubmitTxResult::Rejected {
872 error: SuiErrorKind::ValidatorOverloadedRetryAfter {
873 retry_after_secs: 1,
874 }
875 .into(),
876 });
877 continue;
878 }
879
880 let verified_transaction = {
882 let _metrics_guard = metrics.tx_verification_latency.start_timer();
883 if epoch_store.protocol_config().address_aliases() {
884 match epoch_store.verify_transaction_with_current_aliases(transaction) {
885 Ok(tx) => tx,
886 Err(e) => {
887 metrics.signature_errors.inc();
888 return Err(e);
889 }
890 }
891 } else {
892 match epoch_store.verify_transaction_require_no_aliases(transaction) {
893 Ok(tx) => tx,
894 Err(e) => {
895 metrics.signature_errors.inc();
896 return Err(e);
897 }
898 }
899 }
900 };
901
902 debug!(
903 ?tx_digest,
904 "handle_submit_transaction: verified transaction"
905 );
906
907 if let Some(effects) = state
910 .get_transaction_cache_reader()
911 .get_executed_effects(&tx_digest)
912 {
913 let effects_digest = effects.digest();
914 if let Ok(executed_data) = self.complete_executed_data(effects).await {
915 let executed_result = SubmitTxResult::Executed {
916 effects_digest,
917 details: Some(executed_data),
918 };
919 results[idx] = Some(executed_result);
920 debug!(?tx_digest, "handle_submit_transaction: already executed");
921 continue;
922 }
923 }
924
925 if self
926 .state
927 .get_transaction_cache_reader()
928 .transaction_executed_in_last_epoch(&tx_digest, epoch_store.epoch())
929 {
930 results[idx] = Some(SubmitTxResult::Rejected {
931 error: UserInputError::TransactionAlreadyExecuted { digest: tx_digest }.into(),
932 });
933 debug!(
934 ?tx_digest,
935 "handle_submit_transaction: transaction already executed in previous epoch"
936 );
937 continue;
938 }
939
940 let consensus_key = SequencedConsensusTransactionKey::External(
946 ConsensusTransactionKey::Certificate(tx_digest),
947 );
948 if epoch_store.is_consensus_message_processed(&consensus_key)? {
949 metrics
950 .submission_suppressed_already_processed
951 .with_label_values(&[req_type])
952 .inc();
953 results[idx] = Some(SubmitTxResult::Rejected {
954 error: SuiErrorKind::TransactionProcessing {
955 digest: tx_digest,
956 status: "sequenced by consensus".to_string(),
957 }
958 .into(),
959 });
960 debug!(
961 ?tx_digest,
962 "handle_submit_transaction: consensus message already processed"
963 );
964 continue;
965 }
966
967 if let Some(recorded_at) = self.recently_submitted.get(&tx_digest)
970 && recorded_at.elapsed() < self.recent_submission_window
971 {
972 metrics
973 .submission_suppressed_recently_submitted
974 .with_label_values(&[req_type])
975 .inc();
976 metrics
977 .recently_submitted_resubmission_interval
978 .observe(recorded_at.elapsed().as_secs_f64());
979 results[idx] = Some(SubmitTxResult::Rejected {
980 error: SuiErrorKind::TransactionProcessing {
981 digest: tx_digest,
982 status: "recently submitted".to_string(),
983 }
984 .into(),
985 });
986 debug!(?tx_digest, "handle_submit_transaction: recently submitted");
987 continue;
988 }
989 self.recently_submitted.insert(tx_digest, Instant::now());
990 metrics
991 .recently_submitted_cache_size
992 .set(self.recently_submitted.entry_count() as i64);
993
994 debug!(
995 ?tx_digest,
996 "handle_submit_transaction: waiting for fastpath dependency objects"
997 );
998 if !state
999 .wait_for_fastpath_dependency_objects(
1000 verified_transaction.tx(),
1001 epoch_store.epoch(),
1002 )
1003 .await?
1004 {
1005 debug!(
1006 ?tx_digest,
1007 "fastpath input objects are still unavailable after waiting"
1008 );
1009 }
1010
1011 match state.handle_vote_transaction(&epoch_store, verified_transaction.tx().clone()) {
1012 Ok(_) => { }
1013 Err(e) => {
1014 if let Some(effects) = state
1017 .get_transaction_cache_reader()
1018 .get_executed_effects(&tx_digest)
1019 {
1020 let effects_digest = effects.digest();
1021 if let Ok(executed_data) = self.complete_executed_data(effects).await {
1022 let executed_result = SubmitTxResult::Executed {
1023 effects_digest,
1024 details: Some(executed_data),
1025 };
1026 results[idx] = Some(executed_result);
1027 continue;
1028 }
1029 }
1030
1031 debug!(?tx_digest, "Transaction rejected during submission: {e}");
1033 metrics
1034 .submission_rejected_transactions
1035 .with_label_values(&[e.to_variant_name()])
1036 .inc();
1037 results[idx] = Some(SubmitTxResult::Rejected { error: e });
1038 continue;
1039 }
1040 }
1041
1042 let mut claims = vec![];
1044
1045 let immutable_object_ids = self
1046 .collect_immutable_object_ids(verified_transaction.tx(), state)
1047 .await?;
1048 if !immutable_object_ids.is_empty() {
1049 claims.push(TransactionClaim::ImmutableInputObjects(
1050 immutable_object_ids,
1051 ));
1052 }
1053
1054 let (tx, aliases) = verified_transaction.into_inner();
1055 if epoch_store.protocol_config().address_aliases() {
1056 if epoch_store
1057 .protocol_config()
1058 .fix_checkpoint_signature_mapping()
1059 {
1060 claims.push(TransactionClaim::AddressAliasesV2(aliases));
1061 } else {
1062 let v1_aliases: Vec<_> = tx
1063 .data()
1064 .intent_message()
1065 .value
1066 .required_signers()
1067 .into_iter()
1068 .zip_eq(aliases.into_iter().map(|(_, seq)| seq))
1069 .collect();
1070 #[allow(deprecated)]
1071 claims.push(TransactionClaim::AddressAliases(
1072 nonempty::NonEmpty::from_vec(v1_aliases)
1073 .expect("must have at least one required_signer"),
1074 ));
1075 }
1076 }
1077
1078 let tx_with_claims = TransactionWithClaims::new(tx.into(), claims);
1079
1080 consensus_transactions.push(ConsensusTransaction::new_user_transaction_v2_message(
1081 &state.name,
1082 tx_with_claims,
1083 ));
1084 if is_gasless {
1085 metrics
1086 .gasless_submission_outcomes
1087 .with_label_values(&["submitted"])
1088 .inc();
1089 }
1090
1091 transaction_indexes.push(idx);
1092 tx_digests.push(tx_digest);
1093 total_size_bytes += tx_size;
1094 }
1095
1096 if consensus_transactions.is_empty() && !is_ping_request {
1097 let spam_weight = Self::request_spam_weight(
1098 &results,
1099 has_gasless,
1100 duplicate_at_admission,
1101 is_ping_request,
1102 );
1103 let response = Self::try_from_submit_tx_response(results)?;
1104 return Ok((response, spam_weight));
1105 }
1106
1107 let max_transaction_bytes = if is_soft_bundle_request {
1111 epoch_store
1112 .protocol_config()
1113 .consensus_max_transactions_in_block_bytes()
1114 / 2
1115 } else {
1116 epoch_store
1117 .protocol_config()
1118 .consensus_max_transactions_in_block_bytes()
1119 };
1120 fp_ensure!(
1121 total_size_bytes <= max_transaction_bytes as usize,
1122 SuiErrorKind::UserInputError {
1123 error: UserInputError::TotalTransactionSizeTooLargeInBatch {
1124 size: total_size_bytes,
1125 limit: max_transaction_bytes,
1126 },
1127 }
1128 .into()
1129 );
1130
1131 metrics
1132 .handle_submit_transaction_bytes
1133 .with_label_values(&[req_type])
1134 .observe(total_size_bytes as f64);
1135 metrics
1136 .handle_submit_transaction_batch_size
1137 .with_label_values(&[req_type])
1138 .observe(consensus_transactions.len() as f64);
1139
1140 let _latency_metric_guard = metrics
1141 .handle_submit_transaction_consensus_latency
1142 .with_label_values(&[req_type])
1143 .start_timer();
1144
1145 if is_soft_bundle_request {
1146 assert!(
1149 !consensus_transactions.is_empty(),
1150 "A valid soft bundle must have at least one transaction"
1151 );
1152 }
1153
1154 let tx_groups: Vec<Vec<ConsensusTransaction>> = if is_soft_bundle_request || is_ping_request
1157 {
1158 vec![consensus_transactions]
1159 } else {
1160 consensus_transactions
1161 .into_iter()
1162 .map(|t| vec![t])
1163 .collect()
1164 };
1165
1166 let group_tx_meta = if is_soft_bundle_request {
1171 vec![
1172 transaction_indexes
1173 .into_iter()
1174 .zip_eq(tx_digests)
1175 .collect::<Vec<_>>(),
1176 ]
1177 } else {
1178 transaction_indexes
1179 .into_iter()
1180 .zip_eq(tx_digests)
1181 .map(|pair| vec![pair])
1182 .collect::<Vec<_>>()
1183 };
1184
1185 let group_results = match submit_mode {
1190 AdmissionQueueSubmitMode::Bypass | AdmissionQueueSubmitMode::Disabled => {
1191 if matches!(submit_mode, AdmissionQueueSubmitMode::Bypass) {
1192 self.metrics.admission_queue_direct_bypasses.inc();
1193 }
1194 let futures = tx_groups.into_iter().map(|txns| {
1195 debug!(
1196 "handle_submit_transaction: submitting consensus transactions ({}): {}",
1197 req_type,
1198 txns.iter().map(|t| t.local_display()).join(", ")
1199 );
1200 self.consensus_adapter.submit_and_get_positions(
1201 txns,
1202 &epoch_store,
1203 submitter_client_addr,
1204 )
1205 });
1206 future::join_all(futures).await
1207 }
1208 AdmissionQueueSubmitMode::Queue => {
1209 let aq = self
1210 .admission_queue
1211 .as_ref()
1212 .expect("Queue mode implies admission_queue is Some")
1213 .load();
1214 let mut receivers = Vec::with_capacity(tx_groups.len());
1215 for txns in tx_groups {
1216 let gas_price = Self::extract_gas_price(&txns);
1217 let (rx, newly_inserted) = aq
1218 .try_insert(gas_price, txns, submitter_client_addr)
1219 .await?;
1220 if !newly_inserted {
1221 duplicate_at_admission = true;
1224 }
1225 receivers.push(rx);
1226 }
1227 future::join_all(receivers.into_iter().map(|rx| async move {
1228 match rx.await {
1229 Ok(result) => result.map_err(SuiError::from),
1230 Err(_) => Err(SuiError::from(
1231 SuiErrorKind::TooManyTransactionsPendingConsensus,
1232 )),
1233 }
1234 }))
1235 .await
1236 }
1237 };
1238
1239 if is_ping_request {
1240 let consensus_positions = group_results
1242 .into_iter()
1243 .next()
1244 .expect("Ping request must have exactly one submission group")?;
1245 assert_eq!(consensus_positions.len(), 1);
1246 results.push(Some(SubmitTxResult::Submitted {
1247 consensus_position: consensus_positions[0],
1248 }));
1249 } else {
1250 for (group_result, txns_meta) in group_results.into_iter().zip_debug_eq(group_tx_meta) {
1251 match group_result {
1252 Ok(consensus_positions) => {
1253 for ((idx, tx_digest), consensus_position) in
1254 txns_meta.into_iter().zip_debug_eq(consensus_positions)
1255 {
1256 debug!(
1257 ?tx_digest,
1258 "handle_submit_transaction: submitted consensus transaction at {}",
1259 consensus_position,
1260 );
1261 results[idx] = Some(SubmitTxResult::Submitted { consensus_position });
1262 }
1263 }
1264 Err(err) => {
1267 let SuiErrorKind::TransactionProcessing { status, .. } =
1268 err.as_inner().clone()
1269 else {
1270 return Err(err);
1271 };
1272 for (idx, tx_digest) in txns_meta {
1273 debug!(
1274 ?tx_digest,
1275 "handle_submit_transaction: transaction already processing: {err}"
1276 );
1277 metrics
1281 .submission_suppressed_already_processed
1282 .with_label_values(&[req_type])
1283 .inc();
1284 results[idx] = Some(SubmitTxResult::Rejected {
1285 error: SuiErrorKind::TransactionProcessing {
1286 digest: tx_digest,
1287 status: status.clone(),
1288 }
1289 .into(),
1290 });
1291 }
1292 }
1293 }
1294 }
1295 }
1296
1297 let spam_weight = Self::request_spam_weight(
1298 &results,
1299 has_gasless,
1300 duplicate_at_admission,
1301 is_ping_request,
1302 );
1303 let response = Self::try_from_submit_tx_response(results)?;
1304 Ok((response, spam_weight))
1305 }
1306
1307 fn request_spam_weight(
1310 results: &[Option<SubmitTxResult>],
1311 has_gasless: bool,
1312 duplicate_at_admission: bool,
1313 is_ping: bool,
1314 ) -> Weight {
1315 if is_ping || has_gasless || duplicate_at_admission {
1316 return Weight::one();
1317 }
1318 for result in results {
1319 let Some(result) = result else {
1320 debug_fatal!("transaction outcome unset when computing spam weight");
1324 return Weight::one();
1325 };
1326 if Self::submission_spam_weight(result) == Weight::one() {
1327 return Weight::one();
1328 }
1329 }
1330 Weight::zero()
1331 }
1332
1333 fn submission_spam_weight(result: &SubmitTxResult) -> Weight {
1334 match result {
1335 SubmitTxResult::Submitted { .. } => Weight::zero(),
1336 SubmitTxResult::Executed { .. } | SubmitTxResult::Rejected { .. } => Weight::one(),
1338 }
1339 }
1340
1341 fn try_from_submit_tx_response(
1342 results: Vec<Option<SubmitTxResult>>,
1343 ) -> Result<RawSubmitTxResponse, SuiError> {
1344 let mut raw_results = Vec::new();
1345 for (i, result) in results.into_iter().enumerate() {
1346 let result = result.ok_or_else(|| SuiErrorKind::GenericAuthorityError {
1347 error: format!("Missing transaction result at {}", i),
1348 })?;
1349 let raw_result = result.try_into()?;
1350 raw_results.push(raw_result);
1351 }
1352 Ok(RawSubmitTxResponse {
1353 results: raw_results,
1354 })
1355 }
1356
1357 fn extract_gas_price(transactions: &[ConsensusTransaction]) -> u64 {
1360 use sui_types::messages_consensus::ConsensusTransactionKind;
1361 transactions
1362 .iter()
1363 .filter_map(|tx| match &tx.kind {
1364 ConsensusTransactionKind::CertifiedTransaction(cert) => Some(cert.gas_price()),
1365 ConsensusTransactionKind::UserTransaction(t) => {
1366 Some(t.data().transaction_data().gas_price())
1367 }
1368 ConsensusTransactionKind::UserTransactionV2(t) => {
1369 Some(t.tx().data().transaction_data().gas_price())
1370 }
1371 _ => None,
1372 })
1373 .min()
1374 .unwrap_or(0)
1375 }
1376
1377 fn classify_submit_mode(&self, is_ping_request: bool) -> AdmissionQueueSubmitMode {
1378 let Some(aq) = &self.admission_queue else {
1379 return AdmissionQueueSubmitMode::Disabled;
1380 };
1381
1382 if is_ping_request {
1383 return AdmissionQueueSubmitMode::Bypass;
1384 }
1385
1386 let inflight = usize::try_from(self.consensus_adapter.num_inflight_transactions()).unwrap();
1387 if inflight < aq.bypass_threshold() {
1388 return AdmissionQueueSubmitMode::Bypass;
1389 }
1390
1391 if aq.load().failover_tripped() {
1394 return AdmissionQueueSubmitMode::Disabled;
1395 }
1396
1397 AdmissionQueueSubmitMode::Queue
1398 }
1399
1400 async fn collect_effects_data(
1401 &self,
1402 effects: &TransactionEffects,
1403 include_events: bool,
1404 include_input_objects: bool,
1405 include_output_objects: bool,
1406 ) -> SuiResult<(Option<TransactionEvents>, Vec<Object>, Vec<Object>)> {
1407 let events = if include_events && effects.events_digest().is_some() {
1408 Some(
1409 self.state
1410 .get_transaction_events(effects.transaction_digest())?,
1411 )
1412 } else {
1413 None
1414 };
1415
1416 let input_objects = if include_input_objects {
1417 self.state.get_transaction_input_objects(effects)?
1418 } else {
1419 vec![]
1420 };
1421
1422 let output_objects = if include_output_objects {
1423 self.state.get_transaction_output_objects(effects)?
1424 } else {
1425 vec![]
1426 };
1427
1428 Ok((events, input_objects, output_objects))
1429 }
1430}
1431
1432type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
1433
1434impl ValidatorService {
1435 async fn handle_submit_transaction_impl(
1436 &self,
1437 request: tonic::Request<RawSubmitTxRequest>,
1438 ) -> WrappedServiceResponse<RawSubmitTxResponse> {
1439 self.handle_submit_transaction(request).await
1440 }
1441
1442 async fn wait_for_effects_impl(
1443 &self,
1444 request: tonic::Request<RawWaitForEffectsRequest>,
1445 ) -> WrappedServiceResponse<RawWaitForEffectsResponse> {
1446 let request: WaitForEffectsRequest = request.into_inner().try_into()?;
1447 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1448 let response = timeout(
1449 Duration::from_secs(20),
1451 epoch_store
1452 .within_alive_epoch(self.wait_for_effects_response(request, &epoch_store))
1453 .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch())),
1454 )
1455 .await
1456 .map_err(|_| tonic::Status::internal("Timeout waiting for effects"))???
1457 .try_into()?;
1458 Ok((tonic::Response::new(response), Weight::zero()))
1459 }
1460
1461 #[instrument(name= "ValidatorService::wait_for_effects_response", level = "debug", skip_all, fields(consensus_position = ?request.consensus_position))]
1462 async fn wait_for_effects_response(
1463 &self,
1464 request: WaitForEffectsRequest,
1465 epoch_store: &Arc<AuthorityPerEpochStore>,
1466 ) -> SuiResult<WaitForEffectsResponse> {
1467 if request.ping_type.is_some() {
1468 return timeout(
1469 Duration::from_secs(10),
1470 self.ping_response(request, epoch_store),
1471 )
1472 .await
1473 .map_err(|_| SuiErrorKind::TimeoutError)?;
1474 }
1475
1476 let Some(tx_digest) = request.transaction_digest else {
1477 return Err(SuiErrorKind::InvalidRequest(
1478 "Transaction digest is required for wait for effects requests".to_string(),
1479 )
1480 .into());
1481 };
1482 let tx_digests = [tx_digest];
1483
1484 let consensus_status_future = async {
1488 let consensus_position = match request.consensus_position {
1489 Some(pos) => pos,
1490 None => return futures::future::pending().await,
1491 };
1492 let consensus_tx_status_cache = &epoch_store.consensus_tx_status_cache;
1493 consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1494 match consensus_tx_status_cache
1495 .notify_read_transaction_status(consensus_position)
1496 .await
1497 {
1498 NotifyReadConsensusTxStatusResult::Status(
1499 ConsensusTxStatus::Rejected | ConsensusTxStatus::Dropped,
1500 ) => Ok(WaitForEffectsResponse::Rejected {
1501 error: epoch_store.get_rejection_vote_reason(consensus_position),
1502 }),
1503 NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized) => {
1504 futures::future::pending().await
1506 }
1507 NotifyReadConsensusTxStatusResult::Expired(round) => {
1508 Ok(WaitForEffectsResponse::Expired {
1509 epoch: epoch_store.epoch(),
1510 round: Some(round),
1511 })
1512 }
1513 }
1514 };
1515
1516 tokio::select! {
1517 effects_result = self.state
1518 .get_transaction_cache_reader()
1519 .notify_read_executed_effects_may_fail(
1520 "AuthorityServer::wait_for_effects::notify_read_executed_effects_finalized",
1521 &tx_digests,
1522 ) => {
1523 let effects = effects_result?.pop().unwrap();
1524 let effects_digest = effects.digest();
1525 let details = if request.include_details {
1526 Some(self.complete_executed_data(effects).await?)
1527 } else {
1528 None
1529 };
1530 Ok(WaitForEffectsResponse::Executed {
1531 effects_digest,
1532 details,
1533 })
1534 }
1535 status_response = consensus_status_future => {
1536 status_response
1537 }
1538 }
1539 }
1540
1541 #[instrument(level = "error", skip_all, err(level = "debug"))]
1542 async fn ping_response(
1543 &self,
1544 request: WaitForEffectsRequest,
1545 epoch_store: &Arc<AuthorityPerEpochStore>,
1546 ) -> SuiResult<WaitForEffectsResponse> {
1547 let consensus_tx_status_cache = &epoch_store.consensus_tx_status_cache;
1548
1549 let Some(consensus_position) = request.consensus_position else {
1550 return Err(SuiErrorKind::InvalidRequest(
1551 "Consensus position is required for Ping requests".to_string(),
1552 )
1553 .into());
1554 };
1555
1556 let Some(ping) = request.ping_type else {
1558 return Err(SuiErrorKind::InvalidRequest(
1559 "Ping type is required for ping requests".to_string(),
1560 )
1561 .into());
1562 };
1563
1564 let _metrics_guard = self
1565 .metrics
1566 .handle_wait_for_effects_ping_latency
1567 .with_label_values(&[ping.as_str()])
1568 .start_timer();
1569
1570 consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1571
1572 let details = if request.include_details {
1573 Some(Box::new(ExecutedData::default()))
1574 } else {
1575 None
1576 };
1577
1578 let status = consensus_tx_status_cache
1579 .notify_read_transaction_status(consensus_position)
1580 .await;
1581 match status {
1582 NotifyReadConsensusTxStatusResult::Status(status) => match status {
1583 ConsensusTxStatus::Rejected | ConsensusTxStatus::Dropped => {
1584 Ok(WaitForEffectsResponse::Rejected {
1585 error: epoch_store.get_rejection_vote_reason(consensus_position),
1586 })
1587 }
1588 ConsensusTxStatus::Finalized => Ok(WaitForEffectsResponse::Executed {
1589 effects_digest: TransactionEffectsDigest::ZERO,
1590 details,
1591 }),
1592 },
1593 NotifyReadConsensusTxStatusResult::Expired(round) => {
1594 Ok(WaitForEffectsResponse::Expired {
1595 epoch: epoch_store.epoch(),
1596 round: Some(round),
1597 })
1598 }
1599 }
1600 }
1601
1602 async fn complete_executed_data(
1603 &self,
1604 effects: TransactionEffects,
1605 ) -> SuiResult<Box<ExecutedData>> {
1606 let (events, input_objects, output_objects) = self
1607 .collect_effects_data(
1608 &effects, true, true,
1609 true,
1610 )
1611 .await?;
1612 Ok(Box::new(ExecutedData {
1613 effects,
1614 events,
1615 input_objects,
1616 output_objects,
1617 }))
1618 }
1619
1620 async fn object_info_impl(
1621 &self,
1622 request: tonic::Request<ObjectInfoRequest>,
1623 ) -> WrappedServiceResponse<ObjectInfoResponse> {
1624 let request = request.into_inner();
1625 let response = self.state.handle_object_info_request(request).await?;
1626 Ok((tonic::Response::new(response), Weight::one()))
1627 }
1628
1629 async fn transaction_info_impl(
1630 &self,
1631 request: tonic::Request<TransactionInfoRequest>,
1632 ) -> WrappedServiceResponse<TransactionInfoResponse> {
1633 let request = request.into_inner();
1634 let response = self.state.handle_transaction_info_request(request).await?;
1635 Ok((tonic::Response::new(response), Weight::one()))
1636 }
1637
1638 async fn checkpoint_impl(
1639 &self,
1640 request: tonic::Request<CheckpointRequest>,
1641 ) -> WrappedServiceResponse<CheckpointResponse> {
1642 let request = request.into_inner();
1643 let response = self.state.handle_checkpoint_request(&request)?;
1644 Ok((tonic::Response::new(response), Weight::one()))
1645 }
1646
1647 async fn checkpoint_v2_impl(
1648 &self,
1649 request: tonic::Request<CheckpointRequestV2>,
1650 ) -> WrappedServiceResponse<CheckpointResponseV2> {
1651 let request = request.into_inner();
1652 let response = self.state.handle_checkpoint_request_v2(&request)?;
1653 Ok((tonic::Response::new(response), Weight::one()))
1654 }
1655
1656 async fn get_system_state_object_impl(
1657 &self,
1658 _request: tonic::Request<SystemStateRequest>,
1659 ) -> WrappedServiceResponse<SuiSystemState> {
1660 let response = self
1661 .state
1662 .get_object_cache_reader()
1663 .get_sui_system_state_object_unsafe()?;
1664 Ok((tonic::Response::new(response), Weight::one()))
1665 }
1666
1667 async fn validator_health_impl(
1668 &self,
1669 _request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1670 ) -> WrappedServiceResponse<sui_types::messages_grpc::RawValidatorHealthResponse> {
1671 let state = &self.state;
1672
1673 let epoch_store = state.load_epoch_store_one_call_per_task();
1675
1676 let num_inflight_execution_transactions =
1678 state.execution_scheduler().num_pending_certificates() as u64;
1679
1680 let num_inflight_consensus_transactions =
1682 self.consensus_adapter.num_inflight_transactions();
1683
1684 let last_committed_leader_round = epoch_store
1686 .consensus_tx_status_cache
1687 .get_last_committed_leader_round()
1688 .unwrap_or(0);
1689
1690 let last_locally_built_checkpoint = epoch_store
1692 .last_built_checkpoint_summary()
1693 .ok()
1694 .flatten()
1695 .map(|(_, summary)| summary.sequence_number)
1696 .unwrap_or(0);
1697
1698 let typed_response = sui_types::messages_grpc::ValidatorHealthResponse {
1699 num_inflight_consensus_transactions,
1700 num_inflight_execution_transactions,
1701 last_locally_built_checkpoint,
1702 last_committed_leader_round,
1703 };
1704
1705 let raw_response = typed_response
1706 .try_into()
1707 .map_err(|e: sui_types::error::SuiError| {
1708 tonic::Status::internal(format!("Failed to serialize health response: {}", e))
1709 })?;
1710
1711 Ok((tonic::Response::new(raw_response), Weight::one()))
1712 }
1713
1714 fn get_client_ip_addr<T>(
1715 &self,
1716 request: &tonic::Request<T>,
1717 source: &ClientIdSource,
1718 ) -> Option<IpAddr> {
1719 let forwarded_header = request.metadata().get_all("x-forwarded-for").iter().next();
1720
1721 if let Some(header) = forwarded_header {
1722 let num_hops = header
1723 .to_str()
1724 .map(|h| h.split(',').count().saturating_sub(1))
1725 .unwrap_or(0);
1726
1727 self.metrics.x_forwarded_for_num_hops.set(num_hops as f64);
1728 }
1729
1730 match source {
1731 ClientIdSource::SocketAddr => {
1732 let socket_addr: Option<SocketAddr> = request.remote_addr();
1733
1734 if let Some(socket_addr) = socket_addr {
1740 Some(socket_addr.ip())
1741 } else {
1742 if cfg!(msim) {
1743 } else if cfg!(test) {
1745 panic!("Failed to get remote address from request");
1746 } else {
1747 self.metrics.connection_ip_not_found.inc();
1748 error!("Failed to get remote address from request");
1749 }
1750 None
1751 }
1752 }
1753 ClientIdSource::XForwardedFor(num_hops) => {
1754 let do_header_parse = |op: &MetadataValue<Ascii>| {
1755 match op.to_str() {
1756 Ok(header_val) => {
1757 let header_contents =
1758 header_val.split(',').map(str::trim).collect::<Vec<_>>();
1759 if *num_hops == 0 {
1760 error!(
1761 "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
1762 number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
1763 to this node. Skipping traffic controller request handling.",
1764 header_contents,
1765 );
1766 return None;
1767 }
1768 let contents_len = header_contents.len();
1769 if contents_len < *num_hops {
1770 error!(
1771 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1772 Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
1773 `client-id-source` in the node config.",
1774 header_contents, contents_len, num_hops, contents_len,
1775 );
1776 self.metrics.client_id_source_config_mismatch.inc();
1777 return None;
1778 }
1779 let Some(client_ip) = header_contents.get(contents_len - num_hops)
1780 else {
1781 error!(
1782 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1783 Expected at least {} values. Skipping traffic controller request handling.",
1784 header_contents, contents_len, num_hops, contents_len,
1785 );
1786 return None;
1787 };
1788 parse_ip(client_ip).or_else(|| {
1789 self.metrics.forwarded_header_parse_error.inc();
1790 None
1791 })
1792 }
1793 Err(e) => {
1794 self.metrics.forwarded_header_invalid.inc();
1798 error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
1799 None
1800 }
1801 }
1802 };
1803 if let Some(op) = request.metadata().get("x-forwarded-for") {
1804 do_header_parse(op)
1805 } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
1806 do_header_parse(op)
1807 } else {
1808 self.metrics.forwarded_header_not_included.inc();
1809 error!(
1810 "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
1811 );
1812 None
1813 }
1814 }
1815 }
1816 }
1817
1818 async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
1819 if let Some(traffic_controller) = &self.traffic_controller {
1820 if !traffic_controller.check(&client, &None).await {
1821 Err(tonic::Status::from_error(
1823 SuiErrorKind::TooManyRequests.into(),
1824 ))
1825 } else {
1826 Ok(())
1827 }
1828 } else {
1829 Ok(())
1830 }
1831 }
1832
1833 fn handle_traffic_resp<T>(
1834 &self,
1835 client: Option<IpAddr>,
1836 wrapped_response: WrappedServiceResponse<T>,
1837 method_name: &str,
1838 ) -> Result<tonic::Response<T>, tonic::Status> {
1839 let (error, spam_weight, unwrapped_response) = match wrapped_response {
1840 Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
1841 Err(status) => (
1842 Some(SuiError::from(status.clone())),
1843 Weight::zero(),
1844 Err(status.clone()),
1845 ),
1846 };
1847
1848 if let Some(traffic_controller) = self.traffic_controller.clone() {
1849 traffic_controller.tally(TrafficTally {
1850 direct: client,
1851 through_fullnode: None,
1852 error_info: error.map(|e| {
1853 let error_type = String::from(e.clone().as_ref());
1854 let error_weight = normalize(e);
1855 (error_weight, error_type)
1856 }),
1857 spam_weight,
1858 timestamp: SystemTime::now(),
1859 method: Some(method_name.to_string()),
1860 })
1861 }
1862 unwrapped_response
1863 }
1864}
1865
1866fn normalize(err: SuiError) -> Weight {
1868 match err.as_inner() {
1869 SuiErrorKind::UserInputError {
1870 error: UserInputError::IncorrectUserSignature { .. },
1871 } => Weight::one(),
1872 SuiErrorKind::InvalidSignature { .. }
1873 | SuiErrorKind::SignerSignatureAbsent { .. }
1874 | SuiErrorKind::SignerSignatureNumberMismatch { .. }
1875 | SuiErrorKind::IncorrectSigner { .. }
1876 | SuiErrorKind::UnknownSigner { .. }
1877 | SuiErrorKind::WrongEpoch { .. } => Weight::one(),
1878 _ => Weight::zero(),
1879 }
1880}
1881
1882#[macro_export]
1886macro_rules! handle_with_decoration {
1887 ($self:ident, $func_name:ident, $request:ident, $method_name:expr) => {{
1888 if $self.client_id_source.is_none() {
1889 return $self.$func_name($request).await.map(|(result, _)| result);
1890 }
1891
1892 let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
1893
1894 $self.handle_traffic_req(client.clone()).await?;
1896
1897 let wrapped_response = $self.$func_name($request).await;
1899 $self.handle_traffic_resp(client, wrapped_response, $method_name)
1900 }};
1901}
1902
1903#[async_trait]
1904impl Validator for ValidatorService {
1905 async fn submit_transaction(
1906 &self,
1907 request: tonic::Request<RawSubmitTxRequest>,
1908 ) -> Result<tonic::Response<RawSubmitTxResponse>, tonic::Status> {
1909 let validator_service = self.clone();
1910
1911 spawn_monitored_task!(async move {
1914 handle_with_decoration!(
1917 validator_service,
1918 handle_submit_transaction_impl,
1919 request,
1920 "submit_transaction"
1921 )
1922 })
1923 .await
1924 .unwrap()
1925 }
1926
1927 async fn wait_for_effects(
1928 &self,
1929 request: tonic::Request<RawWaitForEffectsRequest>,
1930 ) -> Result<tonic::Response<RawWaitForEffectsResponse>, tonic::Status> {
1931 handle_with_decoration!(self, wait_for_effects_impl, request, "wait_for_effects")
1932 }
1933
1934 async fn object_info(
1935 &self,
1936 request: tonic::Request<ObjectInfoRequest>,
1937 ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
1938 handle_with_decoration!(self, object_info_impl, request, "object_info")
1939 }
1940
1941 async fn transaction_info(
1942 &self,
1943 request: tonic::Request<TransactionInfoRequest>,
1944 ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
1945 handle_with_decoration!(self, transaction_info_impl, request, "transaction_info")
1946 }
1947
1948 async fn checkpoint(
1949 &self,
1950 request: tonic::Request<CheckpointRequest>,
1951 ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
1952 handle_with_decoration!(self, checkpoint_impl, request, "checkpoint")
1953 }
1954
1955 async fn checkpoint_v2(
1956 &self,
1957 request: tonic::Request<CheckpointRequestV2>,
1958 ) -> Result<tonic::Response<CheckpointResponseV2>, tonic::Status> {
1959 handle_with_decoration!(self, checkpoint_v2_impl, request, "checkpoint_v2")
1960 }
1961
1962 async fn get_system_state_object(
1963 &self,
1964 request: tonic::Request<SystemStateRequest>,
1965 ) -> Result<tonic::Response<SuiSystemState>, tonic::Status> {
1966 handle_with_decoration!(
1967 self,
1968 get_system_state_object_impl,
1969 request,
1970 "get_system_state_object"
1971 )
1972 }
1973
1974 async fn validator_health(
1975 &self,
1976 request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1977 ) -> Result<tonic::Response<sui_types::messages_grpc::RawValidatorHealthResponse>, tonic::Status>
1978 {
1979 handle_with_decoration!(self, validator_health_impl, request, "validator_health")
1980 }
1981}