1use anyhow::Result;
6use async_trait::async_trait;
7use fastcrypto::traits::KeyPair;
8use futures::{TryFutureExt, future};
9use itertools::Itertools as _;
10use mysten_common::{assert_reachable, debug_fatal};
11use mysten_metrics::spawn_monitored_task;
12use prometheus::{
13 Gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, Registry,
14 register_gauge_with_registry, register_histogram_vec_with_registry,
15 register_histogram_with_registry, register_int_counter_vec_with_registry,
16 register_int_counter_with_registry,
17};
18use std::{
19 io,
20 net::{IpAddr, SocketAddr},
21 sync::Arc,
22 time::{Duration, SystemTime},
23};
24use sui_network::{
25 api::{Validator, ValidatorServer},
26 tonic,
27 validator::server::SUI_TLS_SERVER_NAME,
28};
29use sui_types::effects::TransactionEffectsAPI;
30use sui_types::message_envelope::Message;
31use sui_types::messages_consensus::ConsensusPosition;
32use sui_types::messages_consensus::ConsensusTransaction;
33use sui_types::messages_grpc::{
34 ObjectInfoRequest, ObjectInfoResponse, RawSubmitTxResponse, SystemStateRequest,
35 TransactionInfoRequest, TransactionInfoResponse,
36};
37use sui_types::multiaddr::Multiaddr;
38use sui_types::object::Object;
39use sui_types::sui_system_state::SuiSystemState;
40use sui_types::traffic_control::{ClientIdSource, Weight};
41use sui_types::{
42 base_types::ObjectID,
43 digests::TransactionEffectsDigest,
44 error::{SuiErrorKind, UserInputError},
45};
46use sui_types::{
47 effects::TransactionEffects,
48 messages_grpc::{
49 ExecutedData, RawSubmitTxRequest, RawWaitForEffectsRequest, RawWaitForEffectsResponse,
50 SubmitTxResult, WaitForEffectsRequest, WaitForEffectsResponse,
51 },
52};
53use sui_types::{effects::TransactionEvents, messages_grpc::SubmitTxType};
54use sui_types::{error::*, transaction::*};
55use sui_types::{
56 fp_ensure,
57 messages_checkpoint::{
58 CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
59 },
60};
61use tokio::sync::oneshot;
62use tokio::time::timeout;
63use tonic::metadata::{Ascii, MetadataValue};
64use tracing::{debug, error, info, instrument};
65
66use crate::consensus_adapter::ConnectionMonitorStatusForTests;
67use crate::{
68 authority::{AuthorityState, consensus_tx_status_cache::ConsensusTxStatus},
69 consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics},
70 traffic_controller::{TrafficController, parse_ip, policies::TrafficTally},
71};
72use crate::{
73 authority::{
74 authority_per_epoch_store::AuthorityPerEpochStore,
75 consensus_tx_status_cache::NotifyReadConsensusTxStatusResult,
76 },
77 checkpoints::CheckpointStore,
78 mysticeti_adapter::LazyMysticetiClient,
79};
80use sui_config::local_ip_utils::new_local_tcp_address_for_testing;
81
82#[cfg(test)]
83#[path = "unit_tests/server_tests.rs"]
84mod server_tests;
85
86#[cfg(test)]
87#[path = "unit_tests/wait_for_effects_tests.rs"]
88mod wait_for_effects_tests;
89
90#[cfg(test)]
91#[path = "unit_tests/submit_transaction_tests.rs"]
92mod submit_transaction_tests;
93
94pub struct AuthorityServerHandle {
95 server_handle: sui_network::validator::server::Server,
96}
97
98impl AuthorityServerHandle {
99 pub async fn join(self) -> Result<(), io::Error> {
100 self.server_handle.handle().wait_for_shutdown().await;
101 Ok(())
102 }
103
104 pub async fn kill(self) -> Result<(), io::Error> {
105 self.server_handle.handle().shutdown().await;
106 Ok(())
107 }
108
109 pub fn address(&self) -> &Multiaddr {
110 self.server_handle.local_addr()
111 }
112}
113
114pub struct AuthorityServer {
115 address: Multiaddr,
116 pub state: Arc<AuthorityState>,
117 consensus_adapter: Arc<ConsensusAdapter>,
118 pub metrics: Arc<ValidatorServiceMetrics>,
119}
120
121impl AuthorityServer {
122 pub fn new_for_test_with_consensus_adapter(
123 state: Arc<AuthorityState>,
124 consensus_adapter: Arc<ConsensusAdapter>,
125 ) -> Self {
126 let address = new_local_tcp_address_for_testing();
127 let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
128
129 Self {
130 address,
131 state,
132 consensus_adapter,
133 metrics,
134 }
135 }
136
137 pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
138 let consensus_adapter = Arc::new(ConsensusAdapter::new(
139 Arc::new(LazyMysticetiClient::new()),
140 CheckpointStore::new_for_tests(),
141 state.name,
142 Arc::new(ConnectionMonitorStatusForTests {}),
143 100_000,
144 100_000,
145 None,
146 None,
147 ConsensusAdapterMetrics::new_test(),
148 state.epoch_store_for_testing().protocol_config().clone(),
149 ));
150 Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
151 }
152
153 pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
154 let address = self.address.clone();
155 self.spawn_with_bind_address_for_test(address).await
156 }
157
158 pub async fn spawn_with_bind_address_for_test(
159 self,
160 address: Multiaddr,
161 ) -> Result<AuthorityServerHandle, io::Error> {
162 let tls_config = sui_tls::create_rustls_server_config(
163 self.state.config.network_key_pair().copy().private(),
164 SUI_TLS_SERVER_NAME.to_string(),
165 );
166 let config = mysten_network::config::Config::new();
167 let server = sui_network::validator::server::ServerBuilder::from_config(
168 &config,
169 mysten_network::metrics::DefaultMetricsCallbackProvider::default(),
170 )
171 .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
172 self.state,
173 self.consensus_adapter,
174 self.metrics,
175 )))
176 .bind(&address, Some(tls_config))
177 .await
178 .unwrap();
179 let local_addr = server.local_addr().to_owned();
180 info!("Listening to traffic on {local_addr}");
181 let handle = AuthorityServerHandle {
182 server_handle: server,
183 };
184 Ok(handle)
185 }
186}
187
188pub struct ValidatorServiceMetrics {
189 pub signature_errors: IntCounter,
190 pub tx_verification_latency: Histogram,
191 pub cert_verification_latency: Histogram,
192 pub consensus_latency: Histogram,
193 pub handle_transaction_latency: Histogram,
194 pub submit_certificate_consensus_latency: Histogram,
195 pub handle_certificate_consensus_latency: Histogram,
196 pub handle_certificate_non_consensus_latency: Histogram,
197 pub handle_soft_bundle_certificates_consensus_latency: Histogram,
198 pub handle_soft_bundle_certificates_count: Histogram,
199 pub handle_soft_bundle_certificates_size_bytes: Histogram,
200 pub handle_transaction_consensus_latency: Histogram,
201 pub handle_submit_transaction_consensus_latency: HistogramVec,
202 pub handle_wait_for_effects_ping_latency: HistogramVec,
203
204 handle_submit_transaction_latency: HistogramVec,
205 handle_submit_transaction_bytes: HistogramVec,
206 handle_submit_transaction_batch_size: HistogramVec,
207
208 num_rejected_cert_in_epoch_boundary: IntCounter,
209 num_rejected_tx_during_overload: IntCounterVec,
210 submission_rejected_transactions: IntCounterVec,
211 connection_ip_not_found: IntCounter,
212 forwarded_header_parse_error: IntCounter,
213 forwarded_header_invalid: IntCounter,
214 forwarded_header_not_included: IntCounter,
215 client_id_source_config_mismatch: IntCounter,
216 x_forwarded_for_num_hops: Gauge,
217}
218
219impl ValidatorServiceMetrics {
220 pub fn new(registry: &Registry) -> Self {
221 Self {
222 signature_errors: register_int_counter_with_registry!(
223 "total_signature_errors",
224 "Number of transaction signature errors",
225 registry,
226 )
227 .unwrap(),
228 tx_verification_latency: register_histogram_with_registry!(
229 "validator_service_tx_verification_latency",
230 "Latency of verifying a transaction",
231 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
232 registry,
233 )
234 .unwrap(),
235 cert_verification_latency: register_histogram_with_registry!(
236 "validator_service_cert_verification_latency",
237 "Latency of verifying a certificate",
238 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
239 registry,
240 )
241 .unwrap(),
242 consensus_latency: register_histogram_with_registry!(
243 "validator_service_consensus_latency",
244 "Time spent between submitting a txn to consensus and getting back local acknowledgement. Execution and finalization time are not included.",
245 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
246 registry,
247 )
248 .unwrap(),
249 handle_transaction_latency: register_histogram_with_registry!(
250 "validator_service_handle_transaction_latency",
251 "Latency of handling a transaction",
252 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
253 registry,
254 )
255 .unwrap(),
256 handle_certificate_consensus_latency: register_histogram_with_registry!(
257 "validator_service_handle_certificate_consensus_latency",
258 "Latency of handling a consensus transaction certificate",
259 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
260 registry,
261 )
262 .unwrap(),
263 submit_certificate_consensus_latency: register_histogram_with_registry!(
264 "validator_service_submit_certificate_consensus_latency",
265 "Latency of submit_certificate RPC handler",
266 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
267 registry,
268 )
269 .unwrap(),
270 handle_certificate_non_consensus_latency: register_histogram_with_registry!(
271 "validator_service_handle_certificate_non_consensus_latency",
272 "Latency of handling a non-consensus transaction certificate",
273 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
274 registry,
275 )
276 .unwrap(),
277 handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
278 "validator_service_handle_soft_bundle_certificates_consensus_latency",
279 "Latency of handling a consensus soft bundle",
280 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
281 registry,
282 )
283 .unwrap(),
284 handle_soft_bundle_certificates_count: register_histogram_with_registry!(
285 "handle_soft_bundle_certificates_count",
286 "The number of certificates included in a soft bundle",
287 mysten_metrics::COUNT_BUCKETS.to_vec(),
288 registry,
289 )
290 .unwrap(),
291 handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
292 "handle_soft_bundle_certificates_size_bytes",
293 "The size of soft bundle in bytes",
294 mysten_metrics::BYTES_BUCKETS.to_vec(),
295 registry,
296 )
297 .unwrap(),
298 handle_transaction_consensus_latency: register_histogram_with_registry!(
299 "validator_service_handle_transaction_consensus_latency",
300 "Latency of handling a user transaction sent through consensus",
301 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
302 registry,
303 )
304 .unwrap(),
305 handle_submit_transaction_consensus_latency: register_histogram_vec_with_registry!(
306 "validator_service_submit_transaction_consensus_latency",
307 "Latency of submitting a user transaction sent through consensus",
308 &["req_type"],
309 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
310 registry,
311 )
312 .unwrap(),
313 handle_submit_transaction_latency: register_histogram_vec_with_registry!(
314 "validator_service_submit_transaction_latency",
315 "Latency of submit transaction handler",
316 &["req_type"],
317 mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
318 registry,
319 )
320 .unwrap(),
321 handle_wait_for_effects_ping_latency: register_histogram_vec_with_registry!(
322 "validator_service_handle_wait_for_effects_ping_latency",
323 "Latency of handling a ping request for wait_for_effects",
324 &["req_type"],
325 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
326 registry,
327 )
328 .unwrap(),
329 handle_submit_transaction_bytes: register_histogram_vec_with_registry!(
330 "validator_service_submit_transaction_bytes",
331 "The size of transactions in the submit transaction request",
332 &["req_type"],
333 mysten_metrics::BYTES_BUCKETS.to_vec(),
334 registry,
335 )
336 .unwrap(),
337 handle_submit_transaction_batch_size: register_histogram_vec_with_registry!(
338 "validator_service_submit_transaction_batch_size",
339 "The number of transactions in the submit transaction request",
340 &["req_type"],
341 mysten_metrics::COUNT_BUCKETS.to_vec(),
342 registry,
343 )
344 .unwrap(),
345 num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
346 "validator_service_num_rejected_cert_in_epoch_boundary",
347 "Number of rejected transaction certificate during epoch transitioning",
348 registry,
349 )
350 .unwrap(),
351 num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
352 "validator_service_num_rejected_tx_during_overload",
353 "Number of rejected transaction due to system overload",
354 &["error_type"],
355 registry,
356 )
357 .unwrap(),
358 submission_rejected_transactions: register_int_counter_vec_with_registry!(
359 "validator_service_submission_rejected_transactions",
360 "Number of transactions rejected during submission",
361 &["reason"],
362 registry,
363 )
364 .unwrap(),
365 connection_ip_not_found: register_int_counter_with_registry!(
366 "validator_service_connection_ip_not_found",
367 "Number of times connection IP was not extractable from request",
368 registry,
369 )
370 .unwrap(),
371 forwarded_header_parse_error: register_int_counter_with_registry!(
372 "validator_service_forwarded_header_parse_error",
373 "Number of times x-forwarded-for header could not be parsed",
374 registry,
375 )
376 .unwrap(),
377 forwarded_header_invalid: register_int_counter_with_registry!(
378 "validator_service_forwarded_header_invalid",
379 "Number of times x-forwarded-for header was invalid",
380 registry,
381 )
382 .unwrap(),
383 forwarded_header_not_included: register_int_counter_with_registry!(
384 "validator_service_forwarded_header_not_included",
385 "Number of times x-forwarded-for header was (unexpectedly) not included in request",
386 registry,
387 )
388 .unwrap(),
389 client_id_source_config_mismatch: register_int_counter_with_registry!(
390 "validator_service_client_id_source_config_mismatch",
391 "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
392 registry,
393 )
394 .unwrap(),
395 x_forwarded_for_num_hops: register_gauge_with_registry!(
396 "validator_service_x_forwarded_for_num_hops",
397 "Number of hops in x-forwarded-for header",
398 registry,
399 )
400 .unwrap(),
401 }
402 }
403
404 pub fn new_for_tests() -> Self {
405 let registry = Registry::new();
406 Self::new(®istry)
407 }
408}
409
410#[derive(Clone)]
411pub struct ValidatorService {
412 state: Arc<AuthorityState>,
413 consensus_adapter: Arc<ConsensusAdapter>,
414 metrics: Arc<ValidatorServiceMetrics>,
415 traffic_controller: Option<Arc<TrafficController>>,
416 client_id_source: Option<ClientIdSource>,
417}
418
419impl ValidatorService {
420 pub fn new(
421 state: Arc<AuthorityState>,
422 consensus_adapter: Arc<ConsensusAdapter>,
423 validator_metrics: Arc<ValidatorServiceMetrics>,
424 client_id_source: Option<ClientIdSource>,
425 ) -> Self {
426 let traffic_controller = state.traffic_controller.clone();
427 Self {
428 state,
429 consensus_adapter,
430 metrics: validator_metrics,
431 traffic_controller,
432 client_id_source,
433 }
434 }
435
436 pub fn new_for_tests(
437 state: Arc<AuthorityState>,
438 consensus_adapter: Arc<ConsensusAdapter>,
439 metrics: Arc<ValidatorServiceMetrics>,
440 ) -> Self {
441 Self {
442 state,
443 consensus_adapter,
444 metrics,
445 traffic_controller: None,
446 client_id_source: None,
447 }
448 }
449
450 pub fn validator_state(&self) -> &Arc<AuthorityState> {
451 &self.state
452 }
453
454 pub fn handle_transaction_for_testing(&self, transaction: Transaction) -> SuiResult<()> {
456 let epoch_store = self.state.load_epoch_store_one_call_per_task();
457
458 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
460
461 let transaction = epoch_store
463 .verify_transaction_require_no_aliases(transaction)?
464 .into_tx();
465
466 self.state
468 .handle_vote_transaction(&epoch_store, transaction)?;
469
470 Ok(())
471 }
472
473 pub fn handle_transaction_for_testing_with_overload_check(
476 &self,
477 transaction: Transaction,
478 ) -> SuiResult<()> {
479 let epoch_store = self.state.load_epoch_store_one_call_per_task();
480
481 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
483
484 self.state.check_system_overload(
486 self.consensus_adapter.as_ref(),
487 transaction.data(),
488 self.state.check_system_overload_at_signing(),
489 )?;
490
491 let transaction = epoch_store
493 .verify_transaction_require_no_aliases(transaction)?
494 .into_tx();
495
496 self.state
498 .handle_vote_transaction(&epoch_store, transaction)?;
499
500 Ok(())
501 }
502
503 async fn collect_immutable_object_ids(
506 &self,
507 tx: &VerifiedTransaction,
508 state: &AuthorityState,
509 ) -> SuiResult<Vec<ObjectID>> {
510 let input_objects = tx.data().transaction_data().input_objects()?;
511
512 let object_ids: Vec<ObjectID> = input_objects
514 .iter()
515 .filter_map(|obj| match obj {
516 InputObjectKind::ImmOrOwnedMoveObject((id, _, _)) => Some(*id),
517 _ => None,
518 })
519 .collect();
520 if object_ids.is_empty() {
521 return Ok(vec![]);
522 }
523
524 let objects = state.get_object_cache_reader().get_objects(&object_ids);
526
527 objects
529 .into_iter()
530 .zip(object_ids.iter())
531 .filter_map(|(obj, id)| {
532 let Some(o) = obj else {
533 return Some(Err::<ObjectID, SuiError>(
534 SuiErrorKind::UserInputError {
535 error: UserInputError::ObjectNotFound {
536 object_id: *id,
537 version: None,
538 },
539 }
540 .into(),
541 ));
542 };
543 if o.is_immutable() {
544 Some(Ok(*id))
545 } else {
546 None
547 }
548 })
549 .collect::<SuiResult<Vec<ObjectID>>>()
550 }
551
552 #[instrument(
553 name = "ValidatorService::handle_submit_transaction",
554 level = "error",
555 skip_all,
556 err(level = "debug")
557 )]
558 async fn handle_submit_transaction(
559 &self,
560 request: tonic::Request<RawSubmitTxRequest>,
561 ) -> WrappedServiceResponse<RawSubmitTxResponse> {
562 let Self {
563 state,
564 consensus_adapter,
565 metrics,
566 traffic_controller: _,
567 client_id_source,
568 } = self.clone();
569
570 let submitter_client_addr = if let Some(client_id_source) = &client_id_source {
571 self.get_client_ip_addr(&request, client_id_source)
572 } else {
573 self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
574 };
575
576 let inner = request.into_inner();
577 let start_epoch = state.load_epoch_store_one_call_per_task().epoch();
578
579 let next_epoch = start_epoch + 1;
580 let mut max_retries = 1;
581
582 loop {
583 let res = self
584 .handle_submit_transaction_inner(
585 &state,
586 &consensus_adapter,
587 &metrics,
588 &inner,
589 submitter_client_addr,
590 )
591 .await;
592 match res {
593 Ok((response, weight)) => return Ok((tonic::Response::new(response), weight)),
594 Err(err) => {
595 if max_retries > 0
596 && let SuiErrorKind::ValidatorHaltedAtEpochEnd = err.as_inner()
597 {
598 max_retries -= 1;
599
600 debug!(
601 "ValidatorHaltedAtEpochEnd. Will retry after validator reconfigures"
602 );
603
604 if let Ok(Ok(new_epoch)) =
605 timeout(Duration::from_secs(15), state.wait_for_epoch(next_epoch)).await
606 {
607 assert_reachable!("retry submission at epoch end");
608 if new_epoch == next_epoch {
609 continue;
610 }
611
612 debug_fatal!(
613 "expected epoch {} after reconfiguration. got {}",
614 next_epoch,
615 new_epoch
616 );
617 }
618 }
619 return Err(err.into());
620 }
621 }
622 }
623 }
624
625 async fn handle_submit_transaction_inner(
626 &self,
627 state: &AuthorityState,
628 consensus_adapter: &ConsensusAdapter,
629 metrics: &ValidatorServiceMetrics,
630 request: &RawSubmitTxRequest,
631 submitter_client_addr: Option<IpAddr>,
632 ) -> SuiResult<(RawSubmitTxResponse, Weight)> {
633 let epoch_store = state.load_epoch_store_one_call_per_task();
634 if !epoch_store.protocol_config().mysticeti_fastpath() {
635 return Err(SuiErrorKind::UnsupportedFeatureError {
636 error: "Mysticeti fastpath".to_string(),
637 }
638 .into());
639 }
640
641 let submit_type = SubmitTxType::try_from(request.submit_type).map_err(|e| {
642 SuiErrorKind::GrpcMessageDeserializeError {
643 type_info: "RawSubmitTxRequest.submit_type".to_string(),
644 error: e.to_string(),
645 }
646 })?;
647
648 let is_ping_request = submit_type == SubmitTxType::Ping;
649 if is_ping_request {
650 fp_ensure!(
651 request.transactions.is_empty(),
652 SuiErrorKind::InvalidRequest(format!(
653 "Ping request cannot contain {} transactions",
654 request.transactions.len()
655 ))
656 .into()
657 );
658 } else {
659 fp_ensure!(
661 !request.transactions.is_empty(),
662 SuiErrorKind::InvalidRequest(
663 "At least one transaction needs to be submitted".to_string(),
664 )
665 .into()
666 );
667 }
668
669 let is_soft_bundle_request = submit_type == SubmitTxType::SoftBundle;
674
675 let max_num_transactions = if is_soft_bundle_request {
676 epoch_store.protocol_config().max_soft_bundle_size()
679 } else {
680 epoch_store
682 .protocol_config()
683 .max_num_transactions_in_block()
684 };
685 fp_ensure!(
686 request.transactions.len() <= max_num_transactions as usize,
687 SuiErrorKind::InvalidRequest(format!(
688 "Too many transactions in request: {} vs {}",
689 request.transactions.len(),
690 max_num_transactions
691 ))
692 .into()
693 );
694
695 let mut tx_digests = Vec::with_capacity(request.transactions.len());
697 let mut consensus_transactions = Vec::with_capacity(request.transactions.len());
699 let mut transaction_indexes = Vec::with_capacity(request.transactions.len());
701 let mut results: Vec<Option<SubmitTxResult>> = vec![None; request.transactions.len()];
703 let mut total_size_bytes = 0;
705 let mut spam_weight = Weight::zero();
707
708 let req_type = if is_ping_request {
709 "ping"
710 } else if request.transactions.len() == 1 {
711 "single_transaction"
712 } else if is_soft_bundle_request {
713 "soft_bundle"
714 } else {
715 "batch"
716 };
717
718 let _handle_tx_metrics_guard = metrics
719 .handle_submit_transaction_latency
720 .with_label_values(&[req_type])
721 .start_timer();
722
723 for (idx, tx_bytes) in request.transactions.iter().enumerate() {
724 let transaction = match bcs::from_bytes::<Transaction>(tx_bytes) {
725 Ok(txn) => txn,
726 Err(e) => {
727 return Err(SuiErrorKind::TransactionDeserializationError {
729 error: format!("Failed to deserialize transaction at index {}: {}", idx, e),
730 }
731 .into());
732 }
733 };
734
735 let tx_size = transaction.validity_check(&epoch_store.tx_validity_check_context())?;
737
738 if transaction
739 .data()
740 .transaction_data()
741 .is_gasless_transaction()
742 {
743 spam_weight = Weight::one();
745 }
746
747 let overload_check_res = state.check_system_overload(
748 consensus_adapter,
749 transaction.data(),
750 state.check_system_overload_at_signing(),
751 );
752 if let Err(error) = overload_check_res {
753 metrics
754 .num_rejected_tx_during_overload
755 .with_label_values(&[error.as_ref()])
756 .inc();
757 results[idx] = Some(SubmitTxResult::Rejected { error });
758 continue;
759 }
760
761 let verified_transaction = {
763 let _metrics_guard = metrics.tx_verification_latency.start_timer();
764 if epoch_store.protocol_config().address_aliases() {
765 match epoch_store.verify_transaction_with_current_aliases(transaction) {
766 Ok(tx) => tx,
767 Err(e) => {
768 metrics.signature_errors.inc();
769 return Err(e);
770 }
771 }
772 } else {
773 match epoch_store.verify_transaction_require_no_aliases(transaction) {
774 Ok(tx) => tx,
775 Err(e) => {
776 metrics.signature_errors.inc();
777 return Err(e);
778 }
779 }
780 }
781 };
782
783 let tx_digest = verified_transaction.tx().digest();
784 tx_digests.push(*tx_digest);
785
786 debug!(
787 ?tx_digest,
788 "handle_submit_transaction: verified transaction"
789 );
790
791 if let Some(effects) = state
794 .get_transaction_cache_reader()
795 .get_executed_effects(tx_digest)
796 {
797 let effects_digest = effects.digest();
798 if let Ok(executed_data) = self.complete_executed_data(effects).await {
799 let executed_result = SubmitTxResult::Executed {
800 effects_digest,
801 details: Some(executed_data),
802 };
803 results[idx] = Some(executed_result);
804 debug!(?tx_digest, "handle_submit_transaction: already executed");
805 continue;
806 }
807 }
808
809 if self
810 .state
811 .get_transaction_cache_reader()
812 .transaction_executed_in_last_epoch(tx_digest, epoch_store.epoch())
813 {
814 results[idx] = Some(SubmitTxResult::Rejected {
815 error: UserInputError::TransactionAlreadyExecuted { digest: *tx_digest }.into(),
816 });
817 debug!(
818 ?tx_digest,
819 "handle_submit_transaction: transaction already executed in previous epoch"
820 );
821 continue;
822 }
823
824 debug!(
825 ?tx_digest,
826 "handle_submit_transaction: waiting for fastpath dependency objects"
827 );
828 if !state
829 .wait_for_fastpath_dependency_objects(
830 verified_transaction.tx(),
831 epoch_store.epoch(),
832 )
833 .await?
834 {
835 debug!(
836 ?tx_digest,
837 "fastpath input objects are still unavailable after waiting"
838 );
839 }
840
841 match state.handle_vote_transaction(&epoch_store, verified_transaction.tx().clone()) {
842 Ok(_) => { }
843 Err(e) => {
844 if let Some(effects) = state
847 .get_transaction_cache_reader()
848 .get_executed_effects(tx_digest)
849 {
850 let effects_digest = effects.digest();
851 if let Ok(executed_data) = self.complete_executed_data(effects).await {
852 let executed_result = SubmitTxResult::Executed {
853 effects_digest,
854 details: Some(executed_data),
855 };
856 results[idx] = Some(executed_result);
857 continue;
858 }
859 }
860
861 debug!(?tx_digest, "Transaction rejected during submission: {e}");
863 metrics
864 .submission_rejected_transactions
865 .with_label_values(&[e.to_variant_name()])
866 .inc();
867 results[idx] = Some(SubmitTxResult::Rejected { error: e });
868 continue;
869 }
870 }
871
872 let mut claims = vec![];
874
875 let immutable_object_ids = self
876 .collect_immutable_object_ids(verified_transaction.tx(), state)
877 .await?;
878 if !immutable_object_ids.is_empty() {
879 claims.push(TransactionClaim::ImmutableInputObjects(
880 immutable_object_ids,
881 ));
882 }
883
884 let (tx, aliases) = verified_transaction.into_inner();
885 if epoch_store.protocol_config().address_aliases() {
886 if epoch_store
887 .protocol_config()
888 .fix_checkpoint_signature_mapping()
889 {
890 claims.push(TransactionClaim::AddressAliasesV2(aliases));
891 } else {
892 let v1_aliases: Vec<_> = tx
893 .data()
894 .intent_message()
895 .value
896 .required_signers()
897 .into_iter()
898 .zip_eq(aliases.into_iter().map(|(_, seq)| seq))
899 .collect();
900 #[allow(deprecated)]
901 claims.push(TransactionClaim::AddressAliases(
902 nonempty::NonEmpty::from_vec(v1_aliases)
903 .expect("must have at least one required_signer"),
904 ));
905 }
906 }
907
908 let tx_with_claims = TransactionWithClaims::new(tx.into(), claims);
909
910 consensus_transactions.push(ConsensusTransaction::new_user_transaction_v2_message(
911 &state.name,
912 tx_with_claims,
913 ));
914
915 transaction_indexes.push(idx);
916 total_size_bytes += tx_size;
917 }
918
919 if consensus_transactions.is_empty() && !is_ping_request {
920 return Ok((Self::try_from_submit_tx_response(results)?, spam_weight));
921 }
922
923 let max_transaction_bytes = if is_soft_bundle_request {
927 epoch_store
928 .protocol_config()
929 .consensus_max_transactions_in_block_bytes()
930 / 2
931 } else {
932 epoch_store
933 .protocol_config()
934 .consensus_max_transactions_in_block_bytes()
935 };
936 fp_ensure!(
937 total_size_bytes <= max_transaction_bytes as usize,
938 SuiErrorKind::UserInputError {
939 error: UserInputError::TotalTransactionSizeTooLargeInBatch {
940 size: total_size_bytes,
941 limit: max_transaction_bytes,
942 },
943 }
944 .into()
945 );
946
947 metrics
948 .handle_submit_transaction_bytes
949 .with_label_values(&[req_type])
950 .observe(total_size_bytes as f64);
951 metrics
952 .handle_submit_transaction_batch_size
953 .with_label_values(&[req_type])
954 .observe(consensus_transactions.len() as f64);
955
956 let _latency_metric_guard = metrics
957 .handle_submit_transaction_consensus_latency
958 .with_label_values(&[req_type])
959 .start_timer();
960
961 let consensus_positions = if is_soft_bundle_request || is_ping_request {
962 assert!(
965 is_ping_request || !consensus_transactions.is_empty(),
966 "A valid soft bundle must have at least one transaction"
967 );
968 debug!(
969 "handle_submit_transaction: submitting consensus transactions ({}): {}",
970 req_type,
971 consensus_transactions
972 .iter()
973 .map(|t| t.local_display())
974 .join(", ")
975 );
976 self.handle_submit_to_consensus_for_position(
977 consensus_transactions,
978 &epoch_store,
979 submitter_client_addr,
980 )
981 .await?
982 } else {
983 let futures = consensus_transactions.into_iter().map(|t| {
984 debug!(
985 "handle_submit_transaction: submitting consensus transaction ({}): {}",
986 req_type,
987 t.local_display(),
988 );
989 self.handle_submit_to_consensus_for_position(
990 vec![t],
991 &epoch_store,
992 submitter_client_addr,
993 )
994 });
995 future::try_join_all(futures)
996 .await?
997 .into_iter()
998 .flatten()
999 .collect()
1000 };
1001
1002 if is_ping_request {
1003 assert_eq!(consensus_positions.len(), 1);
1005 results.push(Some(SubmitTxResult::Submitted {
1006 consensus_position: consensus_positions[0],
1007 }));
1008 } else {
1009 for ((idx, tx_digest), consensus_position) in transaction_indexes
1011 .into_iter()
1012 .zip(tx_digests)
1013 .zip(consensus_positions)
1014 {
1015 debug!(
1016 ?tx_digest,
1017 "handle_submit_transaction: submitted consensus transaction at {}",
1018 consensus_position,
1019 );
1020 results[idx] = Some(SubmitTxResult::Submitted { consensus_position });
1021 }
1022 }
1023
1024 Ok((Self::try_from_submit_tx_response(results)?, spam_weight))
1025 }
1026
1027 fn try_from_submit_tx_response(
1028 results: Vec<Option<SubmitTxResult>>,
1029 ) -> Result<RawSubmitTxResponse, SuiError> {
1030 let mut raw_results = Vec::new();
1031 for (i, result) in results.into_iter().enumerate() {
1032 let result = result.ok_or_else(|| SuiErrorKind::GenericAuthorityError {
1033 error: format!("Missing transaction result at {}", i),
1034 })?;
1035 let raw_result = result.try_into()?;
1036 raw_results.push(raw_result);
1037 }
1038 Ok(RawSubmitTxResponse {
1039 results: raw_results,
1040 })
1041 }
1042
1043 #[instrument(
1044 name = "ValidatorService::handle_submit_to_consensus_for_position",
1045 level = "debug",
1046 skip_all,
1047 err(level = "debug")
1048 )]
1049 async fn handle_submit_to_consensus_for_position(
1050 &self,
1051 consensus_transactions: Vec<ConsensusTransaction>,
1053 epoch_store: &Arc<AuthorityPerEpochStore>,
1054 submitter_client_addr: Option<IpAddr>,
1055 ) -> Result<Vec<ConsensusPosition>, tonic::Status> {
1056 let (tx_consensus_positions, rx_consensus_positions) = oneshot::channel();
1057
1058 {
1059 let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
1061 if !reconfiguration_lock.should_accept_user_certs() {
1062 self.metrics.num_rejected_cert_in_epoch_boundary.inc();
1063 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1064 }
1065
1066 let _metrics_guard = self.metrics.consensus_latency.start_timer();
1070
1071 self.consensus_adapter.submit_batch(
1072 &consensus_transactions,
1073 Some(&reconfiguration_lock),
1074 epoch_store,
1075 Some(tx_consensus_positions),
1076 submitter_client_addr,
1077 )?;
1078 }
1079
1080 Ok(rx_consensus_positions.await.map_err(|e| {
1081 SuiErrorKind::FailedToSubmitToConsensus(format!(
1082 "Failed to get consensus position: {e}"
1083 ))
1084 })?)
1085 }
1086
1087 async fn collect_effects_data(
1088 &self,
1089 effects: &TransactionEffects,
1090 include_events: bool,
1091 include_input_objects: bool,
1092 include_output_objects: bool,
1093 ) -> SuiResult<(Option<TransactionEvents>, Vec<Object>, Vec<Object>)> {
1094 let events = if include_events && effects.events_digest().is_some() {
1095 Some(
1096 self.state
1097 .get_transaction_events(effects.transaction_digest())?,
1098 )
1099 } else {
1100 None
1101 };
1102
1103 let input_objects = if include_input_objects {
1104 self.state.get_transaction_input_objects(effects)?
1105 } else {
1106 vec![]
1107 };
1108
1109 let output_objects = if include_output_objects {
1110 self.state.get_transaction_output_objects(effects)?
1111 } else {
1112 vec![]
1113 };
1114
1115 Ok((events, input_objects, output_objects))
1116 }
1117}
1118
1119type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
1120
1121impl ValidatorService {
1122 async fn handle_submit_transaction_impl(
1123 &self,
1124 request: tonic::Request<RawSubmitTxRequest>,
1125 ) -> WrappedServiceResponse<RawSubmitTxResponse> {
1126 self.handle_submit_transaction(request).await
1127 }
1128
1129 async fn wait_for_effects_impl(
1130 &self,
1131 request: tonic::Request<RawWaitForEffectsRequest>,
1132 ) -> WrappedServiceResponse<RawWaitForEffectsResponse> {
1133 let request: WaitForEffectsRequest = request.into_inner().try_into()?;
1134 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1135 let response = timeout(
1136 Duration::from_secs(20),
1138 epoch_store
1139 .within_alive_epoch(self.wait_for_effects_response(request, &epoch_store))
1140 .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch())),
1141 )
1142 .await
1143 .map_err(|_| tonic::Status::internal("Timeout waiting for effects"))???
1144 .try_into()?;
1145 Ok((tonic::Response::new(response), Weight::zero()))
1146 }
1147
1148 #[instrument(name= "ValidatorService::wait_for_effects_response", level = "error", skip_all, fields(consensus_position = ?request.consensus_position))]
1149 async fn wait_for_effects_response(
1150 &self,
1151 request: WaitForEffectsRequest,
1152 epoch_store: &Arc<AuthorityPerEpochStore>,
1153 ) -> SuiResult<WaitForEffectsResponse> {
1154 if request.ping_type.is_some() {
1155 return timeout(
1156 Duration::from_secs(10),
1157 self.ping_response(request, epoch_store),
1158 )
1159 .await
1160 .map_err(|_| SuiErrorKind::TimeoutError)?;
1161 }
1162
1163 let Some(tx_digest) = request.transaction_digest else {
1164 return Err(SuiErrorKind::InvalidRequest(
1165 "Transaction digest is required for wait for effects requests".to_string(),
1166 )
1167 .into());
1168 };
1169 let tx_digests = [tx_digest];
1170
1171 let consensus_status_future = async {
1175 let consensus_position = match request.consensus_position {
1176 Some(pos) => pos,
1177 None => return futures::future::pending().await,
1178 };
1179 let consensus_tx_status_cache = epoch_store.consensus_tx_status_cache.as_ref().ok_or(
1180 SuiErrorKind::UnsupportedFeatureError {
1181 error: "Consensus tx status cache".to_string(),
1182 },
1183 )?;
1184 consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1185 match consensus_tx_status_cache
1186 .notify_read_transaction_status(consensus_position)
1187 .await
1188 {
1189 NotifyReadConsensusTxStatusResult::Status(
1190 ConsensusTxStatus::Rejected | ConsensusTxStatus::Dropped,
1191 ) => Ok(WaitForEffectsResponse::Rejected {
1192 error: epoch_store.get_rejection_vote_reason(consensus_position),
1193 }),
1194 NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized) => {
1195 futures::future::pending().await
1197 }
1198 NotifyReadConsensusTxStatusResult::Expired(round) => {
1199 Ok(WaitForEffectsResponse::Expired {
1200 epoch: epoch_store.epoch(),
1201 round: Some(round),
1202 })
1203 }
1204 }
1205 };
1206
1207 tokio::select! {
1208 effects_result = self.state
1209 .get_transaction_cache_reader()
1210 .notify_read_executed_effects_may_fail(
1211 "AuthorityServer::wait_for_effects::notify_read_executed_effects_finalized",
1212 &tx_digests,
1213 ) => {
1214 let effects = effects_result?.pop().unwrap();
1215 let effects_digest = effects.digest();
1216 let details = if request.include_details {
1217 Some(self.complete_executed_data(effects).await?)
1218 } else {
1219 None
1220 };
1221 Ok(WaitForEffectsResponse::Executed {
1222 effects_digest,
1223 details,
1224 })
1225 }
1226 status_response = consensus_status_future => {
1227 status_response
1228 }
1229 }
1230 }
1231
1232 #[instrument(level = "error", skip_all, err(level = "debug"))]
1233 async fn ping_response(
1234 &self,
1235 request: WaitForEffectsRequest,
1236 epoch_store: &Arc<AuthorityPerEpochStore>,
1237 ) -> SuiResult<WaitForEffectsResponse> {
1238 let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1239 return Err(SuiErrorKind::UnsupportedFeatureError {
1240 error: "Mysticeti fastpath".to_string(),
1241 }
1242 .into());
1243 };
1244
1245 let Some(consensus_position) = request.consensus_position else {
1246 return Err(SuiErrorKind::InvalidRequest(
1247 "Consensus position is required for Ping requests".to_string(),
1248 )
1249 .into());
1250 };
1251
1252 let Some(ping) = request.ping_type else {
1254 return Err(SuiErrorKind::InvalidRequest(
1255 "Ping type is required for ping requests".to_string(),
1256 )
1257 .into());
1258 };
1259
1260 let _metrics_guard = self
1261 .metrics
1262 .handle_wait_for_effects_ping_latency
1263 .with_label_values(&[ping.as_str()])
1264 .start_timer();
1265
1266 consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1267
1268 let details = if request.include_details {
1269 Some(Box::new(ExecutedData::default()))
1270 } else {
1271 None
1272 };
1273
1274 let status = consensus_tx_status_cache
1275 .notify_read_transaction_status(consensus_position)
1276 .await;
1277 match status {
1278 NotifyReadConsensusTxStatusResult::Status(status) => match status {
1279 ConsensusTxStatus::Rejected | ConsensusTxStatus::Dropped => {
1280 Ok(WaitForEffectsResponse::Rejected {
1281 error: epoch_store.get_rejection_vote_reason(consensus_position),
1282 })
1283 }
1284 ConsensusTxStatus::Finalized => Ok(WaitForEffectsResponse::Executed {
1285 effects_digest: TransactionEffectsDigest::ZERO,
1286 details,
1287 }),
1288 },
1289 NotifyReadConsensusTxStatusResult::Expired(round) => {
1290 Ok(WaitForEffectsResponse::Expired {
1291 epoch: epoch_store.epoch(),
1292 round: Some(round),
1293 })
1294 }
1295 }
1296 }
1297
1298 async fn complete_executed_data(
1299 &self,
1300 effects: TransactionEffects,
1301 ) -> SuiResult<Box<ExecutedData>> {
1302 let (events, input_objects, output_objects) = self
1303 .collect_effects_data(
1304 &effects, true, true,
1305 true,
1306 )
1307 .await?;
1308 Ok(Box::new(ExecutedData {
1309 effects,
1310 events,
1311 input_objects,
1312 output_objects,
1313 }))
1314 }
1315
1316 async fn object_info_impl(
1317 &self,
1318 request: tonic::Request<ObjectInfoRequest>,
1319 ) -> WrappedServiceResponse<ObjectInfoResponse> {
1320 let request = request.into_inner();
1321 let response = self.state.handle_object_info_request(request).await?;
1322 Ok((tonic::Response::new(response), Weight::one()))
1323 }
1324
1325 async fn transaction_info_impl(
1326 &self,
1327 request: tonic::Request<TransactionInfoRequest>,
1328 ) -> WrappedServiceResponse<TransactionInfoResponse> {
1329 let request = request.into_inner();
1330 let response = self.state.handle_transaction_info_request(request).await?;
1331 Ok((tonic::Response::new(response), Weight::one()))
1332 }
1333
1334 async fn checkpoint_impl(
1335 &self,
1336 request: tonic::Request<CheckpointRequest>,
1337 ) -> WrappedServiceResponse<CheckpointResponse> {
1338 let request = request.into_inner();
1339 let response = self.state.handle_checkpoint_request(&request)?;
1340 Ok((tonic::Response::new(response), Weight::one()))
1341 }
1342
1343 async fn checkpoint_v2_impl(
1344 &self,
1345 request: tonic::Request<CheckpointRequestV2>,
1346 ) -> WrappedServiceResponse<CheckpointResponseV2> {
1347 let request = request.into_inner();
1348 let response = self.state.handle_checkpoint_request_v2(&request)?;
1349 Ok((tonic::Response::new(response), Weight::one()))
1350 }
1351
1352 async fn get_system_state_object_impl(
1353 &self,
1354 _request: tonic::Request<SystemStateRequest>,
1355 ) -> WrappedServiceResponse<SuiSystemState> {
1356 let response = self
1357 .state
1358 .get_object_cache_reader()
1359 .get_sui_system_state_object_unsafe()?;
1360 Ok((tonic::Response::new(response), Weight::one()))
1361 }
1362
1363 async fn validator_health_impl(
1364 &self,
1365 _request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1366 ) -> WrappedServiceResponse<sui_types::messages_grpc::RawValidatorHealthResponse> {
1367 let state = &self.state;
1368
1369 let epoch_store = state.load_epoch_store_one_call_per_task();
1371
1372 let num_inflight_execution_transactions =
1374 state.execution_scheduler().num_pending_certificates() as u64;
1375
1376 let num_inflight_consensus_transactions =
1378 self.consensus_adapter.num_inflight_transactions();
1379
1380 let last_committed_leader_round = epoch_store
1382 .consensus_tx_status_cache
1383 .as_ref()
1384 .and_then(|cache| cache.get_last_committed_leader_round())
1385 .unwrap_or(0);
1386
1387 let last_locally_built_checkpoint = epoch_store
1389 .last_built_checkpoint_summary()
1390 .ok()
1391 .flatten()
1392 .map(|(_, summary)| summary.sequence_number)
1393 .unwrap_or(0);
1394
1395 let typed_response = sui_types::messages_grpc::ValidatorHealthResponse {
1396 num_inflight_consensus_transactions,
1397 num_inflight_execution_transactions,
1398 last_locally_built_checkpoint,
1399 last_committed_leader_round,
1400 };
1401
1402 let raw_response = typed_response
1403 .try_into()
1404 .map_err(|e: sui_types::error::SuiError| {
1405 tonic::Status::internal(format!("Failed to serialize health response: {}", e))
1406 })?;
1407
1408 Ok((tonic::Response::new(raw_response), Weight::one()))
1409 }
1410
1411 fn get_client_ip_addr<T>(
1412 &self,
1413 request: &tonic::Request<T>,
1414 source: &ClientIdSource,
1415 ) -> Option<IpAddr> {
1416 let forwarded_header = request.metadata().get_all("x-forwarded-for").iter().next();
1417
1418 if let Some(header) = forwarded_header {
1419 let num_hops = header
1420 .to_str()
1421 .map(|h| h.split(',').count().saturating_sub(1))
1422 .unwrap_or(0);
1423
1424 self.metrics.x_forwarded_for_num_hops.set(num_hops as f64);
1425 }
1426
1427 match source {
1428 ClientIdSource::SocketAddr => {
1429 let socket_addr: Option<SocketAddr> = request.remote_addr();
1430
1431 if let Some(socket_addr) = socket_addr {
1437 Some(socket_addr.ip())
1438 } else {
1439 if cfg!(msim) {
1440 } else if cfg!(test) {
1442 panic!("Failed to get remote address from request");
1443 } else {
1444 self.metrics.connection_ip_not_found.inc();
1445 error!("Failed to get remote address from request");
1446 }
1447 None
1448 }
1449 }
1450 ClientIdSource::XForwardedFor(num_hops) => {
1451 let do_header_parse = |op: &MetadataValue<Ascii>| {
1452 match op.to_str() {
1453 Ok(header_val) => {
1454 let header_contents =
1455 header_val.split(',').map(str::trim).collect::<Vec<_>>();
1456 if *num_hops == 0 {
1457 error!(
1458 "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
1459 number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
1460 to this node. Skipping traffic controller request handling.",
1461 header_contents,
1462 );
1463 return None;
1464 }
1465 let contents_len = header_contents.len();
1466 if contents_len < *num_hops {
1467 error!(
1468 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1469 Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
1470 `client-id-source` in the node config.",
1471 header_contents, contents_len, num_hops, contents_len,
1472 );
1473 self.metrics.client_id_source_config_mismatch.inc();
1474 return None;
1475 }
1476 let Some(client_ip) = header_contents.get(contents_len - num_hops)
1477 else {
1478 error!(
1479 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1480 Expected at least {} values. Skipping traffic controller request handling.",
1481 header_contents, contents_len, num_hops, contents_len,
1482 );
1483 return None;
1484 };
1485 parse_ip(client_ip).or_else(|| {
1486 self.metrics.forwarded_header_parse_error.inc();
1487 None
1488 })
1489 }
1490 Err(e) => {
1491 self.metrics.forwarded_header_invalid.inc();
1495 error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
1496 None
1497 }
1498 }
1499 };
1500 if let Some(op) = request.metadata().get("x-forwarded-for") {
1501 do_header_parse(op)
1502 } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
1503 do_header_parse(op)
1504 } else {
1505 self.metrics.forwarded_header_not_included.inc();
1506 error!(
1507 "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
1508 );
1509 None
1510 }
1511 }
1512 }
1513 }
1514
1515 async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
1516 if let Some(traffic_controller) = &self.traffic_controller {
1517 if !traffic_controller.check(&client, &None).await {
1518 Err(tonic::Status::from_error(
1520 SuiErrorKind::TooManyRequests.into(),
1521 ))
1522 } else {
1523 Ok(())
1524 }
1525 } else {
1526 Ok(())
1527 }
1528 }
1529
1530 fn handle_traffic_resp<T>(
1531 &self,
1532 client: Option<IpAddr>,
1533 wrapped_response: WrappedServiceResponse<T>,
1534 ) -> Result<tonic::Response<T>, tonic::Status> {
1535 let (error, spam_weight, unwrapped_response) = match wrapped_response {
1536 Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
1537 Err(status) => (
1538 Some(SuiError::from(status.clone())),
1539 Weight::zero(),
1540 Err(status.clone()),
1541 ),
1542 };
1543
1544 if let Some(traffic_controller) = self.traffic_controller.clone() {
1545 traffic_controller.tally(TrafficTally {
1546 direct: client,
1547 through_fullnode: None,
1548 error_info: error.map(|e| {
1549 let error_type = String::from(e.clone().as_ref());
1550 let error_weight = normalize(e);
1551 (error_weight, error_type)
1552 }),
1553 spam_weight,
1554 timestamp: SystemTime::now(),
1555 })
1556 }
1557 unwrapped_response
1558 }
1559}
1560
1561fn normalize(err: SuiError) -> Weight {
1563 match err.as_inner() {
1564 SuiErrorKind::UserInputError {
1565 error: UserInputError::IncorrectUserSignature { .. },
1566 } => Weight::one(),
1567 SuiErrorKind::InvalidSignature { .. }
1568 | SuiErrorKind::SignerSignatureAbsent { .. }
1569 | SuiErrorKind::SignerSignatureNumberMismatch { .. }
1570 | SuiErrorKind::IncorrectSigner { .. }
1571 | SuiErrorKind::UnknownSigner { .. }
1572 | SuiErrorKind::WrongEpoch { .. } => Weight::one(),
1573 _ => Weight::zero(),
1574 }
1575}
1576
1577#[macro_export]
1581macro_rules! handle_with_decoration {
1582 ($self:ident, $func_name:ident, $request:ident) => {{
1583 if $self.client_id_source.is_none() {
1584 return $self.$func_name($request).await.map(|(result, _)| result);
1585 }
1586
1587 let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
1588
1589 $self.handle_traffic_req(client.clone()).await?;
1591
1592 let wrapped_response = $self.$func_name($request).await;
1594 $self.handle_traffic_resp(client, wrapped_response)
1595 }};
1596}
1597
1598#[async_trait]
1599impl Validator for ValidatorService {
1600 async fn submit_transaction(
1601 &self,
1602 request: tonic::Request<RawSubmitTxRequest>,
1603 ) -> Result<tonic::Response<RawSubmitTxResponse>, tonic::Status> {
1604 let validator_service = self.clone();
1605
1606 spawn_monitored_task!(async move {
1609 handle_with_decoration!(validator_service, handle_submit_transaction_impl, request)
1612 })
1613 .await
1614 .unwrap()
1615 }
1616
1617 async fn wait_for_effects(
1618 &self,
1619 request: tonic::Request<RawWaitForEffectsRequest>,
1620 ) -> Result<tonic::Response<RawWaitForEffectsResponse>, tonic::Status> {
1621 handle_with_decoration!(self, wait_for_effects_impl, request)
1622 }
1623
1624 async fn object_info(
1625 &self,
1626 request: tonic::Request<ObjectInfoRequest>,
1627 ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
1628 handle_with_decoration!(self, object_info_impl, request)
1629 }
1630
1631 async fn transaction_info(
1632 &self,
1633 request: tonic::Request<TransactionInfoRequest>,
1634 ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
1635 handle_with_decoration!(self, transaction_info_impl, request)
1636 }
1637
1638 async fn checkpoint(
1639 &self,
1640 request: tonic::Request<CheckpointRequest>,
1641 ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
1642 handle_with_decoration!(self, checkpoint_impl, request)
1643 }
1644
1645 async fn checkpoint_v2(
1646 &self,
1647 request: tonic::Request<CheckpointRequestV2>,
1648 ) -> Result<tonic::Response<CheckpointResponseV2>, tonic::Status> {
1649 handle_with_decoration!(self, checkpoint_v2_impl, request)
1650 }
1651
1652 async fn get_system_state_object(
1653 &self,
1654 request: tonic::Request<SystemStateRequest>,
1655 ) -> Result<tonic::Response<SuiSystemState>, tonic::Status> {
1656 handle_with_decoration!(self, get_system_state_object_impl, request)
1657 }
1658
1659 async fn validator_health(
1660 &self,
1661 request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1662 ) -> Result<tonic::Response<sui_types::messages_grpc::RawValidatorHealthResponse>, tonic::Status>
1663 {
1664 handle_with_decoration!(self, validator_health_impl, request)
1665 }
1666}