1use anyhow::Result;
6use async_trait::async_trait;
7use fastcrypto::traits::KeyPair;
8use futures::{TryFutureExt, future};
9use itertools::Itertools as _;
10use mysten_common::{assert_reachable, debug_fatal};
11use mysten_metrics::spawn_monitored_task;
12use nonempty::NonEmpty;
13use prometheus::{
14 Gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, Registry,
15 register_gauge_with_registry, register_histogram_vec_with_registry,
16 register_histogram_with_registry, register_int_counter_vec_with_registry,
17 register_int_counter_with_registry,
18};
19use std::{
20 cmp::Ordering,
21 future::Future,
22 io,
23 net::{IpAddr, SocketAddr},
24 pin::Pin,
25 sync::Arc,
26 time::{Duration, SystemTime},
27};
28use sui_network::{
29 api::{Validator, ValidatorServer},
30 tonic,
31 validator::server::SUI_TLS_SERVER_NAME,
32};
33use sui_types::effects::TransactionEffectsAPI;
34use sui_types::message_envelope::Message;
35use sui_types::messages_consensus::ConsensusPosition;
36use sui_types::messages_consensus::ConsensusTransaction;
37use sui_types::messages_grpc::{
38 ObjectInfoRequest, ObjectInfoResponse, RawSubmitTxResponse, SystemStateRequest,
39 TransactionInfoRequest, TransactionInfoResponse,
40};
41use sui_types::multiaddr::Multiaddr;
42use sui_types::object::Object;
43use sui_types::sui_system_state::SuiSystemState;
44use sui_types::traffic_control::{ClientIdSource, Weight};
45use sui_types::{
46 base_types::ObjectID,
47 digests::{TransactionDigest, TransactionEffectsDigest},
48 error::{SuiErrorKind, UserInputError},
49};
50use sui_types::{
51 effects::TransactionEffects,
52 messages_grpc::{
53 ExecutedData, RawSubmitTxRequest, RawWaitForEffectsRequest, RawWaitForEffectsResponse,
54 SubmitTxResult, WaitForEffectsRequest, WaitForEffectsResponse,
55 },
56};
57use sui_types::{effects::TransactionEvents, messages_grpc::SubmitTxType};
58use sui_types::{error::*, transaction::*};
59use sui_types::{
60 fp_ensure,
61 messages_checkpoint::{
62 CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
63 },
64};
65use tokio::sync::oneshot;
66use tokio::time::timeout;
67use tonic::metadata::{Ascii, MetadataValue};
68use tracing::{debug, error, info, instrument};
69
70use crate::consensus_adapter::ConnectionMonitorStatusForTests;
71use crate::{
72 authority::{AuthorityState, consensus_tx_status_cache::ConsensusTxStatus},
73 consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics},
74 traffic_controller::{TrafficController, parse_ip, policies::TrafficTally},
75};
76use crate::{
77 authority::{
78 authority_per_epoch_store::AuthorityPerEpochStore,
79 consensus_tx_status_cache::NotifyReadConsensusTxStatusResult,
80 },
81 checkpoints::CheckpointStore,
82 mysticeti_adapter::LazyMysticetiClient,
83 transaction_outputs::TransactionOutputs,
84};
85use sui_config::local_ip_utils::new_local_tcp_address_for_testing;
86use sui_types::messages_grpc::PingType;
87
88#[cfg(test)]
89#[path = "unit_tests/server_tests.rs"]
90mod server_tests;
91
92#[cfg(test)]
93#[path = "unit_tests/wait_for_effects_tests.rs"]
94mod wait_for_effects_tests;
95
96#[cfg(test)]
97#[path = "unit_tests/submit_transaction_tests.rs"]
98mod submit_transaction_tests;
99
100pub struct AuthorityServerHandle {
101 server_handle: sui_network::validator::server::Server,
102}
103
104impl AuthorityServerHandle {
105 pub async fn join(self) -> Result<(), io::Error> {
106 self.server_handle.handle().wait_for_shutdown().await;
107 Ok(())
108 }
109
110 pub async fn kill(self) -> Result<(), io::Error> {
111 self.server_handle.handle().shutdown().await;
112 Ok(())
113 }
114
115 pub fn address(&self) -> &Multiaddr {
116 self.server_handle.local_addr()
117 }
118}
119
120pub struct AuthorityServer {
121 address: Multiaddr,
122 pub state: Arc<AuthorityState>,
123 consensus_adapter: Arc<ConsensusAdapter>,
124 pub metrics: Arc<ValidatorServiceMetrics>,
125}
126
127impl AuthorityServer {
128 pub fn new_for_test_with_consensus_adapter(
129 state: Arc<AuthorityState>,
130 consensus_adapter: Arc<ConsensusAdapter>,
131 ) -> Self {
132 let address = new_local_tcp_address_for_testing();
133 let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
134
135 Self {
136 address,
137 state,
138 consensus_adapter,
139 metrics,
140 }
141 }
142
143 pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
144 let consensus_adapter = Arc::new(ConsensusAdapter::new(
145 Arc::new(LazyMysticetiClient::new()),
146 CheckpointStore::new_for_tests(),
147 state.name,
148 Arc::new(ConnectionMonitorStatusForTests {}),
149 100_000,
150 100_000,
151 None,
152 None,
153 ConsensusAdapterMetrics::new_test(),
154 state.epoch_store_for_testing().protocol_config().clone(),
155 ));
156 Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
157 }
158
159 pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
160 let address = self.address.clone();
161 self.spawn_with_bind_address_for_test(address).await
162 }
163
164 pub async fn spawn_with_bind_address_for_test(
165 self,
166 address: Multiaddr,
167 ) -> Result<AuthorityServerHandle, io::Error> {
168 let tls_config = sui_tls::create_rustls_server_config(
169 self.state.config.network_key_pair().copy().private(),
170 SUI_TLS_SERVER_NAME.to_string(),
171 );
172 let config = mysten_network::config::Config::new();
173 let server = sui_network::validator::server::ServerBuilder::from_config(
174 &config,
175 mysten_network::metrics::DefaultMetricsCallbackProvider::default(),
176 )
177 .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
178 self.state,
179 self.consensus_adapter,
180 self.metrics,
181 )))
182 .bind(&address, Some(tls_config))
183 .await
184 .unwrap();
185 let local_addr = server.local_addr().to_owned();
186 info!("Listening to traffic on {local_addr}");
187 let handle = AuthorityServerHandle {
188 server_handle: server,
189 };
190 Ok(handle)
191 }
192}
193
194pub struct ValidatorServiceMetrics {
195 pub signature_errors: IntCounter,
196 pub tx_verification_latency: Histogram,
197 pub cert_verification_latency: Histogram,
198 pub consensus_latency: Histogram,
199 pub handle_transaction_latency: Histogram,
200 pub submit_certificate_consensus_latency: Histogram,
201 pub handle_certificate_consensus_latency: Histogram,
202 pub handle_certificate_non_consensus_latency: Histogram,
203 pub handle_soft_bundle_certificates_consensus_latency: Histogram,
204 pub handle_soft_bundle_certificates_count: Histogram,
205 pub handle_soft_bundle_certificates_size_bytes: Histogram,
206 pub handle_transaction_consensus_latency: Histogram,
207 pub handle_submit_transaction_consensus_latency: HistogramVec,
208 pub handle_wait_for_effects_ping_latency: HistogramVec,
209
210 handle_submit_transaction_latency: HistogramVec,
211 handle_submit_transaction_bytes: HistogramVec,
212 handle_submit_transaction_batch_size: HistogramVec,
213
214 num_rejected_cert_in_epoch_boundary: IntCounter,
215 num_rejected_tx_during_overload: IntCounterVec,
216 submission_rejected_transactions: IntCounterVec,
217 connection_ip_not_found: IntCounter,
218 forwarded_header_parse_error: IntCounter,
219 forwarded_header_invalid: IntCounter,
220 forwarded_header_not_included: IntCounter,
221 client_id_source_config_mismatch: IntCounter,
222 x_forwarded_for_num_hops: Gauge,
223}
224
225impl ValidatorServiceMetrics {
226 pub fn new(registry: &Registry) -> Self {
227 Self {
228 signature_errors: register_int_counter_with_registry!(
229 "total_signature_errors",
230 "Number of transaction signature errors",
231 registry,
232 )
233 .unwrap(),
234 tx_verification_latency: register_histogram_with_registry!(
235 "validator_service_tx_verification_latency",
236 "Latency of verifying a transaction",
237 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
238 registry,
239 )
240 .unwrap(),
241 cert_verification_latency: register_histogram_with_registry!(
242 "validator_service_cert_verification_latency",
243 "Latency of verifying a certificate",
244 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
245 registry,
246 )
247 .unwrap(),
248 consensus_latency: register_histogram_with_registry!(
249 "validator_service_consensus_latency",
250 "Time spent between submitting a txn to consensus and getting back local acknowledgement. Execution and finalization time are not included.",
251 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
252 registry,
253 )
254 .unwrap(),
255 handle_transaction_latency: register_histogram_with_registry!(
256 "validator_service_handle_transaction_latency",
257 "Latency of handling a transaction",
258 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
259 registry,
260 )
261 .unwrap(),
262 handle_certificate_consensus_latency: register_histogram_with_registry!(
263 "validator_service_handle_certificate_consensus_latency",
264 "Latency of handling a consensus transaction certificate",
265 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
266 registry,
267 )
268 .unwrap(),
269 submit_certificate_consensus_latency: register_histogram_with_registry!(
270 "validator_service_submit_certificate_consensus_latency",
271 "Latency of submit_certificate RPC handler",
272 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
273 registry,
274 )
275 .unwrap(),
276 handle_certificate_non_consensus_latency: register_histogram_with_registry!(
277 "validator_service_handle_certificate_non_consensus_latency",
278 "Latency of handling a non-consensus transaction certificate",
279 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
280 registry,
281 )
282 .unwrap(),
283 handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
284 "validator_service_handle_soft_bundle_certificates_consensus_latency",
285 "Latency of handling a consensus soft bundle",
286 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
287 registry,
288 )
289 .unwrap(),
290 handle_soft_bundle_certificates_count: register_histogram_with_registry!(
291 "handle_soft_bundle_certificates_count",
292 "The number of certificates included in a soft bundle",
293 mysten_metrics::COUNT_BUCKETS.to_vec(),
294 registry,
295 )
296 .unwrap(),
297 handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
298 "handle_soft_bundle_certificates_size_bytes",
299 "The size of soft bundle in bytes",
300 mysten_metrics::BYTES_BUCKETS.to_vec(),
301 registry,
302 )
303 .unwrap(),
304 handle_transaction_consensus_latency: register_histogram_with_registry!(
305 "validator_service_handle_transaction_consensus_latency",
306 "Latency of handling a user transaction sent through consensus",
307 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
308 registry,
309 )
310 .unwrap(),
311 handle_submit_transaction_consensus_latency: register_histogram_vec_with_registry!(
312 "validator_service_submit_transaction_consensus_latency",
313 "Latency of submitting a user transaction sent through consensus",
314 &["req_type"],
315 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
316 registry,
317 )
318 .unwrap(),
319 handle_submit_transaction_latency: register_histogram_vec_with_registry!(
320 "validator_service_submit_transaction_latency",
321 "Latency of submit transaction handler",
322 &["req_type"],
323 mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
324 registry,
325 )
326 .unwrap(),
327 handle_wait_for_effects_ping_latency: register_histogram_vec_with_registry!(
328 "validator_service_handle_wait_for_effects_ping_latency",
329 "Latency of handling a ping request for wait_for_effects",
330 &["req_type"],
331 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
332 registry,
333 )
334 .unwrap(),
335 handle_submit_transaction_bytes: register_histogram_vec_with_registry!(
336 "validator_service_submit_transaction_bytes",
337 "The size of transactions in the submit transaction request",
338 &["req_type"],
339 mysten_metrics::BYTES_BUCKETS.to_vec(),
340 registry,
341 )
342 .unwrap(),
343 handle_submit_transaction_batch_size: register_histogram_vec_with_registry!(
344 "validator_service_submit_transaction_batch_size",
345 "The number of transactions in the submit transaction request",
346 &["req_type"],
347 mysten_metrics::COUNT_BUCKETS.to_vec(),
348 registry,
349 )
350 .unwrap(),
351 num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
352 "validator_service_num_rejected_cert_in_epoch_boundary",
353 "Number of rejected transaction certificate during epoch transitioning",
354 registry,
355 )
356 .unwrap(),
357 num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
358 "validator_service_num_rejected_tx_during_overload",
359 "Number of rejected transaction due to system overload",
360 &["error_type"],
361 registry,
362 )
363 .unwrap(),
364 submission_rejected_transactions: register_int_counter_vec_with_registry!(
365 "validator_service_submission_rejected_transactions",
366 "Number of transactions rejected during submission",
367 &["reason"],
368 registry,
369 )
370 .unwrap(),
371 connection_ip_not_found: register_int_counter_with_registry!(
372 "validator_service_connection_ip_not_found",
373 "Number of times connection IP was not extractable from request",
374 registry,
375 )
376 .unwrap(),
377 forwarded_header_parse_error: register_int_counter_with_registry!(
378 "validator_service_forwarded_header_parse_error",
379 "Number of times x-forwarded-for header could not be parsed",
380 registry,
381 )
382 .unwrap(),
383 forwarded_header_invalid: register_int_counter_with_registry!(
384 "validator_service_forwarded_header_invalid",
385 "Number of times x-forwarded-for header was invalid",
386 registry,
387 )
388 .unwrap(),
389 forwarded_header_not_included: register_int_counter_with_registry!(
390 "validator_service_forwarded_header_not_included",
391 "Number of times x-forwarded-for header was (unexpectedly) not included in request",
392 registry,
393 )
394 .unwrap(),
395 client_id_source_config_mismatch: register_int_counter_with_registry!(
396 "validator_service_client_id_source_config_mismatch",
397 "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
398 registry,
399 )
400 .unwrap(),
401 x_forwarded_for_num_hops: register_gauge_with_registry!(
402 "validator_service_x_forwarded_for_num_hops",
403 "Number of hops in x-forwarded-for header",
404 registry,
405 )
406 .unwrap(),
407 }
408 }
409
410 pub fn new_for_tests() -> Self {
411 let registry = Registry::new();
412 Self::new(®istry)
413 }
414}
415
416#[derive(Clone)]
417pub struct ValidatorService {
418 state: Arc<AuthorityState>,
419 consensus_adapter: Arc<ConsensusAdapter>,
420 metrics: Arc<ValidatorServiceMetrics>,
421 traffic_controller: Option<Arc<TrafficController>>,
422 client_id_source: Option<ClientIdSource>,
423}
424
425impl ValidatorService {
426 pub fn new(
427 state: Arc<AuthorityState>,
428 consensus_adapter: Arc<ConsensusAdapter>,
429 validator_metrics: Arc<ValidatorServiceMetrics>,
430 client_id_source: Option<ClientIdSource>,
431 ) -> Self {
432 let traffic_controller = state.traffic_controller.clone();
433 Self {
434 state,
435 consensus_adapter,
436 metrics: validator_metrics,
437 traffic_controller,
438 client_id_source,
439 }
440 }
441
442 pub fn new_for_tests(
443 state: Arc<AuthorityState>,
444 consensus_adapter: Arc<ConsensusAdapter>,
445 metrics: Arc<ValidatorServiceMetrics>,
446 ) -> Self {
447 Self {
448 state,
449 consensus_adapter,
450 metrics,
451 traffic_controller: None,
452 client_id_source: None,
453 }
454 }
455
456 pub fn validator_state(&self) -> &Arc<AuthorityState> {
457 &self.state
458 }
459
460 pub fn handle_transaction_for_testing(&self, transaction: Transaction) -> SuiResult<()> {
462 let epoch_store = self.state.load_epoch_store_one_call_per_task();
463
464 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
466
467 let transaction = epoch_store
469 .verify_transaction_require_no_aliases(transaction)?
470 .into_tx();
471
472 self.state
474 .handle_vote_transaction(&epoch_store, transaction)?;
475
476 Ok(())
477 }
478
479 pub fn handle_transaction_for_testing_with_overload_check(
482 &self,
483 transaction: Transaction,
484 ) -> SuiResult<()> {
485 let epoch_store = self.state.load_epoch_store_one_call_per_task();
486
487 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
489
490 self.state.check_system_overload(
492 self.consensus_adapter.as_ref(),
493 transaction.data(),
494 self.state.check_system_overload_at_signing(),
495 )?;
496
497 let transaction = epoch_store
499 .verify_transaction_require_no_aliases(transaction)?
500 .into_tx();
501
502 self.state
504 .handle_vote_transaction(&epoch_store, transaction)?;
505
506 Ok(())
507 }
508
509 async fn collect_immutable_object_ids(
512 &self,
513 tx: &VerifiedTransaction,
514 state: &AuthorityState,
515 ) -> SuiResult<Vec<ObjectID>> {
516 let input_objects = tx.data().transaction_data().input_objects()?;
517
518 let object_ids: Vec<ObjectID> = input_objects
520 .iter()
521 .filter_map(|obj| match obj {
522 InputObjectKind::ImmOrOwnedMoveObject((id, _, _)) => Some(*id),
523 _ => None,
524 })
525 .collect();
526 if object_ids.is_empty() {
527 return Ok(vec![]);
528 }
529
530 let objects = state.get_object_cache_reader().get_objects(&object_ids);
532
533 objects
535 .into_iter()
536 .zip(object_ids.iter())
537 .filter_map(|(obj, id)| {
538 let Some(o) = obj else {
539 return Some(Err::<ObjectID, SuiError>(
540 SuiErrorKind::UserInputError {
541 error: UserInputError::ObjectNotFound {
542 object_id: *id,
543 version: None,
544 },
545 }
546 .into(),
547 ));
548 };
549 if o.is_immutable() {
550 Some(Ok(*id))
551 } else {
552 None
553 }
554 })
555 .collect::<SuiResult<Vec<ObjectID>>>()
556 }
557
558 #[instrument(
559 name = "ValidatorService::handle_submit_transaction",
560 level = "error",
561 skip_all,
562 err(level = "debug")
563 )]
564 async fn handle_submit_transaction(
565 &self,
566 request: tonic::Request<RawSubmitTxRequest>,
567 ) -> WrappedServiceResponse<RawSubmitTxResponse> {
568 let Self {
569 state,
570 consensus_adapter,
571 metrics,
572 traffic_controller: _,
573 client_id_source,
574 } = self.clone();
575
576 let submitter_client_addr = if let Some(client_id_source) = &client_id_source {
577 self.get_client_ip_addr(&request, client_id_source)
578 } else {
579 self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
580 };
581
582 let inner = request.into_inner();
583 let start_epoch = state.load_epoch_store_one_call_per_task().epoch();
584
585 let next_epoch = start_epoch + 1;
586 let mut max_retries = 1;
587
588 loop {
589 let res = self
590 .handle_submit_transaction_inner(
591 &state,
592 &consensus_adapter,
593 &metrics,
594 &inner,
595 submitter_client_addr,
596 )
597 .await;
598 match res {
599 Ok((response, weight)) => return Ok((tonic::Response::new(response), weight)),
600 Err(err) => {
601 if max_retries > 0
602 && let SuiErrorKind::ValidatorHaltedAtEpochEnd = err.as_inner()
603 {
604 max_retries -= 1;
605
606 debug!(
607 "ValidatorHaltedAtEpochEnd. Will retry after validator reconfigures"
608 );
609
610 if let Ok(Ok(new_epoch)) =
611 timeout(Duration::from_secs(15), state.wait_for_epoch(next_epoch)).await
612 {
613 assert_reachable!("retry submission at epoch end");
614 if new_epoch == next_epoch {
615 continue;
616 }
617
618 debug_fatal!(
619 "expected epoch {} after reconfiguration. got {}",
620 next_epoch,
621 new_epoch
622 );
623 }
624 }
625 return Err(err.into());
626 }
627 }
628 }
629 }
630
631 async fn handle_submit_transaction_inner(
632 &self,
633 state: &AuthorityState,
634 consensus_adapter: &ConsensusAdapter,
635 metrics: &ValidatorServiceMetrics,
636 request: &RawSubmitTxRequest,
637 submitter_client_addr: Option<IpAddr>,
638 ) -> SuiResult<(RawSubmitTxResponse, Weight)> {
639 let epoch_store = state.load_epoch_store_one_call_per_task();
640 if !epoch_store.protocol_config().mysticeti_fastpath() {
641 return Err(SuiErrorKind::UnsupportedFeatureError {
642 error: "Mysticeti fastpath".to_string(),
643 }
644 .into());
645 }
646
647 let submit_type = SubmitTxType::try_from(request.submit_type).map_err(|e| {
648 SuiErrorKind::GrpcMessageDeserializeError {
649 type_info: "RawSubmitTxRequest.submit_type".to_string(),
650 error: e.to_string(),
651 }
652 })?;
653
654 let is_ping_request = submit_type == SubmitTxType::Ping;
655 if is_ping_request {
656 fp_ensure!(
657 request.transactions.is_empty(),
658 SuiErrorKind::InvalidRequest(format!(
659 "Ping request cannot contain {} transactions",
660 request.transactions.len()
661 ))
662 .into()
663 );
664 } else {
665 fp_ensure!(
667 !request.transactions.is_empty(),
668 SuiErrorKind::InvalidRequest(
669 "At least one transaction needs to be submitted".to_string(),
670 )
671 .into()
672 );
673 }
674
675 let is_soft_bundle_request = submit_type == SubmitTxType::SoftBundle;
680
681 let max_num_transactions = if is_soft_bundle_request {
682 epoch_store.protocol_config().max_soft_bundle_size()
685 } else {
686 epoch_store
688 .protocol_config()
689 .max_num_transactions_in_block()
690 };
691 fp_ensure!(
692 request.transactions.len() <= max_num_transactions as usize,
693 SuiErrorKind::InvalidRequest(format!(
694 "Too many transactions in request: {} vs {}",
695 request.transactions.len(),
696 max_num_transactions
697 ))
698 .into()
699 );
700
701 let mut tx_digests = Vec::with_capacity(request.transactions.len());
703 let mut consensus_transactions = Vec::with_capacity(request.transactions.len());
705 let mut transaction_indexes = Vec::with_capacity(request.transactions.len());
707 let mut results: Vec<Option<SubmitTxResult>> = vec![None; request.transactions.len()];
709 let mut total_size_bytes = 0;
711
712 let req_type = if is_ping_request {
713 "ping"
714 } else if request.transactions.len() == 1 {
715 "single_transaction"
716 } else if is_soft_bundle_request {
717 "soft_bundle"
718 } else {
719 "batch"
720 };
721
722 let _handle_tx_metrics_guard = metrics
723 .handle_submit_transaction_latency
724 .with_label_values(&[req_type])
725 .start_timer();
726
727 for (idx, tx_bytes) in request.transactions.iter().enumerate() {
728 let transaction = match bcs::from_bytes::<Transaction>(tx_bytes) {
729 Ok(txn) => txn,
730 Err(e) => {
731 return Err(SuiErrorKind::TransactionDeserializationError {
733 error: format!("Failed to deserialize transaction at index {}: {}", idx, e),
734 }
735 .into());
736 }
737 };
738
739 let tx_size = transaction.validity_check(&epoch_store.tx_validity_check_context())?;
741
742 let overload_check_res = state.check_system_overload(
743 consensus_adapter,
744 transaction.data(),
745 state.check_system_overload_at_signing(),
746 );
747 if let Err(error) = overload_check_res {
748 metrics
749 .num_rejected_tx_during_overload
750 .with_label_values(&[error.as_ref()])
751 .inc();
752 results[idx] = Some(SubmitTxResult::Rejected { error });
753 continue;
754 }
755
756 let verified_transaction = {
758 let _metrics_guard = metrics.tx_verification_latency.start_timer();
759 if epoch_store.protocol_config().address_aliases() {
760 match epoch_store.verify_transaction_with_current_aliases(transaction) {
761 Ok(tx) => tx,
762 Err(e) => {
763 metrics.signature_errors.inc();
764 return Err(e);
765 }
766 }
767 } else {
768 match epoch_store.verify_transaction_require_no_aliases(transaction) {
769 Ok(tx) => tx,
770 Err(e) => {
771 metrics.signature_errors.inc();
772 return Err(e);
773 }
774 }
775 }
776 };
777
778 let tx_digest = verified_transaction.tx().digest();
779 tx_digests.push(*tx_digest);
780
781 debug!(
782 ?tx_digest,
783 "handle_submit_transaction: verified transaction"
784 );
785
786 if let Some(effects) = state
789 .get_transaction_cache_reader()
790 .get_executed_effects(tx_digest)
791 {
792 let effects_digest = effects.digest();
793 if let Ok(executed_data) = self.complete_executed_data(effects, None).await {
794 let executed_result = SubmitTxResult::Executed {
795 effects_digest,
796 details: Some(executed_data),
797 fast_path: false,
798 };
799 results[idx] = Some(executed_result);
800 debug!(?tx_digest, "handle_submit_transaction: already executed");
801 continue;
802 }
803 }
804
805 if self
806 .state
807 .get_transaction_cache_reader()
808 .transaction_executed_in_last_epoch(tx_digest, epoch_store.epoch())
809 {
810 results[idx] = Some(SubmitTxResult::Rejected {
811 error: UserInputError::TransactionAlreadyExecuted { digest: *tx_digest }.into(),
812 });
813 debug!(
814 ?tx_digest,
815 "handle_submit_transaction: transaction already executed in previous epoch"
816 );
817 continue;
818 }
819
820 debug!(
821 ?tx_digest,
822 "handle_submit_transaction: waiting for fastpath dependency objects"
823 );
824 if !state
825 .wait_for_fastpath_dependency_objects(
826 verified_transaction.tx(),
827 epoch_store.epoch(),
828 )
829 .await?
830 {
831 debug!(
832 ?tx_digest,
833 "fastpath input objects are still unavailable after waiting"
834 );
835 }
836
837 match state.handle_vote_transaction(&epoch_store, verified_transaction.tx().clone()) {
838 Ok(_) => { }
839 Err(e) => {
840 if let Some(effects) = state
843 .get_transaction_cache_reader()
844 .get_executed_effects(tx_digest)
845 {
846 let effects_digest = effects.digest();
847 if let Ok(executed_data) = self.complete_executed_data(effects, None).await
848 {
849 let executed_result = SubmitTxResult::Executed {
850 effects_digest,
851 details: Some(executed_data),
852 fast_path: false,
853 };
854 results[idx] = Some(executed_result);
855 continue;
856 }
857 }
858
859 debug!(?tx_digest, "Transaction rejected during submission: {e}");
861 metrics
862 .submission_rejected_transactions
863 .with_label_values(&[e.to_variant_name()])
864 .inc();
865 results[idx] = Some(SubmitTxResult::Rejected { error: e });
866 continue;
867 }
868 }
869
870 if epoch_store.protocol_config().address_aliases()
872 || epoch_store.protocol_config().disable_preconsensus_locking()
873 {
874 let mut claims = vec![];
875
876 if epoch_store.protocol_config().disable_preconsensus_locking() {
877 let immutable_object_ids = self
878 .collect_immutable_object_ids(verified_transaction.tx(), state)
879 .await?;
880 if !immutable_object_ids.is_empty() {
881 claims.push(TransactionClaim::ImmutableInputObjects(
882 immutable_object_ids,
883 ));
884 }
885 }
886
887 let (tx, aliases) = verified_transaction.into_inner();
888 if epoch_store.protocol_config().address_aliases() {
889 if epoch_store
890 .protocol_config()
891 .fix_checkpoint_signature_mapping()
892 {
893 claims.push(TransactionClaim::AddressAliasesV2(aliases));
894 } else {
895 let v1_aliases: Vec<_> = tx
896 .data()
897 .intent_message()
898 .value
899 .required_signers()
900 .into_iter()
901 .zip_eq(aliases.into_iter().map(|(_, seq)| seq))
902 .collect();
903 #[allow(deprecated)]
904 claims.push(TransactionClaim::AddressAliases(
905 NonEmpty::from_vec(v1_aliases)
906 .expect("must have at least one required_signer"),
907 ));
908 }
909 }
910
911 let tx_with_claims = TransactionWithClaims::new(tx.into(), claims);
912
913 consensus_transactions.push(ConsensusTransaction::new_user_transaction_v2_message(
914 &state.name,
915 tx_with_claims,
916 ));
917 } else {
918 consensus_transactions.push(ConsensusTransaction::new_user_transaction_message(
919 &state.name,
920 verified_transaction.into_tx().into(),
921 ));
922 }
923 transaction_indexes.push(idx);
924 total_size_bytes += tx_size;
925 }
926
927 if consensus_transactions.is_empty() && !is_ping_request {
928 return Ok((Self::try_from_submit_tx_response(results)?, Weight::zero()));
929 }
930
931 let max_transaction_bytes = if is_soft_bundle_request {
935 epoch_store
936 .protocol_config()
937 .consensus_max_transactions_in_block_bytes()
938 / 2
939 } else {
940 epoch_store
941 .protocol_config()
942 .consensus_max_transactions_in_block_bytes()
943 };
944 fp_ensure!(
945 total_size_bytes <= max_transaction_bytes as usize,
946 SuiErrorKind::UserInputError {
947 error: UserInputError::TotalTransactionSizeTooLargeInBatch {
948 size: total_size_bytes,
949 limit: max_transaction_bytes,
950 },
951 }
952 .into()
953 );
954
955 metrics
956 .handle_submit_transaction_bytes
957 .with_label_values(&[req_type])
958 .observe(total_size_bytes as f64);
959 metrics
960 .handle_submit_transaction_batch_size
961 .with_label_values(&[req_type])
962 .observe(consensus_transactions.len() as f64);
963
964 let _latency_metric_guard = metrics
965 .handle_submit_transaction_consensus_latency
966 .with_label_values(&[req_type])
967 .start_timer();
968
969 let consensus_positions = if is_soft_bundle_request || is_ping_request {
970 assert!(
973 is_ping_request || !consensus_transactions.is_empty(),
974 "A valid soft bundle must have at least one transaction"
975 );
976 debug!(
977 "handle_submit_transaction: submitting consensus transactions ({}): {}",
978 req_type,
979 consensus_transactions
980 .iter()
981 .map(|t| t.local_display())
982 .join(", ")
983 );
984 self.handle_submit_to_consensus_for_position(
985 consensus_transactions,
986 &epoch_store,
987 submitter_client_addr,
988 )
989 .await?
990 } else {
991 let futures = consensus_transactions.into_iter().map(|t| {
992 debug!(
993 "handle_submit_transaction: submitting consensus transaction ({}): {}",
994 req_type,
995 t.local_display(),
996 );
997 self.handle_submit_to_consensus_for_position(
998 vec![t],
999 &epoch_store,
1000 submitter_client_addr,
1001 )
1002 });
1003 future::try_join_all(futures)
1004 .await?
1005 .into_iter()
1006 .flatten()
1007 .collect()
1008 };
1009
1010 if is_ping_request {
1011 assert_eq!(consensus_positions.len(), 1);
1013 results.push(Some(SubmitTxResult::Submitted {
1014 consensus_position: consensus_positions[0],
1015 }));
1016 } else {
1017 for ((idx, tx_digest), consensus_position) in transaction_indexes
1019 .into_iter()
1020 .zip(tx_digests)
1021 .zip(consensus_positions)
1022 {
1023 debug!(
1024 ?tx_digest,
1025 "handle_submit_transaction: submitted consensus transaction at {}",
1026 consensus_position,
1027 );
1028 results[idx] = Some(SubmitTxResult::Submitted { consensus_position });
1029 }
1030 }
1031
1032 Ok((Self::try_from_submit_tx_response(results)?, Weight::zero()))
1033 }
1034
1035 fn try_from_submit_tx_response(
1036 results: Vec<Option<SubmitTxResult>>,
1037 ) -> Result<RawSubmitTxResponse, SuiError> {
1038 let mut raw_results = Vec::new();
1039 for (i, result) in results.into_iter().enumerate() {
1040 let result = result.ok_or_else(|| SuiErrorKind::GenericAuthorityError {
1041 error: format!("Missing transaction result at {}", i),
1042 })?;
1043 let raw_result = result.try_into()?;
1044 raw_results.push(raw_result);
1045 }
1046 Ok(RawSubmitTxResponse {
1047 results: raw_results,
1048 })
1049 }
1050
1051 #[instrument(
1052 name = "ValidatorService::handle_submit_to_consensus_for_position",
1053 level = "debug",
1054 skip_all,
1055 err(level = "debug")
1056 )]
1057 async fn handle_submit_to_consensus_for_position(
1058 &self,
1059 consensus_transactions: Vec<ConsensusTransaction>,
1061 epoch_store: &Arc<AuthorityPerEpochStore>,
1062 submitter_client_addr: Option<IpAddr>,
1063 ) -> Result<Vec<ConsensusPosition>, tonic::Status> {
1064 let (tx_consensus_positions, rx_consensus_positions) = oneshot::channel();
1065
1066 {
1067 let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
1069 if !reconfiguration_lock.should_accept_user_certs() {
1070 self.metrics.num_rejected_cert_in_epoch_boundary.inc();
1071 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1072 }
1073
1074 let _metrics_guard = self.metrics.consensus_latency.start_timer();
1078
1079 self.consensus_adapter.submit_batch(
1080 &consensus_transactions,
1081 Some(&reconfiguration_lock),
1082 epoch_store,
1083 Some(tx_consensus_positions),
1084 submitter_client_addr,
1085 )?;
1086 }
1087
1088 Ok(rx_consensus_positions.await.map_err(|e| {
1089 SuiErrorKind::FailedToSubmitToConsensus(format!(
1090 "Failed to get consensus position: {e}"
1091 ))
1092 })?)
1093 }
1094
1095 async fn collect_effects_data(
1096 &self,
1097 effects: &TransactionEffects,
1098 include_events: bool,
1099 include_input_objects: bool,
1100 include_output_objects: bool,
1101 fastpath_outputs: Option<Arc<TransactionOutputs>>,
1102 ) -> SuiResult<(Option<TransactionEvents>, Vec<Object>, Vec<Object>)> {
1103 let events = if include_events && effects.events_digest().is_some() {
1104 if let Some(fastpath_outputs) = &fastpath_outputs {
1105 Some(fastpath_outputs.events.clone())
1106 } else {
1107 Some(
1108 self.state
1109 .get_transaction_events(effects.transaction_digest())?,
1110 )
1111 }
1112 } else {
1113 None
1114 };
1115
1116 let input_objects = if include_input_objects {
1117 self.state.get_transaction_input_objects(effects)?
1118 } else {
1119 vec![]
1120 };
1121
1122 let output_objects = if include_output_objects {
1123 if let Some(fastpath_outputs) = &fastpath_outputs {
1124 fastpath_outputs.written.values().cloned().collect()
1125 } else {
1126 self.state.get_transaction_output_objects(effects)?
1127 }
1128 } else {
1129 vec![]
1130 };
1131
1132 Ok((events, input_objects, output_objects))
1133 }
1134}
1135
1136type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
1137
1138impl ValidatorService {
1139 async fn handle_submit_transaction_impl(
1140 &self,
1141 request: tonic::Request<RawSubmitTxRequest>,
1142 ) -> WrappedServiceResponse<RawSubmitTxResponse> {
1143 self.handle_submit_transaction(request).await
1144 }
1145
1146 async fn wait_for_effects_impl(
1147 &self,
1148 request: tonic::Request<RawWaitForEffectsRequest>,
1149 ) -> WrappedServiceResponse<RawWaitForEffectsResponse> {
1150 let request: WaitForEffectsRequest = request.into_inner().try_into()?;
1151 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1152 let response = timeout(
1153 Duration::from_secs(20),
1155 epoch_store
1156 .within_alive_epoch(self.wait_for_effects_response(request, &epoch_store))
1157 .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch())),
1158 )
1159 .await
1160 .map_err(|_| tonic::Status::internal("Timeout waiting for effects"))???
1161 .try_into()?;
1162 Ok((tonic::Response::new(response), Weight::zero()))
1163 }
1164
1165 #[instrument(name= "ValidatorService::wait_for_effects_response", level = "error", skip_all, fields(consensus_position = ?request.consensus_position, fast_path_effects = tracing::field::Empty))]
1166 async fn wait_for_effects_response(
1167 &self,
1168 request: WaitForEffectsRequest,
1169 epoch_store: &Arc<AuthorityPerEpochStore>,
1170 ) -> SuiResult<WaitForEffectsResponse> {
1171 if request.ping_type.is_some() {
1172 return timeout(
1173 Duration::from_secs(10),
1174 self.ping_response(request, epoch_store),
1175 )
1176 .await
1177 .map_err(|_| SuiErrorKind::TimeoutError)?;
1178 }
1179
1180 let Some(tx_digest) = request.transaction_digest else {
1181 return Err(SuiErrorKind::InvalidRequest(
1182 "Transaction digest is required for wait for effects requests".to_string(),
1183 )
1184 .into());
1185 };
1186 let tx_digests = [tx_digest];
1187
1188 let fastpath_effects_future: Pin<Box<dyn Future<Output = _> + Send>> =
1189 if let Some(consensus_position) = request.consensus_position {
1190 Box::pin(self.wait_for_fastpath_effects(
1191 consensus_position,
1192 &tx_digests,
1193 request.include_details,
1194 epoch_store,
1195 ))
1196 } else {
1197 Box::pin(futures::future::pending())
1198 };
1199
1200 tokio::select! {
1201 biased;
1203 mut effects = self.state
1210 .get_transaction_cache_reader()
1211 .notify_read_executed_effects(
1212 "AuthorityServer::wait_for_effects::notify_read_executed_effects_finalized",
1213 &tx_digests,
1214 ) => {
1215 tracing::Span::current().record("fast_path_effects", false);
1216 let effects = effects.pop().unwrap();
1217 let details = if request.include_details {
1218 Some(self.complete_executed_data(effects.clone(), None).await?)
1219 } else {
1220 None
1221 };
1222
1223 Ok(WaitForEffectsResponse::Executed {
1224 effects_digest: effects.digest(),
1225 details,
1226 fast_path: false,
1227 })
1228 }
1229
1230 fastpath_response = fastpath_effects_future => {
1231 tracing::Span::current().record("fast_path_effects", true);
1232 fastpath_response
1233 }
1234 }
1235 }
1236
1237 #[instrument(level = "error", skip_all, err(level = "debug"))]
1238 async fn ping_response(
1239 &self,
1240 request: WaitForEffectsRequest,
1241 epoch_store: &Arc<AuthorityPerEpochStore>,
1242 ) -> SuiResult<WaitForEffectsResponse> {
1243 let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1244 return Err(SuiErrorKind::UnsupportedFeatureError {
1245 error: "Mysticeti fastpath".to_string(),
1246 }
1247 .into());
1248 };
1249
1250 let Some(consensus_position) = request.consensus_position else {
1251 return Err(SuiErrorKind::InvalidRequest(
1252 "Consensus position is required for Ping requests".to_string(),
1253 )
1254 .into());
1255 };
1256
1257 let Some(ping) = request.ping_type else {
1259 return Err(SuiErrorKind::InvalidRequest(
1260 "Ping type is required for ping requests".to_string(),
1261 )
1262 .into());
1263 };
1264
1265 let _metrics_guard = self
1266 .metrics
1267 .handle_wait_for_effects_ping_latency
1268 .with_label_values(&[ping.as_str()])
1269 .start_timer();
1270
1271 consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1272
1273 let mut last_status = None;
1274 let details = if request.include_details {
1275 Some(Box::new(ExecutedData::default()))
1276 } else {
1277 None
1278 };
1279
1280 loop {
1281 let status = consensus_tx_status_cache
1282 .notify_read_transaction_status_change(consensus_position, last_status)
1283 .await;
1284 match status {
1285 NotifyReadConsensusTxStatusResult::Status(status) => match status {
1286 ConsensusTxStatus::FastpathCertified => {
1287 if ping == PingType::Consensus {
1289 last_status = Some(status);
1290 continue;
1291 }
1292 return Ok(WaitForEffectsResponse::Executed {
1293 effects_digest: TransactionEffectsDigest::ZERO,
1294 details,
1295 fast_path: true,
1296 });
1297 }
1298 ConsensusTxStatus::Rejected => {
1299 return Ok(WaitForEffectsResponse::Rejected { error: None });
1300 }
1301 ConsensusTxStatus::Finalized => {
1302 return Ok(WaitForEffectsResponse::Executed {
1303 effects_digest: TransactionEffectsDigest::ZERO,
1304 details,
1305 fast_path: false,
1306 });
1307 }
1308 ConsensusTxStatus::Dropped => {
1309 return Ok(WaitForEffectsResponse::Rejected {
1312 error: epoch_store.get_rejection_vote_reason(consensus_position),
1313 });
1314 }
1315 },
1316 NotifyReadConsensusTxStatusResult::Expired(round) => {
1317 return Ok(WaitForEffectsResponse::Expired {
1318 epoch: epoch_store.epoch(),
1319 round: Some(round),
1320 });
1321 }
1322 }
1323 }
1324 }
1325
1326 #[instrument(level = "error", skip_all, err(level = "debug"))]
1327 async fn wait_for_fastpath_effects(
1328 &self,
1329 consensus_position: ConsensusPosition,
1330 tx_digests: &[TransactionDigest],
1331 include_details: bool,
1332 epoch_store: &Arc<AuthorityPerEpochStore>,
1333 ) -> SuiResult<WaitForEffectsResponse> {
1334 let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1335 return Err(SuiErrorKind::UnsupportedFeatureError {
1336 error: "Mysticeti fastpath".to_string(),
1337 }
1338 .into());
1339 };
1340
1341 let local_epoch = epoch_store.epoch();
1342 match consensus_position.epoch.cmp(&local_epoch) {
1343 Ordering::Less => {
1344 let response = WaitForEffectsResponse::Expired {
1347 epoch: local_epoch,
1348 round: None,
1349 };
1350 return Ok(response);
1351 }
1352 Ordering::Greater => {
1353 return Err(SuiErrorKind::WrongEpoch {
1355 expected_epoch: local_epoch,
1356 actual_epoch: consensus_position.epoch,
1357 }
1358 .into());
1359 }
1360 Ordering::Equal => {
1361 }
1364 };
1365
1366 consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1367
1368 let mut current_status = None;
1369 loop {
1370 tokio::select! {
1371 status_result = consensus_tx_status_cache
1372 .notify_read_transaction_status_change(consensus_position, current_status) => {
1373 match status_result {
1374 NotifyReadConsensusTxStatusResult::Status(new_status) => {
1375 match new_status {
1376 ConsensusTxStatus::Rejected => {
1377 return Ok(WaitForEffectsResponse::Rejected {
1378 error: epoch_store.get_rejection_vote_reason(
1379 consensus_position
1380 )
1381 });
1382 }
1383 ConsensusTxStatus::FastpathCertified => {
1384 current_status = Some(new_status);
1385 continue;
1386 }
1387 ConsensusTxStatus::Finalized => {
1388 current_status = Some(new_status);
1389 continue;
1390 }
1391 ConsensusTxStatus::Dropped => {
1392 return Ok(WaitForEffectsResponse::Rejected {
1395 error: epoch_store
1396 .get_rejection_vote_reason(consensus_position),
1397 });
1398 }
1399 }
1400 }
1401 NotifyReadConsensusTxStatusResult::Expired(round) => {
1402 return Ok(WaitForEffectsResponse::Expired {
1403 epoch: epoch_store.epoch(),
1404 round: Some(round),
1405 });
1406 }
1407 }
1408 }
1409
1410 mut outputs = self.state.get_transaction_cache_reader().notify_read_fastpath_transaction_outputs(tx_digests),
1411 if current_status == Some(ConsensusTxStatus::FastpathCertified) || current_status == Some(ConsensusTxStatus::Finalized) => {
1412 let outputs = outputs.pop().unwrap();
1413 let effects = outputs.effects.clone();
1414
1415 let details = if include_details {
1416 Some(self.complete_executed_data(effects.clone(), Some(outputs)).await?)
1417 } else {
1418 None
1419 };
1420
1421 return Ok(WaitForEffectsResponse::Executed {
1422 effects_digest: effects.digest(),
1423 details,
1424 fast_path: current_status == Some(ConsensusTxStatus::FastpathCertified),
1425 });
1426 }
1427 }
1428 }
1429 }
1430
1431 async fn complete_executed_data(
1432 &self,
1433 effects: TransactionEffects,
1434 fastpath_outputs: Option<Arc<TransactionOutputs>>,
1435 ) -> SuiResult<Box<ExecutedData>> {
1436 let (events, input_objects, output_objects) = self
1437 .collect_effects_data(
1438 &effects,
1439 true,
1440 true,
1441 true,
1442 fastpath_outputs,
1443 )
1444 .await?;
1445 Ok(Box::new(ExecutedData {
1446 effects,
1447 events,
1448 input_objects,
1449 output_objects,
1450 }))
1451 }
1452
1453 async fn object_info_impl(
1454 &self,
1455 request: tonic::Request<ObjectInfoRequest>,
1456 ) -> WrappedServiceResponse<ObjectInfoResponse> {
1457 let request = request.into_inner();
1458 let response = self.state.handle_object_info_request(request).await?;
1459 Ok((tonic::Response::new(response), Weight::one()))
1460 }
1461
1462 async fn transaction_info_impl(
1463 &self,
1464 request: tonic::Request<TransactionInfoRequest>,
1465 ) -> WrappedServiceResponse<TransactionInfoResponse> {
1466 let request = request.into_inner();
1467 let response = self.state.handle_transaction_info_request(request).await?;
1468 Ok((tonic::Response::new(response), Weight::one()))
1469 }
1470
1471 async fn checkpoint_impl(
1472 &self,
1473 request: tonic::Request<CheckpointRequest>,
1474 ) -> WrappedServiceResponse<CheckpointResponse> {
1475 let request = request.into_inner();
1476 let response = self.state.handle_checkpoint_request(&request)?;
1477 Ok((tonic::Response::new(response), Weight::one()))
1478 }
1479
1480 async fn checkpoint_v2_impl(
1481 &self,
1482 request: tonic::Request<CheckpointRequestV2>,
1483 ) -> WrappedServiceResponse<CheckpointResponseV2> {
1484 let request = request.into_inner();
1485 let response = self.state.handle_checkpoint_request_v2(&request)?;
1486 Ok((tonic::Response::new(response), Weight::one()))
1487 }
1488
1489 async fn get_system_state_object_impl(
1490 &self,
1491 _request: tonic::Request<SystemStateRequest>,
1492 ) -> WrappedServiceResponse<SuiSystemState> {
1493 let response = self
1494 .state
1495 .get_object_cache_reader()
1496 .get_sui_system_state_object_unsafe()?;
1497 Ok((tonic::Response::new(response), Weight::one()))
1498 }
1499
1500 async fn validator_health_impl(
1501 &self,
1502 _request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1503 ) -> WrappedServiceResponse<sui_types::messages_grpc::RawValidatorHealthResponse> {
1504 let state = &self.state;
1505
1506 let epoch_store = state.load_epoch_store_one_call_per_task();
1508
1509 let num_inflight_execution_transactions =
1511 state.execution_scheduler().num_pending_certificates() as u64;
1512
1513 let num_inflight_consensus_transactions =
1515 self.consensus_adapter.num_inflight_transactions();
1516
1517 let last_committed_leader_round = epoch_store
1519 .consensus_tx_status_cache
1520 .as_ref()
1521 .and_then(|cache| cache.get_last_committed_leader_round())
1522 .unwrap_or(0);
1523
1524 let last_locally_built_checkpoint = epoch_store
1526 .last_built_checkpoint_summary()
1527 .ok()
1528 .flatten()
1529 .map(|(_, summary)| summary.sequence_number)
1530 .unwrap_or(0);
1531
1532 let typed_response = sui_types::messages_grpc::ValidatorHealthResponse {
1533 num_inflight_consensus_transactions,
1534 num_inflight_execution_transactions,
1535 last_locally_built_checkpoint,
1536 last_committed_leader_round,
1537 };
1538
1539 let raw_response = typed_response
1540 .try_into()
1541 .map_err(|e: sui_types::error::SuiError| {
1542 tonic::Status::internal(format!("Failed to serialize health response: {}", e))
1543 })?;
1544
1545 Ok((tonic::Response::new(raw_response), Weight::one()))
1546 }
1547
1548 fn get_client_ip_addr<T>(
1549 &self,
1550 request: &tonic::Request<T>,
1551 source: &ClientIdSource,
1552 ) -> Option<IpAddr> {
1553 let forwarded_header = request.metadata().get_all("x-forwarded-for").iter().next();
1554
1555 if let Some(header) = forwarded_header {
1556 let num_hops = header
1557 .to_str()
1558 .map(|h| h.split(',').count().saturating_sub(1))
1559 .unwrap_or(0);
1560
1561 self.metrics.x_forwarded_for_num_hops.set(num_hops as f64);
1562 }
1563
1564 match source {
1565 ClientIdSource::SocketAddr => {
1566 let socket_addr: Option<SocketAddr> = request.remote_addr();
1567
1568 if let Some(socket_addr) = socket_addr {
1574 Some(socket_addr.ip())
1575 } else {
1576 if cfg!(msim) {
1577 } else if cfg!(test) {
1579 panic!("Failed to get remote address from request");
1580 } else {
1581 self.metrics.connection_ip_not_found.inc();
1582 error!("Failed to get remote address from request");
1583 }
1584 None
1585 }
1586 }
1587 ClientIdSource::XForwardedFor(num_hops) => {
1588 let do_header_parse = |op: &MetadataValue<Ascii>| {
1589 match op.to_str() {
1590 Ok(header_val) => {
1591 let header_contents =
1592 header_val.split(',').map(str::trim).collect::<Vec<_>>();
1593 if *num_hops == 0 {
1594 error!(
1595 "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
1596 number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
1597 to this node. Skipping traffic controller request handling.",
1598 header_contents,
1599 );
1600 return None;
1601 }
1602 let contents_len = header_contents.len();
1603 if contents_len < *num_hops {
1604 error!(
1605 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1606 Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
1607 `client-id-source` in the node config.",
1608 header_contents, contents_len, num_hops, contents_len,
1609 );
1610 self.metrics.client_id_source_config_mismatch.inc();
1611 return None;
1612 }
1613 let Some(client_ip) = header_contents.get(contents_len - num_hops)
1614 else {
1615 error!(
1616 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1617 Expected at least {} values. Skipping traffic controller request handling.",
1618 header_contents, contents_len, num_hops, contents_len,
1619 );
1620 return None;
1621 };
1622 parse_ip(client_ip).or_else(|| {
1623 self.metrics.forwarded_header_parse_error.inc();
1624 None
1625 })
1626 }
1627 Err(e) => {
1628 self.metrics.forwarded_header_invalid.inc();
1632 error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
1633 None
1634 }
1635 }
1636 };
1637 if let Some(op) = request.metadata().get("x-forwarded-for") {
1638 do_header_parse(op)
1639 } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
1640 do_header_parse(op)
1641 } else {
1642 self.metrics.forwarded_header_not_included.inc();
1643 error!(
1644 "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
1645 );
1646 None
1647 }
1648 }
1649 }
1650 }
1651
1652 async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
1653 if let Some(traffic_controller) = &self.traffic_controller {
1654 if !traffic_controller.check(&client, &None).await {
1655 Err(tonic::Status::from_error(
1657 SuiErrorKind::TooManyRequests.into(),
1658 ))
1659 } else {
1660 Ok(())
1661 }
1662 } else {
1663 Ok(())
1664 }
1665 }
1666
1667 fn handle_traffic_resp<T>(
1668 &self,
1669 client: Option<IpAddr>,
1670 wrapped_response: WrappedServiceResponse<T>,
1671 ) -> Result<tonic::Response<T>, tonic::Status> {
1672 let (error, spam_weight, unwrapped_response) = match wrapped_response {
1673 Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
1674 Err(status) => (
1675 Some(SuiError::from(status.clone())),
1676 Weight::zero(),
1677 Err(status.clone()),
1678 ),
1679 };
1680
1681 if let Some(traffic_controller) = self.traffic_controller.clone() {
1682 traffic_controller.tally(TrafficTally {
1683 direct: client,
1684 through_fullnode: None,
1685 error_info: error.map(|e| {
1686 let error_type = String::from(e.clone().as_ref());
1687 let error_weight = normalize(e);
1688 (error_weight, error_type)
1689 }),
1690 spam_weight,
1691 timestamp: SystemTime::now(),
1692 })
1693 }
1694 unwrapped_response
1695 }
1696}
1697
1698fn normalize(err: SuiError) -> Weight {
1700 match err.as_inner() {
1701 SuiErrorKind::UserInputError {
1702 error: UserInputError::IncorrectUserSignature { .. },
1703 } => Weight::one(),
1704 SuiErrorKind::InvalidSignature { .. }
1705 | SuiErrorKind::SignerSignatureAbsent { .. }
1706 | SuiErrorKind::SignerSignatureNumberMismatch { .. }
1707 | SuiErrorKind::IncorrectSigner { .. }
1708 | SuiErrorKind::UnknownSigner { .. }
1709 | SuiErrorKind::WrongEpoch { .. } => Weight::one(),
1710 _ => Weight::zero(),
1711 }
1712}
1713
1714#[macro_export]
1718macro_rules! handle_with_decoration {
1719 ($self:ident, $func_name:ident, $request:ident) => {{
1720 if $self.client_id_source.is_none() {
1721 return $self.$func_name($request).await.map(|(result, _)| result);
1722 }
1723
1724 let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
1725
1726 $self.handle_traffic_req(client.clone()).await?;
1728
1729 let wrapped_response = $self.$func_name($request).await;
1731 $self.handle_traffic_resp(client, wrapped_response)
1732 }};
1733}
1734
1735#[async_trait]
1736impl Validator for ValidatorService {
1737 async fn submit_transaction(
1738 &self,
1739 request: tonic::Request<RawSubmitTxRequest>,
1740 ) -> Result<tonic::Response<RawSubmitTxResponse>, tonic::Status> {
1741 let validator_service = self.clone();
1742
1743 spawn_monitored_task!(async move {
1746 handle_with_decoration!(validator_service, handle_submit_transaction_impl, request)
1749 })
1750 .await
1751 .unwrap()
1752 }
1753
1754 async fn wait_for_effects(
1755 &self,
1756 request: tonic::Request<RawWaitForEffectsRequest>,
1757 ) -> Result<tonic::Response<RawWaitForEffectsResponse>, tonic::Status> {
1758 handle_with_decoration!(self, wait_for_effects_impl, request)
1759 }
1760
1761 async fn object_info(
1762 &self,
1763 request: tonic::Request<ObjectInfoRequest>,
1764 ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
1765 handle_with_decoration!(self, object_info_impl, request)
1766 }
1767
1768 async fn transaction_info(
1769 &self,
1770 request: tonic::Request<TransactionInfoRequest>,
1771 ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
1772 handle_with_decoration!(self, transaction_info_impl, request)
1773 }
1774
1775 async fn checkpoint(
1776 &self,
1777 request: tonic::Request<CheckpointRequest>,
1778 ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
1779 handle_with_decoration!(self, checkpoint_impl, request)
1780 }
1781
1782 async fn checkpoint_v2(
1783 &self,
1784 request: tonic::Request<CheckpointRequestV2>,
1785 ) -> Result<tonic::Response<CheckpointResponseV2>, tonic::Status> {
1786 handle_with_decoration!(self, checkpoint_v2_impl, request)
1787 }
1788
1789 async fn get_system_state_object(
1790 &self,
1791 request: tonic::Request<SystemStateRequest>,
1792 ) -> Result<tonic::Response<SuiSystemState>, tonic::Status> {
1793 handle_with_decoration!(self, get_system_state_object_impl, request)
1794 }
1795
1796 async fn validator_health(
1797 &self,
1798 request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1799 ) -> Result<tonic::Response<sui_types::messages_grpc::RawValidatorHealthResponse>, tonic::Status>
1800 {
1801 handle_with_decoration!(self, validator_health_impl, request)
1802 }
1803}