1use anyhow::Result;
6use async_trait::async_trait;
7use fastcrypto::traits::KeyPair;
8use futures::{TryFutureExt, future};
9use itertools::Itertools as _;
10use mysten_common::ZipDebugEqIteratorExt;
11use mysten_common::{assert_reachable, debug_fatal};
12use mysten_metrics::spawn_monitored_task;
13use prometheus::{
14 Gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, Registry,
15 register_gauge_with_registry, register_histogram_vec_with_registry,
16 register_histogram_with_registry, register_int_counter_vec_with_registry,
17 register_int_counter_with_registry,
18};
19use std::{
20 io,
21 net::{IpAddr, SocketAddr},
22 sync::Arc,
23 time::{Duration, SystemTime},
24};
25use sui_network::{
26 api::{Validator, ValidatorServer},
27 tonic,
28 validator::server::SUI_TLS_SERVER_NAME,
29};
30use sui_types::effects::TransactionEffectsAPI;
31use sui_types::message_envelope::Message;
32use sui_types::messages_consensus::ConsensusPosition;
33use sui_types::messages_consensus::ConsensusTransaction;
34use sui_types::messages_grpc::{
35 ObjectInfoRequest, ObjectInfoResponse, RawSubmitTxResponse, SystemStateRequest,
36 TransactionInfoRequest, TransactionInfoResponse,
37};
38use sui_types::multiaddr::Multiaddr;
39use sui_types::object::Object;
40use sui_types::sui_system_state::SuiSystemState;
41use sui_types::traffic_control::{ClientIdSource, Weight};
42use sui_types::{
43 base_types::ObjectID,
44 digests::TransactionEffectsDigest,
45 error::{SuiErrorKind, UserInputError},
46};
47use sui_types::{
48 effects::TransactionEffects,
49 messages_grpc::{
50 ExecutedData, RawSubmitTxRequest, RawWaitForEffectsRequest, RawWaitForEffectsResponse,
51 SubmitTxResult, WaitForEffectsRequest, WaitForEffectsResponse,
52 },
53};
54use sui_types::{effects::TransactionEvents, messages_grpc::SubmitTxType};
55use sui_types::{error::*, transaction::*};
56use sui_types::{
57 fp_ensure,
58 messages_checkpoint::{
59 CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
60 },
61};
62use tokio::sync::oneshot;
63use tokio::time::timeout;
64use tonic::metadata::{Ascii, MetadataValue};
65use tracing::{debug, error, info, instrument};
66
67use crate::consensus_adapter::ConnectionMonitorStatusForTests;
68use crate::gasless_rate_limiter::GaslessRateLimiter;
69use crate::{
70 authority::{AuthorityState, consensus_tx_status_cache::ConsensusTxStatus},
71 consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics},
72 traffic_controller::{TrafficController, parse_ip, policies::TrafficTally},
73};
74use crate::{
75 authority::{
76 authority_per_epoch_store::AuthorityPerEpochStore,
77 consensus_tx_status_cache::NotifyReadConsensusTxStatusResult,
78 },
79 checkpoints::CheckpointStore,
80 mysticeti_adapter::LazyMysticetiClient,
81};
82use sui_config::local_ip_utils::new_local_tcp_address_for_testing;
83
84#[cfg(test)]
85#[path = "unit_tests/server_tests.rs"]
86mod server_tests;
87
88#[cfg(test)]
89#[path = "unit_tests/wait_for_effects_tests.rs"]
90mod wait_for_effects_tests;
91
92#[cfg(test)]
93#[path = "unit_tests/submit_transaction_tests.rs"]
94mod submit_transaction_tests;
95
96pub struct AuthorityServerHandle {
97 server_handle: sui_network::validator::server::Server,
98}
99
100impl AuthorityServerHandle {
101 pub async fn join(self) -> Result<(), io::Error> {
102 self.server_handle.handle().wait_for_shutdown().await;
103 Ok(())
104 }
105
106 pub async fn kill(self) -> Result<(), io::Error> {
107 self.server_handle.handle().shutdown().await;
108 Ok(())
109 }
110
111 pub fn address(&self) -> &Multiaddr {
112 self.server_handle.local_addr()
113 }
114}
115
116pub struct AuthorityServer {
117 address: Multiaddr,
118 pub state: Arc<AuthorityState>,
119 consensus_adapter: Arc<ConsensusAdapter>,
120 pub metrics: Arc<ValidatorServiceMetrics>,
121}
122
123impl AuthorityServer {
124 pub fn new_for_test_with_consensus_adapter(
125 state: Arc<AuthorityState>,
126 consensus_adapter: Arc<ConsensusAdapter>,
127 ) -> Self {
128 let address = new_local_tcp_address_for_testing();
129 let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
130
131 Self {
132 address,
133 state,
134 consensus_adapter,
135 metrics,
136 }
137 }
138
139 pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
140 let consensus_adapter = Arc::new(ConsensusAdapter::new(
141 Arc::new(LazyMysticetiClient::new()),
142 CheckpointStore::new_for_tests(),
143 state.name,
144 Arc::new(ConnectionMonitorStatusForTests {}),
145 100_000,
146 100_000,
147 None,
148 None,
149 ConsensusAdapterMetrics::new_test(),
150 state.epoch_store_for_testing().protocol_config().clone(),
151 ));
152 Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
153 }
154
155 pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
156 let address = self.address.clone();
157 self.spawn_with_bind_address_for_test(address).await
158 }
159
160 pub async fn spawn_with_bind_address_for_test(
161 self,
162 address: Multiaddr,
163 ) -> Result<AuthorityServerHandle, io::Error> {
164 let tls_config = sui_tls::create_rustls_server_config(
165 self.state.config.network_key_pair().copy().private(),
166 SUI_TLS_SERVER_NAME.to_string(),
167 );
168 let config = mysten_network::config::Config::new();
169 let server = sui_network::validator::server::ServerBuilder::from_config(
170 &config,
171 mysten_network::metrics::DefaultMetricsCallbackProvider::default(),
172 )
173 .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
174 self.state,
175 self.consensus_adapter,
176 self.metrics,
177 )))
178 .bind(&address, Some(tls_config))
179 .await
180 .unwrap();
181 let local_addr = server.local_addr().to_owned();
182 info!("Listening to traffic on {local_addr}");
183 let handle = AuthorityServerHandle {
184 server_handle: server,
185 };
186 Ok(handle)
187 }
188}
189
190pub struct ValidatorServiceMetrics {
191 pub signature_errors: IntCounter,
192 pub tx_verification_latency: Histogram,
193 pub cert_verification_latency: Histogram,
194 pub consensus_latency: Histogram,
195 pub handle_transaction_latency: Histogram,
196 pub submit_certificate_consensus_latency: Histogram,
197 pub handle_certificate_consensus_latency: Histogram,
198 pub handle_certificate_non_consensus_latency: Histogram,
199 pub handle_soft_bundle_certificates_consensus_latency: Histogram,
200 pub handle_soft_bundle_certificates_count: Histogram,
201 pub handle_soft_bundle_certificates_size_bytes: Histogram,
202 pub handle_transaction_consensus_latency: Histogram,
203 pub handle_submit_transaction_consensus_latency: HistogramVec,
204 pub handle_wait_for_effects_ping_latency: HistogramVec,
205
206 handle_submit_transaction_latency: HistogramVec,
207 handle_submit_transaction_bytes: HistogramVec,
208 handle_submit_transaction_batch_size: HistogramVec,
209
210 num_rejected_cert_in_epoch_boundary: IntCounter,
211 num_rejected_tx_during_overload: IntCounterVec,
212 submission_rejected_transactions: IntCounterVec,
213 connection_ip_not_found: IntCounter,
214 forwarded_header_parse_error: IntCounter,
215 forwarded_header_invalid: IntCounter,
216 forwarded_header_not_included: IntCounter,
217 client_id_source_config_mismatch: IntCounter,
218 x_forwarded_for_num_hops: Gauge,
219 pub gasless_rate_limited_count: IntCounter,
220}
221
222impl ValidatorServiceMetrics {
223 pub fn new(registry: &Registry) -> Self {
224 Self {
225 signature_errors: register_int_counter_with_registry!(
226 "total_signature_errors",
227 "Number of transaction signature errors",
228 registry,
229 )
230 .unwrap(),
231 tx_verification_latency: register_histogram_with_registry!(
232 "validator_service_tx_verification_latency",
233 "Latency of verifying a transaction",
234 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
235 registry,
236 )
237 .unwrap(),
238 cert_verification_latency: register_histogram_with_registry!(
239 "validator_service_cert_verification_latency",
240 "Latency of verifying a certificate",
241 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
242 registry,
243 )
244 .unwrap(),
245 consensus_latency: register_histogram_with_registry!(
246 "validator_service_consensus_latency",
247 "Time spent between submitting a txn to consensus and getting back local acknowledgement. Execution and finalization time are not included.",
248 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
249 registry,
250 )
251 .unwrap(),
252 handle_transaction_latency: register_histogram_with_registry!(
253 "validator_service_handle_transaction_latency",
254 "Latency of handling a transaction",
255 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
256 registry,
257 )
258 .unwrap(),
259 handle_certificate_consensus_latency: register_histogram_with_registry!(
260 "validator_service_handle_certificate_consensus_latency",
261 "Latency of handling a consensus transaction certificate",
262 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
263 registry,
264 )
265 .unwrap(),
266 submit_certificate_consensus_latency: register_histogram_with_registry!(
267 "validator_service_submit_certificate_consensus_latency",
268 "Latency of submit_certificate RPC handler",
269 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
270 registry,
271 )
272 .unwrap(),
273 handle_certificate_non_consensus_latency: register_histogram_with_registry!(
274 "validator_service_handle_certificate_non_consensus_latency",
275 "Latency of handling a non-consensus transaction certificate",
276 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
277 registry,
278 )
279 .unwrap(),
280 handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
281 "validator_service_handle_soft_bundle_certificates_consensus_latency",
282 "Latency of handling a consensus soft bundle",
283 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
284 registry,
285 )
286 .unwrap(),
287 handle_soft_bundle_certificates_count: register_histogram_with_registry!(
288 "handle_soft_bundle_certificates_count",
289 "The number of certificates included in a soft bundle",
290 mysten_metrics::COUNT_BUCKETS.to_vec(),
291 registry,
292 )
293 .unwrap(),
294 handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
295 "handle_soft_bundle_certificates_size_bytes",
296 "The size of soft bundle in bytes",
297 mysten_metrics::BYTES_BUCKETS.to_vec(),
298 registry,
299 )
300 .unwrap(),
301 handle_transaction_consensus_latency: register_histogram_with_registry!(
302 "validator_service_handle_transaction_consensus_latency",
303 "Latency of handling a user transaction sent through consensus",
304 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
305 registry,
306 )
307 .unwrap(),
308 handle_submit_transaction_consensus_latency: register_histogram_vec_with_registry!(
309 "validator_service_submit_transaction_consensus_latency",
310 "Latency of submitting a user transaction sent through consensus",
311 &["req_type"],
312 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
313 registry,
314 )
315 .unwrap(),
316 handle_submit_transaction_latency: register_histogram_vec_with_registry!(
317 "validator_service_submit_transaction_latency",
318 "Latency of submit transaction handler",
319 &["req_type"],
320 mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
321 registry,
322 )
323 .unwrap(),
324 handle_wait_for_effects_ping_latency: register_histogram_vec_with_registry!(
325 "validator_service_handle_wait_for_effects_ping_latency",
326 "Latency of handling a ping request for wait_for_effects",
327 &["req_type"],
328 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
329 registry,
330 )
331 .unwrap(),
332 handle_submit_transaction_bytes: register_histogram_vec_with_registry!(
333 "validator_service_submit_transaction_bytes",
334 "The size of transactions in the submit transaction request",
335 &["req_type"],
336 mysten_metrics::BYTES_BUCKETS.to_vec(),
337 registry,
338 )
339 .unwrap(),
340 handle_submit_transaction_batch_size: register_histogram_vec_with_registry!(
341 "validator_service_submit_transaction_batch_size",
342 "The number of transactions in the submit transaction request",
343 &["req_type"],
344 mysten_metrics::COUNT_BUCKETS.to_vec(),
345 registry,
346 )
347 .unwrap(),
348 num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
349 "validator_service_num_rejected_cert_in_epoch_boundary",
350 "Number of rejected transaction certificate during epoch transitioning",
351 registry,
352 )
353 .unwrap(),
354 num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
355 "validator_service_num_rejected_tx_during_overload",
356 "Number of rejected transaction due to system overload",
357 &["error_type"],
358 registry,
359 )
360 .unwrap(),
361 submission_rejected_transactions: register_int_counter_vec_with_registry!(
362 "validator_service_submission_rejected_transactions",
363 "Number of transactions rejected during submission",
364 &["reason"],
365 registry,
366 )
367 .unwrap(),
368 connection_ip_not_found: register_int_counter_with_registry!(
369 "validator_service_connection_ip_not_found",
370 "Number of times connection IP was not extractable from request",
371 registry,
372 )
373 .unwrap(),
374 forwarded_header_parse_error: register_int_counter_with_registry!(
375 "validator_service_forwarded_header_parse_error",
376 "Number of times x-forwarded-for header could not be parsed",
377 registry,
378 )
379 .unwrap(),
380 forwarded_header_invalid: register_int_counter_with_registry!(
381 "validator_service_forwarded_header_invalid",
382 "Number of times x-forwarded-for header was invalid",
383 registry,
384 )
385 .unwrap(),
386 forwarded_header_not_included: register_int_counter_with_registry!(
387 "validator_service_forwarded_header_not_included",
388 "Number of times x-forwarded-for header was (unexpectedly) not included in request",
389 registry,
390 )
391 .unwrap(),
392 client_id_source_config_mismatch: register_int_counter_with_registry!(
393 "validator_service_client_id_source_config_mismatch",
394 "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
395 registry,
396 )
397 .unwrap(),
398 x_forwarded_for_num_hops: register_gauge_with_registry!(
399 "validator_service_x_forwarded_for_num_hops",
400 "Number of hops in x-forwarded-for header",
401 registry,
402 )
403 .unwrap(),
404 gasless_rate_limited_count: register_int_counter_with_registry!(
405 "validator_service_gasless_rate_limited_count",
406 "Number of gasless transactions rejected by rate limiter",
407 registry,
408 )
409 .unwrap(),
410 }
411 }
412
413 pub fn new_for_tests() -> Self {
414 let registry = Registry::new();
415 Self::new(®istry)
416 }
417}
418
419#[derive(Clone)]
420pub struct ValidatorService {
421 state: Arc<AuthorityState>,
422 consensus_adapter: Arc<ConsensusAdapter>,
423 metrics: Arc<ValidatorServiceMetrics>,
424 traffic_controller: Option<Arc<TrafficController>>,
425 client_id_source: Option<ClientIdSource>,
426 gasless_limiter: GaslessRateLimiter,
427}
428
429impl ValidatorService {
430 pub fn new(
431 state: Arc<AuthorityState>,
432 consensus_adapter: Arc<ConsensusAdapter>,
433 validator_metrics: Arc<ValidatorServiceMetrics>,
434 client_id_source: Option<ClientIdSource>,
435 ) -> Self {
436 let traffic_controller = state.traffic_controller.clone();
437 let gasless_limiter = GaslessRateLimiter::new(state.consensus_gasless_counter.clone());
438 Self {
439 state,
440 consensus_adapter,
441 metrics: validator_metrics,
442 traffic_controller,
443 client_id_source,
444 gasless_limiter,
445 }
446 }
447
448 pub fn new_for_tests(
449 state: Arc<AuthorityState>,
450 consensus_adapter: Arc<ConsensusAdapter>,
451 metrics: Arc<ValidatorServiceMetrics>,
452 ) -> Self {
453 let gasless_limiter = GaslessRateLimiter::new(state.consensus_gasless_counter.clone());
454 Self {
455 state,
456 consensus_adapter,
457 metrics,
458 traffic_controller: None,
459 client_id_source: None,
460 gasless_limiter,
461 }
462 }
463
464 pub fn validator_state(&self) -> &Arc<AuthorityState> {
465 &self.state
466 }
467
468 pub fn handle_transaction_for_testing(&self, transaction: Transaction) -> SuiResult<()> {
470 let epoch_store = self.state.load_epoch_store_one_call_per_task();
471
472 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
474
475 let transaction = epoch_store
477 .verify_transaction_require_no_aliases(transaction)?
478 .into_tx();
479
480 self.state
482 .handle_vote_transaction(&epoch_store, transaction)?;
483
484 Ok(())
485 }
486
487 pub fn handle_transaction_for_testing_with_overload_check(
490 &self,
491 transaction: Transaction,
492 ) -> SuiResult<()> {
493 let epoch_store = self.state.load_epoch_store_one_call_per_task();
494
495 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
497
498 self.state.check_system_overload(
500 self.consensus_adapter.as_ref(),
501 transaction.data(),
502 self.state.check_system_overload_at_signing(),
503 )?;
504
505 let transaction = epoch_store
507 .verify_transaction_require_no_aliases(transaction)?
508 .into_tx();
509
510 self.state
512 .handle_vote_transaction(&epoch_store, transaction)?;
513
514 Ok(())
515 }
516
517 async fn collect_immutable_object_ids(
520 &self,
521 tx: &VerifiedTransaction,
522 state: &AuthorityState,
523 ) -> SuiResult<Vec<ObjectID>> {
524 let input_objects = tx.data().transaction_data().input_objects()?;
525
526 let object_ids: Vec<ObjectID> = input_objects
528 .iter()
529 .filter_map(|obj| match obj {
530 InputObjectKind::ImmOrOwnedMoveObject((id, _, _)) => Some(*id),
531 _ => None,
532 })
533 .collect();
534 if object_ids.is_empty() {
535 return Ok(vec![]);
536 }
537
538 let objects = state.get_object_cache_reader().get_objects(&object_ids);
540
541 objects
543 .into_iter()
544 .zip_debug_eq(object_ids.iter())
545 .filter_map(|(obj, id)| {
546 let Some(o) = obj else {
547 return Some(Err::<ObjectID, SuiError>(
548 SuiErrorKind::UserInputError {
549 error: UserInputError::ObjectNotFound {
550 object_id: *id,
551 version: None,
552 },
553 }
554 .into(),
555 ));
556 };
557 if o.is_immutable() {
558 Some(Ok(*id))
559 } else {
560 None
561 }
562 })
563 .collect::<SuiResult<Vec<ObjectID>>>()
564 }
565
566 #[instrument(
567 name = "ValidatorService::handle_submit_transaction",
568 level = "error",
569 skip_all,
570 err(level = "debug")
571 )]
572 async fn handle_submit_transaction(
573 &self,
574 request: tonic::Request<RawSubmitTxRequest>,
575 ) -> WrappedServiceResponse<RawSubmitTxResponse> {
576 let Self {
577 state,
578 consensus_adapter,
579 metrics,
580 traffic_controller: _,
581 client_id_source,
582 gasless_limiter: _,
583 } = self.clone();
584
585 let submitter_client_addr = if let Some(client_id_source) = &client_id_source {
586 self.get_client_ip_addr(&request, client_id_source)
587 } else {
588 self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
589 };
590
591 let inner = request.into_inner();
592 let start_epoch = state.load_epoch_store_one_call_per_task().epoch();
593
594 let next_epoch = start_epoch + 1;
595 let mut max_retries = 1;
596
597 loop {
598 let res = self
599 .handle_submit_transaction_inner(
600 &state,
601 &consensus_adapter,
602 &metrics,
603 &inner,
604 submitter_client_addr,
605 )
606 .await;
607 match res {
608 Ok((response, weight)) => return Ok((tonic::Response::new(response), weight)),
609 Err(err) => {
610 if max_retries > 0
611 && let SuiErrorKind::ValidatorHaltedAtEpochEnd = err.as_inner()
612 {
613 max_retries -= 1;
614
615 debug!(
616 "ValidatorHaltedAtEpochEnd. Will retry after validator reconfigures"
617 );
618
619 if let Ok(Ok(new_epoch)) =
620 timeout(Duration::from_secs(15), state.wait_for_epoch(next_epoch)).await
621 {
622 assert_reachable!("retry submission at epoch end");
623 if new_epoch == next_epoch {
624 continue;
625 }
626
627 debug_fatal!(
628 "expected epoch {} after reconfiguration. got {}",
629 next_epoch,
630 new_epoch
631 );
632 }
633 }
634 return Err(err.into());
635 }
636 }
637 }
638 }
639
640 async fn handle_submit_transaction_inner(
641 &self,
642 state: &AuthorityState,
643 consensus_adapter: &ConsensusAdapter,
644 metrics: &ValidatorServiceMetrics,
645 request: &RawSubmitTxRequest,
646 submitter_client_addr: Option<IpAddr>,
647 ) -> SuiResult<(RawSubmitTxResponse, Weight)> {
648 let epoch_store = state.load_epoch_store_one_call_per_task();
649 if !epoch_store.protocol_config().mysticeti_fastpath() {
650 return Err(SuiErrorKind::UnsupportedFeatureError {
651 error: "Mysticeti fastpath".to_string(),
652 }
653 .into());
654 }
655
656 let submit_type = SubmitTxType::try_from(request.submit_type).map_err(|e| {
657 SuiErrorKind::GrpcMessageDeserializeError {
658 type_info: "RawSubmitTxRequest.submit_type".to_string(),
659 error: e.to_string(),
660 }
661 })?;
662
663 let is_ping_request = submit_type == SubmitTxType::Ping;
664 if is_ping_request {
665 fp_ensure!(
666 request.transactions.is_empty(),
667 SuiErrorKind::InvalidRequest(format!(
668 "Ping request cannot contain {} transactions",
669 request.transactions.len()
670 ))
671 .into()
672 );
673 } else {
674 fp_ensure!(
676 !request.transactions.is_empty(),
677 SuiErrorKind::InvalidRequest(
678 "At least one transaction needs to be submitted".to_string(),
679 )
680 .into()
681 );
682 }
683
684 let is_soft_bundle_request = submit_type == SubmitTxType::SoftBundle;
689
690 let max_num_transactions = if is_soft_bundle_request {
691 epoch_store.protocol_config().max_soft_bundle_size()
694 } else {
695 epoch_store
697 .protocol_config()
698 .max_num_transactions_in_block()
699 };
700 fp_ensure!(
701 request.transactions.len() <= max_num_transactions as usize,
702 SuiErrorKind::InvalidRequest(format!(
703 "Too many transactions in request: {} vs {}",
704 request.transactions.len(),
705 max_num_transactions
706 ))
707 .into()
708 );
709
710 let mut tx_digests = Vec::with_capacity(request.transactions.len());
712 let mut consensus_transactions = Vec::with_capacity(request.transactions.len());
714 let mut transaction_indexes = Vec::with_capacity(request.transactions.len());
716 let mut results: Vec<Option<SubmitTxResult>> = vec![None; request.transactions.len()];
718 let mut total_size_bytes = 0;
720 let mut spam_weight = Weight::zero();
722
723 let req_type = if is_ping_request {
724 "ping"
725 } else if request.transactions.len() == 1 {
726 "single_transaction"
727 } else if is_soft_bundle_request {
728 "soft_bundle"
729 } else {
730 "batch"
731 };
732
733 let _handle_tx_metrics_guard = metrics
734 .handle_submit_transaction_latency
735 .with_label_values(&[req_type])
736 .start_timer();
737
738 for (idx, tx_bytes) in request.transactions.iter().enumerate() {
739 let transaction = match bcs::from_bytes::<Transaction>(tx_bytes) {
740 Ok(txn) => txn,
741 Err(e) => {
742 return Err(SuiErrorKind::TransactionDeserializationError {
744 error: format!("Failed to deserialize transaction at index {}: {}", idx, e),
745 }
746 .into());
747 }
748 };
749
750 let tx_size = transaction.validity_check(&epoch_store.tx_validity_check_context())?;
752
753 if transaction
754 .data()
755 .transaction_data()
756 .is_gasless_transaction()
757 {
758 spam_weight = Weight::one();
760 }
761
762 let overload_check_res = state.check_system_overload(
763 consensus_adapter,
764 transaction.data(),
765 state.check_system_overload_at_signing(),
766 );
767 if let Err(error) = overload_check_res {
768 metrics
769 .num_rejected_tx_during_overload
770 .with_label_values(&[error.as_ref()])
771 .inc();
772 results[idx] = Some(SubmitTxResult::Rejected { error });
773 continue;
774 }
775
776 if transaction
777 .data()
778 .transaction_data()
779 .is_gasless_transaction()
780 && !self
781 .gasless_limiter
782 .try_acquire(epoch_store.protocol_config())
783 {
784 metrics.gasless_rate_limited_count.inc();
785 results[idx] = Some(SubmitTxResult::Rejected {
786 error: SuiErrorKind::ValidatorOverloadedRetryAfter {
787 retry_after_secs: 1,
788 }
789 .into(),
790 });
791 continue;
792 }
793
794 let verified_transaction = {
796 let _metrics_guard = metrics.tx_verification_latency.start_timer();
797 if epoch_store.protocol_config().address_aliases() {
798 match epoch_store.verify_transaction_with_current_aliases(transaction) {
799 Ok(tx) => tx,
800 Err(e) => {
801 metrics.signature_errors.inc();
802 return Err(e);
803 }
804 }
805 } else {
806 match epoch_store.verify_transaction_require_no_aliases(transaction) {
807 Ok(tx) => tx,
808 Err(e) => {
809 metrics.signature_errors.inc();
810 return Err(e);
811 }
812 }
813 }
814 };
815
816 let tx_digest = *verified_transaction.tx().digest();
817 debug!(
818 ?tx_digest,
819 "handle_submit_transaction: verified transaction"
820 );
821
822 if let Some(effects) = state
825 .get_transaction_cache_reader()
826 .get_executed_effects(&tx_digest)
827 {
828 let effects_digest = effects.digest();
829 if let Ok(executed_data) = self.complete_executed_data(effects).await {
830 let executed_result = SubmitTxResult::Executed {
831 effects_digest,
832 details: Some(executed_data),
833 };
834 results[idx] = Some(executed_result);
835 debug!(?tx_digest, "handle_submit_transaction: already executed");
836 continue;
837 }
838 }
839
840 if self
841 .state
842 .get_transaction_cache_reader()
843 .transaction_executed_in_last_epoch(&tx_digest, epoch_store.epoch())
844 {
845 results[idx] = Some(SubmitTxResult::Rejected {
846 error: UserInputError::TransactionAlreadyExecuted { digest: tx_digest }.into(),
847 });
848 debug!(
849 ?tx_digest,
850 "handle_submit_transaction: transaction already executed in previous epoch"
851 );
852 continue;
853 }
854
855 debug!(
856 ?tx_digest,
857 "handle_submit_transaction: waiting for fastpath dependency objects"
858 );
859 if !state
860 .wait_for_fastpath_dependency_objects(
861 verified_transaction.tx(),
862 epoch_store.epoch(),
863 )
864 .await?
865 {
866 debug!(
867 ?tx_digest,
868 "fastpath input objects are still unavailable after waiting"
869 );
870 }
871
872 match state.handle_vote_transaction(&epoch_store, verified_transaction.tx().clone()) {
873 Ok(_) => { }
874 Err(e) => {
875 if let Some(effects) = state
878 .get_transaction_cache_reader()
879 .get_executed_effects(&tx_digest)
880 {
881 let effects_digest = effects.digest();
882 if let Ok(executed_data) = self.complete_executed_data(effects).await {
883 let executed_result = SubmitTxResult::Executed {
884 effects_digest,
885 details: Some(executed_data),
886 };
887 results[idx] = Some(executed_result);
888 continue;
889 }
890 }
891
892 debug!(?tx_digest, "Transaction rejected during submission: {e}");
894 metrics
895 .submission_rejected_transactions
896 .with_label_values(&[e.to_variant_name()])
897 .inc();
898 results[idx] = Some(SubmitTxResult::Rejected { error: e });
899 continue;
900 }
901 }
902
903 let mut claims = vec![];
905
906 let immutable_object_ids = self
907 .collect_immutable_object_ids(verified_transaction.tx(), state)
908 .await?;
909 if !immutable_object_ids.is_empty() {
910 claims.push(TransactionClaim::ImmutableInputObjects(
911 immutable_object_ids,
912 ));
913 }
914
915 let (tx, aliases) = verified_transaction.into_inner();
916 if epoch_store.protocol_config().address_aliases() {
917 if epoch_store
918 .protocol_config()
919 .fix_checkpoint_signature_mapping()
920 {
921 claims.push(TransactionClaim::AddressAliasesV2(aliases));
922 } else {
923 let v1_aliases: Vec<_> = tx
924 .data()
925 .intent_message()
926 .value
927 .required_signers()
928 .into_iter()
929 .zip_eq(aliases.into_iter().map(|(_, seq)| seq))
930 .collect();
931 #[allow(deprecated)]
932 claims.push(TransactionClaim::AddressAliases(
933 nonempty::NonEmpty::from_vec(v1_aliases)
934 .expect("must have at least one required_signer"),
935 ));
936 }
937 }
938
939 let tx_with_claims = TransactionWithClaims::new(tx.into(), claims);
940
941 consensus_transactions.push(ConsensusTransaction::new_user_transaction_v2_message(
942 &state.name,
943 tx_with_claims,
944 ));
945
946 transaction_indexes.push(idx);
947 tx_digests.push(tx_digest);
948 total_size_bytes += tx_size;
949 }
950
951 if consensus_transactions.is_empty() && !is_ping_request {
952 return Ok((Self::try_from_submit_tx_response(results)?, spam_weight));
953 }
954
955 let max_transaction_bytes = if is_soft_bundle_request {
959 epoch_store
960 .protocol_config()
961 .consensus_max_transactions_in_block_bytes()
962 / 2
963 } else {
964 epoch_store
965 .protocol_config()
966 .consensus_max_transactions_in_block_bytes()
967 };
968 fp_ensure!(
969 total_size_bytes <= max_transaction_bytes as usize,
970 SuiErrorKind::UserInputError {
971 error: UserInputError::TotalTransactionSizeTooLargeInBatch {
972 size: total_size_bytes,
973 limit: max_transaction_bytes,
974 },
975 }
976 .into()
977 );
978
979 metrics
980 .handle_submit_transaction_bytes
981 .with_label_values(&[req_type])
982 .observe(total_size_bytes as f64);
983 metrics
984 .handle_submit_transaction_batch_size
985 .with_label_values(&[req_type])
986 .observe(consensus_transactions.len() as f64);
987
988 let _latency_metric_guard = metrics
989 .handle_submit_transaction_consensus_latency
990 .with_label_values(&[req_type])
991 .start_timer();
992
993 let consensus_positions = if is_soft_bundle_request || is_ping_request {
994 assert!(
997 is_ping_request || !consensus_transactions.is_empty(),
998 "A valid soft bundle must have at least one transaction"
999 );
1000 debug!(
1001 "handle_submit_transaction: submitting consensus transactions ({}): {}",
1002 req_type,
1003 consensus_transactions
1004 .iter()
1005 .map(|t| t.local_display())
1006 .join(", ")
1007 );
1008 self.handle_submit_to_consensus_for_position(
1009 consensus_transactions,
1010 &epoch_store,
1011 submitter_client_addr,
1012 )
1013 .await?
1014 } else {
1015 let futures = consensus_transactions.into_iter().map(|t| {
1016 debug!(
1017 "handle_submit_transaction: submitting consensus transaction ({}): {}",
1018 req_type,
1019 t.local_display(),
1020 );
1021 self.handle_submit_to_consensus_for_position(
1022 vec![t],
1023 &epoch_store,
1024 submitter_client_addr,
1025 )
1026 });
1027 future::try_join_all(futures)
1028 .await?
1029 .into_iter()
1030 .flatten()
1031 .collect()
1032 };
1033
1034 if is_ping_request {
1035 assert_eq!(consensus_positions.len(), 1);
1037 results.push(Some(SubmitTxResult::Submitted {
1038 consensus_position: consensus_positions[0],
1039 }));
1040 } else {
1041 for ((idx, tx_digest), consensus_position) in transaction_indexes
1043 .into_iter()
1044 .zip_debug_eq(tx_digests)
1045 .zip_debug_eq(consensus_positions)
1046 {
1047 debug!(
1048 ?tx_digest,
1049 "handle_submit_transaction: submitted consensus transaction at {}",
1050 consensus_position,
1051 );
1052 results[idx] = Some(SubmitTxResult::Submitted { consensus_position });
1053 }
1054 }
1055
1056 Ok((Self::try_from_submit_tx_response(results)?, spam_weight))
1057 }
1058
1059 fn try_from_submit_tx_response(
1060 results: Vec<Option<SubmitTxResult>>,
1061 ) -> Result<RawSubmitTxResponse, SuiError> {
1062 let mut raw_results = Vec::new();
1063 for (i, result) in results.into_iter().enumerate() {
1064 let result = result.ok_or_else(|| SuiErrorKind::GenericAuthorityError {
1065 error: format!("Missing transaction result at {}", i),
1066 })?;
1067 let raw_result = result.try_into()?;
1068 raw_results.push(raw_result);
1069 }
1070 Ok(RawSubmitTxResponse {
1071 results: raw_results,
1072 })
1073 }
1074
1075 #[instrument(
1076 name = "ValidatorService::handle_submit_to_consensus_for_position",
1077 level = "debug",
1078 skip_all,
1079 err(level = "debug")
1080 )]
1081 async fn handle_submit_to_consensus_for_position(
1082 &self,
1083 consensus_transactions: Vec<ConsensusTransaction>,
1085 epoch_store: &Arc<AuthorityPerEpochStore>,
1086 submitter_client_addr: Option<IpAddr>,
1087 ) -> Result<Vec<ConsensusPosition>, tonic::Status> {
1088 let (tx_consensus_positions, rx_consensus_positions) = oneshot::channel();
1089
1090 {
1091 let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
1093 if !reconfiguration_lock.should_accept_user_certs() {
1094 self.metrics.num_rejected_cert_in_epoch_boundary.inc();
1095 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1096 }
1097
1098 let _metrics_guard = self.metrics.consensus_latency.start_timer();
1102
1103 self.consensus_adapter.submit_batch(
1104 &consensus_transactions,
1105 Some(&reconfiguration_lock),
1106 epoch_store,
1107 Some(tx_consensus_positions),
1108 submitter_client_addr,
1109 )?;
1110 }
1111
1112 Ok(rx_consensus_positions.await.map_err(|e| {
1113 SuiErrorKind::FailedToSubmitToConsensus(format!(
1114 "Failed to get consensus position: {e}"
1115 ))
1116 })?)
1117 }
1118
1119 async fn collect_effects_data(
1120 &self,
1121 effects: &TransactionEffects,
1122 include_events: bool,
1123 include_input_objects: bool,
1124 include_output_objects: bool,
1125 ) -> SuiResult<(Option<TransactionEvents>, Vec<Object>, Vec<Object>)> {
1126 let events = if include_events && effects.events_digest().is_some() {
1127 Some(
1128 self.state
1129 .get_transaction_events(effects.transaction_digest())?,
1130 )
1131 } else {
1132 None
1133 };
1134
1135 let input_objects = if include_input_objects {
1136 self.state.get_transaction_input_objects(effects)?
1137 } else {
1138 vec![]
1139 };
1140
1141 let output_objects = if include_output_objects {
1142 self.state.get_transaction_output_objects(effects)?
1143 } else {
1144 vec![]
1145 };
1146
1147 Ok((events, input_objects, output_objects))
1148 }
1149}
1150
1151type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
1152
1153impl ValidatorService {
1154 async fn handle_submit_transaction_impl(
1155 &self,
1156 request: tonic::Request<RawSubmitTxRequest>,
1157 ) -> WrappedServiceResponse<RawSubmitTxResponse> {
1158 self.handle_submit_transaction(request).await
1159 }
1160
1161 async fn wait_for_effects_impl(
1162 &self,
1163 request: tonic::Request<RawWaitForEffectsRequest>,
1164 ) -> WrappedServiceResponse<RawWaitForEffectsResponse> {
1165 let request: WaitForEffectsRequest = request.into_inner().try_into()?;
1166 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1167 let response = timeout(
1168 Duration::from_secs(20),
1170 epoch_store
1171 .within_alive_epoch(self.wait_for_effects_response(request, &epoch_store))
1172 .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch())),
1173 )
1174 .await
1175 .map_err(|_| tonic::Status::internal("Timeout waiting for effects"))???
1176 .try_into()?;
1177 Ok((tonic::Response::new(response), Weight::zero()))
1178 }
1179
1180 #[instrument(name= "ValidatorService::wait_for_effects_response", level = "error", skip_all, fields(consensus_position = ?request.consensus_position))]
1181 async fn wait_for_effects_response(
1182 &self,
1183 request: WaitForEffectsRequest,
1184 epoch_store: &Arc<AuthorityPerEpochStore>,
1185 ) -> SuiResult<WaitForEffectsResponse> {
1186 if request.ping_type.is_some() {
1187 return timeout(
1188 Duration::from_secs(10),
1189 self.ping_response(request, epoch_store),
1190 )
1191 .await
1192 .map_err(|_| SuiErrorKind::TimeoutError)?;
1193 }
1194
1195 let Some(tx_digest) = request.transaction_digest else {
1196 return Err(SuiErrorKind::InvalidRequest(
1197 "Transaction digest is required for wait for effects requests".to_string(),
1198 )
1199 .into());
1200 };
1201 let tx_digests = [tx_digest];
1202
1203 let consensus_status_future = async {
1207 let consensus_position = match request.consensus_position {
1208 Some(pos) => pos,
1209 None => return futures::future::pending().await,
1210 };
1211 let consensus_tx_status_cache = epoch_store.consensus_tx_status_cache.as_ref().ok_or(
1212 SuiErrorKind::UnsupportedFeatureError {
1213 error: "Consensus tx status cache".to_string(),
1214 },
1215 )?;
1216 consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1217 match consensus_tx_status_cache
1218 .notify_read_transaction_status(consensus_position)
1219 .await
1220 {
1221 NotifyReadConsensusTxStatusResult::Status(
1222 ConsensusTxStatus::Rejected | ConsensusTxStatus::Dropped,
1223 ) => Ok(WaitForEffectsResponse::Rejected {
1224 error: epoch_store.get_rejection_vote_reason(consensus_position),
1225 }),
1226 NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized) => {
1227 futures::future::pending().await
1229 }
1230 NotifyReadConsensusTxStatusResult::Expired(round) => {
1231 Ok(WaitForEffectsResponse::Expired {
1232 epoch: epoch_store.epoch(),
1233 round: Some(round),
1234 })
1235 }
1236 }
1237 };
1238
1239 tokio::select! {
1240 effects_result = self.state
1241 .get_transaction_cache_reader()
1242 .notify_read_executed_effects_may_fail(
1243 "AuthorityServer::wait_for_effects::notify_read_executed_effects_finalized",
1244 &tx_digests,
1245 ) => {
1246 let effects = effects_result?.pop().unwrap();
1247 let effects_digest = effects.digest();
1248 let details = if request.include_details {
1249 Some(self.complete_executed_data(effects).await?)
1250 } else {
1251 None
1252 };
1253 Ok(WaitForEffectsResponse::Executed {
1254 effects_digest,
1255 details,
1256 })
1257 }
1258 status_response = consensus_status_future => {
1259 status_response
1260 }
1261 }
1262 }
1263
1264 #[instrument(level = "error", skip_all, err(level = "debug"))]
1265 async fn ping_response(
1266 &self,
1267 request: WaitForEffectsRequest,
1268 epoch_store: &Arc<AuthorityPerEpochStore>,
1269 ) -> SuiResult<WaitForEffectsResponse> {
1270 let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1271 return Err(SuiErrorKind::UnsupportedFeatureError {
1272 error: "Mysticeti fastpath".to_string(),
1273 }
1274 .into());
1275 };
1276
1277 let Some(consensus_position) = request.consensus_position else {
1278 return Err(SuiErrorKind::InvalidRequest(
1279 "Consensus position is required for Ping requests".to_string(),
1280 )
1281 .into());
1282 };
1283
1284 let Some(ping) = request.ping_type else {
1286 return Err(SuiErrorKind::InvalidRequest(
1287 "Ping type is required for ping requests".to_string(),
1288 )
1289 .into());
1290 };
1291
1292 let _metrics_guard = self
1293 .metrics
1294 .handle_wait_for_effects_ping_latency
1295 .with_label_values(&[ping.as_str()])
1296 .start_timer();
1297
1298 consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1299
1300 let details = if request.include_details {
1301 Some(Box::new(ExecutedData::default()))
1302 } else {
1303 None
1304 };
1305
1306 let status = consensus_tx_status_cache
1307 .notify_read_transaction_status(consensus_position)
1308 .await;
1309 match status {
1310 NotifyReadConsensusTxStatusResult::Status(status) => match status {
1311 ConsensusTxStatus::Rejected | ConsensusTxStatus::Dropped => {
1312 Ok(WaitForEffectsResponse::Rejected {
1313 error: epoch_store.get_rejection_vote_reason(consensus_position),
1314 })
1315 }
1316 ConsensusTxStatus::Finalized => Ok(WaitForEffectsResponse::Executed {
1317 effects_digest: TransactionEffectsDigest::ZERO,
1318 details,
1319 }),
1320 },
1321 NotifyReadConsensusTxStatusResult::Expired(round) => {
1322 Ok(WaitForEffectsResponse::Expired {
1323 epoch: epoch_store.epoch(),
1324 round: Some(round),
1325 })
1326 }
1327 }
1328 }
1329
1330 async fn complete_executed_data(
1331 &self,
1332 effects: TransactionEffects,
1333 ) -> SuiResult<Box<ExecutedData>> {
1334 let (events, input_objects, output_objects) = self
1335 .collect_effects_data(
1336 &effects, true, true,
1337 true,
1338 )
1339 .await?;
1340 Ok(Box::new(ExecutedData {
1341 effects,
1342 events,
1343 input_objects,
1344 output_objects,
1345 }))
1346 }
1347
1348 async fn object_info_impl(
1349 &self,
1350 request: tonic::Request<ObjectInfoRequest>,
1351 ) -> WrappedServiceResponse<ObjectInfoResponse> {
1352 let request = request.into_inner();
1353 let response = self.state.handle_object_info_request(request).await?;
1354 Ok((tonic::Response::new(response), Weight::one()))
1355 }
1356
1357 async fn transaction_info_impl(
1358 &self,
1359 request: tonic::Request<TransactionInfoRequest>,
1360 ) -> WrappedServiceResponse<TransactionInfoResponse> {
1361 let request = request.into_inner();
1362 let response = self.state.handle_transaction_info_request(request).await?;
1363 Ok((tonic::Response::new(response), Weight::one()))
1364 }
1365
1366 async fn checkpoint_impl(
1367 &self,
1368 request: tonic::Request<CheckpointRequest>,
1369 ) -> WrappedServiceResponse<CheckpointResponse> {
1370 let request = request.into_inner();
1371 let response = self.state.handle_checkpoint_request(&request)?;
1372 Ok((tonic::Response::new(response), Weight::one()))
1373 }
1374
1375 async fn checkpoint_v2_impl(
1376 &self,
1377 request: tonic::Request<CheckpointRequestV2>,
1378 ) -> WrappedServiceResponse<CheckpointResponseV2> {
1379 let request = request.into_inner();
1380 let response = self.state.handle_checkpoint_request_v2(&request)?;
1381 Ok((tonic::Response::new(response), Weight::one()))
1382 }
1383
1384 async fn get_system_state_object_impl(
1385 &self,
1386 _request: tonic::Request<SystemStateRequest>,
1387 ) -> WrappedServiceResponse<SuiSystemState> {
1388 let response = self
1389 .state
1390 .get_object_cache_reader()
1391 .get_sui_system_state_object_unsafe()?;
1392 Ok((tonic::Response::new(response), Weight::one()))
1393 }
1394
1395 async fn validator_health_impl(
1396 &self,
1397 _request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1398 ) -> WrappedServiceResponse<sui_types::messages_grpc::RawValidatorHealthResponse> {
1399 let state = &self.state;
1400
1401 let epoch_store = state.load_epoch_store_one_call_per_task();
1403
1404 let num_inflight_execution_transactions =
1406 state.execution_scheduler().num_pending_certificates() as u64;
1407
1408 let num_inflight_consensus_transactions =
1410 self.consensus_adapter.num_inflight_transactions();
1411
1412 let last_committed_leader_round = epoch_store
1414 .consensus_tx_status_cache
1415 .as_ref()
1416 .and_then(|cache| cache.get_last_committed_leader_round())
1417 .unwrap_or(0);
1418
1419 let last_locally_built_checkpoint = epoch_store
1421 .last_built_checkpoint_summary()
1422 .ok()
1423 .flatten()
1424 .map(|(_, summary)| summary.sequence_number)
1425 .unwrap_or(0);
1426
1427 let typed_response = sui_types::messages_grpc::ValidatorHealthResponse {
1428 num_inflight_consensus_transactions,
1429 num_inflight_execution_transactions,
1430 last_locally_built_checkpoint,
1431 last_committed_leader_round,
1432 };
1433
1434 let raw_response = typed_response
1435 .try_into()
1436 .map_err(|e: sui_types::error::SuiError| {
1437 tonic::Status::internal(format!("Failed to serialize health response: {}", e))
1438 })?;
1439
1440 Ok((tonic::Response::new(raw_response), Weight::one()))
1441 }
1442
1443 fn get_client_ip_addr<T>(
1444 &self,
1445 request: &tonic::Request<T>,
1446 source: &ClientIdSource,
1447 ) -> Option<IpAddr> {
1448 let forwarded_header = request.metadata().get_all("x-forwarded-for").iter().next();
1449
1450 if let Some(header) = forwarded_header {
1451 let num_hops = header
1452 .to_str()
1453 .map(|h| h.split(',').count().saturating_sub(1))
1454 .unwrap_or(0);
1455
1456 self.metrics.x_forwarded_for_num_hops.set(num_hops as f64);
1457 }
1458
1459 match source {
1460 ClientIdSource::SocketAddr => {
1461 let socket_addr: Option<SocketAddr> = request.remote_addr();
1462
1463 if let Some(socket_addr) = socket_addr {
1469 Some(socket_addr.ip())
1470 } else {
1471 if cfg!(msim) {
1472 } else if cfg!(test) {
1474 panic!("Failed to get remote address from request");
1475 } else {
1476 self.metrics.connection_ip_not_found.inc();
1477 error!("Failed to get remote address from request");
1478 }
1479 None
1480 }
1481 }
1482 ClientIdSource::XForwardedFor(num_hops) => {
1483 let do_header_parse = |op: &MetadataValue<Ascii>| {
1484 match op.to_str() {
1485 Ok(header_val) => {
1486 let header_contents =
1487 header_val.split(',').map(str::trim).collect::<Vec<_>>();
1488 if *num_hops == 0 {
1489 error!(
1490 "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
1491 number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
1492 to this node. Skipping traffic controller request handling.",
1493 header_contents,
1494 );
1495 return None;
1496 }
1497 let contents_len = header_contents.len();
1498 if contents_len < *num_hops {
1499 error!(
1500 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1501 Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
1502 `client-id-source` in the node config.",
1503 header_contents, contents_len, num_hops, contents_len,
1504 );
1505 self.metrics.client_id_source_config_mismatch.inc();
1506 return None;
1507 }
1508 let Some(client_ip) = header_contents.get(contents_len - num_hops)
1509 else {
1510 error!(
1511 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1512 Expected at least {} values. Skipping traffic controller request handling.",
1513 header_contents, contents_len, num_hops, contents_len,
1514 );
1515 return None;
1516 };
1517 parse_ip(client_ip).or_else(|| {
1518 self.metrics.forwarded_header_parse_error.inc();
1519 None
1520 })
1521 }
1522 Err(e) => {
1523 self.metrics.forwarded_header_invalid.inc();
1527 error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
1528 None
1529 }
1530 }
1531 };
1532 if let Some(op) = request.metadata().get("x-forwarded-for") {
1533 do_header_parse(op)
1534 } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
1535 do_header_parse(op)
1536 } else {
1537 self.metrics.forwarded_header_not_included.inc();
1538 error!(
1539 "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
1540 );
1541 None
1542 }
1543 }
1544 }
1545 }
1546
1547 async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
1548 if let Some(traffic_controller) = &self.traffic_controller {
1549 if !traffic_controller.check(&client, &None).await {
1550 Err(tonic::Status::from_error(
1552 SuiErrorKind::TooManyRequests.into(),
1553 ))
1554 } else {
1555 Ok(())
1556 }
1557 } else {
1558 Ok(())
1559 }
1560 }
1561
1562 fn handle_traffic_resp<T>(
1563 &self,
1564 client: Option<IpAddr>,
1565 wrapped_response: WrappedServiceResponse<T>,
1566 ) -> Result<tonic::Response<T>, tonic::Status> {
1567 let (error, spam_weight, unwrapped_response) = match wrapped_response {
1568 Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
1569 Err(status) => (
1570 Some(SuiError::from(status.clone())),
1571 Weight::zero(),
1572 Err(status.clone()),
1573 ),
1574 };
1575
1576 if let Some(traffic_controller) = self.traffic_controller.clone() {
1577 traffic_controller.tally(TrafficTally {
1578 direct: client,
1579 through_fullnode: None,
1580 error_info: error.map(|e| {
1581 let error_type = String::from(e.clone().as_ref());
1582 let error_weight = normalize(e);
1583 (error_weight, error_type)
1584 }),
1585 spam_weight,
1586 timestamp: SystemTime::now(),
1587 })
1588 }
1589 unwrapped_response
1590 }
1591}
1592
1593fn normalize(err: SuiError) -> Weight {
1595 match err.as_inner() {
1596 SuiErrorKind::UserInputError {
1597 error: UserInputError::IncorrectUserSignature { .. },
1598 } => Weight::one(),
1599 SuiErrorKind::InvalidSignature { .. }
1600 | SuiErrorKind::SignerSignatureAbsent { .. }
1601 | SuiErrorKind::SignerSignatureNumberMismatch { .. }
1602 | SuiErrorKind::IncorrectSigner { .. }
1603 | SuiErrorKind::UnknownSigner { .. }
1604 | SuiErrorKind::WrongEpoch { .. } => Weight::one(),
1605 _ => Weight::zero(),
1606 }
1607}
1608
1609#[macro_export]
1613macro_rules! handle_with_decoration {
1614 ($self:ident, $func_name:ident, $request:ident) => {{
1615 if $self.client_id_source.is_none() {
1616 return $self.$func_name($request).await.map(|(result, _)| result);
1617 }
1618
1619 let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
1620
1621 $self.handle_traffic_req(client.clone()).await?;
1623
1624 let wrapped_response = $self.$func_name($request).await;
1626 $self.handle_traffic_resp(client, wrapped_response)
1627 }};
1628}
1629
1630#[async_trait]
1631impl Validator for ValidatorService {
1632 async fn submit_transaction(
1633 &self,
1634 request: tonic::Request<RawSubmitTxRequest>,
1635 ) -> Result<tonic::Response<RawSubmitTxResponse>, tonic::Status> {
1636 let validator_service = self.clone();
1637
1638 spawn_monitored_task!(async move {
1641 handle_with_decoration!(validator_service, handle_submit_transaction_impl, request)
1644 })
1645 .await
1646 .unwrap()
1647 }
1648
1649 async fn wait_for_effects(
1650 &self,
1651 request: tonic::Request<RawWaitForEffectsRequest>,
1652 ) -> Result<tonic::Response<RawWaitForEffectsResponse>, tonic::Status> {
1653 handle_with_decoration!(self, wait_for_effects_impl, request)
1654 }
1655
1656 async fn object_info(
1657 &self,
1658 request: tonic::Request<ObjectInfoRequest>,
1659 ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
1660 handle_with_decoration!(self, object_info_impl, request)
1661 }
1662
1663 async fn transaction_info(
1664 &self,
1665 request: tonic::Request<TransactionInfoRequest>,
1666 ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
1667 handle_with_decoration!(self, transaction_info_impl, request)
1668 }
1669
1670 async fn checkpoint(
1671 &self,
1672 request: tonic::Request<CheckpointRequest>,
1673 ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
1674 handle_with_decoration!(self, checkpoint_impl, request)
1675 }
1676
1677 async fn checkpoint_v2(
1678 &self,
1679 request: tonic::Request<CheckpointRequestV2>,
1680 ) -> Result<tonic::Response<CheckpointResponseV2>, tonic::Status> {
1681 handle_with_decoration!(self, checkpoint_v2_impl, request)
1682 }
1683
1684 async fn get_system_state_object(
1685 &self,
1686 request: tonic::Request<SystemStateRequest>,
1687 ) -> Result<tonic::Response<SuiSystemState>, tonic::Status> {
1688 handle_with_decoration!(self, get_system_state_object_impl, request)
1689 }
1690
1691 async fn validator_health(
1692 &self,
1693 request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1694 ) -> Result<tonic::Response<sui_types::messages_grpc::RawValidatorHealthResponse>, tonic::Status>
1695 {
1696 handle_with_decoration!(self, validator_health_impl, request)
1697 }
1698}