1use anyhow::Result;
6use async_trait::async_trait;
7use fastcrypto::traits::KeyPair;
8use futures::{TryFutureExt, future};
9use itertools::Itertools as _;
10use mysten_common::{assert_reachable, debug_fatal};
11use mysten_metrics::spawn_monitored_task;
12use prometheus::{
13 Gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, Registry,
14 register_gauge_with_registry, register_histogram_vec_with_registry,
15 register_histogram_with_registry, register_int_counter_vec_with_registry,
16 register_int_counter_with_registry,
17};
18use std::{
19 cmp::Ordering,
20 future::Future,
21 io,
22 net::{IpAddr, SocketAddr},
23 pin::Pin,
24 sync::Arc,
25 time::{Duration, SystemTime},
26};
27use sui_network::{
28 api::{Validator, ValidatorServer},
29 tonic,
30 validator::server::SUI_TLS_SERVER_NAME,
31};
32use sui_types::effects::TransactionEffectsAPI;
33use sui_types::message_envelope::Message;
34use sui_types::messages_consensus::ConsensusPosition;
35use sui_types::messages_consensus::ConsensusTransaction;
36use sui_types::messages_grpc::{
37 ObjectInfoRequest, ObjectInfoResponse, RawSubmitTxResponse, SystemStateRequest,
38 TransactionInfoRequest, TransactionInfoResponse,
39};
40use sui_types::multiaddr::Multiaddr;
41use sui_types::object::Object;
42use sui_types::sui_system_state::SuiSystemState;
43use sui_types::traffic_control::{ClientIdSource, Weight};
44use sui_types::{
45 base_types::ObjectID,
46 digests::{TransactionDigest, TransactionEffectsDigest},
47 error::{SuiErrorKind, UserInputError},
48};
49use sui_types::{
50 effects::TransactionEffects,
51 messages_grpc::{
52 ExecutedData, RawSubmitTxRequest, RawWaitForEffectsRequest, RawWaitForEffectsResponse,
53 SubmitTxResult, WaitForEffectsRequest, WaitForEffectsResponse,
54 },
55};
56use sui_types::{effects::TransactionEvents, messages_grpc::SubmitTxType};
57use sui_types::{error::*, transaction::*};
58use sui_types::{
59 fp_ensure,
60 messages_checkpoint::{
61 CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
62 },
63};
64use tokio::sync::oneshot;
65use tokio::time::timeout;
66use tonic::metadata::{Ascii, MetadataValue};
67use tracing::{debug, error, info, instrument};
68
69use crate::consensus_adapter::ConnectionMonitorStatusForTests;
70use crate::{
71 authority::{AuthorityState, consensus_tx_status_cache::ConsensusTxStatus},
72 consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics},
73 traffic_controller::{TrafficController, parse_ip, policies::TrafficTally},
74};
75use crate::{
76 authority::{
77 authority_per_epoch_store::AuthorityPerEpochStore,
78 consensus_tx_status_cache::NotifyReadConsensusTxStatusResult,
79 },
80 checkpoints::CheckpointStore,
81 mysticeti_adapter::LazyMysticetiClient,
82};
83use sui_config::local_ip_utils::new_local_tcp_address_for_testing;
84use sui_types::messages_grpc::PingType;
85
86#[cfg(test)]
87#[path = "unit_tests/server_tests.rs"]
88mod server_tests;
89
90#[cfg(test)]
91#[path = "unit_tests/wait_for_effects_tests.rs"]
92mod wait_for_effects_tests;
93
94#[cfg(test)]
95#[path = "unit_tests/submit_transaction_tests.rs"]
96mod submit_transaction_tests;
97
98pub struct AuthorityServerHandle {
99 server_handle: sui_network::validator::server::Server,
100}
101
102impl AuthorityServerHandle {
103 pub async fn join(self) -> Result<(), io::Error> {
104 self.server_handle.handle().wait_for_shutdown().await;
105 Ok(())
106 }
107
108 pub async fn kill(self) -> Result<(), io::Error> {
109 self.server_handle.handle().shutdown().await;
110 Ok(())
111 }
112
113 pub fn address(&self) -> &Multiaddr {
114 self.server_handle.local_addr()
115 }
116}
117
118pub struct AuthorityServer {
119 address: Multiaddr,
120 pub state: Arc<AuthorityState>,
121 consensus_adapter: Arc<ConsensusAdapter>,
122 pub metrics: Arc<ValidatorServiceMetrics>,
123}
124
125impl AuthorityServer {
126 pub fn new_for_test_with_consensus_adapter(
127 state: Arc<AuthorityState>,
128 consensus_adapter: Arc<ConsensusAdapter>,
129 ) -> Self {
130 let address = new_local_tcp_address_for_testing();
131 let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
132
133 Self {
134 address,
135 state,
136 consensus_adapter,
137 metrics,
138 }
139 }
140
141 pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
142 let consensus_adapter = Arc::new(ConsensusAdapter::new(
143 Arc::new(LazyMysticetiClient::new()),
144 CheckpointStore::new_for_tests(),
145 state.name,
146 Arc::new(ConnectionMonitorStatusForTests {}),
147 100_000,
148 100_000,
149 None,
150 None,
151 ConsensusAdapterMetrics::new_test(),
152 state.epoch_store_for_testing().protocol_config().clone(),
153 ));
154 Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
155 }
156
157 pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
158 let address = self.address.clone();
159 self.spawn_with_bind_address_for_test(address).await
160 }
161
162 pub async fn spawn_with_bind_address_for_test(
163 self,
164 address: Multiaddr,
165 ) -> Result<AuthorityServerHandle, io::Error> {
166 let tls_config = sui_tls::create_rustls_server_config(
167 self.state.config.network_key_pair().copy().private(),
168 SUI_TLS_SERVER_NAME.to_string(),
169 );
170 let config = mysten_network::config::Config::new();
171 let server = sui_network::validator::server::ServerBuilder::from_config(
172 &config,
173 mysten_network::metrics::DefaultMetricsCallbackProvider::default(),
174 )
175 .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
176 self.state,
177 self.consensus_adapter,
178 self.metrics,
179 )))
180 .bind(&address, Some(tls_config))
181 .await
182 .unwrap();
183 let local_addr = server.local_addr().to_owned();
184 info!("Listening to traffic on {local_addr}");
185 let handle = AuthorityServerHandle {
186 server_handle: server,
187 };
188 Ok(handle)
189 }
190}
191
192pub struct ValidatorServiceMetrics {
193 pub signature_errors: IntCounter,
194 pub tx_verification_latency: Histogram,
195 pub cert_verification_latency: Histogram,
196 pub consensus_latency: Histogram,
197 pub handle_transaction_latency: Histogram,
198 pub submit_certificate_consensus_latency: Histogram,
199 pub handle_certificate_consensus_latency: Histogram,
200 pub handle_certificate_non_consensus_latency: Histogram,
201 pub handle_soft_bundle_certificates_consensus_latency: Histogram,
202 pub handle_soft_bundle_certificates_count: Histogram,
203 pub handle_soft_bundle_certificates_size_bytes: Histogram,
204 pub handle_transaction_consensus_latency: Histogram,
205 pub handle_submit_transaction_consensus_latency: HistogramVec,
206 pub handle_wait_for_effects_ping_latency: HistogramVec,
207
208 handle_submit_transaction_latency: HistogramVec,
209 handle_submit_transaction_bytes: HistogramVec,
210 handle_submit_transaction_batch_size: HistogramVec,
211
212 num_rejected_cert_in_epoch_boundary: IntCounter,
213 num_rejected_tx_during_overload: IntCounterVec,
214 submission_rejected_transactions: IntCounterVec,
215 connection_ip_not_found: IntCounter,
216 forwarded_header_parse_error: IntCounter,
217 forwarded_header_invalid: IntCounter,
218 forwarded_header_not_included: IntCounter,
219 client_id_source_config_mismatch: IntCounter,
220 x_forwarded_for_num_hops: Gauge,
221}
222
223impl ValidatorServiceMetrics {
224 pub fn new(registry: &Registry) -> Self {
225 Self {
226 signature_errors: register_int_counter_with_registry!(
227 "total_signature_errors",
228 "Number of transaction signature errors",
229 registry,
230 )
231 .unwrap(),
232 tx_verification_latency: register_histogram_with_registry!(
233 "validator_service_tx_verification_latency",
234 "Latency of verifying a transaction",
235 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
236 registry,
237 )
238 .unwrap(),
239 cert_verification_latency: register_histogram_with_registry!(
240 "validator_service_cert_verification_latency",
241 "Latency of verifying a certificate",
242 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
243 registry,
244 )
245 .unwrap(),
246 consensus_latency: register_histogram_with_registry!(
247 "validator_service_consensus_latency",
248 "Time spent between submitting a txn to consensus and getting back local acknowledgement. Execution and finalization time are not included.",
249 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
250 registry,
251 )
252 .unwrap(),
253 handle_transaction_latency: register_histogram_with_registry!(
254 "validator_service_handle_transaction_latency",
255 "Latency of handling a transaction",
256 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
257 registry,
258 )
259 .unwrap(),
260 handle_certificate_consensus_latency: register_histogram_with_registry!(
261 "validator_service_handle_certificate_consensus_latency",
262 "Latency of handling a consensus transaction certificate",
263 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
264 registry,
265 )
266 .unwrap(),
267 submit_certificate_consensus_latency: register_histogram_with_registry!(
268 "validator_service_submit_certificate_consensus_latency",
269 "Latency of submit_certificate RPC handler",
270 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
271 registry,
272 )
273 .unwrap(),
274 handle_certificate_non_consensus_latency: register_histogram_with_registry!(
275 "validator_service_handle_certificate_non_consensus_latency",
276 "Latency of handling a non-consensus transaction certificate",
277 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
278 registry,
279 )
280 .unwrap(),
281 handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
282 "validator_service_handle_soft_bundle_certificates_consensus_latency",
283 "Latency of handling a consensus soft bundle",
284 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
285 registry,
286 )
287 .unwrap(),
288 handle_soft_bundle_certificates_count: register_histogram_with_registry!(
289 "handle_soft_bundle_certificates_count",
290 "The number of certificates included in a soft bundle",
291 mysten_metrics::COUNT_BUCKETS.to_vec(),
292 registry,
293 )
294 .unwrap(),
295 handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
296 "handle_soft_bundle_certificates_size_bytes",
297 "The size of soft bundle in bytes",
298 mysten_metrics::BYTES_BUCKETS.to_vec(),
299 registry,
300 )
301 .unwrap(),
302 handle_transaction_consensus_latency: register_histogram_with_registry!(
303 "validator_service_handle_transaction_consensus_latency",
304 "Latency of handling a user transaction sent through consensus",
305 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
306 registry,
307 )
308 .unwrap(),
309 handle_submit_transaction_consensus_latency: register_histogram_vec_with_registry!(
310 "validator_service_submit_transaction_consensus_latency",
311 "Latency of submitting a user transaction sent through consensus",
312 &["req_type"],
313 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
314 registry,
315 )
316 .unwrap(),
317 handle_submit_transaction_latency: register_histogram_vec_with_registry!(
318 "validator_service_submit_transaction_latency",
319 "Latency of submit transaction handler",
320 &["req_type"],
321 mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
322 registry,
323 )
324 .unwrap(),
325 handle_wait_for_effects_ping_latency: register_histogram_vec_with_registry!(
326 "validator_service_handle_wait_for_effects_ping_latency",
327 "Latency of handling a ping request for wait_for_effects",
328 &["req_type"],
329 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
330 registry,
331 )
332 .unwrap(),
333 handle_submit_transaction_bytes: register_histogram_vec_with_registry!(
334 "validator_service_submit_transaction_bytes",
335 "The size of transactions in the submit transaction request",
336 &["req_type"],
337 mysten_metrics::BYTES_BUCKETS.to_vec(),
338 registry,
339 )
340 .unwrap(),
341 handle_submit_transaction_batch_size: register_histogram_vec_with_registry!(
342 "validator_service_submit_transaction_batch_size",
343 "The number of transactions in the submit transaction request",
344 &["req_type"],
345 mysten_metrics::COUNT_BUCKETS.to_vec(),
346 registry,
347 )
348 .unwrap(),
349 num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
350 "validator_service_num_rejected_cert_in_epoch_boundary",
351 "Number of rejected transaction certificate during epoch transitioning",
352 registry,
353 )
354 .unwrap(),
355 num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
356 "validator_service_num_rejected_tx_during_overload",
357 "Number of rejected transaction due to system overload",
358 &["error_type"],
359 registry,
360 )
361 .unwrap(),
362 submission_rejected_transactions: register_int_counter_vec_with_registry!(
363 "validator_service_submission_rejected_transactions",
364 "Number of transactions rejected during submission",
365 &["reason"],
366 registry,
367 )
368 .unwrap(),
369 connection_ip_not_found: register_int_counter_with_registry!(
370 "validator_service_connection_ip_not_found",
371 "Number of times connection IP was not extractable from request",
372 registry,
373 )
374 .unwrap(),
375 forwarded_header_parse_error: register_int_counter_with_registry!(
376 "validator_service_forwarded_header_parse_error",
377 "Number of times x-forwarded-for header could not be parsed",
378 registry,
379 )
380 .unwrap(),
381 forwarded_header_invalid: register_int_counter_with_registry!(
382 "validator_service_forwarded_header_invalid",
383 "Number of times x-forwarded-for header was invalid",
384 registry,
385 )
386 .unwrap(),
387 forwarded_header_not_included: register_int_counter_with_registry!(
388 "validator_service_forwarded_header_not_included",
389 "Number of times x-forwarded-for header was (unexpectedly) not included in request",
390 registry,
391 )
392 .unwrap(),
393 client_id_source_config_mismatch: register_int_counter_with_registry!(
394 "validator_service_client_id_source_config_mismatch",
395 "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
396 registry,
397 )
398 .unwrap(),
399 x_forwarded_for_num_hops: register_gauge_with_registry!(
400 "validator_service_x_forwarded_for_num_hops",
401 "Number of hops in x-forwarded-for header",
402 registry,
403 )
404 .unwrap(),
405 }
406 }
407
408 pub fn new_for_tests() -> Self {
409 let registry = Registry::new();
410 Self::new(®istry)
411 }
412}
413
414#[derive(Clone)]
415pub struct ValidatorService {
416 state: Arc<AuthorityState>,
417 consensus_adapter: Arc<ConsensusAdapter>,
418 metrics: Arc<ValidatorServiceMetrics>,
419 traffic_controller: Option<Arc<TrafficController>>,
420 client_id_source: Option<ClientIdSource>,
421}
422
423impl ValidatorService {
424 pub fn new(
425 state: Arc<AuthorityState>,
426 consensus_adapter: Arc<ConsensusAdapter>,
427 validator_metrics: Arc<ValidatorServiceMetrics>,
428 client_id_source: Option<ClientIdSource>,
429 ) -> Self {
430 let traffic_controller = state.traffic_controller.clone();
431 Self {
432 state,
433 consensus_adapter,
434 metrics: validator_metrics,
435 traffic_controller,
436 client_id_source,
437 }
438 }
439
440 pub fn new_for_tests(
441 state: Arc<AuthorityState>,
442 consensus_adapter: Arc<ConsensusAdapter>,
443 metrics: Arc<ValidatorServiceMetrics>,
444 ) -> Self {
445 Self {
446 state,
447 consensus_adapter,
448 metrics,
449 traffic_controller: None,
450 client_id_source: None,
451 }
452 }
453
454 pub fn validator_state(&self) -> &Arc<AuthorityState> {
455 &self.state
456 }
457
458 pub fn handle_transaction_for_testing(&self, transaction: Transaction) -> SuiResult<()> {
460 let epoch_store = self.state.load_epoch_store_one_call_per_task();
461
462 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
464
465 let transaction = epoch_store
467 .verify_transaction_require_no_aliases(transaction)?
468 .into_tx();
469
470 self.state
472 .handle_vote_transaction(&epoch_store, transaction)?;
473
474 Ok(())
475 }
476
477 pub fn handle_transaction_for_testing_with_overload_check(
480 &self,
481 transaction: Transaction,
482 ) -> SuiResult<()> {
483 let epoch_store = self.state.load_epoch_store_one_call_per_task();
484
485 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
487
488 self.state.check_system_overload(
490 self.consensus_adapter.as_ref(),
491 transaction.data(),
492 self.state.check_system_overload_at_signing(),
493 )?;
494
495 let transaction = epoch_store
497 .verify_transaction_require_no_aliases(transaction)?
498 .into_tx();
499
500 self.state
502 .handle_vote_transaction(&epoch_store, transaction)?;
503
504 Ok(())
505 }
506
507 async fn collect_immutable_object_ids(
510 &self,
511 tx: &VerifiedTransaction,
512 state: &AuthorityState,
513 ) -> SuiResult<Vec<ObjectID>> {
514 let input_objects = tx.data().transaction_data().input_objects()?;
515
516 let object_ids: Vec<ObjectID> = input_objects
518 .iter()
519 .filter_map(|obj| match obj {
520 InputObjectKind::ImmOrOwnedMoveObject((id, _, _)) => Some(*id),
521 _ => None,
522 })
523 .collect();
524 if object_ids.is_empty() {
525 return Ok(vec![]);
526 }
527
528 let objects = state.get_object_cache_reader().get_objects(&object_ids);
530
531 objects
533 .into_iter()
534 .zip(object_ids.iter())
535 .filter_map(|(obj, id)| {
536 let Some(o) = obj else {
537 return Some(Err::<ObjectID, SuiError>(
538 SuiErrorKind::UserInputError {
539 error: UserInputError::ObjectNotFound {
540 object_id: *id,
541 version: None,
542 },
543 }
544 .into(),
545 ));
546 };
547 if o.is_immutable() {
548 Some(Ok(*id))
549 } else {
550 None
551 }
552 })
553 .collect::<SuiResult<Vec<ObjectID>>>()
554 }
555
556 #[instrument(
557 name = "ValidatorService::handle_submit_transaction",
558 level = "error",
559 skip_all,
560 err(level = "debug")
561 )]
562 async fn handle_submit_transaction(
563 &self,
564 request: tonic::Request<RawSubmitTxRequest>,
565 ) -> WrappedServiceResponse<RawSubmitTxResponse> {
566 let Self {
567 state,
568 consensus_adapter,
569 metrics,
570 traffic_controller: _,
571 client_id_source,
572 } = self.clone();
573
574 let submitter_client_addr = if let Some(client_id_source) = &client_id_source {
575 self.get_client_ip_addr(&request, client_id_source)
576 } else {
577 self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
578 };
579
580 let inner = request.into_inner();
581 let start_epoch = state.load_epoch_store_one_call_per_task().epoch();
582
583 let next_epoch = start_epoch + 1;
584 let mut max_retries = 1;
585
586 loop {
587 let res = self
588 .handle_submit_transaction_inner(
589 &state,
590 &consensus_adapter,
591 &metrics,
592 &inner,
593 submitter_client_addr,
594 )
595 .await;
596 match res {
597 Ok((response, weight)) => return Ok((tonic::Response::new(response), weight)),
598 Err(err) => {
599 if max_retries > 0
600 && let SuiErrorKind::ValidatorHaltedAtEpochEnd = err.as_inner()
601 {
602 max_retries -= 1;
603
604 debug!(
605 "ValidatorHaltedAtEpochEnd. Will retry after validator reconfigures"
606 );
607
608 if let Ok(Ok(new_epoch)) =
609 timeout(Duration::from_secs(15), state.wait_for_epoch(next_epoch)).await
610 {
611 assert_reachable!("retry submission at epoch end");
612 if new_epoch == next_epoch {
613 continue;
614 }
615
616 debug_fatal!(
617 "expected epoch {} after reconfiguration. got {}",
618 next_epoch,
619 new_epoch
620 );
621 }
622 }
623 return Err(err.into());
624 }
625 }
626 }
627 }
628
629 async fn handle_submit_transaction_inner(
630 &self,
631 state: &AuthorityState,
632 consensus_adapter: &ConsensusAdapter,
633 metrics: &ValidatorServiceMetrics,
634 request: &RawSubmitTxRequest,
635 submitter_client_addr: Option<IpAddr>,
636 ) -> SuiResult<(RawSubmitTxResponse, Weight)> {
637 let epoch_store = state.load_epoch_store_one_call_per_task();
638 if !epoch_store.protocol_config().mysticeti_fastpath() {
639 return Err(SuiErrorKind::UnsupportedFeatureError {
640 error: "Mysticeti fastpath".to_string(),
641 }
642 .into());
643 }
644
645 let submit_type = SubmitTxType::try_from(request.submit_type).map_err(|e| {
646 SuiErrorKind::GrpcMessageDeserializeError {
647 type_info: "RawSubmitTxRequest.submit_type".to_string(),
648 error: e.to_string(),
649 }
650 })?;
651
652 let is_ping_request = submit_type == SubmitTxType::Ping;
653 if is_ping_request {
654 fp_ensure!(
655 request.transactions.is_empty(),
656 SuiErrorKind::InvalidRequest(format!(
657 "Ping request cannot contain {} transactions",
658 request.transactions.len()
659 ))
660 .into()
661 );
662 } else {
663 fp_ensure!(
665 !request.transactions.is_empty(),
666 SuiErrorKind::InvalidRequest(
667 "At least one transaction needs to be submitted".to_string(),
668 )
669 .into()
670 );
671 }
672
673 let is_soft_bundle_request = submit_type == SubmitTxType::SoftBundle;
678
679 let max_num_transactions = if is_soft_bundle_request {
680 epoch_store.protocol_config().max_soft_bundle_size()
683 } else {
684 epoch_store
686 .protocol_config()
687 .max_num_transactions_in_block()
688 };
689 fp_ensure!(
690 request.transactions.len() <= max_num_transactions as usize,
691 SuiErrorKind::InvalidRequest(format!(
692 "Too many transactions in request: {} vs {}",
693 request.transactions.len(),
694 max_num_transactions
695 ))
696 .into()
697 );
698
699 let mut tx_digests = Vec::with_capacity(request.transactions.len());
701 let mut consensus_transactions = Vec::with_capacity(request.transactions.len());
703 let mut transaction_indexes = Vec::with_capacity(request.transactions.len());
705 let mut results: Vec<Option<SubmitTxResult>> = vec![None; request.transactions.len()];
707 let mut total_size_bytes = 0;
709
710 let req_type = if is_ping_request {
711 "ping"
712 } else if request.transactions.len() == 1 {
713 "single_transaction"
714 } else if is_soft_bundle_request {
715 "soft_bundle"
716 } else {
717 "batch"
718 };
719
720 let _handle_tx_metrics_guard = metrics
721 .handle_submit_transaction_latency
722 .with_label_values(&[req_type])
723 .start_timer();
724
725 for (idx, tx_bytes) in request.transactions.iter().enumerate() {
726 let transaction = match bcs::from_bytes::<Transaction>(tx_bytes) {
727 Ok(txn) => txn,
728 Err(e) => {
729 return Err(SuiErrorKind::TransactionDeserializationError {
731 error: format!("Failed to deserialize transaction at index {}: {}", idx, e),
732 }
733 .into());
734 }
735 };
736
737 let tx_size = transaction.validity_check(&epoch_store.tx_validity_check_context())?;
739
740 let overload_check_res = state.check_system_overload(
741 consensus_adapter,
742 transaction.data(),
743 state.check_system_overload_at_signing(),
744 );
745 if let Err(error) = overload_check_res {
746 metrics
747 .num_rejected_tx_during_overload
748 .with_label_values(&[error.as_ref()])
749 .inc();
750 results[idx] = Some(SubmitTxResult::Rejected { error });
751 continue;
752 }
753
754 let verified_transaction = {
756 let _metrics_guard = metrics.tx_verification_latency.start_timer();
757 if epoch_store.protocol_config().address_aliases() {
758 match epoch_store.verify_transaction_with_current_aliases(transaction) {
759 Ok(tx) => tx,
760 Err(e) => {
761 metrics.signature_errors.inc();
762 return Err(e);
763 }
764 }
765 } else {
766 match epoch_store.verify_transaction_require_no_aliases(transaction) {
767 Ok(tx) => tx,
768 Err(e) => {
769 metrics.signature_errors.inc();
770 return Err(e);
771 }
772 }
773 }
774 };
775
776 let tx_digest = verified_transaction.tx().digest();
777 tx_digests.push(*tx_digest);
778
779 debug!(
780 ?tx_digest,
781 "handle_submit_transaction: verified transaction"
782 );
783
784 if let Some(effects) = state
787 .get_transaction_cache_reader()
788 .get_executed_effects(tx_digest)
789 {
790 let effects_digest = effects.digest();
791 if let Ok(executed_data) = self.complete_executed_data(effects).await {
792 let executed_result = SubmitTxResult::Executed {
793 effects_digest,
794 details: Some(executed_data),
795 fast_path: false,
796 };
797 results[idx] = Some(executed_result);
798 debug!(?tx_digest, "handle_submit_transaction: already executed");
799 continue;
800 }
801 }
802
803 if self
804 .state
805 .get_transaction_cache_reader()
806 .transaction_executed_in_last_epoch(tx_digest, epoch_store.epoch())
807 {
808 results[idx] = Some(SubmitTxResult::Rejected {
809 error: UserInputError::TransactionAlreadyExecuted { digest: *tx_digest }.into(),
810 });
811 debug!(
812 ?tx_digest,
813 "handle_submit_transaction: transaction already executed in previous epoch"
814 );
815 continue;
816 }
817
818 debug!(
819 ?tx_digest,
820 "handle_submit_transaction: waiting for fastpath dependency objects"
821 );
822 if !state
823 .wait_for_fastpath_dependency_objects(
824 verified_transaction.tx(),
825 epoch_store.epoch(),
826 )
827 .await?
828 {
829 debug!(
830 ?tx_digest,
831 "fastpath input objects are still unavailable after waiting"
832 );
833 }
834
835 match state.handle_vote_transaction(&epoch_store, verified_transaction.tx().clone()) {
836 Ok(_) => { }
837 Err(e) => {
838 if let Some(effects) = state
841 .get_transaction_cache_reader()
842 .get_executed_effects(tx_digest)
843 {
844 let effects_digest = effects.digest();
845 if let Ok(executed_data) = self.complete_executed_data(effects).await {
846 let executed_result = SubmitTxResult::Executed {
847 effects_digest,
848 details: Some(executed_data),
849 fast_path: false,
850 };
851 results[idx] = Some(executed_result);
852 continue;
853 }
854 }
855
856 debug!(?tx_digest, "Transaction rejected during submission: {e}");
858 metrics
859 .submission_rejected_transactions
860 .with_label_values(&[e.to_variant_name()])
861 .inc();
862 results[idx] = Some(SubmitTxResult::Rejected { error: e });
863 continue;
864 }
865 }
866
867 let mut claims = vec![];
869
870 let immutable_object_ids = self
871 .collect_immutable_object_ids(verified_transaction.tx(), state)
872 .await?;
873 if !immutable_object_ids.is_empty() {
874 claims.push(TransactionClaim::ImmutableInputObjects(
875 immutable_object_ids,
876 ));
877 }
878
879 let (tx, aliases) = verified_transaction.into_inner();
880 if epoch_store.protocol_config().address_aliases() {
881 if epoch_store
882 .protocol_config()
883 .fix_checkpoint_signature_mapping()
884 {
885 claims.push(TransactionClaim::AddressAliasesV2(aliases));
886 } else {
887 let v1_aliases: Vec<_> = tx
888 .data()
889 .intent_message()
890 .value
891 .required_signers()
892 .into_iter()
893 .zip_eq(aliases.into_iter().map(|(_, seq)| seq))
894 .collect();
895 #[allow(deprecated)]
896 claims.push(TransactionClaim::AddressAliases(
897 nonempty::NonEmpty::from_vec(v1_aliases)
898 .expect("must have at least one required_signer"),
899 ));
900 }
901 }
902
903 let tx_with_claims = TransactionWithClaims::new(tx.into(), claims);
904
905 consensus_transactions.push(ConsensusTransaction::new_user_transaction_v2_message(
906 &state.name,
907 tx_with_claims,
908 ));
909
910 transaction_indexes.push(idx);
911 total_size_bytes += tx_size;
912 }
913
914 if consensus_transactions.is_empty() && !is_ping_request {
915 return Ok((Self::try_from_submit_tx_response(results)?, Weight::zero()));
916 }
917
918 let max_transaction_bytes = if is_soft_bundle_request {
922 epoch_store
923 .protocol_config()
924 .consensus_max_transactions_in_block_bytes()
925 / 2
926 } else {
927 epoch_store
928 .protocol_config()
929 .consensus_max_transactions_in_block_bytes()
930 };
931 fp_ensure!(
932 total_size_bytes <= max_transaction_bytes as usize,
933 SuiErrorKind::UserInputError {
934 error: UserInputError::TotalTransactionSizeTooLargeInBatch {
935 size: total_size_bytes,
936 limit: max_transaction_bytes,
937 },
938 }
939 .into()
940 );
941
942 metrics
943 .handle_submit_transaction_bytes
944 .with_label_values(&[req_type])
945 .observe(total_size_bytes as f64);
946 metrics
947 .handle_submit_transaction_batch_size
948 .with_label_values(&[req_type])
949 .observe(consensus_transactions.len() as f64);
950
951 let _latency_metric_guard = metrics
952 .handle_submit_transaction_consensus_latency
953 .with_label_values(&[req_type])
954 .start_timer();
955
956 let consensus_positions = if is_soft_bundle_request || is_ping_request {
957 assert!(
960 is_ping_request || !consensus_transactions.is_empty(),
961 "A valid soft bundle must have at least one transaction"
962 );
963 debug!(
964 "handle_submit_transaction: submitting consensus transactions ({}): {}",
965 req_type,
966 consensus_transactions
967 .iter()
968 .map(|t| t.local_display())
969 .join(", ")
970 );
971 self.handle_submit_to_consensus_for_position(
972 consensus_transactions,
973 &epoch_store,
974 submitter_client_addr,
975 )
976 .await?
977 } else {
978 let futures = consensus_transactions.into_iter().map(|t| {
979 debug!(
980 "handle_submit_transaction: submitting consensus transaction ({}): {}",
981 req_type,
982 t.local_display(),
983 );
984 self.handle_submit_to_consensus_for_position(
985 vec![t],
986 &epoch_store,
987 submitter_client_addr,
988 )
989 });
990 future::try_join_all(futures)
991 .await?
992 .into_iter()
993 .flatten()
994 .collect()
995 };
996
997 if is_ping_request {
998 assert_eq!(consensus_positions.len(), 1);
1000 results.push(Some(SubmitTxResult::Submitted {
1001 consensus_position: consensus_positions[0],
1002 }));
1003 } else {
1004 for ((idx, tx_digest), consensus_position) in transaction_indexes
1006 .into_iter()
1007 .zip(tx_digests)
1008 .zip(consensus_positions)
1009 {
1010 debug!(
1011 ?tx_digest,
1012 "handle_submit_transaction: submitted consensus transaction at {}",
1013 consensus_position,
1014 );
1015 results[idx] = Some(SubmitTxResult::Submitted { consensus_position });
1016 }
1017 }
1018
1019 Ok((Self::try_from_submit_tx_response(results)?, Weight::zero()))
1020 }
1021
1022 fn try_from_submit_tx_response(
1023 results: Vec<Option<SubmitTxResult>>,
1024 ) -> Result<RawSubmitTxResponse, SuiError> {
1025 let mut raw_results = Vec::new();
1026 for (i, result) in results.into_iter().enumerate() {
1027 let result = result.ok_or_else(|| SuiErrorKind::GenericAuthorityError {
1028 error: format!("Missing transaction result at {}", i),
1029 })?;
1030 let raw_result = result.try_into()?;
1031 raw_results.push(raw_result);
1032 }
1033 Ok(RawSubmitTxResponse {
1034 results: raw_results,
1035 })
1036 }
1037
1038 #[instrument(
1039 name = "ValidatorService::handle_submit_to_consensus_for_position",
1040 level = "debug",
1041 skip_all,
1042 err(level = "debug")
1043 )]
1044 async fn handle_submit_to_consensus_for_position(
1045 &self,
1046 consensus_transactions: Vec<ConsensusTransaction>,
1048 epoch_store: &Arc<AuthorityPerEpochStore>,
1049 submitter_client_addr: Option<IpAddr>,
1050 ) -> Result<Vec<ConsensusPosition>, tonic::Status> {
1051 let (tx_consensus_positions, rx_consensus_positions) = oneshot::channel();
1052
1053 {
1054 let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
1056 if !reconfiguration_lock.should_accept_user_certs() {
1057 self.metrics.num_rejected_cert_in_epoch_boundary.inc();
1058 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1059 }
1060
1061 let _metrics_guard = self.metrics.consensus_latency.start_timer();
1065
1066 self.consensus_adapter.submit_batch(
1067 &consensus_transactions,
1068 Some(&reconfiguration_lock),
1069 epoch_store,
1070 Some(tx_consensus_positions),
1071 submitter_client_addr,
1072 )?;
1073 }
1074
1075 Ok(rx_consensus_positions.await.map_err(|e| {
1076 SuiErrorKind::FailedToSubmitToConsensus(format!(
1077 "Failed to get consensus position: {e}"
1078 ))
1079 })?)
1080 }
1081
1082 async fn collect_effects_data(
1083 &self,
1084 effects: &TransactionEffects,
1085 include_events: bool,
1086 include_input_objects: bool,
1087 include_output_objects: bool,
1088 ) -> SuiResult<(Option<TransactionEvents>, Vec<Object>, Vec<Object>)> {
1089 let events = if include_events && effects.events_digest().is_some() {
1090 Some(
1091 self.state
1092 .get_transaction_events(effects.transaction_digest())?,
1093 )
1094 } else {
1095 None
1096 };
1097
1098 let input_objects = if include_input_objects {
1099 self.state.get_transaction_input_objects(effects)?
1100 } else {
1101 vec![]
1102 };
1103
1104 let output_objects = if include_output_objects {
1105 self.state.get_transaction_output_objects(effects)?
1106 } else {
1107 vec![]
1108 };
1109
1110 Ok((events, input_objects, output_objects))
1111 }
1112}
1113
1114type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
1115
1116impl ValidatorService {
1117 async fn handle_submit_transaction_impl(
1118 &self,
1119 request: tonic::Request<RawSubmitTxRequest>,
1120 ) -> WrappedServiceResponse<RawSubmitTxResponse> {
1121 self.handle_submit_transaction(request).await
1122 }
1123
1124 async fn wait_for_effects_impl(
1125 &self,
1126 request: tonic::Request<RawWaitForEffectsRequest>,
1127 ) -> WrappedServiceResponse<RawWaitForEffectsResponse> {
1128 let request: WaitForEffectsRequest = request.into_inner().try_into()?;
1129 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1130 let response = timeout(
1131 Duration::from_secs(20),
1133 epoch_store
1134 .within_alive_epoch(self.wait_for_effects_response(request, &epoch_store))
1135 .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch())),
1136 )
1137 .await
1138 .map_err(|_| tonic::Status::internal("Timeout waiting for effects"))???
1139 .try_into()?;
1140 Ok((tonic::Response::new(response), Weight::zero()))
1141 }
1142
1143 #[instrument(name= "ValidatorService::wait_for_effects_response", level = "error", skip_all, fields(consensus_position = ?request.consensus_position))]
1144 async fn wait_for_effects_response(
1145 &self,
1146 request: WaitForEffectsRequest,
1147 epoch_store: &Arc<AuthorityPerEpochStore>,
1148 ) -> SuiResult<WaitForEffectsResponse> {
1149 if request.ping_type.is_some() {
1150 return timeout(
1151 Duration::from_secs(10),
1152 self.ping_response(request, epoch_store),
1153 )
1154 .await
1155 .map_err(|_| SuiErrorKind::TimeoutError)?;
1156 }
1157
1158 let Some(tx_digest) = request.transaction_digest else {
1159 return Err(SuiErrorKind::InvalidRequest(
1160 "Transaction digest is required for wait for effects requests".to_string(),
1161 )
1162 .into());
1163 };
1164 let tx_digests = [tx_digest];
1165
1166 let fastpath_effects_future: Pin<Box<dyn Future<Output = _> + Send>> =
1167 if let Some(consensus_position) = request.consensus_position {
1168 Box::pin(self.wait_for_fastpath_effects(
1169 consensus_position,
1170 &tx_digests,
1171 request.include_details,
1172 epoch_store,
1173 ))
1174 } else {
1175 Box::pin(futures::future::pending())
1176 };
1177
1178 tokio::select! {
1179 effects_result = self.state
1187 .get_transaction_cache_reader()
1188 .notify_read_executed_effects_may_fail(
1189 "AuthorityServer::wait_for_effects::notify_read_executed_effects_finalized",
1190 &tx_digests,
1191 ) => {
1192 let effects = effects_result?.pop().unwrap();
1193 let effects_digest = effects.digest();
1194 let details = if request.include_details {
1195 Some(self.complete_executed_data(effects).await?)
1196 } else {
1197 None
1198 };
1199
1200 Ok(WaitForEffectsResponse::Executed {
1201 effects_digest,
1202 details,
1203 fast_path: false,
1204 })
1205 }
1206
1207 fastpath_response = fastpath_effects_future => {
1208 tracing::Span::current().record("fast_path_effects", true);
1209 fastpath_response
1210 }
1211 }
1212 }
1213
1214 #[instrument(level = "error", skip_all, err(level = "debug"))]
1215 async fn ping_response(
1216 &self,
1217 request: WaitForEffectsRequest,
1218 epoch_store: &Arc<AuthorityPerEpochStore>,
1219 ) -> SuiResult<WaitForEffectsResponse> {
1220 let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1221 return Err(SuiErrorKind::UnsupportedFeatureError {
1222 error: "Mysticeti fastpath".to_string(),
1223 }
1224 .into());
1225 };
1226
1227 let Some(consensus_position) = request.consensus_position else {
1228 return Err(SuiErrorKind::InvalidRequest(
1229 "Consensus position is required for Ping requests".to_string(),
1230 )
1231 .into());
1232 };
1233
1234 let Some(ping) = request.ping_type else {
1236 return Err(SuiErrorKind::InvalidRequest(
1237 "Ping type is required for ping requests".to_string(),
1238 )
1239 .into());
1240 };
1241
1242 let _metrics_guard = self
1243 .metrics
1244 .handle_wait_for_effects_ping_latency
1245 .with_label_values(&[ping.as_str()])
1246 .start_timer();
1247
1248 consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1249
1250 let mut last_status = None;
1251 let details = if request.include_details {
1252 Some(Box::new(ExecutedData::default()))
1253 } else {
1254 None
1255 };
1256
1257 loop {
1258 let status = consensus_tx_status_cache
1259 .notify_read_transaction_status_change(consensus_position, last_status)
1260 .await;
1261 match status {
1262 NotifyReadConsensusTxStatusResult::Status(status) => match status {
1263 ConsensusTxStatus::FastpathCertified => {
1264 if ping == PingType::Consensus {
1266 last_status = Some(status);
1267 continue;
1268 }
1269 return Ok(WaitForEffectsResponse::Executed {
1270 effects_digest: TransactionEffectsDigest::ZERO,
1271 details,
1272 fast_path: true,
1273 });
1274 }
1275 ConsensusTxStatus::Rejected => {
1276 return Ok(WaitForEffectsResponse::Rejected { error: None });
1277 }
1278 ConsensusTxStatus::Finalized => {
1279 return Ok(WaitForEffectsResponse::Executed {
1280 effects_digest: TransactionEffectsDigest::ZERO,
1281 details,
1282 fast_path: false,
1283 });
1284 }
1285 ConsensusTxStatus::Dropped => {
1286 return Ok(WaitForEffectsResponse::Rejected {
1289 error: epoch_store.get_rejection_vote_reason(consensus_position),
1290 });
1291 }
1292 },
1293 NotifyReadConsensusTxStatusResult::Expired(round) => {
1294 return Ok(WaitForEffectsResponse::Expired {
1295 epoch: epoch_store.epoch(),
1296 round: Some(round),
1297 });
1298 }
1299 }
1300 }
1301 }
1302
1303 #[instrument(level = "error", skip_all, err(level = "debug"))]
1304 async fn wait_for_fastpath_effects(
1305 &self,
1306 consensus_position: ConsensusPosition,
1307 tx_digests: &[TransactionDigest],
1308 include_details: bool,
1309 epoch_store: &Arc<AuthorityPerEpochStore>,
1310 ) -> SuiResult<WaitForEffectsResponse> {
1311 let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1312 return Err(SuiErrorKind::UnsupportedFeatureError {
1313 error: "Mysticeti fastpath".to_string(),
1314 }
1315 .into());
1316 };
1317
1318 let local_epoch = epoch_store.epoch();
1319 match consensus_position.epoch.cmp(&local_epoch) {
1320 Ordering::Less => {
1321 let response = WaitForEffectsResponse::Expired {
1324 epoch: local_epoch,
1325 round: None,
1326 };
1327 return Ok(response);
1328 }
1329 Ordering::Greater => {
1330 return Err(SuiErrorKind::WrongEpoch {
1332 expected_epoch: local_epoch,
1333 actual_epoch: consensus_position.epoch,
1334 }
1335 .into());
1336 }
1337 Ordering::Equal => {
1338 }
1341 };
1342
1343 consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1344
1345 let mut current_status = None;
1348 loop {
1349 let status = consensus_tx_status_cache
1350 .notify_read_transaction_status_change(consensus_position, current_status)
1351 .await;
1352 match status {
1353 NotifyReadConsensusTxStatusResult::Status(new_status) => match new_status {
1354 ConsensusTxStatus::Rejected => {
1355 return Ok(WaitForEffectsResponse::Rejected {
1356 error: epoch_store.get_rejection_vote_reason(consensus_position),
1357 });
1358 }
1359 ConsensusTxStatus::FastpathCertified => {
1360 current_status = Some(new_status);
1361 continue;
1362 }
1363 ConsensusTxStatus::Finalized => {
1364 let effects = self
1365 .state
1366 .get_transaction_cache_reader()
1367 .notify_read_executed_effects_may_fail(
1368 "AuthorityServer::wait_for_fastpath_effects::notify_read_executed_effects",
1369 tx_digests,
1370 )
1371 .await?
1372 .pop()
1373 .unwrap();
1374 let effects_digest = effects.digest();
1375
1376 let details = if include_details {
1377 Some(self.complete_executed_data(effects).await?)
1378 } else {
1379 None
1380 };
1381
1382 return Ok(WaitForEffectsResponse::Executed {
1383 effects_digest,
1384 details,
1385 fast_path: false,
1386 });
1387 }
1388 ConsensusTxStatus::Dropped => {
1389 return Ok(WaitForEffectsResponse::Rejected {
1390 error: epoch_store.get_rejection_vote_reason(consensus_position),
1391 });
1392 }
1393 },
1394 NotifyReadConsensusTxStatusResult::Expired(round) => {
1395 return Ok(WaitForEffectsResponse::Expired {
1396 epoch: epoch_store.epoch(),
1397 round: Some(round),
1398 });
1399 }
1400 }
1401 }
1402 }
1403
1404 async fn complete_executed_data(
1405 &self,
1406 effects: TransactionEffects,
1407 ) -> SuiResult<Box<ExecutedData>> {
1408 let (events, input_objects, output_objects) = self
1409 .collect_effects_data(
1410 &effects, true, true,
1411 true,
1412 )
1413 .await?;
1414 Ok(Box::new(ExecutedData {
1415 effects,
1416 events,
1417 input_objects,
1418 output_objects,
1419 }))
1420 }
1421
1422 async fn object_info_impl(
1423 &self,
1424 request: tonic::Request<ObjectInfoRequest>,
1425 ) -> WrappedServiceResponse<ObjectInfoResponse> {
1426 let request = request.into_inner();
1427 let response = self.state.handle_object_info_request(request).await?;
1428 Ok((tonic::Response::new(response), Weight::one()))
1429 }
1430
1431 async fn transaction_info_impl(
1432 &self,
1433 request: tonic::Request<TransactionInfoRequest>,
1434 ) -> WrappedServiceResponse<TransactionInfoResponse> {
1435 let request = request.into_inner();
1436 let response = self.state.handle_transaction_info_request(request).await?;
1437 Ok((tonic::Response::new(response), Weight::one()))
1438 }
1439
1440 async fn checkpoint_impl(
1441 &self,
1442 request: tonic::Request<CheckpointRequest>,
1443 ) -> WrappedServiceResponse<CheckpointResponse> {
1444 let request = request.into_inner();
1445 let response = self.state.handle_checkpoint_request(&request)?;
1446 Ok((tonic::Response::new(response), Weight::one()))
1447 }
1448
1449 async fn checkpoint_v2_impl(
1450 &self,
1451 request: tonic::Request<CheckpointRequestV2>,
1452 ) -> WrappedServiceResponse<CheckpointResponseV2> {
1453 let request = request.into_inner();
1454 let response = self.state.handle_checkpoint_request_v2(&request)?;
1455 Ok((tonic::Response::new(response), Weight::one()))
1456 }
1457
1458 async fn get_system_state_object_impl(
1459 &self,
1460 _request: tonic::Request<SystemStateRequest>,
1461 ) -> WrappedServiceResponse<SuiSystemState> {
1462 let response = self
1463 .state
1464 .get_object_cache_reader()
1465 .get_sui_system_state_object_unsafe()?;
1466 Ok((tonic::Response::new(response), Weight::one()))
1467 }
1468
1469 async fn validator_health_impl(
1470 &self,
1471 _request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1472 ) -> WrappedServiceResponse<sui_types::messages_grpc::RawValidatorHealthResponse> {
1473 let state = &self.state;
1474
1475 let epoch_store = state.load_epoch_store_one_call_per_task();
1477
1478 let num_inflight_execution_transactions =
1480 state.execution_scheduler().num_pending_certificates() as u64;
1481
1482 let num_inflight_consensus_transactions =
1484 self.consensus_adapter.num_inflight_transactions();
1485
1486 let last_committed_leader_round = epoch_store
1488 .consensus_tx_status_cache
1489 .as_ref()
1490 .and_then(|cache| cache.get_last_committed_leader_round())
1491 .unwrap_or(0);
1492
1493 let last_locally_built_checkpoint = epoch_store
1495 .last_built_checkpoint_summary()
1496 .ok()
1497 .flatten()
1498 .map(|(_, summary)| summary.sequence_number)
1499 .unwrap_or(0);
1500
1501 let typed_response = sui_types::messages_grpc::ValidatorHealthResponse {
1502 num_inflight_consensus_transactions,
1503 num_inflight_execution_transactions,
1504 last_locally_built_checkpoint,
1505 last_committed_leader_round,
1506 };
1507
1508 let raw_response = typed_response
1509 .try_into()
1510 .map_err(|e: sui_types::error::SuiError| {
1511 tonic::Status::internal(format!("Failed to serialize health response: {}", e))
1512 })?;
1513
1514 Ok((tonic::Response::new(raw_response), Weight::one()))
1515 }
1516
1517 fn get_client_ip_addr<T>(
1518 &self,
1519 request: &tonic::Request<T>,
1520 source: &ClientIdSource,
1521 ) -> Option<IpAddr> {
1522 let forwarded_header = request.metadata().get_all("x-forwarded-for").iter().next();
1523
1524 if let Some(header) = forwarded_header {
1525 let num_hops = header
1526 .to_str()
1527 .map(|h| h.split(',').count().saturating_sub(1))
1528 .unwrap_or(0);
1529
1530 self.metrics.x_forwarded_for_num_hops.set(num_hops as f64);
1531 }
1532
1533 match source {
1534 ClientIdSource::SocketAddr => {
1535 let socket_addr: Option<SocketAddr> = request.remote_addr();
1536
1537 if let Some(socket_addr) = socket_addr {
1543 Some(socket_addr.ip())
1544 } else {
1545 if cfg!(msim) {
1546 } else if cfg!(test) {
1548 panic!("Failed to get remote address from request");
1549 } else {
1550 self.metrics.connection_ip_not_found.inc();
1551 error!("Failed to get remote address from request");
1552 }
1553 None
1554 }
1555 }
1556 ClientIdSource::XForwardedFor(num_hops) => {
1557 let do_header_parse = |op: &MetadataValue<Ascii>| {
1558 match op.to_str() {
1559 Ok(header_val) => {
1560 let header_contents =
1561 header_val.split(',').map(str::trim).collect::<Vec<_>>();
1562 if *num_hops == 0 {
1563 error!(
1564 "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
1565 number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
1566 to this node. Skipping traffic controller request handling.",
1567 header_contents,
1568 );
1569 return None;
1570 }
1571 let contents_len = header_contents.len();
1572 if contents_len < *num_hops {
1573 error!(
1574 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1575 Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
1576 `client-id-source` in the node config.",
1577 header_contents, contents_len, num_hops, contents_len,
1578 );
1579 self.metrics.client_id_source_config_mismatch.inc();
1580 return None;
1581 }
1582 let Some(client_ip) = header_contents.get(contents_len - num_hops)
1583 else {
1584 error!(
1585 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1586 Expected at least {} values. Skipping traffic controller request handling.",
1587 header_contents, contents_len, num_hops, contents_len,
1588 );
1589 return None;
1590 };
1591 parse_ip(client_ip).or_else(|| {
1592 self.metrics.forwarded_header_parse_error.inc();
1593 None
1594 })
1595 }
1596 Err(e) => {
1597 self.metrics.forwarded_header_invalid.inc();
1601 error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
1602 None
1603 }
1604 }
1605 };
1606 if let Some(op) = request.metadata().get("x-forwarded-for") {
1607 do_header_parse(op)
1608 } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
1609 do_header_parse(op)
1610 } else {
1611 self.metrics.forwarded_header_not_included.inc();
1612 error!(
1613 "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
1614 );
1615 None
1616 }
1617 }
1618 }
1619 }
1620
1621 async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
1622 if let Some(traffic_controller) = &self.traffic_controller {
1623 if !traffic_controller.check(&client, &None).await {
1624 Err(tonic::Status::from_error(
1626 SuiErrorKind::TooManyRequests.into(),
1627 ))
1628 } else {
1629 Ok(())
1630 }
1631 } else {
1632 Ok(())
1633 }
1634 }
1635
1636 fn handle_traffic_resp<T>(
1637 &self,
1638 client: Option<IpAddr>,
1639 wrapped_response: WrappedServiceResponse<T>,
1640 ) -> Result<tonic::Response<T>, tonic::Status> {
1641 let (error, spam_weight, unwrapped_response) = match wrapped_response {
1642 Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
1643 Err(status) => (
1644 Some(SuiError::from(status.clone())),
1645 Weight::zero(),
1646 Err(status.clone()),
1647 ),
1648 };
1649
1650 if let Some(traffic_controller) = self.traffic_controller.clone() {
1651 traffic_controller.tally(TrafficTally {
1652 direct: client,
1653 through_fullnode: None,
1654 error_info: error.map(|e| {
1655 let error_type = String::from(e.clone().as_ref());
1656 let error_weight = normalize(e);
1657 (error_weight, error_type)
1658 }),
1659 spam_weight,
1660 timestamp: SystemTime::now(),
1661 })
1662 }
1663 unwrapped_response
1664 }
1665}
1666
1667fn normalize(err: SuiError) -> Weight {
1669 match err.as_inner() {
1670 SuiErrorKind::UserInputError {
1671 error: UserInputError::IncorrectUserSignature { .. },
1672 } => Weight::one(),
1673 SuiErrorKind::InvalidSignature { .. }
1674 | SuiErrorKind::SignerSignatureAbsent { .. }
1675 | SuiErrorKind::SignerSignatureNumberMismatch { .. }
1676 | SuiErrorKind::IncorrectSigner { .. }
1677 | SuiErrorKind::UnknownSigner { .. }
1678 | SuiErrorKind::WrongEpoch { .. } => Weight::one(),
1679 _ => Weight::zero(),
1680 }
1681}
1682
1683#[macro_export]
1687macro_rules! handle_with_decoration {
1688 ($self:ident, $func_name:ident, $request:ident) => {{
1689 if $self.client_id_source.is_none() {
1690 return $self.$func_name($request).await.map(|(result, _)| result);
1691 }
1692
1693 let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
1694
1695 $self.handle_traffic_req(client.clone()).await?;
1697
1698 let wrapped_response = $self.$func_name($request).await;
1700 $self.handle_traffic_resp(client, wrapped_response)
1701 }};
1702}
1703
1704#[async_trait]
1705impl Validator for ValidatorService {
1706 async fn submit_transaction(
1707 &self,
1708 request: tonic::Request<RawSubmitTxRequest>,
1709 ) -> Result<tonic::Response<RawSubmitTxResponse>, tonic::Status> {
1710 let validator_service = self.clone();
1711
1712 spawn_monitored_task!(async move {
1715 handle_with_decoration!(validator_service, handle_submit_transaction_impl, request)
1718 })
1719 .await
1720 .unwrap()
1721 }
1722
1723 async fn wait_for_effects(
1724 &self,
1725 request: tonic::Request<RawWaitForEffectsRequest>,
1726 ) -> Result<tonic::Response<RawWaitForEffectsResponse>, tonic::Status> {
1727 handle_with_decoration!(self, wait_for_effects_impl, request)
1728 }
1729
1730 async fn object_info(
1731 &self,
1732 request: tonic::Request<ObjectInfoRequest>,
1733 ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
1734 handle_with_decoration!(self, object_info_impl, request)
1735 }
1736
1737 async fn transaction_info(
1738 &self,
1739 request: tonic::Request<TransactionInfoRequest>,
1740 ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
1741 handle_with_decoration!(self, transaction_info_impl, request)
1742 }
1743
1744 async fn checkpoint(
1745 &self,
1746 request: tonic::Request<CheckpointRequest>,
1747 ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
1748 handle_with_decoration!(self, checkpoint_impl, request)
1749 }
1750
1751 async fn checkpoint_v2(
1752 &self,
1753 request: tonic::Request<CheckpointRequestV2>,
1754 ) -> Result<tonic::Response<CheckpointResponseV2>, tonic::Status> {
1755 handle_with_decoration!(self, checkpoint_v2_impl, request)
1756 }
1757
1758 async fn get_system_state_object(
1759 &self,
1760 request: tonic::Request<SystemStateRequest>,
1761 ) -> Result<tonic::Response<SuiSystemState>, tonic::Status> {
1762 handle_with_decoration!(self, get_system_state_object_impl, request)
1763 }
1764
1765 async fn validator_health(
1766 &self,
1767 request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1768 ) -> Result<tonic::Response<sui_types::messages_grpc::RawValidatorHealthResponse>, tonic::Status>
1769 {
1770 handle_with_decoration!(self, validator_health_impl, request)
1771 }
1772}