1use anyhow::Result;
6use async_trait::async_trait;
7use fastcrypto::traits::KeyPair;
8use futures::{TryFutureExt, future};
9use itertools::Itertools as _;
10use mysten_common::ZipDebugEqIteratorExt;
11use mysten_common::{assert_reachable, debug_fatal};
12use mysten_metrics::spawn_monitored_task;
13use prometheus::{
14 Gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, Registry,
15 register_gauge_with_registry, register_histogram_vec_with_registry,
16 register_histogram_with_registry, register_int_counter_vec_with_registry,
17 register_int_counter_with_registry,
18};
19use std::{
20 io,
21 net::{IpAddr, SocketAddr},
22 sync::Arc,
23 time::{Duration, SystemTime},
24};
25use sui_network::{
26 api::{Validator, ValidatorServer},
27 tonic,
28 validator::server::SUI_TLS_SERVER_NAME,
29};
30use sui_types::effects::TransactionEffectsAPI;
31use sui_types::message_envelope::Message;
32use sui_types::messages_consensus::{ConsensusPosition, ConsensusTransaction};
33use sui_types::messages_grpc::{
34 ObjectInfoRequest, ObjectInfoResponse, RawSubmitTxResponse, SystemStateRequest,
35 TransactionInfoRequest, TransactionInfoResponse,
36};
37use sui_types::multiaddr::Multiaddr;
38use sui_types::object::Object;
39use sui_types::sui_system_state::SuiSystemState;
40use sui_types::traffic_control::{ClientIdSource, Weight};
41use sui_types::{
42 base_types::ObjectID,
43 digests::TransactionEffectsDigest,
44 error::{SuiErrorKind, UserInputError},
45};
46use sui_types::{
47 effects::TransactionEffects,
48 messages_grpc::{
49 ExecutedData, RawSubmitTxRequest, RawWaitForEffectsRequest, RawWaitForEffectsResponse,
50 SubmitTxResult, WaitForEffectsRequest, WaitForEffectsResponse,
51 },
52};
53use sui_types::{effects::TransactionEvents, messages_grpc::SubmitTxType};
54use sui_types::{error::*, transaction::*};
55use sui_types::{
56 fp_ensure,
57 messages_checkpoint::{
58 CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
59 },
60};
61use tokio::time::timeout;
62use tonic::metadata::{Ascii, MetadataValue};
63use tracing::{debug, error, info, instrument};
64
65use crate::admission_queue::{AdmissionQueueContext, AdmissionQueueManager};
66use crate::gasless_rate_limiter::GaslessRateLimiter;
67use crate::{
68 authority::{AuthorityState, consensus_tx_status_cache::ConsensusTxStatus},
69 consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics, ConsensusOverloadChecker},
70 traffic_controller::{TrafficController, parse_ip, policies::TrafficTally},
71};
72use crate::{
73 authority::{
74 authority_per_epoch_store::AuthorityPerEpochStore,
75 consensus_tx_status_cache::NotifyReadConsensusTxStatusResult,
76 },
77 checkpoints::CheckpointStore,
78 mysticeti_adapter::LazyMysticetiClient,
79};
80use sui_config::local_ip_utils::new_local_tcp_address_for_testing;
81
82#[cfg(test)]
83#[path = "unit_tests/server_tests.rs"]
84mod server_tests;
85
86#[cfg(test)]
87#[path = "unit_tests/wait_for_effects_tests.rs"]
88mod wait_for_effects_tests;
89
90#[cfg(test)]
91#[path = "unit_tests/submit_transaction_tests.rs"]
92mod submit_transaction_tests;
93
94pub struct AuthorityServerHandle {
95 server_handle: sui_network::validator::server::Server,
96}
97
98impl AuthorityServerHandle {
99 pub async fn join(self) -> Result<(), io::Error> {
100 self.server_handle.handle().wait_for_shutdown().await;
101 Ok(())
102 }
103
104 pub async fn kill(self) -> Result<(), io::Error> {
105 self.server_handle.handle().shutdown().await;
106 Ok(())
107 }
108
109 pub fn address(&self) -> &Multiaddr {
110 self.server_handle.local_addr()
111 }
112}
113
114pub struct AuthorityServer {
115 address: Multiaddr,
116 pub state: Arc<AuthorityState>,
117 consensus_adapter: Arc<ConsensusAdapter>,
118 pub metrics: Arc<ValidatorServiceMetrics>,
119}
120
121impl AuthorityServer {
122 pub fn new_for_test_with_consensus_adapter(
123 state: Arc<AuthorityState>,
124 consensus_adapter: Arc<ConsensusAdapter>,
125 ) -> Self {
126 let address = new_local_tcp_address_for_testing();
127 let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
128
129 Self {
130 address,
131 state,
132 consensus_adapter,
133 metrics,
134 }
135 }
136
137 pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
138 let slot_freed_notify = Arc::new(tokio::sync::Notify::new());
139 let consensus_adapter = Arc::new(ConsensusAdapter::new(
140 Arc::new(LazyMysticetiClient::new()),
141 CheckpointStore::new_for_tests(),
142 state.name,
143 100_000,
144 100_000,
145 ConsensusAdapterMetrics::new_test(),
146 slot_freed_notify,
147 ));
148 Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
149 }
150
151 pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
152 let address = self.address.clone();
153 self.spawn_with_bind_address_for_test(address).await
154 }
155
156 pub async fn spawn_with_bind_address_for_test(
157 self,
158 address: Multiaddr,
159 ) -> Result<AuthorityServerHandle, io::Error> {
160 let tls_config = sui_tls::create_rustls_server_config(
161 self.state.config.network_key_pair().copy().private(),
162 SUI_TLS_SERVER_NAME.to_string(),
163 );
164 let config = mysten_network::config::Config::new();
165 let server = sui_network::validator::server::ServerBuilder::from_config(
166 &config,
167 mysten_network::metrics::DefaultMetricsCallbackProvider::default(),
168 )
169 .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
170 self.state,
171 self.consensus_adapter,
172 self.metrics,
173 )))
174 .bind(&address, Some(tls_config))
175 .await
176 .unwrap();
177 let local_addr = server.local_addr().to_owned();
178 info!("Listening to traffic on {local_addr}");
179 let handle = AuthorityServerHandle {
180 server_handle: server,
181 };
182 Ok(handle)
183 }
184}
185
186pub struct ValidatorServiceMetrics {
187 pub signature_errors: IntCounter,
188 pub tx_verification_latency: Histogram,
189 pub cert_verification_latency: Histogram,
190 pub handle_transaction_latency: Histogram,
191 pub submit_certificate_consensus_latency: Histogram,
192 pub handle_certificate_consensus_latency: Histogram,
193 pub handle_certificate_non_consensus_latency: Histogram,
194 pub handle_soft_bundle_certificates_consensus_latency: Histogram,
195 pub handle_soft_bundle_certificates_count: Histogram,
196 pub handle_soft_bundle_certificates_size_bytes: Histogram,
197 pub handle_transaction_consensus_latency: Histogram,
198 pub handle_submit_transaction_consensus_latency: HistogramVec,
199 pub handle_wait_for_effects_ping_latency: HistogramVec,
200
201 handle_submit_transaction_latency: HistogramVec,
202 handle_submit_transaction_bytes: HistogramVec,
203 handle_submit_transaction_batch_size: HistogramVec,
204
205 num_rejected_tx_during_overload: IntCounterVec,
206 submission_rejected_transactions: IntCounterVec,
207 connection_ip_not_found: IntCounter,
208 forwarded_header_parse_error: IntCounter,
209 forwarded_header_invalid: IntCounter,
210 forwarded_header_not_included: IntCounter,
211 client_id_source_config_mismatch: IntCounter,
212 x_forwarded_for_num_hops: Gauge,
213 pub gasless_rate_limited_count: IntCounter,
214 pub gasless_submission_outcomes: IntCounterVec,
215 admission_queue_direct_bypasses: IntCounter,
216}
217
218impl ValidatorServiceMetrics {
219 pub fn new(registry: &Registry) -> Self {
220 Self {
221 signature_errors: register_int_counter_with_registry!(
222 "total_signature_errors",
223 "Number of transaction signature errors",
224 registry,
225 )
226 .unwrap(),
227 tx_verification_latency: register_histogram_with_registry!(
228 "validator_service_tx_verification_latency",
229 "Latency of verifying a transaction",
230 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
231 registry,
232 )
233 .unwrap(),
234 cert_verification_latency: register_histogram_with_registry!(
235 "validator_service_cert_verification_latency",
236 "Latency of verifying a certificate",
237 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
238 registry,
239 )
240 .unwrap(),
241 handle_transaction_latency: register_histogram_with_registry!(
242 "validator_service_handle_transaction_latency",
243 "Latency of handling a transaction",
244 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
245 registry,
246 )
247 .unwrap(),
248 handle_certificate_consensus_latency: register_histogram_with_registry!(
249 "validator_service_handle_certificate_consensus_latency",
250 "Latency of handling a consensus transaction certificate",
251 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
252 registry,
253 )
254 .unwrap(),
255 submit_certificate_consensus_latency: register_histogram_with_registry!(
256 "validator_service_submit_certificate_consensus_latency",
257 "Latency of submit_certificate RPC handler",
258 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
259 registry,
260 )
261 .unwrap(),
262 handle_certificate_non_consensus_latency: register_histogram_with_registry!(
263 "validator_service_handle_certificate_non_consensus_latency",
264 "Latency of handling a non-consensus transaction certificate",
265 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
266 registry,
267 )
268 .unwrap(),
269 handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
270 "validator_service_handle_soft_bundle_certificates_consensus_latency",
271 "Latency of handling a consensus soft bundle",
272 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
273 registry,
274 )
275 .unwrap(),
276 handle_soft_bundle_certificates_count: register_histogram_with_registry!(
277 "handle_soft_bundle_certificates_count",
278 "The number of certificates included in a soft bundle",
279 mysten_metrics::COUNT_BUCKETS.to_vec(),
280 registry,
281 )
282 .unwrap(),
283 handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
284 "handle_soft_bundle_certificates_size_bytes",
285 "The size of soft bundle in bytes",
286 mysten_metrics::BYTES_BUCKETS.to_vec(),
287 registry,
288 )
289 .unwrap(),
290 handle_transaction_consensus_latency: register_histogram_with_registry!(
291 "validator_service_handle_transaction_consensus_latency",
292 "Latency of handling a user transaction sent through consensus",
293 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
294 registry,
295 )
296 .unwrap(),
297 handle_submit_transaction_consensus_latency: register_histogram_vec_with_registry!(
298 "validator_service_submit_transaction_consensus_latency",
299 "Latency of submitting a user transaction sent through consensus",
300 &["req_type"],
301 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
302 registry,
303 )
304 .unwrap(),
305 handle_submit_transaction_latency: register_histogram_vec_with_registry!(
306 "validator_service_submit_transaction_latency",
307 "Latency of submit transaction handler",
308 &["req_type"],
309 mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
310 registry,
311 )
312 .unwrap(),
313 handle_wait_for_effects_ping_latency: register_histogram_vec_with_registry!(
314 "validator_service_handle_wait_for_effects_ping_latency",
315 "Latency of handling a ping request for wait_for_effects",
316 &["req_type"],
317 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
318 registry,
319 )
320 .unwrap(),
321 handle_submit_transaction_bytes: register_histogram_vec_with_registry!(
322 "validator_service_submit_transaction_bytes",
323 "The size of transactions in the submit transaction request",
324 &["req_type"],
325 mysten_metrics::BYTES_BUCKETS.to_vec(),
326 registry,
327 )
328 .unwrap(),
329 handle_submit_transaction_batch_size: register_histogram_vec_with_registry!(
330 "validator_service_submit_transaction_batch_size",
331 "The number of transactions in the submit transaction request",
332 &["req_type"],
333 mysten_metrics::COUNT_BUCKETS.to_vec(),
334 registry,
335 )
336 .unwrap(),
337 num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
338 "validator_service_num_rejected_tx_during_overload",
339 "Number of rejected transaction due to system overload",
340 &["error_type"],
341 registry,
342 )
343 .unwrap(),
344 submission_rejected_transactions: register_int_counter_vec_with_registry!(
345 "validator_service_submission_rejected_transactions",
346 "Number of transactions rejected during submission",
347 &["reason"],
348 registry,
349 )
350 .unwrap(),
351 connection_ip_not_found: register_int_counter_with_registry!(
352 "validator_service_connection_ip_not_found",
353 "Number of times connection IP was not extractable from request",
354 registry,
355 )
356 .unwrap(),
357 forwarded_header_parse_error: register_int_counter_with_registry!(
358 "validator_service_forwarded_header_parse_error",
359 "Number of times x-forwarded-for header could not be parsed",
360 registry,
361 )
362 .unwrap(),
363 forwarded_header_invalid: register_int_counter_with_registry!(
364 "validator_service_forwarded_header_invalid",
365 "Number of times x-forwarded-for header was invalid",
366 registry,
367 )
368 .unwrap(),
369 forwarded_header_not_included: register_int_counter_with_registry!(
370 "validator_service_forwarded_header_not_included",
371 "Number of times x-forwarded-for header was (unexpectedly) not included in request",
372 registry,
373 )
374 .unwrap(),
375 client_id_source_config_mismatch: register_int_counter_with_registry!(
376 "validator_service_client_id_source_config_mismatch",
377 "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
378 registry,
379 )
380 .unwrap(),
381 x_forwarded_for_num_hops: register_gauge_with_registry!(
382 "validator_service_x_forwarded_for_num_hops",
383 "Number of hops in x-forwarded-for header",
384 registry,
385 )
386 .unwrap(),
387 gasless_rate_limited_count: register_int_counter_with_registry!(
388 "validator_service_gasless_rate_limited_count",
389 "Number of gasless transactions rejected by rate limiter",
390 registry,
391 )
392 .unwrap(),
393 gasless_submission_outcomes: register_int_counter_vec_with_registry!(
394 "validator_service_gasless_submission_outcomes",
395 "Number of valid gasless transaction submissions by outcome",
396 &["outcome"],
397 registry,
398 )
399 .unwrap(),
400 admission_queue_direct_bypasses: register_int_counter_with_registry!(
401 "validator_service_admission_queue_direct_bypasses",
402 "Number of transactions that bypassed the queue (system not overloaded)",
403 registry,
404 )
405 .unwrap(),
406 }
407 }
408
409 pub fn new_for_tests() -> Self {
410 let registry = Registry::new();
411 Self::new(®istry)
412 }
413}
414
415enum AdmissionQueueSubmitMode {
417 Bypass,
419 Queue,
421 Disabled,
425}
426
427#[derive(Clone)]
428pub struct ValidatorService {
429 state: Arc<AuthorityState>,
430 consensus_adapter: Arc<ConsensusAdapter>,
431 metrics: Arc<ValidatorServiceMetrics>,
432 traffic_controller: Option<Arc<TrafficController>>,
433 client_id_source: Option<ClientIdSource>,
434 gasless_limiter: GaslessRateLimiter,
435 admission_queue: Option<AdmissionQueueContext>,
436}
437
438impl ValidatorService {
439 pub fn new(
440 state: Arc<AuthorityState>,
441 consensus_adapter: Arc<ConsensusAdapter>,
442 validator_metrics: Arc<ValidatorServiceMetrics>,
443 client_id_source: Option<ClientIdSource>,
444 admission_queue: Option<AdmissionQueueContext>,
445 ) -> Self {
446 let traffic_controller = state.traffic_controller.clone();
447 let gasless_limiter = GaslessRateLimiter::new(state.consensus_gasless_counter.clone());
448 Self {
449 state,
450 consensus_adapter,
451 metrics: validator_metrics,
452 traffic_controller,
453 client_id_source,
454 gasless_limiter,
455 admission_queue,
456 }
457 }
458
459 pub fn new_for_tests(
460 state: Arc<AuthorityState>,
461 consensus_adapter: Arc<ConsensusAdapter>,
462 metrics: Arc<ValidatorServiceMetrics>,
463 ) -> Self {
464 let gasless_limiter = GaslessRateLimiter::new(state.consensus_gasless_counter.clone());
465 let epoch_store = state.epoch_store_for_testing().clone();
466 let slot_freed_notify = Arc::new(tokio::sync::Notify::new());
467 let manager = Arc::new(AdmissionQueueManager::new_for_tests(
468 consensus_adapter.clone(),
469 slot_freed_notify,
470 ));
471 let admission_queue = Some(AdmissionQueueContext::spawn(manager, epoch_store));
472 Self {
473 state,
474 consensus_adapter,
475 metrics,
476 traffic_controller: None,
477 client_id_source: None,
478 gasless_limiter,
479 admission_queue,
480 }
481 }
482
483 pub fn validator_state(&self) -> &Arc<AuthorityState> {
484 &self.state
485 }
486
487 pub fn handle_transaction_for_testing(&self, transaction: Transaction) -> SuiResult<()> {
489 let epoch_store = self.state.load_epoch_store_one_call_per_task();
490
491 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
493
494 let transaction = epoch_store
496 .verify_transaction_require_no_aliases(transaction)?
497 .into_tx();
498
499 self.state
501 .handle_vote_transaction(&epoch_store, transaction)?;
502
503 Ok(())
504 }
505
506 pub fn handle_transaction_for_testing_with_overload_check(
509 &self,
510 transaction: Transaction,
511 ) -> SuiResult<()> {
512 let epoch_store = self.state.load_epoch_store_one_call_per_task();
513
514 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
516
517 self.state.check_system_overload(
519 transaction.data(),
520 self.state.check_system_overload_at_signing(),
521 )?;
522
523 let transaction = epoch_store
525 .verify_transaction_require_no_aliases(transaction)?
526 .into_tx();
527
528 self.state
530 .handle_vote_transaction(&epoch_store, transaction)?;
531
532 Ok(())
533 }
534
535 async fn collect_immutable_object_ids(
538 &self,
539 tx: &VerifiedTransaction,
540 state: &AuthorityState,
541 ) -> SuiResult<Vec<ObjectID>> {
542 let input_objects = tx.data().transaction_data().input_objects()?;
543
544 let object_ids: Vec<ObjectID> = input_objects
546 .iter()
547 .filter_map(|obj| match obj {
548 InputObjectKind::ImmOrOwnedMoveObject((id, _, _)) => Some(*id),
549 _ => None,
550 })
551 .collect();
552 if object_ids.is_empty() {
553 return Ok(vec![]);
554 }
555
556 let objects = state.get_object_cache_reader().get_objects(&object_ids);
558
559 objects
561 .into_iter()
562 .zip_debug_eq(object_ids.iter())
563 .filter_map(|(obj, id)| {
564 let Some(o) = obj else {
565 return Some(Err::<ObjectID, SuiError>(
566 SuiErrorKind::UserInputError {
567 error: UserInputError::ObjectNotFound {
568 object_id: *id,
569 version: None,
570 },
571 }
572 .into(),
573 ));
574 };
575 if o.is_immutable() {
576 Some(Ok(*id))
577 } else {
578 None
579 }
580 })
581 .collect::<SuiResult<Vec<ObjectID>>>()
582 }
583
584 #[instrument(
585 name = "ValidatorService::handle_submit_transaction",
586 level = "error",
587 skip_all,
588 err(level = "debug")
589 )]
590 async fn handle_submit_transaction(
591 &self,
592 request: tonic::Request<RawSubmitTxRequest>,
593 ) -> WrappedServiceResponse<RawSubmitTxResponse> {
594 let Self {
595 state,
596 consensus_adapter: _,
597 metrics,
598 traffic_controller: _,
599 client_id_source,
600 gasless_limiter: _,
601 admission_queue: _,
602 } = self.clone();
603
604 let submitter_client_addr = if let Some(client_id_source) = &client_id_source {
605 self.get_client_ip_addr(&request, client_id_source)
606 } else {
607 self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
608 };
609
610 let inner = request.into_inner();
611 let start_epoch = state.load_epoch_store_one_call_per_task().epoch();
612
613 let next_epoch = start_epoch + 1;
614 let mut max_retries = 1;
615
616 loop {
617 let res = self
618 .handle_submit_transaction_inner(&state, &metrics, &inner, submitter_client_addr)
619 .await;
620 match res {
621 Ok((response, weight)) => return Ok((tonic::Response::new(response), weight)),
622 Err(err) => {
623 if max_retries > 0
624 && let SuiErrorKind::ValidatorHaltedAtEpochEnd = err.as_inner()
625 {
626 max_retries -= 1;
627
628 debug!(
629 "ValidatorHaltedAtEpochEnd. Will retry after validator reconfigures"
630 );
631
632 if let Ok(Ok(new_epoch)) =
633 timeout(Duration::from_secs(15), state.wait_for_epoch(next_epoch)).await
634 {
635 assert_reachable!("retry submission at epoch end");
636 if new_epoch == next_epoch {
637 continue;
638 }
639
640 debug_fatal!(
641 "expected epoch {} after reconfiguration. got {}",
642 next_epoch,
643 new_epoch
644 );
645 }
646 }
647 return Err(err.into());
648 }
649 }
650 }
651 }
652
653 async fn handle_submit_transaction_inner(
654 &self,
655 state: &AuthorityState,
656 metrics: &ValidatorServiceMetrics,
657 request: &RawSubmitTxRequest,
658 submitter_client_addr: Option<IpAddr>,
659 ) -> SuiResult<(RawSubmitTxResponse, Weight)> {
660 let epoch_store = state.load_epoch_store_one_call_per_task();
661 let submit_type = SubmitTxType::try_from(request.submit_type).map_err(|e| {
662 SuiErrorKind::GrpcMessageDeserializeError {
663 type_info: "RawSubmitTxRequest.submit_type".to_string(),
664 error: e.to_string(),
665 }
666 })?;
667
668 let is_ping_request = submit_type == SubmitTxType::Ping;
669 if is_ping_request {
670 fp_ensure!(
671 request.transactions.is_empty(),
672 SuiErrorKind::InvalidRequest(format!(
673 "Ping request cannot contain {} transactions",
674 request.transactions.len()
675 ))
676 .into()
677 );
678 } else {
679 fp_ensure!(
681 !request.transactions.is_empty(),
682 SuiErrorKind::InvalidRequest(
683 "At least one transaction needs to be submitted".to_string(),
684 )
685 .into()
686 );
687 }
688
689 let is_soft_bundle_request = submit_type == SubmitTxType::SoftBundle;
694
695 let max_num_transactions = if is_soft_bundle_request {
696 epoch_store.protocol_config().max_soft_bundle_size()
699 } else {
700 epoch_store
702 .protocol_config()
703 .max_num_transactions_in_block()
704 };
705 fp_ensure!(
706 request.transactions.len() <= max_num_transactions as usize,
707 SuiErrorKind::InvalidRequest(format!(
708 "Too many transactions in request: {} vs {}",
709 request.transactions.len(),
710 max_num_transactions
711 ))
712 .into()
713 );
714
715 let mut tx_digests = Vec::with_capacity(request.transactions.len());
717 let mut consensus_transactions = Vec::with_capacity(request.transactions.len());
719 let mut transaction_indexes = Vec::with_capacity(request.transactions.len());
721 let mut results: Vec<Option<SubmitTxResult>> = vec![None; request.transactions.len()];
723 let mut total_size_bytes = 0;
725 let mut spam_weight = Weight::zero();
727
728 let req_type = if is_ping_request {
729 "ping"
730 } else if request.transactions.len() == 1 {
731 "single_transaction"
732 } else if is_soft_bundle_request {
733 "soft_bundle"
734 } else {
735 "batch"
736 };
737
738 let _handle_tx_metrics_guard = metrics
739 .handle_submit_transaction_latency
740 .with_label_values(&[req_type])
741 .start_timer();
742
743 let submit_mode = self.classify_submit_mode(is_ping_request);
744
745 for (idx, tx_bytes) in request.transactions.iter().enumerate() {
746 let transaction = match bcs::from_bytes::<Transaction>(tx_bytes) {
747 Ok(txn) => txn,
748 Err(e) => {
749 return Err(SuiErrorKind::TransactionDeserializationError {
751 error: format!("Failed to deserialize transaction at index {}: {}", idx, e),
752 }
753 .into());
754 }
755 };
756
757 let tx_size = transaction.validity_check(&epoch_store.tx_validity_check_context())?;
759 let is_gasless = transaction
760 .data()
761 .transaction_data()
762 .is_gasless_transaction();
763
764 if is_gasless {
765 spam_weight = Weight::one();
767 metrics
768 .gasless_submission_outcomes
769 .with_label_values(&["attempted"])
770 .inc();
771 }
772
773 let overload_check_res = state.check_system_overload(
774 transaction.data(),
775 state.check_system_overload_at_signing(),
776 );
777 if let Err(error) = overload_check_res {
778 metrics
779 .num_rejected_tx_during_overload
780 .with_label_values(&[error.as_ref()])
781 .inc();
782 if is_gasless {
783 metrics
784 .gasless_submission_outcomes
785 .with_label_values(&["rejected_overload"])
786 .inc();
787 }
788 results[idx] = Some(SubmitTxResult::Rejected { error });
789 continue;
790 }
791
792 if matches!(submit_mode, AdmissionQueueSubmitMode::Disabled)
795 && let Err(error) = self.consensus_adapter.check_consensus_overload()
796 {
797 state.update_overload_metrics("consensus");
798 metrics
799 .num_rejected_tx_during_overload
800 .with_label_values(&[error.as_ref()])
801 .inc();
802 results[idx] = Some(SubmitTxResult::Rejected { error });
803 continue;
804 }
805
806 if is_gasless
807 && !self
808 .gasless_limiter
809 .try_acquire(epoch_store.protocol_config())
810 {
811 metrics.gasless_rate_limited_count.inc();
812 metrics
813 .gasless_submission_outcomes
814 .with_label_values(&["rejected_rate_limited"])
815 .inc();
816 results[idx] = Some(SubmitTxResult::Rejected {
817 error: SuiErrorKind::ValidatorOverloadedRetryAfter {
818 retry_after_secs: 1,
819 }
820 .into(),
821 });
822 continue;
823 }
824
825 let verified_transaction = {
827 let _metrics_guard = metrics.tx_verification_latency.start_timer();
828 if epoch_store.protocol_config().address_aliases() {
829 match epoch_store.verify_transaction_with_current_aliases(transaction) {
830 Ok(tx) => tx,
831 Err(e) => {
832 metrics.signature_errors.inc();
833 return Err(e);
834 }
835 }
836 } else {
837 match epoch_store.verify_transaction_require_no_aliases(transaction) {
838 Ok(tx) => tx,
839 Err(e) => {
840 metrics.signature_errors.inc();
841 return Err(e);
842 }
843 }
844 }
845 };
846
847 let tx_digest = *verified_transaction.tx().digest();
848 debug!(
849 ?tx_digest,
850 "handle_submit_transaction: verified transaction"
851 );
852
853 if let Some(effects) = state
856 .get_transaction_cache_reader()
857 .get_executed_effects(&tx_digest)
858 {
859 let effects_digest = effects.digest();
860 if let Ok(executed_data) = self.complete_executed_data(effects).await {
861 let executed_result = SubmitTxResult::Executed {
862 effects_digest,
863 details: Some(executed_data),
864 };
865 results[idx] = Some(executed_result);
866 debug!(?tx_digest, "handle_submit_transaction: already executed");
867 continue;
868 }
869 }
870
871 if self
872 .state
873 .get_transaction_cache_reader()
874 .transaction_executed_in_last_epoch(&tx_digest, epoch_store.epoch())
875 {
876 results[idx] = Some(SubmitTxResult::Rejected {
877 error: UserInputError::TransactionAlreadyExecuted { digest: tx_digest }.into(),
878 });
879 debug!(
880 ?tx_digest,
881 "handle_submit_transaction: transaction already executed in previous epoch"
882 );
883 continue;
884 }
885
886 debug!(
887 ?tx_digest,
888 "handle_submit_transaction: waiting for fastpath dependency objects"
889 );
890 if !state
891 .wait_for_fastpath_dependency_objects(
892 verified_transaction.tx(),
893 epoch_store.epoch(),
894 )
895 .await?
896 {
897 debug!(
898 ?tx_digest,
899 "fastpath input objects are still unavailable after waiting"
900 );
901 }
902
903 match state.handle_vote_transaction(&epoch_store, verified_transaction.tx().clone()) {
904 Ok(_) => { }
905 Err(e) => {
906 if let Some(effects) = state
909 .get_transaction_cache_reader()
910 .get_executed_effects(&tx_digest)
911 {
912 let effects_digest = effects.digest();
913 if let Ok(executed_data) = self.complete_executed_data(effects).await {
914 let executed_result = SubmitTxResult::Executed {
915 effects_digest,
916 details: Some(executed_data),
917 };
918 results[idx] = Some(executed_result);
919 continue;
920 }
921 }
922
923 debug!(?tx_digest, "Transaction rejected during submission: {e}");
925 metrics
926 .submission_rejected_transactions
927 .with_label_values(&[e.to_variant_name()])
928 .inc();
929 results[idx] = Some(SubmitTxResult::Rejected { error: e });
930 continue;
931 }
932 }
933
934 let mut claims = vec![];
936
937 let immutable_object_ids = self
938 .collect_immutable_object_ids(verified_transaction.tx(), state)
939 .await?;
940 if !immutable_object_ids.is_empty() {
941 claims.push(TransactionClaim::ImmutableInputObjects(
942 immutable_object_ids,
943 ));
944 }
945
946 let (tx, aliases) = verified_transaction.into_inner();
947 if epoch_store.protocol_config().address_aliases() {
948 if epoch_store
949 .protocol_config()
950 .fix_checkpoint_signature_mapping()
951 {
952 claims.push(TransactionClaim::AddressAliasesV2(aliases));
953 } else {
954 let v1_aliases: Vec<_> = tx
955 .data()
956 .intent_message()
957 .value
958 .required_signers()
959 .into_iter()
960 .zip_eq(aliases.into_iter().map(|(_, seq)| seq))
961 .collect();
962 #[allow(deprecated)]
963 claims.push(TransactionClaim::AddressAliases(
964 nonempty::NonEmpty::from_vec(v1_aliases)
965 .expect("must have at least one required_signer"),
966 ));
967 }
968 }
969
970 let tx_with_claims = TransactionWithClaims::new(tx.into(), claims);
971
972 consensus_transactions.push(ConsensusTransaction::new_user_transaction_v2_message(
973 &state.name,
974 tx_with_claims,
975 ));
976 if is_gasless {
977 metrics
978 .gasless_submission_outcomes
979 .with_label_values(&["submitted"])
980 .inc();
981 }
982
983 transaction_indexes.push(idx);
984 tx_digests.push(tx_digest);
985 total_size_bytes += tx_size;
986 }
987
988 if consensus_transactions.is_empty() && !is_ping_request {
989 return Ok((Self::try_from_submit_tx_response(results)?, spam_weight));
990 }
991
992 let max_transaction_bytes = if is_soft_bundle_request {
996 epoch_store
997 .protocol_config()
998 .consensus_max_transactions_in_block_bytes()
999 / 2
1000 } else {
1001 epoch_store
1002 .protocol_config()
1003 .consensus_max_transactions_in_block_bytes()
1004 };
1005 fp_ensure!(
1006 total_size_bytes <= max_transaction_bytes as usize,
1007 SuiErrorKind::UserInputError {
1008 error: UserInputError::TotalTransactionSizeTooLargeInBatch {
1009 size: total_size_bytes,
1010 limit: max_transaction_bytes,
1011 },
1012 }
1013 .into()
1014 );
1015
1016 metrics
1017 .handle_submit_transaction_bytes
1018 .with_label_values(&[req_type])
1019 .observe(total_size_bytes as f64);
1020 metrics
1021 .handle_submit_transaction_batch_size
1022 .with_label_values(&[req_type])
1023 .observe(consensus_transactions.len() as f64);
1024
1025 let _latency_metric_guard = metrics
1026 .handle_submit_transaction_consensus_latency
1027 .with_label_values(&[req_type])
1028 .start_timer();
1029
1030 if is_soft_bundle_request {
1031 assert!(
1034 !consensus_transactions.is_empty(),
1035 "A valid soft bundle must have at least one transaction"
1036 );
1037 }
1038
1039 let tx_groups: Vec<Vec<ConsensusTransaction>> = if is_soft_bundle_request || is_ping_request
1042 {
1043 vec![consensus_transactions]
1044 } else {
1045 consensus_transactions
1046 .into_iter()
1047 .map(|t| vec![t])
1048 .collect()
1049 };
1050
1051 let consensus_positions: Vec<ConsensusPosition> = match submit_mode {
1052 AdmissionQueueSubmitMode::Bypass | AdmissionQueueSubmitMode::Disabled => {
1053 if matches!(submit_mode, AdmissionQueueSubmitMode::Bypass) {
1054 self.metrics.admission_queue_direct_bypasses.inc();
1055 }
1056 let futures = tx_groups.into_iter().map(|txns| {
1057 debug!(
1058 "handle_submit_transaction: submitting consensus transactions ({}): {}",
1059 req_type,
1060 txns.iter().map(|t| t.local_display()).join(", ")
1061 );
1062 self.consensus_adapter.submit_and_get_positions(
1063 txns,
1064 &epoch_store,
1065 submitter_client_addr,
1066 )
1067 });
1068 future::try_join_all(futures)
1069 .await?
1070 .into_iter()
1071 .flatten()
1072 .collect()
1073 }
1074 AdmissionQueueSubmitMode::Queue => {
1075 let aq = self
1076 .admission_queue
1077 .as_ref()
1078 .expect("Queue mode implies admission_queue is Some")
1079 .load();
1080 let mut receivers = Vec::with_capacity(tx_groups.len());
1081 for txns in tx_groups {
1082 let gas_price = Self::extract_gas_price(&txns);
1083 let (rx, newly_inserted) = aq
1084 .try_insert(gas_price, txns, submitter_client_addr)
1085 .await?;
1086 if !newly_inserted {
1087 spam_weight = Weight::one();
1089 }
1090 receivers.push(rx);
1091 }
1092 let results = future::try_join_all(receivers.into_iter().map(|rx| async move {
1093 rx.await.map_err(|_| {
1094 SuiError::from(SuiErrorKind::TooManyTransactionsPendingConsensus)
1095 })?
1096 }))
1097 .await?;
1098 results.into_iter().flatten().collect()
1099 }
1100 };
1101
1102 if is_ping_request {
1103 assert_eq!(consensus_positions.len(), 1);
1105 results.push(Some(SubmitTxResult::Submitted {
1106 consensus_position: consensus_positions[0],
1107 }));
1108 } else {
1109 for ((idx, tx_digest), consensus_position) in transaction_indexes
1111 .into_iter()
1112 .zip_debug_eq(tx_digests)
1113 .zip_debug_eq(consensus_positions)
1114 {
1115 debug!(
1116 ?tx_digest,
1117 "handle_submit_transaction: submitted consensus transaction at {}",
1118 consensus_position,
1119 );
1120 results[idx] = Some(SubmitTxResult::Submitted { consensus_position });
1121 }
1122 }
1123
1124 Ok((Self::try_from_submit_tx_response(results)?, spam_weight))
1125 }
1126
1127 fn try_from_submit_tx_response(
1128 results: Vec<Option<SubmitTxResult>>,
1129 ) -> Result<RawSubmitTxResponse, SuiError> {
1130 let mut raw_results = Vec::new();
1131 for (i, result) in results.into_iter().enumerate() {
1132 let result = result.ok_or_else(|| SuiErrorKind::GenericAuthorityError {
1133 error: format!("Missing transaction result at {}", i),
1134 })?;
1135 let raw_result = result.try_into()?;
1136 raw_results.push(raw_result);
1137 }
1138 Ok(RawSubmitTxResponse {
1139 results: raw_results,
1140 })
1141 }
1142
1143 fn extract_gas_price(transactions: &[ConsensusTransaction]) -> u64 {
1146 use sui_types::messages_consensus::ConsensusTransactionKind;
1147 transactions
1148 .iter()
1149 .filter_map(|tx| match &tx.kind {
1150 ConsensusTransactionKind::CertifiedTransaction(cert) => Some(cert.gas_price()),
1151 ConsensusTransactionKind::UserTransaction(t) => {
1152 Some(t.data().transaction_data().gas_price())
1153 }
1154 ConsensusTransactionKind::UserTransactionV2(t) => {
1155 Some(t.tx().data().transaction_data().gas_price())
1156 }
1157 _ => None,
1158 })
1159 .min()
1160 .unwrap_or(0)
1161 }
1162
1163 fn classify_submit_mode(&self, is_ping_request: bool) -> AdmissionQueueSubmitMode {
1164 let Some(aq) = &self.admission_queue else {
1165 return AdmissionQueueSubmitMode::Disabled;
1166 };
1167
1168 if is_ping_request {
1169 return AdmissionQueueSubmitMode::Bypass;
1170 }
1171
1172 let inflight = usize::try_from(self.consensus_adapter.num_inflight_transactions()).unwrap();
1173 if inflight < aq.bypass_threshold() {
1174 return AdmissionQueueSubmitMode::Bypass;
1175 }
1176
1177 if aq.load().failover_tripped() {
1180 return AdmissionQueueSubmitMode::Disabled;
1181 }
1182
1183 AdmissionQueueSubmitMode::Queue
1184 }
1185
1186 async fn collect_effects_data(
1187 &self,
1188 effects: &TransactionEffects,
1189 include_events: bool,
1190 include_input_objects: bool,
1191 include_output_objects: bool,
1192 ) -> SuiResult<(Option<TransactionEvents>, Vec<Object>, Vec<Object>)> {
1193 let events = if include_events && effects.events_digest().is_some() {
1194 Some(
1195 self.state
1196 .get_transaction_events(effects.transaction_digest())?,
1197 )
1198 } else {
1199 None
1200 };
1201
1202 let input_objects = if include_input_objects {
1203 self.state.get_transaction_input_objects(effects)?
1204 } else {
1205 vec![]
1206 };
1207
1208 let output_objects = if include_output_objects {
1209 self.state.get_transaction_output_objects(effects)?
1210 } else {
1211 vec![]
1212 };
1213
1214 Ok((events, input_objects, output_objects))
1215 }
1216}
1217
1218type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
1219
1220impl ValidatorService {
1221 async fn handle_submit_transaction_impl(
1222 &self,
1223 request: tonic::Request<RawSubmitTxRequest>,
1224 ) -> WrappedServiceResponse<RawSubmitTxResponse> {
1225 self.handle_submit_transaction(request).await
1226 }
1227
1228 async fn wait_for_effects_impl(
1229 &self,
1230 request: tonic::Request<RawWaitForEffectsRequest>,
1231 ) -> WrappedServiceResponse<RawWaitForEffectsResponse> {
1232 let request: WaitForEffectsRequest = request.into_inner().try_into()?;
1233 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1234 let response = timeout(
1235 Duration::from_secs(20),
1237 epoch_store
1238 .within_alive_epoch(self.wait_for_effects_response(request, &epoch_store))
1239 .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch())),
1240 )
1241 .await
1242 .map_err(|_| tonic::Status::internal("Timeout waiting for effects"))???
1243 .try_into()?;
1244 Ok((tonic::Response::new(response), Weight::zero()))
1245 }
1246
1247 #[instrument(name= "ValidatorService::wait_for_effects_response", level = "error", skip_all, fields(consensus_position = ?request.consensus_position))]
1248 async fn wait_for_effects_response(
1249 &self,
1250 request: WaitForEffectsRequest,
1251 epoch_store: &Arc<AuthorityPerEpochStore>,
1252 ) -> SuiResult<WaitForEffectsResponse> {
1253 if request.ping_type.is_some() {
1254 return timeout(
1255 Duration::from_secs(10),
1256 self.ping_response(request, epoch_store),
1257 )
1258 .await
1259 .map_err(|_| SuiErrorKind::TimeoutError)?;
1260 }
1261
1262 let Some(tx_digest) = request.transaction_digest else {
1263 return Err(SuiErrorKind::InvalidRequest(
1264 "Transaction digest is required for wait for effects requests".to_string(),
1265 )
1266 .into());
1267 };
1268 let tx_digests = [tx_digest];
1269
1270 let consensus_status_future = async {
1274 let consensus_position = match request.consensus_position {
1275 Some(pos) => pos,
1276 None => return futures::future::pending().await,
1277 };
1278 let consensus_tx_status_cache = &epoch_store.consensus_tx_status_cache;
1279 consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1280 match consensus_tx_status_cache
1281 .notify_read_transaction_status(consensus_position)
1282 .await
1283 {
1284 NotifyReadConsensusTxStatusResult::Status(
1285 ConsensusTxStatus::Rejected | ConsensusTxStatus::Dropped,
1286 ) => Ok(WaitForEffectsResponse::Rejected {
1287 error: epoch_store.get_rejection_vote_reason(consensus_position),
1288 }),
1289 NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized) => {
1290 futures::future::pending().await
1292 }
1293 NotifyReadConsensusTxStatusResult::Expired(round) => {
1294 Ok(WaitForEffectsResponse::Expired {
1295 epoch: epoch_store.epoch(),
1296 round: Some(round),
1297 })
1298 }
1299 }
1300 };
1301
1302 tokio::select! {
1303 effects_result = self.state
1304 .get_transaction_cache_reader()
1305 .notify_read_executed_effects_may_fail(
1306 "AuthorityServer::wait_for_effects::notify_read_executed_effects_finalized",
1307 &tx_digests,
1308 ) => {
1309 let effects = effects_result?.pop().unwrap();
1310 let effects_digest = effects.digest();
1311 let details = if request.include_details {
1312 Some(self.complete_executed_data(effects).await?)
1313 } else {
1314 None
1315 };
1316 Ok(WaitForEffectsResponse::Executed {
1317 effects_digest,
1318 details,
1319 })
1320 }
1321 status_response = consensus_status_future => {
1322 status_response
1323 }
1324 }
1325 }
1326
1327 #[instrument(level = "error", skip_all, err(level = "debug"))]
1328 async fn ping_response(
1329 &self,
1330 request: WaitForEffectsRequest,
1331 epoch_store: &Arc<AuthorityPerEpochStore>,
1332 ) -> SuiResult<WaitForEffectsResponse> {
1333 let consensus_tx_status_cache = &epoch_store.consensus_tx_status_cache;
1334
1335 let Some(consensus_position) = request.consensus_position else {
1336 return Err(SuiErrorKind::InvalidRequest(
1337 "Consensus position is required for Ping requests".to_string(),
1338 )
1339 .into());
1340 };
1341
1342 let Some(ping) = request.ping_type else {
1344 return Err(SuiErrorKind::InvalidRequest(
1345 "Ping type is required for ping requests".to_string(),
1346 )
1347 .into());
1348 };
1349
1350 let _metrics_guard = self
1351 .metrics
1352 .handle_wait_for_effects_ping_latency
1353 .with_label_values(&[ping.as_str()])
1354 .start_timer();
1355
1356 consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1357
1358 let details = if request.include_details {
1359 Some(Box::new(ExecutedData::default()))
1360 } else {
1361 None
1362 };
1363
1364 let status = consensus_tx_status_cache
1365 .notify_read_transaction_status(consensus_position)
1366 .await;
1367 match status {
1368 NotifyReadConsensusTxStatusResult::Status(status) => match status {
1369 ConsensusTxStatus::Rejected | ConsensusTxStatus::Dropped => {
1370 Ok(WaitForEffectsResponse::Rejected {
1371 error: epoch_store.get_rejection_vote_reason(consensus_position),
1372 })
1373 }
1374 ConsensusTxStatus::Finalized => Ok(WaitForEffectsResponse::Executed {
1375 effects_digest: TransactionEffectsDigest::ZERO,
1376 details,
1377 }),
1378 },
1379 NotifyReadConsensusTxStatusResult::Expired(round) => {
1380 Ok(WaitForEffectsResponse::Expired {
1381 epoch: epoch_store.epoch(),
1382 round: Some(round),
1383 })
1384 }
1385 }
1386 }
1387
1388 async fn complete_executed_data(
1389 &self,
1390 effects: TransactionEffects,
1391 ) -> SuiResult<Box<ExecutedData>> {
1392 let (events, input_objects, output_objects) = self
1393 .collect_effects_data(
1394 &effects, true, true,
1395 true,
1396 )
1397 .await?;
1398 Ok(Box::new(ExecutedData {
1399 effects,
1400 events,
1401 input_objects,
1402 output_objects,
1403 }))
1404 }
1405
1406 async fn object_info_impl(
1407 &self,
1408 request: tonic::Request<ObjectInfoRequest>,
1409 ) -> WrappedServiceResponse<ObjectInfoResponse> {
1410 let request = request.into_inner();
1411 let response = self.state.handle_object_info_request(request).await?;
1412 Ok((tonic::Response::new(response), Weight::one()))
1413 }
1414
1415 async fn transaction_info_impl(
1416 &self,
1417 request: tonic::Request<TransactionInfoRequest>,
1418 ) -> WrappedServiceResponse<TransactionInfoResponse> {
1419 let request = request.into_inner();
1420 let response = self.state.handle_transaction_info_request(request).await?;
1421 Ok((tonic::Response::new(response), Weight::one()))
1422 }
1423
1424 async fn checkpoint_impl(
1425 &self,
1426 request: tonic::Request<CheckpointRequest>,
1427 ) -> WrappedServiceResponse<CheckpointResponse> {
1428 let request = request.into_inner();
1429 let response = self.state.handle_checkpoint_request(&request)?;
1430 Ok((tonic::Response::new(response), Weight::one()))
1431 }
1432
1433 async fn checkpoint_v2_impl(
1434 &self,
1435 request: tonic::Request<CheckpointRequestV2>,
1436 ) -> WrappedServiceResponse<CheckpointResponseV2> {
1437 let request = request.into_inner();
1438 let response = self.state.handle_checkpoint_request_v2(&request)?;
1439 Ok((tonic::Response::new(response), Weight::one()))
1440 }
1441
1442 async fn get_system_state_object_impl(
1443 &self,
1444 _request: tonic::Request<SystemStateRequest>,
1445 ) -> WrappedServiceResponse<SuiSystemState> {
1446 let response = self
1447 .state
1448 .get_object_cache_reader()
1449 .get_sui_system_state_object_unsafe()?;
1450 Ok((tonic::Response::new(response), Weight::one()))
1451 }
1452
1453 async fn validator_health_impl(
1454 &self,
1455 _request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1456 ) -> WrappedServiceResponse<sui_types::messages_grpc::RawValidatorHealthResponse> {
1457 let state = &self.state;
1458
1459 let epoch_store = state.load_epoch_store_one_call_per_task();
1461
1462 let num_inflight_execution_transactions =
1464 state.execution_scheduler().num_pending_certificates() as u64;
1465
1466 let num_inflight_consensus_transactions =
1468 self.consensus_adapter.num_inflight_transactions();
1469
1470 let last_committed_leader_round = epoch_store
1472 .consensus_tx_status_cache
1473 .get_last_committed_leader_round()
1474 .unwrap_or(0);
1475
1476 let last_locally_built_checkpoint = epoch_store
1478 .last_built_checkpoint_summary()
1479 .ok()
1480 .flatten()
1481 .map(|(_, summary)| summary.sequence_number)
1482 .unwrap_or(0);
1483
1484 let typed_response = sui_types::messages_grpc::ValidatorHealthResponse {
1485 num_inflight_consensus_transactions,
1486 num_inflight_execution_transactions,
1487 last_locally_built_checkpoint,
1488 last_committed_leader_round,
1489 };
1490
1491 let raw_response = typed_response
1492 .try_into()
1493 .map_err(|e: sui_types::error::SuiError| {
1494 tonic::Status::internal(format!("Failed to serialize health response: {}", e))
1495 })?;
1496
1497 Ok((tonic::Response::new(raw_response), Weight::one()))
1498 }
1499
1500 fn get_client_ip_addr<T>(
1501 &self,
1502 request: &tonic::Request<T>,
1503 source: &ClientIdSource,
1504 ) -> Option<IpAddr> {
1505 let forwarded_header = request.metadata().get_all("x-forwarded-for").iter().next();
1506
1507 if let Some(header) = forwarded_header {
1508 let num_hops = header
1509 .to_str()
1510 .map(|h| h.split(',').count().saturating_sub(1))
1511 .unwrap_or(0);
1512
1513 self.metrics.x_forwarded_for_num_hops.set(num_hops as f64);
1514 }
1515
1516 match source {
1517 ClientIdSource::SocketAddr => {
1518 let socket_addr: Option<SocketAddr> = request.remote_addr();
1519
1520 if let Some(socket_addr) = socket_addr {
1526 Some(socket_addr.ip())
1527 } else {
1528 if cfg!(msim) {
1529 } else if cfg!(test) {
1531 panic!("Failed to get remote address from request");
1532 } else {
1533 self.metrics.connection_ip_not_found.inc();
1534 error!("Failed to get remote address from request");
1535 }
1536 None
1537 }
1538 }
1539 ClientIdSource::XForwardedFor(num_hops) => {
1540 let do_header_parse = |op: &MetadataValue<Ascii>| {
1541 match op.to_str() {
1542 Ok(header_val) => {
1543 let header_contents =
1544 header_val.split(',').map(str::trim).collect::<Vec<_>>();
1545 if *num_hops == 0 {
1546 error!(
1547 "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
1548 number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
1549 to this node. Skipping traffic controller request handling.",
1550 header_contents,
1551 );
1552 return None;
1553 }
1554 let contents_len = header_contents.len();
1555 if contents_len < *num_hops {
1556 error!(
1557 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1558 Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
1559 `client-id-source` in the node config.",
1560 header_contents, contents_len, num_hops, contents_len,
1561 );
1562 self.metrics.client_id_source_config_mismatch.inc();
1563 return None;
1564 }
1565 let Some(client_ip) = header_contents.get(contents_len - num_hops)
1566 else {
1567 error!(
1568 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1569 Expected at least {} values. Skipping traffic controller request handling.",
1570 header_contents, contents_len, num_hops, contents_len,
1571 );
1572 return None;
1573 };
1574 parse_ip(client_ip).or_else(|| {
1575 self.metrics.forwarded_header_parse_error.inc();
1576 None
1577 })
1578 }
1579 Err(e) => {
1580 self.metrics.forwarded_header_invalid.inc();
1584 error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
1585 None
1586 }
1587 }
1588 };
1589 if let Some(op) = request.metadata().get("x-forwarded-for") {
1590 do_header_parse(op)
1591 } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
1592 do_header_parse(op)
1593 } else {
1594 self.metrics.forwarded_header_not_included.inc();
1595 error!(
1596 "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
1597 );
1598 None
1599 }
1600 }
1601 }
1602 }
1603
1604 async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
1605 if let Some(traffic_controller) = &self.traffic_controller {
1606 if !traffic_controller.check(&client, &None).await {
1607 Err(tonic::Status::from_error(
1609 SuiErrorKind::TooManyRequests.into(),
1610 ))
1611 } else {
1612 Ok(())
1613 }
1614 } else {
1615 Ok(())
1616 }
1617 }
1618
1619 fn handle_traffic_resp<T>(
1620 &self,
1621 client: Option<IpAddr>,
1622 wrapped_response: WrappedServiceResponse<T>,
1623 ) -> Result<tonic::Response<T>, tonic::Status> {
1624 let (error, spam_weight, unwrapped_response) = match wrapped_response {
1625 Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
1626 Err(status) => (
1627 Some(SuiError::from(status.clone())),
1628 Weight::zero(),
1629 Err(status.clone()),
1630 ),
1631 };
1632
1633 if let Some(traffic_controller) = self.traffic_controller.clone() {
1634 traffic_controller.tally(TrafficTally {
1635 direct: client,
1636 through_fullnode: None,
1637 error_info: error.map(|e| {
1638 let error_type = String::from(e.clone().as_ref());
1639 let error_weight = normalize(e);
1640 (error_weight, error_type)
1641 }),
1642 spam_weight,
1643 timestamp: SystemTime::now(),
1644 })
1645 }
1646 unwrapped_response
1647 }
1648}
1649
1650fn normalize(err: SuiError) -> Weight {
1652 match err.as_inner() {
1653 SuiErrorKind::UserInputError {
1654 error: UserInputError::IncorrectUserSignature { .. },
1655 } => Weight::one(),
1656 SuiErrorKind::InvalidSignature { .. }
1657 | SuiErrorKind::SignerSignatureAbsent { .. }
1658 | SuiErrorKind::SignerSignatureNumberMismatch { .. }
1659 | SuiErrorKind::IncorrectSigner { .. }
1660 | SuiErrorKind::UnknownSigner { .. }
1661 | SuiErrorKind::WrongEpoch { .. } => Weight::one(),
1662 _ => Weight::zero(),
1663 }
1664}
1665
1666#[macro_export]
1670macro_rules! handle_with_decoration {
1671 ($self:ident, $func_name:ident, $request:ident) => {{
1672 if $self.client_id_source.is_none() {
1673 return $self.$func_name($request).await.map(|(result, _)| result);
1674 }
1675
1676 let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
1677
1678 $self.handle_traffic_req(client.clone()).await?;
1680
1681 let wrapped_response = $self.$func_name($request).await;
1683 $self.handle_traffic_resp(client, wrapped_response)
1684 }};
1685}
1686
1687#[async_trait]
1688impl Validator for ValidatorService {
1689 async fn submit_transaction(
1690 &self,
1691 request: tonic::Request<RawSubmitTxRequest>,
1692 ) -> Result<tonic::Response<RawSubmitTxResponse>, tonic::Status> {
1693 let validator_service = self.clone();
1694
1695 spawn_monitored_task!(async move {
1698 handle_with_decoration!(validator_service, handle_submit_transaction_impl, request)
1701 })
1702 .await
1703 .unwrap()
1704 }
1705
1706 async fn wait_for_effects(
1707 &self,
1708 request: tonic::Request<RawWaitForEffectsRequest>,
1709 ) -> Result<tonic::Response<RawWaitForEffectsResponse>, tonic::Status> {
1710 handle_with_decoration!(self, wait_for_effects_impl, request)
1711 }
1712
1713 async fn object_info(
1714 &self,
1715 request: tonic::Request<ObjectInfoRequest>,
1716 ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
1717 handle_with_decoration!(self, object_info_impl, request)
1718 }
1719
1720 async fn transaction_info(
1721 &self,
1722 request: tonic::Request<TransactionInfoRequest>,
1723 ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
1724 handle_with_decoration!(self, transaction_info_impl, request)
1725 }
1726
1727 async fn checkpoint(
1728 &self,
1729 request: tonic::Request<CheckpointRequest>,
1730 ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
1731 handle_with_decoration!(self, checkpoint_impl, request)
1732 }
1733
1734 async fn checkpoint_v2(
1735 &self,
1736 request: tonic::Request<CheckpointRequestV2>,
1737 ) -> Result<tonic::Response<CheckpointResponseV2>, tonic::Status> {
1738 handle_with_decoration!(self, checkpoint_v2_impl, request)
1739 }
1740
1741 async fn get_system_state_object(
1742 &self,
1743 request: tonic::Request<SystemStateRequest>,
1744 ) -> Result<tonic::Response<SuiSystemState>, tonic::Status> {
1745 handle_with_decoration!(self, get_system_state_object_impl, request)
1746 }
1747
1748 async fn validator_health(
1749 &self,
1750 request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1751 ) -> Result<tonic::Response<sui_types::messages_grpc::RawValidatorHealthResponse>, tonic::Status>
1752 {
1753 handle_with_decoration!(self, validator_health_impl, request)
1754 }
1755}