1use anyhow::Result;
6use async_trait::async_trait;
7use fastcrypto::traits::KeyPair;
8use futures::{TryFutureExt, future};
9use itertools::Itertools as _;
10use mysten_common::ZipDebugEqIteratorExt;
11use mysten_common::{assert_reachable, debug_fatal};
12use mysten_metrics::spawn_monitored_task;
13use prometheus::{
14 Gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, Registry,
15 register_gauge_with_registry, register_histogram_vec_with_registry,
16 register_histogram_with_registry, register_int_counter_vec_with_registry,
17 register_int_counter_with_registry,
18};
19use std::{
20 io,
21 net::{IpAddr, SocketAddr},
22 sync::Arc,
23 time::{Duration, SystemTime},
24};
25use sui_network::{
26 api::{Validator, ValidatorServer},
27 tonic,
28 validator::server::SUI_TLS_SERVER_NAME,
29};
30use sui_types::effects::TransactionEffectsAPI;
31use sui_types::message_envelope::Message;
32use sui_types::messages_consensus::{ConsensusPosition, ConsensusTransaction};
33use sui_types::messages_grpc::{
34 ObjectInfoRequest, ObjectInfoResponse, RawSubmitTxResponse, SystemStateRequest,
35 TransactionInfoRequest, TransactionInfoResponse,
36};
37use sui_types::multiaddr::Multiaddr;
38use sui_types::object::Object;
39use sui_types::sui_system_state::SuiSystemState;
40use sui_types::traffic_control::{ClientIdSource, Weight};
41use sui_types::{
42 base_types::ObjectID,
43 digests::TransactionEffectsDigest,
44 error::{SuiErrorKind, UserInputError},
45};
46use sui_types::{
47 effects::TransactionEffects,
48 messages_grpc::{
49 ExecutedData, RawSubmitTxRequest, RawWaitForEffectsRequest, RawWaitForEffectsResponse,
50 SubmitTxResult, WaitForEffectsRequest, WaitForEffectsResponse,
51 },
52};
53use sui_types::{effects::TransactionEvents, messages_grpc::SubmitTxType};
54use sui_types::{error::*, transaction::*};
55use sui_types::{
56 fp_ensure,
57 messages_checkpoint::{
58 CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
59 },
60};
61use tokio::time::timeout;
62use tonic::metadata::{Ascii, MetadataValue};
63use tracing::{debug, error, info, instrument};
64
65use crate::admission_queue::{AdmissionQueueContext, AdmissionQueueManager};
66use crate::gasless_rate_limiter::GaslessRateLimiter;
67use crate::{
68 authority::{AuthorityState, consensus_tx_status_cache::ConsensusTxStatus},
69 consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics, ConsensusOverloadChecker},
70 traffic_controller::{TrafficController, parse_ip, policies::TrafficTally},
71};
72use crate::{
73 authority::{
74 authority_per_epoch_store::AuthorityPerEpochStore,
75 consensus_tx_status_cache::NotifyReadConsensusTxStatusResult,
76 },
77 checkpoints::CheckpointStore,
78 mysticeti_adapter::LazyMysticetiClient,
79};
80use sui_config::local_ip_utils::new_local_tcp_address_for_testing;
81
82#[cfg(test)]
83#[path = "unit_tests/server_tests.rs"]
84mod server_tests;
85
86#[cfg(test)]
87#[path = "unit_tests/wait_for_effects_tests.rs"]
88mod wait_for_effects_tests;
89
90#[cfg(test)]
91#[path = "unit_tests/submit_transaction_tests.rs"]
92mod submit_transaction_tests;
93
94pub struct AuthorityServerHandle {
95 server_handle: sui_network::validator::server::Server,
96}
97
98impl AuthorityServerHandle {
99 pub async fn join(self) -> Result<(), io::Error> {
100 self.server_handle.handle().wait_for_shutdown().await;
101 Ok(())
102 }
103
104 pub async fn kill(self) -> Result<(), io::Error> {
105 self.server_handle.handle().shutdown().await;
106 Ok(())
107 }
108
109 pub fn address(&self) -> &Multiaddr {
110 self.server_handle.local_addr()
111 }
112}
113
114pub struct AuthorityServer {
115 address: Multiaddr,
116 pub state: Arc<AuthorityState>,
117 consensus_adapter: Arc<ConsensusAdapter>,
118 pub metrics: Arc<ValidatorServiceMetrics>,
119}
120
121impl AuthorityServer {
122 pub fn new_for_test_with_consensus_adapter(
123 state: Arc<AuthorityState>,
124 consensus_adapter: Arc<ConsensusAdapter>,
125 ) -> Self {
126 let address = new_local_tcp_address_for_testing();
127 let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
128
129 Self {
130 address,
131 state,
132 consensus_adapter,
133 metrics,
134 }
135 }
136
137 pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
138 let slot_freed_notify = Arc::new(tokio::sync::Notify::new());
139 let consensus_adapter = Arc::new(ConsensusAdapter::new(
140 Arc::new(LazyMysticetiClient::new()),
141 CheckpointStore::new_for_tests(),
142 state.name,
143 100_000,
144 100_000,
145 ConsensusAdapterMetrics::new_test(),
146 slot_freed_notify,
147 ));
148 Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
149 }
150
151 pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
152 let address = self.address.clone();
153 self.spawn_with_bind_address_for_test(address).await
154 }
155
156 pub async fn spawn_with_bind_address_for_test(
157 self,
158 address: Multiaddr,
159 ) -> Result<AuthorityServerHandle, io::Error> {
160 let tls_config = sui_tls::create_rustls_server_config(
161 self.state.config.network_key_pair().copy().private(),
162 SUI_TLS_SERVER_NAME.to_string(),
163 );
164 let config = mysten_network::config::Config::new();
165 let server = sui_network::validator::server::ServerBuilder::from_config(
166 &config,
167 mysten_network::metrics::DefaultMetricsCallbackProvider::default(),
168 )
169 .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
170 self.state,
171 self.consensus_adapter,
172 self.metrics,
173 )))
174 .bind(&address, Some(tls_config))
175 .await
176 .unwrap();
177 let local_addr = server.local_addr().to_owned();
178 info!("Listening to traffic on {local_addr}");
179 let handle = AuthorityServerHandle {
180 server_handle: server,
181 };
182 Ok(handle)
183 }
184}
185
186pub struct ValidatorServiceMetrics {
187 pub signature_errors: IntCounter,
188 pub tx_verification_latency: Histogram,
189 pub handle_transaction_latency: Histogram,
190 pub handle_transaction_consensus_latency: Histogram,
191 pub handle_submit_transaction_consensus_latency: HistogramVec,
192 pub handle_wait_for_effects_ping_latency: HistogramVec,
193
194 handle_submit_transaction_latency: HistogramVec,
195 handle_submit_transaction_bytes: HistogramVec,
196 handle_submit_transaction_batch_size: HistogramVec,
197
198 num_rejected_tx_during_overload: IntCounterVec,
199 submission_rejected_transactions: IntCounterVec,
200 connection_ip_not_found: IntCounter,
201 forwarded_header_parse_error: IntCounter,
202 forwarded_header_invalid: IntCounter,
203 forwarded_header_not_included: IntCounter,
204 client_id_source_config_mismatch: IntCounter,
205 x_forwarded_for_num_hops: Gauge,
206 pub gasless_rate_limited_count: IntCounter,
207 pub gasless_submission_outcomes: IntCounterVec,
208 admission_queue_direct_bypasses: IntCounter,
209}
210
211impl ValidatorServiceMetrics {
212 pub fn new(registry: &Registry) -> Self {
213 Self {
214 signature_errors: register_int_counter_with_registry!(
215 "total_signature_errors",
216 "Number of transaction signature errors",
217 registry,
218 )
219 .unwrap(),
220 tx_verification_latency: register_histogram_with_registry!(
221 "validator_service_tx_verification_latency",
222 "Latency of verifying a transaction",
223 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
224 registry,
225 )
226 .unwrap(),
227 handle_transaction_latency: register_histogram_with_registry!(
228 "validator_service_handle_transaction_latency",
229 "Latency of handling a transaction",
230 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
231 registry,
232 )
233 .unwrap(),
234 handle_transaction_consensus_latency: register_histogram_with_registry!(
235 "validator_service_handle_transaction_consensus_latency",
236 "Latency of handling a user transaction sent through consensus",
237 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
238 registry,
239 )
240 .unwrap(),
241 handle_submit_transaction_consensus_latency: register_histogram_vec_with_registry!(
242 "validator_service_submit_transaction_consensus_latency",
243 "Latency of submitting a user transaction sent through consensus",
244 &["req_type"],
245 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
246 registry,
247 )
248 .unwrap(),
249 handle_submit_transaction_latency: register_histogram_vec_with_registry!(
250 "validator_service_submit_transaction_latency",
251 "Latency of submit transaction handler",
252 &["req_type"],
253 mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
254 registry,
255 )
256 .unwrap(),
257 handle_wait_for_effects_ping_latency: register_histogram_vec_with_registry!(
258 "validator_service_handle_wait_for_effects_ping_latency",
259 "Latency of handling a ping request for wait_for_effects",
260 &["req_type"],
261 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
262 registry,
263 )
264 .unwrap(),
265 handle_submit_transaction_bytes: register_histogram_vec_with_registry!(
266 "validator_service_submit_transaction_bytes",
267 "The size of transactions in the submit transaction request",
268 &["req_type"],
269 mysten_metrics::BYTES_BUCKETS.to_vec(),
270 registry,
271 )
272 .unwrap(),
273 handle_submit_transaction_batch_size: register_histogram_vec_with_registry!(
274 "validator_service_submit_transaction_batch_size",
275 "The number of transactions in the submit transaction request",
276 &["req_type"],
277 mysten_metrics::COUNT_BUCKETS.to_vec(),
278 registry,
279 )
280 .unwrap(),
281 num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
282 "validator_service_num_rejected_tx_during_overload",
283 "Number of rejected transaction due to system overload",
284 &["error_type"],
285 registry,
286 )
287 .unwrap(),
288 submission_rejected_transactions: register_int_counter_vec_with_registry!(
289 "validator_service_submission_rejected_transactions",
290 "Number of transactions rejected during submission",
291 &["reason"],
292 registry,
293 )
294 .unwrap(),
295 connection_ip_not_found: register_int_counter_with_registry!(
296 "validator_service_connection_ip_not_found",
297 "Number of times connection IP was not extractable from request",
298 registry,
299 )
300 .unwrap(),
301 forwarded_header_parse_error: register_int_counter_with_registry!(
302 "validator_service_forwarded_header_parse_error",
303 "Number of times x-forwarded-for header could not be parsed",
304 registry,
305 )
306 .unwrap(),
307 forwarded_header_invalid: register_int_counter_with_registry!(
308 "validator_service_forwarded_header_invalid",
309 "Number of times x-forwarded-for header was invalid",
310 registry,
311 )
312 .unwrap(),
313 forwarded_header_not_included: register_int_counter_with_registry!(
314 "validator_service_forwarded_header_not_included",
315 "Number of times x-forwarded-for header was (unexpectedly) not included in request",
316 registry,
317 )
318 .unwrap(),
319 client_id_source_config_mismatch: register_int_counter_with_registry!(
320 "validator_service_client_id_source_config_mismatch",
321 "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
322 registry,
323 )
324 .unwrap(),
325 x_forwarded_for_num_hops: register_gauge_with_registry!(
326 "validator_service_x_forwarded_for_num_hops",
327 "Number of hops in x-forwarded-for header",
328 registry,
329 )
330 .unwrap(),
331 gasless_rate_limited_count: register_int_counter_with_registry!(
332 "validator_service_gasless_rate_limited_count",
333 "Number of gasless transactions rejected by rate limiter",
334 registry,
335 )
336 .unwrap(),
337 gasless_submission_outcomes: register_int_counter_vec_with_registry!(
338 "validator_service_gasless_submission_outcomes",
339 "Number of valid gasless transaction submissions by outcome",
340 &["outcome"],
341 registry,
342 )
343 .unwrap(),
344 admission_queue_direct_bypasses: register_int_counter_with_registry!(
345 "validator_service_admission_queue_direct_bypasses",
346 "Number of transactions that bypassed the queue (system not overloaded)",
347 registry,
348 )
349 .unwrap(),
350 }
351 }
352
353 pub fn new_for_tests() -> Self {
354 let registry = Registry::new();
355 Self::new(®istry)
356 }
357}
358
359enum AdmissionQueueSubmitMode {
361 Bypass,
363 Queue,
365 Disabled,
369}
370
371#[derive(Clone)]
372pub struct ValidatorService {
373 state: Arc<AuthorityState>,
374 consensus_adapter: Arc<ConsensusAdapter>,
375 metrics: Arc<ValidatorServiceMetrics>,
376 traffic_controller: Option<Arc<TrafficController>>,
377 client_id_source: Option<ClientIdSource>,
378 gasless_limiter: GaslessRateLimiter,
379 admission_queue: Option<AdmissionQueueContext>,
380}
381
382impl ValidatorService {
383 pub fn new(
384 state: Arc<AuthorityState>,
385 consensus_adapter: Arc<ConsensusAdapter>,
386 validator_metrics: Arc<ValidatorServiceMetrics>,
387 client_id_source: Option<ClientIdSource>,
388 admission_queue: Option<AdmissionQueueContext>,
389 ) -> Self {
390 let traffic_controller = state.traffic_controller.clone();
391 let gasless_limiter = GaslessRateLimiter::new(state.consensus_gasless_counter.clone());
392 Self {
393 state,
394 consensus_adapter,
395 metrics: validator_metrics,
396 traffic_controller,
397 client_id_source,
398 gasless_limiter,
399 admission_queue,
400 }
401 }
402
403 pub fn new_for_tests(
404 state: Arc<AuthorityState>,
405 consensus_adapter: Arc<ConsensusAdapter>,
406 metrics: Arc<ValidatorServiceMetrics>,
407 ) -> Self {
408 let gasless_limiter = GaslessRateLimiter::new(state.consensus_gasless_counter.clone());
409 let epoch_store = state.epoch_store_for_testing().clone();
410 let slot_freed_notify = Arc::new(tokio::sync::Notify::new());
411 let manager = Arc::new(AdmissionQueueManager::new_for_tests(
412 consensus_adapter.clone(),
413 slot_freed_notify,
414 ));
415 let admission_queue = Some(AdmissionQueueContext::spawn(manager, epoch_store));
416 Self {
417 state,
418 consensus_adapter,
419 metrics,
420 traffic_controller: None,
421 client_id_source: None,
422 gasless_limiter,
423 admission_queue,
424 }
425 }
426
427 pub fn validator_state(&self) -> &Arc<AuthorityState> {
428 &self.state
429 }
430
431 pub fn handle_transaction_for_testing(&self, transaction: Transaction) -> SuiResult<()> {
433 let epoch_store = self.state.load_epoch_store_one_call_per_task();
434
435 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
437
438 let transaction = epoch_store
440 .verify_transaction_require_no_aliases(transaction)?
441 .into_tx();
442
443 self.state
445 .handle_vote_transaction(&epoch_store, transaction)?;
446
447 Ok(())
448 }
449
450 pub fn handle_transaction_for_testing_with_overload_check(
453 &self,
454 transaction: Transaction,
455 ) -> SuiResult<()> {
456 let epoch_store = self.state.load_epoch_store_one_call_per_task();
457
458 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
460
461 self.state.check_system_overload(
463 transaction.data(),
464 self.state.check_system_overload_at_signing(),
465 )?;
466
467 let transaction = epoch_store
469 .verify_transaction_require_no_aliases(transaction)?
470 .into_tx();
471
472 self.state
474 .handle_vote_transaction(&epoch_store, transaction)?;
475
476 Ok(())
477 }
478
479 async fn collect_immutable_object_ids(
482 &self,
483 tx: &VerifiedTransaction,
484 state: &AuthorityState,
485 ) -> SuiResult<Vec<ObjectID>> {
486 let input_objects = tx.data().transaction_data().input_objects()?;
487
488 let object_ids: Vec<ObjectID> = input_objects
490 .iter()
491 .filter_map(|obj| match obj {
492 InputObjectKind::ImmOrOwnedMoveObject((id, _, _)) => Some(*id),
493 _ => None,
494 })
495 .collect();
496 if object_ids.is_empty() {
497 return Ok(vec![]);
498 }
499
500 let objects = state.get_object_cache_reader().get_objects(&object_ids);
502
503 objects
505 .into_iter()
506 .zip_debug_eq(object_ids.iter())
507 .filter_map(|(obj, id)| {
508 let Some(o) = obj else {
509 return Some(Err::<ObjectID, SuiError>(
510 SuiErrorKind::UserInputError {
511 error: UserInputError::ObjectNotFound {
512 object_id: *id,
513 version: None,
514 },
515 }
516 .into(),
517 ));
518 };
519 if o.is_immutable() {
520 Some(Ok(*id))
521 } else {
522 None
523 }
524 })
525 .collect::<SuiResult<Vec<ObjectID>>>()
526 }
527
528 #[instrument(
529 name = "ValidatorService::handle_submit_transaction",
530 level = "error",
531 skip_all,
532 err(level = "debug")
533 )]
534 async fn handle_submit_transaction(
535 &self,
536 request: tonic::Request<RawSubmitTxRequest>,
537 ) -> WrappedServiceResponse<RawSubmitTxResponse> {
538 let Self {
539 state,
540 consensus_adapter: _,
541 metrics,
542 traffic_controller: _,
543 client_id_source,
544 gasless_limiter: _,
545 admission_queue: _,
546 } = self.clone();
547
548 let submitter_client_addr = if let Some(client_id_source) = &client_id_source {
549 self.get_client_ip_addr(&request, client_id_source)
550 } else {
551 self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
552 };
553
554 let inner = request.into_inner();
555 let start_epoch = state.load_epoch_store_one_call_per_task().epoch();
556
557 let next_epoch = start_epoch + 1;
558 let mut max_retries = 1;
559
560 loop {
561 let res = self
562 .handle_submit_transaction_inner(&state, &metrics, &inner, submitter_client_addr)
563 .await;
564 match res {
565 Ok((response, weight)) => return Ok((tonic::Response::new(response), weight)),
566 Err(err) => {
567 if max_retries > 0
568 && let SuiErrorKind::ValidatorHaltedAtEpochEnd = err.as_inner()
569 {
570 max_retries -= 1;
571
572 debug!(
573 "ValidatorHaltedAtEpochEnd. Will retry after validator reconfigures"
574 );
575
576 if let Ok(Ok(new_epoch)) =
577 timeout(Duration::from_secs(15), state.wait_for_epoch(next_epoch)).await
578 {
579 assert_reachable!("retry submission at epoch end");
580 if new_epoch >= next_epoch {
581 continue;
582 }
583 debug_fatal!(
585 "wait_for_epoch returned early: expected >= {}, got {}",
586 next_epoch,
587 new_epoch
588 );
589 }
590 }
591 return Err(err.into());
592 }
593 }
594 }
595 }
596
597 async fn handle_submit_transaction_inner(
598 &self,
599 state: &AuthorityState,
600 metrics: &ValidatorServiceMetrics,
601 request: &RawSubmitTxRequest,
602 submitter_client_addr: Option<IpAddr>,
603 ) -> SuiResult<(RawSubmitTxResponse, Weight)> {
604 let epoch_store = state.load_epoch_store_one_call_per_task();
605 let submit_type = SubmitTxType::try_from(request.submit_type).map_err(|e| {
606 SuiErrorKind::GrpcMessageDeserializeError {
607 type_info: "RawSubmitTxRequest.submit_type".to_string(),
608 error: e.to_string(),
609 }
610 })?;
611
612 let is_ping_request = submit_type == SubmitTxType::Ping;
613 if is_ping_request {
614 fp_ensure!(
615 request.transactions.is_empty(),
616 SuiErrorKind::InvalidRequest(format!(
617 "Ping request cannot contain {} transactions",
618 request.transactions.len()
619 ))
620 .into()
621 );
622 } else {
623 fp_ensure!(
625 !request.transactions.is_empty(),
626 SuiErrorKind::InvalidRequest(
627 "At least one transaction needs to be submitted".to_string(),
628 )
629 .into()
630 );
631 }
632
633 let is_soft_bundle_request = submit_type == SubmitTxType::SoftBundle;
638
639 let max_num_transactions = if is_soft_bundle_request {
640 epoch_store.protocol_config().max_soft_bundle_size()
643 } else {
644 epoch_store
646 .protocol_config()
647 .max_num_transactions_in_block()
648 };
649 fp_ensure!(
650 request.transactions.len() <= max_num_transactions as usize,
651 SuiErrorKind::InvalidRequest(format!(
652 "Too many transactions in request: {} vs {}",
653 request.transactions.len(),
654 max_num_transactions
655 ))
656 .into()
657 );
658
659 let mut tx_digests = Vec::with_capacity(request.transactions.len());
661 let mut consensus_transactions = Vec::with_capacity(request.transactions.len());
663 let mut transaction_indexes = Vec::with_capacity(request.transactions.len());
665 let mut results: Vec<Option<SubmitTxResult>> = vec![None; request.transactions.len()];
667 let mut total_size_bytes = 0;
669 let mut spam_weight = Weight::zero();
671 let mut expected_soft_bundle_gas_price = None;
673
674 let req_type = if is_ping_request {
675 "ping"
676 } else if request.transactions.len() == 1 {
677 "single_transaction"
678 } else if is_soft_bundle_request {
679 "soft_bundle"
680 } else {
681 "batch"
682 };
683
684 let _handle_tx_metrics_guard = metrics
685 .handle_submit_transaction_latency
686 .with_label_values(&[req_type])
687 .start_timer();
688
689 let submit_mode = self.classify_submit_mode(is_ping_request);
690
691 for (idx, tx_bytes) in request.transactions.iter().enumerate() {
692 let transaction = match bcs::from_bytes::<Transaction>(tx_bytes) {
693 Ok(txn) => txn,
694 Err(e) => {
695 return Err(SuiErrorKind::TransactionDeserializationError {
697 error: format!("Failed to deserialize transaction at index {}: {}", idx, e),
698 }
699 .into());
700 }
701 };
702
703 let tx_size = transaction.validity_check(&epoch_store.tx_validity_check_context())?;
705 let tx_digest = *transaction.digest();
706
707 if is_soft_bundle_request {
709 let gas_price = transaction.data().transaction_data().gas_price();
710 if let Some(expected) = expected_soft_bundle_gas_price {
711 fp_ensure!(
712 gas_price == expected,
713 SuiErrorKind::UserInputError {
714 error: UserInputError::GasPriceMismatchError {
715 digest: tx_digest,
716 expected,
717 actual: gas_price,
718 }
719 }
720 .into()
721 );
722 } else {
723 expected_soft_bundle_gas_price = Some(gas_price);
724 }
725 }
726
727 let is_gasless = transaction
728 .data()
729 .transaction_data()
730 .is_gasless_transaction();
731
732 if is_gasless {
733 spam_weight = Weight::one();
735 metrics
736 .gasless_submission_outcomes
737 .with_label_values(&["attempted"])
738 .inc();
739 }
740
741 let overload_check_res = state.check_system_overload(
742 transaction.data(),
743 state.check_system_overload_at_signing(),
744 );
745 if let Err(error) = overload_check_res {
746 metrics
747 .num_rejected_tx_during_overload
748 .with_label_values(&[error.as_ref()])
749 .inc();
750 if is_gasless {
751 metrics
752 .gasless_submission_outcomes
753 .with_label_values(&["rejected_overload"])
754 .inc();
755 }
756 results[idx] = Some(SubmitTxResult::Rejected { error });
757 continue;
758 }
759
760 if matches!(submit_mode, AdmissionQueueSubmitMode::Disabled)
763 && let Err(error) = self.consensus_adapter.check_consensus_overload()
764 {
765 state.update_overload_metrics("consensus");
766 metrics
767 .num_rejected_tx_during_overload
768 .with_label_values(&[error.as_ref()])
769 .inc();
770 if is_gasless {
771 metrics
772 .gasless_submission_outcomes
773 .with_label_values(&["rejected_overload"])
774 .inc();
775 }
776 results[idx] = Some(SubmitTxResult::Rejected { error });
777 continue;
778 }
779
780 if is_gasless
781 && !self
782 .gasless_limiter
783 .try_acquire(epoch_store.protocol_config())
784 {
785 metrics.gasless_rate_limited_count.inc();
786 metrics
787 .gasless_submission_outcomes
788 .with_label_values(&["rejected_rate_limited"])
789 .inc();
790 results[idx] = Some(SubmitTxResult::Rejected {
791 error: SuiErrorKind::ValidatorOverloadedRetryAfter {
792 retry_after_secs: 1,
793 }
794 .into(),
795 });
796 continue;
797 }
798
799 let verified_transaction = {
801 let _metrics_guard = metrics.tx_verification_latency.start_timer();
802 if epoch_store.protocol_config().address_aliases() {
803 match epoch_store.verify_transaction_with_current_aliases(transaction) {
804 Ok(tx) => tx,
805 Err(e) => {
806 metrics.signature_errors.inc();
807 return Err(e);
808 }
809 }
810 } else {
811 match epoch_store.verify_transaction_require_no_aliases(transaction) {
812 Ok(tx) => tx,
813 Err(e) => {
814 metrics.signature_errors.inc();
815 return Err(e);
816 }
817 }
818 }
819 };
820
821 debug!(
822 ?tx_digest,
823 "handle_submit_transaction: verified transaction"
824 );
825
826 if let Some(effects) = state
829 .get_transaction_cache_reader()
830 .get_executed_effects(&tx_digest)
831 {
832 let effects_digest = effects.digest();
833 if let Ok(executed_data) = self.complete_executed_data(effects).await {
834 let executed_result = SubmitTxResult::Executed {
835 effects_digest,
836 details: Some(executed_data),
837 };
838 results[idx] = Some(executed_result);
839 debug!(?tx_digest, "handle_submit_transaction: already executed");
840 continue;
841 }
842 }
843
844 if self
845 .state
846 .get_transaction_cache_reader()
847 .transaction_executed_in_last_epoch(&tx_digest, epoch_store.epoch())
848 {
849 results[idx] = Some(SubmitTxResult::Rejected {
850 error: UserInputError::TransactionAlreadyExecuted { digest: tx_digest }.into(),
851 });
852 debug!(
853 ?tx_digest,
854 "handle_submit_transaction: transaction already executed in previous epoch"
855 );
856 continue;
857 }
858
859 debug!(
860 ?tx_digest,
861 "handle_submit_transaction: waiting for fastpath dependency objects"
862 );
863 if !state
864 .wait_for_fastpath_dependency_objects(
865 verified_transaction.tx(),
866 epoch_store.epoch(),
867 )
868 .await?
869 {
870 debug!(
871 ?tx_digest,
872 "fastpath input objects are still unavailable after waiting"
873 );
874 }
875
876 match state.handle_vote_transaction(&epoch_store, verified_transaction.tx().clone()) {
877 Ok(_) => { }
878 Err(e) => {
879 if let Some(effects) = state
882 .get_transaction_cache_reader()
883 .get_executed_effects(&tx_digest)
884 {
885 let effects_digest = effects.digest();
886 if let Ok(executed_data) = self.complete_executed_data(effects).await {
887 let executed_result = SubmitTxResult::Executed {
888 effects_digest,
889 details: Some(executed_data),
890 };
891 results[idx] = Some(executed_result);
892 continue;
893 }
894 }
895
896 debug!(?tx_digest, "Transaction rejected during submission: {e}");
898 metrics
899 .submission_rejected_transactions
900 .with_label_values(&[e.to_variant_name()])
901 .inc();
902 results[idx] = Some(SubmitTxResult::Rejected { error: e });
903 continue;
904 }
905 }
906
907 let mut claims = vec![];
909
910 let immutable_object_ids = self
911 .collect_immutable_object_ids(verified_transaction.tx(), state)
912 .await?;
913 if !immutable_object_ids.is_empty() {
914 claims.push(TransactionClaim::ImmutableInputObjects(
915 immutable_object_ids,
916 ));
917 }
918
919 let (tx, aliases) = verified_transaction.into_inner();
920 if epoch_store.protocol_config().address_aliases() {
921 if epoch_store
922 .protocol_config()
923 .fix_checkpoint_signature_mapping()
924 {
925 claims.push(TransactionClaim::AddressAliasesV2(aliases));
926 } else {
927 let v1_aliases: Vec<_> = tx
928 .data()
929 .intent_message()
930 .value
931 .required_signers()
932 .into_iter()
933 .zip_eq(aliases.into_iter().map(|(_, seq)| seq))
934 .collect();
935 #[allow(deprecated)]
936 claims.push(TransactionClaim::AddressAliases(
937 nonempty::NonEmpty::from_vec(v1_aliases)
938 .expect("must have at least one required_signer"),
939 ));
940 }
941 }
942
943 let tx_with_claims = TransactionWithClaims::new(tx.into(), claims);
944
945 consensus_transactions.push(ConsensusTransaction::new_user_transaction_v2_message(
946 &state.name,
947 tx_with_claims,
948 ));
949 if is_gasless {
950 metrics
951 .gasless_submission_outcomes
952 .with_label_values(&["submitted"])
953 .inc();
954 }
955
956 transaction_indexes.push(idx);
957 tx_digests.push(tx_digest);
958 total_size_bytes += tx_size;
959 }
960
961 if consensus_transactions.is_empty() && !is_ping_request {
962 return Ok((Self::try_from_submit_tx_response(results)?, spam_weight));
963 }
964
965 let max_transaction_bytes = if is_soft_bundle_request {
969 epoch_store
970 .protocol_config()
971 .consensus_max_transactions_in_block_bytes()
972 / 2
973 } else {
974 epoch_store
975 .protocol_config()
976 .consensus_max_transactions_in_block_bytes()
977 };
978 fp_ensure!(
979 total_size_bytes <= max_transaction_bytes as usize,
980 SuiErrorKind::UserInputError {
981 error: UserInputError::TotalTransactionSizeTooLargeInBatch {
982 size: total_size_bytes,
983 limit: max_transaction_bytes,
984 },
985 }
986 .into()
987 );
988
989 metrics
990 .handle_submit_transaction_bytes
991 .with_label_values(&[req_type])
992 .observe(total_size_bytes as f64);
993 metrics
994 .handle_submit_transaction_batch_size
995 .with_label_values(&[req_type])
996 .observe(consensus_transactions.len() as f64);
997
998 let _latency_metric_guard = metrics
999 .handle_submit_transaction_consensus_latency
1000 .with_label_values(&[req_type])
1001 .start_timer();
1002
1003 if is_soft_bundle_request {
1004 assert!(
1007 !consensus_transactions.is_empty(),
1008 "A valid soft bundle must have at least one transaction"
1009 );
1010 }
1011
1012 let tx_groups: Vec<Vec<ConsensusTransaction>> = if is_soft_bundle_request || is_ping_request
1015 {
1016 vec![consensus_transactions]
1017 } else {
1018 consensus_transactions
1019 .into_iter()
1020 .map(|t| vec![t])
1021 .collect()
1022 };
1023
1024 let consensus_positions: Vec<ConsensusPosition> = match submit_mode {
1025 AdmissionQueueSubmitMode::Bypass | AdmissionQueueSubmitMode::Disabled => {
1026 if matches!(submit_mode, AdmissionQueueSubmitMode::Bypass) {
1027 self.metrics.admission_queue_direct_bypasses.inc();
1028 }
1029 let futures = tx_groups.into_iter().map(|txns| {
1030 debug!(
1031 "handle_submit_transaction: submitting consensus transactions ({}): {}",
1032 req_type,
1033 txns.iter().map(|t| t.local_display()).join(", ")
1034 );
1035 self.consensus_adapter.submit_and_get_positions(
1036 txns,
1037 &epoch_store,
1038 submitter_client_addr,
1039 )
1040 });
1041 future::try_join_all(futures)
1042 .await?
1043 .into_iter()
1044 .flatten()
1045 .collect()
1046 }
1047 AdmissionQueueSubmitMode::Queue => {
1048 let aq = self
1049 .admission_queue
1050 .as_ref()
1051 .expect("Queue mode implies admission_queue is Some")
1052 .load();
1053 let mut receivers = Vec::with_capacity(tx_groups.len());
1054 for txns in tx_groups {
1055 let gas_price = Self::extract_gas_price(&txns);
1056 let (rx, newly_inserted) = aq
1057 .try_insert(gas_price, txns, submitter_client_addr)
1058 .await?;
1059 if !newly_inserted {
1060 spam_weight = Weight::one();
1062 }
1063 receivers.push(rx);
1064 }
1065 let results = future::try_join_all(receivers.into_iter().map(|rx| async move {
1066 rx.await.map_err(|_| {
1067 SuiError::from(SuiErrorKind::TooManyTransactionsPendingConsensus)
1068 })?
1069 }))
1070 .await?;
1071 results.into_iter().flatten().collect()
1072 }
1073 };
1074
1075 if is_ping_request {
1076 assert_eq!(consensus_positions.len(), 1);
1078 results.push(Some(SubmitTxResult::Submitted {
1079 consensus_position: consensus_positions[0],
1080 }));
1081 } else {
1082 for ((idx, tx_digest), consensus_position) in transaction_indexes
1084 .into_iter()
1085 .zip_debug_eq(tx_digests)
1086 .zip_debug_eq(consensus_positions)
1087 {
1088 debug!(
1089 ?tx_digest,
1090 "handle_submit_transaction: submitted consensus transaction at {}",
1091 consensus_position,
1092 );
1093 results[idx] = Some(SubmitTxResult::Submitted { consensus_position });
1094 }
1095 }
1096
1097 Ok((Self::try_from_submit_tx_response(results)?, spam_weight))
1098 }
1099
1100 fn try_from_submit_tx_response(
1101 results: Vec<Option<SubmitTxResult>>,
1102 ) -> Result<RawSubmitTxResponse, SuiError> {
1103 let mut raw_results = Vec::new();
1104 for (i, result) in results.into_iter().enumerate() {
1105 let result = result.ok_or_else(|| SuiErrorKind::GenericAuthorityError {
1106 error: format!("Missing transaction result at {}", i),
1107 })?;
1108 let raw_result = result.try_into()?;
1109 raw_results.push(raw_result);
1110 }
1111 Ok(RawSubmitTxResponse {
1112 results: raw_results,
1113 })
1114 }
1115
1116 fn extract_gas_price(transactions: &[ConsensusTransaction]) -> u64 {
1119 use sui_types::messages_consensus::ConsensusTransactionKind;
1120 transactions
1121 .iter()
1122 .filter_map(|tx| match &tx.kind {
1123 ConsensusTransactionKind::CertifiedTransaction(cert) => Some(cert.gas_price()),
1124 ConsensusTransactionKind::UserTransaction(t) => {
1125 Some(t.data().transaction_data().gas_price())
1126 }
1127 ConsensusTransactionKind::UserTransactionV2(t) => {
1128 Some(t.tx().data().transaction_data().gas_price())
1129 }
1130 _ => None,
1131 })
1132 .min()
1133 .unwrap_or(0)
1134 }
1135
1136 fn classify_submit_mode(&self, is_ping_request: bool) -> AdmissionQueueSubmitMode {
1137 let Some(aq) = &self.admission_queue else {
1138 return AdmissionQueueSubmitMode::Disabled;
1139 };
1140
1141 if is_ping_request {
1142 return AdmissionQueueSubmitMode::Bypass;
1143 }
1144
1145 let inflight = usize::try_from(self.consensus_adapter.num_inflight_transactions()).unwrap();
1146 if inflight < aq.bypass_threshold() {
1147 return AdmissionQueueSubmitMode::Bypass;
1148 }
1149
1150 if aq.load().failover_tripped() {
1153 return AdmissionQueueSubmitMode::Disabled;
1154 }
1155
1156 AdmissionQueueSubmitMode::Queue
1157 }
1158
1159 async fn collect_effects_data(
1160 &self,
1161 effects: &TransactionEffects,
1162 include_events: bool,
1163 include_input_objects: bool,
1164 include_output_objects: bool,
1165 ) -> SuiResult<(Option<TransactionEvents>, Vec<Object>, Vec<Object>)> {
1166 let events = if include_events && effects.events_digest().is_some() {
1167 Some(
1168 self.state
1169 .get_transaction_events(effects.transaction_digest())?,
1170 )
1171 } else {
1172 None
1173 };
1174
1175 let input_objects = if include_input_objects {
1176 self.state.get_transaction_input_objects(effects)?
1177 } else {
1178 vec![]
1179 };
1180
1181 let output_objects = if include_output_objects {
1182 self.state.get_transaction_output_objects(effects)?
1183 } else {
1184 vec![]
1185 };
1186
1187 Ok((events, input_objects, output_objects))
1188 }
1189}
1190
1191type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
1192
1193impl ValidatorService {
1194 async fn handle_submit_transaction_impl(
1195 &self,
1196 request: tonic::Request<RawSubmitTxRequest>,
1197 ) -> WrappedServiceResponse<RawSubmitTxResponse> {
1198 self.handle_submit_transaction(request).await
1199 }
1200
1201 async fn wait_for_effects_impl(
1202 &self,
1203 request: tonic::Request<RawWaitForEffectsRequest>,
1204 ) -> WrappedServiceResponse<RawWaitForEffectsResponse> {
1205 let request: WaitForEffectsRequest = request.into_inner().try_into()?;
1206 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1207 let response = timeout(
1208 Duration::from_secs(20),
1210 epoch_store
1211 .within_alive_epoch(self.wait_for_effects_response(request, &epoch_store))
1212 .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch())),
1213 )
1214 .await
1215 .map_err(|_| tonic::Status::internal("Timeout waiting for effects"))???
1216 .try_into()?;
1217 Ok((tonic::Response::new(response), Weight::zero()))
1218 }
1219
1220 #[instrument(name= "ValidatorService::wait_for_effects_response", level = "debug", skip_all, fields(consensus_position = ?request.consensus_position))]
1221 async fn wait_for_effects_response(
1222 &self,
1223 request: WaitForEffectsRequest,
1224 epoch_store: &Arc<AuthorityPerEpochStore>,
1225 ) -> SuiResult<WaitForEffectsResponse> {
1226 if request.ping_type.is_some() {
1227 return timeout(
1228 Duration::from_secs(10),
1229 self.ping_response(request, epoch_store),
1230 )
1231 .await
1232 .map_err(|_| SuiErrorKind::TimeoutError)?;
1233 }
1234
1235 let Some(tx_digest) = request.transaction_digest else {
1236 return Err(SuiErrorKind::InvalidRequest(
1237 "Transaction digest is required for wait for effects requests".to_string(),
1238 )
1239 .into());
1240 };
1241 let tx_digests = [tx_digest];
1242
1243 let consensus_status_future = async {
1247 let consensus_position = match request.consensus_position {
1248 Some(pos) => pos,
1249 None => return futures::future::pending().await,
1250 };
1251 let consensus_tx_status_cache = &epoch_store.consensus_tx_status_cache;
1252 consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1253 match consensus_tx_status_cache
1254 .notify_read_transaction_status(consensus_position)
1255 .await
1256 {
1257 NotifyReadConsensusTxStatusResult::Status(
1258 ConsensusTxStatus::Rejected | ConsensusTxStatus::Dropped,
1259 ) => Ok(WaitForEffectsResponse::Rejected {
1260 error: epoch_store.get_rejection_vote_reason(consensus_position),
1261 }),
1262 NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized) => {
1263 futures::future::pending().await
1265 }
1266 NotifyReadConsensusTxStatusResult::Expired(round) => {
1267 Ok(WaitForEffectsResponse::Expired {
1268 epoch: epoch_store.epoch(),
1269 round: Some(round),
1270 })
1271 }
1272 }
1273 };
1274
1275 tokio::select! {
1276 effects_result = self.state
1277 .get_transaction_cache_reader()
1278 .notify_read_executed_effects_may_fail(
1279 "AuthorityServer::wait_for_effects::notify_read_executed_effects_finalized",
1280 &tx_digests,
1281 ) => {
1282 let effects = effects_result?.pop().unwrap();
1283 let effects_digest = effects.digest();
1284 let details = if request.include_details {
1285 Some(self.complete_executed_data(effects).await?)
1286 } else {
1287 None
1288 };
1289 Ok(WaitForEffectsResponse::Executed {
1290 effects_digest,
1291 details,
1292 })
1293 }
1294 status_response = consensus_status_future => {
1295 status_response
1296 }
1297 }
1298 }
1299
1300 #[instrument(level = "error", skip_all, err(level = "debug"))]
1301 async fn ping_response(
1302 &self,
1303 request: WaitForEffectsRequest,
1304 epoch_store: &Arc<AuthorityPerEpochStore>,
1305 ) -> SuiResult<WaitForEffectsResponse> {
1306 let consensus_tx_status_cache = &epoch_store.consensus_tx_status_cache;
1307
1308 let Some(consensus_position) = request.consensus_position else {
1309 return Err(SuiErrorKind::InvalidRequest(
1310 "Consensus position is required for Ping requests".to_string(),
1311 )
1312 .into());
1313 };
1314
1315 let Some(ping) = request.ping_type else {
1317 return Err(SuiErrorKind::InvalidRequest(
1318 "Ping type is required for ping requests".to_string(),
1319 )
1320 .into());
1321 };
1322
1323 let _metrics_guard = self
1324 .metrics
1325 .handle_wait_for_effects_ping_latency
1326 .with_label_values(&[ping.as_str()])
1327 .start_timer();
1328
1329 consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1330
1331 let details = if request.include_details {
1332 Some(Box::new(ExecutedData::default()))
1333 } else {
1334 None
1335 };
1336
1337 let status = consensus_tx_status_cache
1338 .notify_read_transaction_status(consensus_position)
1339 .await;
1340 match status {
1341 NotifyReadConsensusTxStatusResult::Status(status) => match status {
1342 ConsensusTxStatus::Rejected | ConsensusTxStatus::Dropped => {
1343 Ok(WaitForEffectsResponse::Rejected {
1344 error: epoch_store.get_rejection_vote_reason(consensus_position),
1345 })
1346 }
1347 ConsensusTxStatus::Finalized => Ok(WaitForEffectsResponse::Executed {
1348 effects_digest: TransactionEffectsDigest::ZERO,
1349 details,
1350 }),
1351 },
1352 NotifyReadConsensusTxStatusResult::Expired(round) => {
1353 Ok(WaitForEffectsResponse::Expired {
1354 epoch: epoch_store.epoch(),
1355 round: Some(round),
1356 })
1357 }
1358 }
1359 }
1360
1361 async fn complete_executed_data(
1362 &self,
1363 effects: TransactionEffects,
1364 ) -> SuiResult<Box<ExecutedData>> {
1365 let (events, input_objects, output_objects) = self
1366 .collect_effects_data(
1367 &effects, true, true,
1368 true,
1369 )
1370 .await?;
1371 Ok(Box::new(ExecutedData {
1372 effects,
1373 events,
1374 input_objects,
1375 output_objects,
1376 }))
1377 }
1378
1379 async fn object_info_impl(
1380 &self,
1381 request: tonic::Request<ObjectInfoRequest>,
1382 ) -> WrappedServiceResponse<ObjectInfoResponse> {
1383 let request = request.into_inner();
1384 let response = self.state.handle_object_info_request(request).await?;
1385 Ok((tonic::Response::new(response), Weight::one()))
1386 }
1387
1388 async fn transaction_info_impl(
1389 &self,
1390 request: tonic::Request<TransactionInfoRequest>,
1391 ) -> WrappedServiceResponse<TransactionInfoResponse> {
1392 let request = request.into_inner();
1393 let response = self.state.handle_transaction_info_request(request).await?;
1394 Ok((tonic::Response::new(response), Weight::one()))
1395 }
1396
1397 async fn checkpoint_impl(
1398 &self,
1399 request: tonic::Request<CheckpointRequest>,
1400 ) -> WrappedServiceResponse<CheckpointResponse> {
1401 let request = request.into_inner();
1402 let response = self.state.handle_checkpoint_request(&request)?;
1403 Ok((tonic::Response::new(response), Weight::one()))
1404 }
1405
1406 async fn checkpoint_v2_impl(
1407 &self,
1408 request: tonic::Request<CheckpointRequestV2>,
1409 ) -> WrappedServiceResponse<CheckpointResponseV2> {
1410 let request = request.into_inner();
1411 let response = self.state.handle_checkpoint_request_v2(&request)?;
1412 Ok((tonic::Response::new(response), Weight::one()))
1413 }
1414
1415 async fn get_system_state_object_impl(
1416 &self,
1417 _request: tonic::Request<SystemStateRequest>,
1418 ) -> WrappedServiceResponse<SuiSystemState> {
1419 let response = self
1420 .state
1421 .get_object_cache_reader()
1422 .get_sui_system_state_object_unsafe()?;
1423 Ok((tonic::Response::new(response), Weight::one()))
1424 }
1425
1426 async fn validator_health_impl(
1427 &self,
1428 _request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1429 ) -> WrappedServiceResponse<sui_types::messages_grpc::RawValidatorHealthResponse> {
1430 let state = &self.state;
1431
1432 let epoch_store = state.load_epoch_store_one_call_per_task();
1434
1435 let num_inflight_execution_transactions =
1437 state.execution_scheduler().num_pending_certificates() as u64;
1438
1439 let num_inflight_consensus_transactions =
1441 self.consensus_adapter.num_inflight_transactions();
1442
1443 let last_committed_leader_round = epoch_store
1445 .consensus_tx_status_cache
1446 .get_last_committed_leader_round()
1447 .unwrap_or(0);
1448
1449 let last_locally_built_checkpoint = epoch_store
1451 .last_built_checkpoint_summary()
1452 .ok()
1453 .flatten()
1454 .map(|(_, summary)| summary.sequence_number)
1455 .unwrap_or(0);
1456
1457 let typed_response = sui_types::messages_grpc::ValidatorHealthResponse {
1458 num_inflight_consensus_transactions,
1459 num_inflight_execution_transactions,
1460 last_locally_built_checkpoint,
1461 last_committed_leader_round,
1462 };
1463
1464 let raw_response = typed_response
1465 .try_into()
1466 .map_err(|e: sui_types::error::SuiError| {
1467 tonic::Status::internal(format!("Failed to serialize health response: {}", e))
1468 })?;
1469
1470 Ok((tonic::Response::new(raw_response), Weight::one()))
1471 }
1472
1473 fn get_client_ip_addr<T>(
1474 &self,
1475 request: &tonic::Request<T>,
1476 source: &ClientIdSource,
1477 ) -> Option<IpAddr> {
1478 let forwarded_header = request.metadata().get_all("x-forwarded-for").iter().next();
1479
1480 if let Some(header) = forwarded_header {
1481 let num_hops = header
1482 .to_str()
1483 .map(|h| h.split(',').count().saturating_sub(1))
1484 .unwrap_or(0);
1485
1486 self.metrics.x_forwarded_for_num_hops.set(num_hops as f64);
1487 }
1488
1489 match source {
1490 ClientIdSource::SocketAddr => {
1491 let socket_addr: Option<SocketAddr> = request.remote_addr();
1492
1493 if let Some(socket_addr) = socket_addr {
1499 Some(socket_addr.ip())
1500 } else {
1501 if cfg!(msim) {
1502 } else if cfg!(test) {
1504 panic!("Failed to get remote address from request");
1505 } else {
1506 self.metrics.connection_ip_not_found.inc();
1507 error!("Failed to get remote address from request");
1508 }
1509 None
1510 }
1511 }
1512 ClientIdSource::XForwardedFor(num_hops) => {
1513 let do_header_parse = |op: &MetadataValue<Ascii>| {
1514 match op.to_str() {
1515 Ok(header_val) => {
1516 let header_contents =
1517 header_val.split(',').map(str::trim).collect::<Vec<_>>();
1518 if *num_hops == 0 {
1519 error!(
1520 "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
1521 number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
1522 to this node. Skipping traffic controller request handling.",
1523 header_contents,
1524 );
1525 return None;
1526 }
1527 let contents_len = header_contents.len();
1528 if contents_len < *num_hops {
1529 error!(
1530 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1531 Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
1532 `client-id-source` in the node config.",
1533 header_contents, contents_len, num_hops, contents_len,
1534 );
1535 self.metrics.client_id_source_config_mismatch.inc();
1536 return None;
1537 }
1538 let Some(client_ip) = header_contents.get(contents_len - num_hops)
1539 else {
1540 error!(
1541 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1542 Expected at least {} values. Skipping traffic controller request handling.",
1543 header_contents, contents_len, num_hops, contents_len,
1544 );
1545 return None;
1546 };
1547 parse_ip(client_ip).or_else(|| {
1548 self.metrics.forwarded_header_parse_error.inc();
1549 None
1550 })
1551 }
1552 Err(e) => {
1553 self.metrics.forwarded_header_invalid.inc();
1557 error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
1558 None
1559 }
1560 }
1561 };
1562 if let Some(op) = request.metadata().get("x-forwarded-for") {
1563 do_header_parse(op)
1564 } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
1565 do_header_parse(op)
1566 } else {
1567 self.metrics.forwarded_header_not_included.inc();
1568 error!(
1569 "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
1570 );
1571 None
1572 }
1573 }
1574 }
1575 }
1576
1577 async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
1578 if let Some(traffic_controller) = &self.traffic_controller {
1579 if !traffic_controller.check(&client, &None).await {
1580 Err(tonic::Status::from_error(
1582 SuiErrorKind::TooManyRequests.into(),
1583 ))
1584 } else {
1585 Ok(())
1586 }
1587 } else {
1588 Ok(())
1589 }
1590 }
1591
1592 fn handle_traffic_resp<T>(
1593 &self,
1594 client: Option<IpAddr>,
1595 wrapped_response: WrappedServiceResponse<T>,
1596 method_name: &str,
1597 ) -> Result<tonic::Response<T>, tonic::Status> {
1598 let (error, spam_weight, unwrapped_response) = match wrapped_response {
1599 Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
1600 Err(status) => (
1601 Some(SuiError::from(status.clone())),
1602 Weight::zero(),
1603 Err(status.clone()),
1604 ),
1605 };
1606
1607 if let Some(traffic_controller) = self.traffic_controller.clone() {
1608 traffic_controller.tally(TrafficTally {
1609 direct: client,
1610 through_fullnode: None,
1611 error_info: error.map(|e| {
1612 let error_type = String::from(e.clone().as_ref());
1613 let error_weight = normalize(e);
1614 (error_weight, error_type)
1615 }),
1616 spam_weight,
1617 timestamp: SystemTime::now(),
1618 method: Some(method_name.to_string()),
1619 })
1620 }
1621 unwrapped_response
1622 }
1623}
1624
1625fn normalize(err: SuiError) -> Weight {
1627 match err.as_inner() {
1628 SuiErrorKind::UserInputError {
1629 error: UserInputError::IncorrectUserSignature { .. },
1630 } => Weight::one(),
1631 SuiErrorKind::InvalidSignature { .. }
1632 | SuiErrorKind::SignerSignatureAbsent { .. }
1633 | SuiErrorKind::SignerSignatureNumberMismatch { .. }
1634 | SuiErrorKind::IncorrectSigner { .. }
1635 | SuiErrorKind::UnknownSigner { .. }
1636 | SuiErrorKind::WrongEpoch { .. } => Weight::one(),
1637 _ => Weight::zero(),
1638 }
1639}
1640
1641#[macro_export]
1645macro_rules! handle_with_decoration {
1646 ($self:ident, $func_name:ident, $request:ident, $method_name:expr) => {{
1647 if $self.client_id_source.is_none() {
1648 return $self.$func_name($request).await.map(|(result, _)| result);
1649 }
1650
1651 let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
1652
1653 $self.handle_traffic_req(client.clone()).await?;
1655
1656 let wrapped_response = $self.$func_name($request).await;
1658 $self.handle_traffic_resp(client, wrapped_response, $method_name)
1659 }};
1660}
1661
1662#[async_trait]
1663impl Validator for ValidatorService {
1664 async fn submit_transaction(
1665 &self,
1666 request: tonic::Request<RawSubmitTxRequest>,
1667 ) -> Result<tonic::Response<RawSubmitTxResponse>, tonic::Status> {
1668 let validator_service = self.clone();
1669
1670 spawn_monitored_task!(async move {
1673 handle_with_decoration!(
1676 validator_service,
1677 handle_submit_transaction_impl,
1678 request,
1679 "submit_transaction"
1680 )
1681 })
1682 .await
1683 .unwrap()
1684 }
1685
1686 async fn wait_for_effects(
1687 &self,
1688 request: tonic::Request<RawWaitForEffectsRequest>,
1689 ) -> Result<tonic::Response<RawWaitForEffectsResponse>, tonic::Status> {
1690 handle_with_decoration!(self, wait_for_effects_impl, request, "wait_for_effects")
1691 }
1692
1693 async fn object_info(
1694 &self,
1695 request: tonic::Request<ObjectInfoRequest>,
1696 ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
1697 handle_with_decoration!(self, object_info_impl, request, "object_info")
1698 }
1699
1700 async fn transaction_info(
1701 &self,
1702 request: tonic::Request<TransactionInfoRequest>,
1703 ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
1704 handle_with_decoration!(self, transaction_info_impl, request, "transaction_info")
1705 }
1706
1707 async fn checkpoint(
1708 &self,
1709 request: tonic::Request<CheckpointRequest>,
1710 ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
1711 handle_with_decoration!(self, checkpoint_impl, request, "checkpoint")
1712 }
1713
1714 async fn checkpoint_v2(
1715 &self,
1716 request: tonic::Request<CheckpointRequestV2>,
1717 ) -> Result<tonic::Response<CheckpointResponseV2>, tonic::Status> {
1718 handle_with_decoration!(self, checkpoint_v2_impl, request, "checkpoint_v2")
1719 }
1720
1721 async fn get_system_state_object(
1722 &self,
1723 request: tonic::Request<SystemStateRequest>,
1724 ) -> Result<tonic::Response<SuiSystemState>, tonic::Status> {
1725 handle_with_decoration!(
1726 self,
1727 get_system_state_object_impl,
1728 request,
1729 "get_system_state_object"
1730 )
1731 }
1732
1733 async fn validator_health(
1734 &self,
1735 request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1736 ) -> Result<tonic::Response<sui_types::messages_grpc::RawValidatorHealthResponse>, tonic::Status>
1737 {
1738 handle_with_decoration!(self, validator_health_impl, request, "validator_health")
1739 }
1740}