1use anyhow::Result;
6use async_trait::async_trait;
7use fastcrypto::traits::KeyPair;
8use futures::{TryFutureExt, future};
9use itertools::Itertools as _;
10use mysten_common::{assert_reachable, debug_fatal};
11use mysten_metrics::spawn_monitored_task;
12use prometheus::{
13 Gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, Registry,
14 register_gauge_with_registry, register_histogram_vec_with_registry,
15 register_histogram_with_registry, register_int_counter_vec_with_registry,
16 register_int_counter_with_registry,
17};
18use std::{
19 cmp::Ordering,
20 future::Future,
21 io,
22 net::{IpAddr, SocketAddr},
23 pin::Pin,
24 sync::Arc,
25 time::{Duration, SystemTime},
26};
27use sui_network::{
28 api::{Validator, ValidatorServer},
29 tonic,
30 validator::server::SUI_TLS_SERVER_NAME,
31};
32use sui_types::effects::TransactionEffectsAPI;
33use sui_types::message_envelope::Message;
34use sui_types::messages_consensus::ConsensusPosition;
35use sui_types::messages_consensus::ConsensusTransaction;
36use sui_types::messages_grpc::{
37 ObjectInfoRequest, ObjectInfoResponse, RawSubmitTxResponse, SystemStateRequest,
38 TransactionInfoRequest, TransactionInfoResponse,
39};
40use sui_types::multiaddr::Multiaddr;
41use sui_types::object::Object;
42use sui_types::sui_system_state::SuiSystemState;
43use sui_types::traffic_control::{ClientIdSource, Weight};
44use sui_types::{
45 base_types::ObjectID,
46 digests::{TransactionDigest, TransactionEffectsDigest},
47 error::{SuiErrorKind, UserInputError},
48};
49use sui_types::{
50 effects::TransactionEffects,
51 messages_grpc::{
52 ExecutedData, RawSubmitTxRequest, RawWaitForEffectsRequest, RawWaitForEffectsResponse,
53 SubmitTxResult, WaitForEffectsRequest, WaitForEffectsResponse,
54 },
55};
56use sui_types::{effects::TransactionEvents, messages_grpc::SubmitTxType};
57use sui_types::{error::*, transaction::*};
58use sui_types::{
59 fp_ensure,
60 messages_checkpoint::{
61 CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
62 },
63};
64use tokio::sync::oneshot;
65use tokio::time::timeout;
66use tonic::metadata::{Ascii, MetadataValue};
67use tracing::{debug, error, info, instrument};
68
69use crate::consensus_adapter::ConnectionMonitorStatusForTests;
70use crate::{
71 authority::{AuthorityState, consensus_tx_status_cache::ConsensusTxStatus},
72 consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics},
73 traffic_controller::{TrafficController, parse_ip, policies::TrafficTally},
74};
75use crate::{
76 authority::{
77 authority_per_epoch_store::AuthorityPerEpochStore,
78 consensus_tx_status_cache::NotifyReadConsensusTxStatusResult,
79 },
80 checkpoints::CheckpointStore,
81 mysticeti_adapter::LazyMysticetiClient,
82 transaction_outputs::TransactionOutputs,
83};
84use sui_config::local_ip_utils::new_local_tcp_address_for_testing;
85use sui_types::messages_grpc::PingType;
86
87#[cfg(test)]
88#[path = "unit_tests/server_tests.rs"]
89mod server_tests;
90
91#[cfg(test)]
92#[path = "unit_tests/wait_for_effects_tests.rs"]
93mod wait_for_effects_tests;
94
95#[cfg(test)]
96#[path = "unit_tests/submit_transaction_tests.rs"]
97mod submit_transaction_tests;
98
99pub struct AuthorityServerHandle {
100 server_handle: sui_network::validator::server::Server,
101}
102
103impl AuthorityServerHandle {
104 pub async fn join(self) -> Result<(), io::Error> {
105 self.server_handle.handle().wait_for_shutdown().await;
106 Ok(())
107 }
108
109 pub async fn kill(self) -> Result<(), io::Error> {
110 self.server_handle.handle().shutdown().await;
111 Ok(())
112 }
113
114 pub fn address(&self) -> &Multiaddr {
115 self.server_handle.local_addr()
116 }
117}
118
119pub struct AuthorityServer {
120 address: Multiaddr,
121 pub state: Arc<AuthorityState>,
122 consensus_adapter: Arc<ConsensusAdapter>,
123 pub metrics: Arc<ValidatorServiceMetrics>,
124}
125
126impl AuthorityServer {
127 pub fn new_for_test_with_consensus_adapter(
128 state: Arc<AuthorityState>,
129 consensus_adapter: Arc<ConsensusAdapter>,
130 ) -> Self {
131 let address = new_local_tcp_address_for_testing();
132 let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
133
134 Self {
135 address,
136 state,
137 consensus_adapter,
138 metrics,
139 }
140 }
141
142 pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
143 let consensus_adapter = Arc::new(ConsensusAdapter::new(
144 Arc::new(LazyMysticetiClient::new()),
145 CheckpointStore::new_for_tests(),
146 state.name,
147 Arc::new(ConnectionMonitorStatusForTests {}),
148 100_000,
149 100_000,
150 None,
151 None,
152 ConsensusAdapterMetrics::new_test(),
153 state.epoch_store_for_testing().protocol_config().clone(),
154 ));
155 Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
156 }
157
158 pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
159 let address = self.address.clone();
160 self.spawn_with_bind_address_for_test(address).await
161 }
162
163 pub async fn spawn_with_bind_address_for_test(
164 self,
165 address: Multiaddr,
166 ) -> Result<AuthorityServerHandle, io::Error> {
167 let tls_config = sui_tls::create_rustls_server_config(
168 self.state.config.network_key_pair().copy().private(),
169 SUI_TLS_SERVER_NAME.to_string(),
170 );
171 let config = mysten_network::config::Config::new();
172 let server = sui_network::validator::server::ServerBuilder::from_config(
173 &config,
174 mysten_network::metrics::DefaultMetricsCallbackProvider::default(),
175 )
176 .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
177 self.state,
178 self.consensus_adapter,
179 self.metrics,
180 )))
181 .bind(&address, Some(tls_config))
182 .await
183 .unwrap();
184 let local_addr = server.local_addr().to_owned();
185 info!("Listening to traffic on {local_addr}");
186 let handle = AuthorityServerHandle {
187 server_handle: server,
188 };
189 Ok(handle)
190 }
191}
192
193pub struct ValidatorServiceMetrics {
194 pub signature_errors: IntCounter,
195 pub tx_verification_latency: Histogram,
196 pub cert_verification_latency: Histogram,
197 pub consensus_latency: Histogram,
198 pub handle_transaction_latency: Histogram,
199 pub submit_certificate_consensus_latency: Histogram,
200 pub handle_certificate_consensus_latency: Histogram,
201 pub handle_certificate_non_consensus_latency: Histogram,
202 pub handle_soft_bundle_certificates_consensus_latency: Histogram,
203 pub handle_soft_bundle_certificates_count: Histogram,
204 pub handle_soft_bundle_certificates_size_bytes: Histogram,
205 pub handle_transaction_consensus_latency: Histogram,
206 pub handle_submit_transaction_consensus_latency: HistogramVec,
207 pub handle_wait_for_effects_ping_latency: HistogramVec,
208
209 handle_submit_transaction_latency: HistogramVec,
210 handle_submit_transaction_bytes: HistogramVec,
211 handle_submit_transaction_batch_size: HistogramVec,
212
213 num_rejected_cert_in_epoch_boundary: IntCounter,
214 num_rejected_tx_during_overload: IntCounterVec,
215 submission_rejected_transactions: IntCounterVec,
216 connection_ip_not_found: IntCounter,
217 forwarded_header_parse_error: IntCounter,
218 forwarded_header_invalid: IntCounter,
219 forwarded_header_not_included: IntCounter,
220 client_id_source_config_mismatch: IntCounter,
221 x_forwarded_for_num_hops: Gauge,
222}
223
224impl ValidatorServiceMetrics {
225 pub fn new(registry: &Registry) -> Self {
226 Self {
227 signature_errors: register_int_counter_with_registry!(
228 "total_signature_errors",
229 "Number of transaction signature errors",
230 registry,
231 )
232 .unwrap(),
233 tx_verification_latency: register_histogram_with_registry!(
234 "validator_service_tx_verification_latency",
235 "Latency of verifying a transaction",
236 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
237 registry,
238 )
239 .unwrap(),
240 cert_verification_latency: register_histogram_with_registry!(
241 "validator_service_cert_verification_latency",
242 "Latency of verifying a certificate",
243 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
244 registry,
245 )
246 .unwrap(),
247 consensus_latency: register_histogram_with_registry!(
248 "validator_service_consensus_latency",
249 "Time spent between submitting a txn to consensus and getting back local acknowledgement. Execution and finalization time are not included.",
250 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
251 registry,
252 )
253 .unwrap(),
254 handle_transaction_latency: register_histogram_with_registry!(
255 "validator_service_handle_transaction_latency",
256 "Latency of handling a transaction",
257 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
258 registry,
259 )
260 .unwrap(),
261 handle_certificate_consensus_latency: register_histogram_with_registry!(
262 "validator_service_handle_certificate_consensus_latency",
263 "Latency of handling a consensus transaction certificate",
264 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
265 registry,
266 )
267 .unwrap(),
268 submit_certificate_consensus_latency: register_histogram_with_registry!(
269 "validator_service_submit_certificate_consensus_latency",
270 "Latency of submit_certificate RPC handler",
271 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
272 registry,
273 )
274 .unwrap(),
275 handle_certificate_non_consensus_latency: register_histogram_with_registry!(
276 "validator_service_handle_certificate_non_consensus_latency",
277 "Latency of handling a non-consensus transaction certificate",
278 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
279 registry,
280 )
281 .unwrap(),
282 handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
283 "validator_service_handle_soft_bundle_certificates_consensus_latency",
284 "Latency of handling a consensus soft bundle",
285 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
286 registry,
287 )
288 .unwrap(),
289 handle_soft_bundle_certificates_count: register_histogram_with_registry!(
290 "handle_soft_bundle_certificates_count",
291 "The number of certificates included in a soft bundle",
292 mysten_metrics::COUNT_BUCKETS.to_vec(),
293 registry,
294 )
295 .unwrap(),
296 handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
297 "handle_soft_bundle_certificates_size_bytes",
298 "The size of soft bundle in bytes",
299 mysten_metrics::BYTES_BUCKETS.to_vec(),
300 registry,
301 )
302 .unwrap(),
303 handle_transaction_consensus_latency: register_histogram_with_registry!(
304 "validator_service_handle_transaction_consensus_latency",
305 "Latency of handling a user transaction sent through consensus",
306 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
307 registry,
308 )
309 .unwrap(),
310 handle_submit_transaction_consensus_latency: register_histogram_vec_with_registry!(
311 "validator_service_submit_transaction_consensus_latency",
312 "Latency of submitting a user transaction sent through consensus",
313 &["req_type"],
314 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
315 registry,
316 )
317 .unwrap(),
318 handle_submit_transaction_latency: register_histogram_vec_with_registry!(
319 "validator_service_submit_transaction_latency",
320 "Latency of submit transaction handler",
321 &["req_type"],
322 mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
323 registry,
324 )
325 .unwrap(),
326 handle_wait_for_effects_ping_latency: register_histogram_vec_with_registry!(
327 "validator_service_handle_wait_for_effects_ping_latency",
328 "Latency of handling a ping request for wait_for_effects",
329 &["req_type"],
330 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
331 registry,
332 )
333 .unwrap(),
334 handle_submit_transaction_bytes: register_histogram_vec_with_registry!(
335 "validator_service_submit_transaction_bytes",
336 "The size of transactions in the submit transaction request",
337 &["req_type"],
338 mysten_metrics::BYTES_BUCKETS.to_vec(),
339 registry,
340 )
341 .unwrap(),
342 handle_submit_transaction_batch_size: register_histogram_vec_with_registry!(
343 "validator_service_submit_transaction_batch_size",
344 "The number of transactions in the submit transaction request",
345 &["req_type"],
346 mysten_metrics::COUNT_BUCKETS.to_vec(),
347 registry,
348 )
349 .unwrap(),
350 num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
351 "validator_service_num_rejected_cert_in_epoch_boundary",
352 "Number of rejected transaction certificate during epoch transitioning",
353 registry,
354 )
355 .unwrap(),
356 num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
357 "validator_service_num_rejected_tx_during_overload",
358 "Number of rejected transaction due to system overload",
359 &["error_type"],
360 registry,
361 )
362 .unwrap(),
363 submission_rejected_transactions: register_int_counter_vec_with_registry!(
364 "validator_service_submission_rejected_transactions",
365 "Number of transactions rejected during submission",
366 &["reason"],
367 registry,
368 )
369 .unwrap(),
370 connection_ip_not_found: register_int_counter_with_registry!(
371 "validator_service_connection_ip_not_found",
372 "Number of times connection IP was not extractable from request",
373 registry,
374 )
375 .unwrap(),
376 forwarded_header_parse_error: register_int_counter_with_registry!(
377 "validator_service_forwarded_header_parse_error",
378 "Number of times x-forwarded-for header could not be parsed",
379 registry,
380 )
381 .unwrap(),
382 forwarded_header_invalid: register_int_counter_with_registry!(
383 "validator_service_forwarded_header_invalid",
384 "Number of times x-forwarded-for header was invalid",
385 registry,
386 )
387 .unwrap(),
388 forwarded_header_not_included: register_int_counter_with_registry!(
389 "validator_service_forwarded_header_not_included",
390 "Number of times x-forwarded-for header was (unexpectedly) not included in request",
391 registry,
392 )
393 .unwrap(),
394 client_id_source_config_mismatch: register_int_counter_with_registry!(
395 "validator_service_client_id_source_config_mismatch",
396 "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
397 registry,
398 )
399 .unwrap(),
400 x_forwarded_for_num_hops: register_gauge_with_registry!(
401 "validator_service_x_forwarded_for_num_hops",
402 "Number of hops in x-forwarded-for header",
403 registry,
404 )
405 .unwrap(),
406 }
407 }
408
409 pub fn new_for_tests() -> Self {
410 let registry = Registry::new();
411 Self::new(®istry)
412 }
413}
414
415#[derive(Clone)]
416pub struct ValidatorService {
417 state: Arc<AuthorityState>,
418 consensus_adapter: Arc<ConsensusAdapter>,
419 metrics: Arc<ValidatorServiceMetrics>,
420 traffic_controller: Option<Arc<TrafficController>>,
421 client_id_source: Option<ClientIdSource>,
422}
423
424impl ValidatorService {
425 pub fn new(
426 state: Arc<AuthorityState>,
427 consensus_adapter: Arc<ConsensusAdapter>,
428 validator_metrics: Arc<ValidatorServiceMetrics>,
429 client_id_source: Option<ClientIdSource>,
430 ) -> Self {
431 let traffic_controller = state.traffic_controller.clone();
432 Self {
433 state,
434 consensus_adapter,
435 metrics: validator_metrics,
436 traffic_controller,
437 client_id_source,
438 }
439 }
440
441 pub fn new_for_tests(
442 state: Arc<AuthorityState>,
443 consensus_adapter: Arc<ConsensusAdapter>,
444 metrics: Arc<ValidatorServiceMetrics>,
445 ) -> Self {
446 Self {
447 state,
448 consensus_adapter,
449 metrics,
450 traffic_controller: None,
451 client_id_source: None,
452 }
453 }
454
455 pub fn validator_state(&self) -> &Arc<AuthorityState> {
456 &self.state
457 }
458
459 pub fn handle_transaction_for_testing(&self, transaction: Transaction) -> SuiResult<()> {
461 let epoch_store = self.state.load_epoch_store_one_call_per_task();
462
463 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
465
466 let transaction = epoch_store
468 .verify_transaction_require_no_aliases(transaction)?
469 .into_tx();
470
471 self.state
473 .handle_vote_transaction(&epoch_store, transaction)?;
474
475 Ok(())
476 }
477
478 pub fn handle_transaction_for_testing_with_overload_check(
481 &self,
482 transaction: Transaction,
483 ) -> SuiResult<()> {
484 let epoch_store = self.state.load_epoch_store_one_call_per_task();
485
486 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
488
489 self.state.check_system_overload(
491 self.consensus_adapter.as_ref(),
492 transaction.data(),
493 self.state.check_system_overload_at_signing(),
494 )?;
495
496 let transaction = epoch_store
498 .verify_transaction_require_no_aliases(transaction)?
499 .into_tx();
500
501 self.state
503 .handle_vote_transaction(&epoch_store, transaction)?;
504
505 Ok(())
506 }
507
508 async fn collect_immutable_object_ids(
511 &self,
512 tx: &VerifiedTransaction,
513 state: &AuthorityState,
514 ) -> SuiResult<Vec<ObjectID>> {
515 let input_objects = tx.data().transaction_data().input_objects()?;
516
517 let object_ids: Vec<ObjectID> = input_objects
519 .iter()
520 .filter_map(|obj| match obj {
521 InputObjectKind::ImmOrOwnedMoveObject((id, _, _)) => Some(*id),
522 _ => None,
523 })
524 .collect();
525 if object_ids.is_empty() {
526 return Ok(vec![]);
527 }
528
529 let objects = state.get_object_cache_reader().get_objects(&object_ids);
531
532 objects
534 .into_iter()
535 .zip(object_ids.iter())
536 .filter_map(|(obj, id)| {
537 let Some(o) = obj else {
538 return Some(Err::<ObjectID, SuiError>(
539 SuiErrorKind::UserInputError {
540 error: UserInputError::ObjectNotFound {
541 object_id: *id,
542 version: None,
543 },
544 }
545 .into(),
546 ));
547 };
548 if o.is_immutable() {
549 Some(Ok(*id))
550 } else {
551 None
552 }
553 })
554 .collect::<SuiResult<Vec<ObjectID>>>()
555 }
556
557 #[instrument(
558 name = "ValidatorService::handle_submit_transaction",
559 level = "error",
560 skip_all,
561 err(level = "debug")
562 )]
563 async fn handle_submit_transaction(
564 &self,
565 request: tonic::Request<RawSubmitTxRequest>,
566 ) -> WrappedServiceResponse<RawSubmitTxResponse> {
567 let Self {
568 state,
569 consensus_adapter,
570 metrics,
571 traffic_controller: _,
572 client_id_source,
573 } = self.clone();
574
575 let submitter_client_addr = if let Some(client_id_source) = &client_id_source {
576 self.get_client_ip_addr(&request, client_id_source)
577 } else {
578 self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
579 };
580
581 let inner = request.into_inner();
582 let start_epoch = state.load_epoch_store_one_call_per_task().epoch();
583
584 let next_epoch = start_epoch + 1;
585 let mut max_retries = 1;
586
587 loop {
588 let res = self
589 .handle_submit_transaction_inner(
590 &state,
591 &consensus_adapter,
592 &metrics,
593 &inner,
594 submitter_client_addr,
595 )
596 .await;
597 match res {
598 Ok((response, weight)) => return Ok((tonic::Response::new(response), weight)),
599 Err(err) => {
600 if max_retries > 0
601 && let SuiErrorKind::ValidatorHaltedAtEpochEnd = err.as_inner()
602 {
603 max_retries -= 1;
604
605 debug!(
606 "ValidatorHaltedAtEpochEnd. Will retry after validator reconfigures"
607 );
608
609 if let Ok(Ok(new_epoch)) =
610 timeout(Duration::from_secs(15), state.wait_for_epoch(next_epoch)).await
611 {
612 assert_reachable!("retry submission at epoch end");
613 if new_epoch == next_epoch {
614 continue;
615 }
616
617 debug_fatal!(
618 "expected epoch {} after reconfiguration. got {}",
619 next_epoch,
620 new_epoch
621 );
622 }
623 }
624 return Err(err.into());
625 }
626 }
627 }
628 }
629
630 async fn handle_submit_transaction_inner(
631 &self,
632 state: &AuthorityState,
633 consensus_adapter: &ConsensusAdapter,
634 metrics: &ValidatorServiceMetrics,
635 request: &RawSubmitTxRequest,
636 submitter_client_addr: Option<IpAddr>,
637 ) -> SuiResult<(RawSubmitTxResponse, Weight)> {
638 let epoch_store = state.load_epoch_store_one_call_per_task();
639 if !epoch_store.protocol_config().mysticeti_fastpath() {
640 return Err(SuiErrorKind::UnsupportedFeatureError {
641 error: "Mysticeti fastpath".to_string(),
642 }
643 .into());
644 }
645
646 let submit_type = SubmitTxType::try_from(request.submit_type).map_err(|e| {
647 SuiErrorKind::GrpcMessageDeserializeError {
648 type_info: "RawSubmitTxRequest.submit_type".to_string(),
649 error: e.to_string(),
650 }
651 })?;
652
653 let is_ping_request = submit_type == SubmitTxType::Ping;
654 if is_ping_request {
655 fp_ensure!(
656 request.transactions.is_empty(),
657 SuiErrorKind::InvalidRequest(format!(
658 "Ping request cannot contain {} transactions",
659 request.transactions.len()
660 ))
661 .into()
662 );
663 } else {
664 fp_ensure!(
666 !request.transactions.is_empty(),
667 SuiErrorKind::InvalidRequest(
668 "At least one transaction needs to be submitted".to_string(),
669 )
670 .into()
671 );
672 }
673
674 let is_soft_bundle_request = submit_type == SubmitTxType::SoftBundle;
679
680 let max_num_transactions = if is_soft_bundle_request {
681 epoch_store.protocol_config().max_soft_bundle_size()
684 } else {
685 epoch_store
687 .protocol_config()
688 .max_num_transactions_in_block()
689 };
690 fp_ensure!(
691 request.transactions.len() <= max_num_transactions as usize,
692 SuiErrorKind::InvalidRequest(format!(
693 "Too many transactions in request: {} vs {}",
694 request.transactions.len(),
695 max_num_transactions
696 ))
697 .into()
698 );
699
700 let mut tx_digests = Vec::with_capacity(request.transactions.len());
702 let mut consensus_transactions = Vec::with_capacity(request.transactions.len());
704 let mut transaction_indexes = Vec::with_capacity(request.transactions.len());
706 let mut results: Vec<Option<SubmitTxResult>> = vec![None; request.transactions.len()];
708 let mut total_size_bytes = 0;
710
711 let req_type = if is_ping_request {
712 "ping"
713 } else if request.transactions.len() == 1 {
714 "single_transaction"
715 } else if is_soft_bundle_request {
716 "soft_bundle"
717 } else {
718 "batch"
719 };
720
721 let _handle_tx_metrics_guard = metrics
722 .handle_submit_transaction_latency
723 .with_label_values(&[req_type])
724 .start_timer();
725
726 for (idx, tx_bytes) in request.transactions.iter().enumerate() {
727 let transaction = match bcs::from_bytes::<Transaction>(tx_bytes) {
728 Ok(txn) => txn,
729 Err(e) => {
730 return Err(SuiErrorKind::TransactionDeserializationError {
732 error: format!("Failed to deserialize transaction at index {}: {}", idx, e),
733 }
734 .into());
735 }
736 };
737
738 let tx_size = transaction.validity_check(&epoch_store.tx_validity_check_context())?;
740
741 let overload_check_res = state.check_system_overload(
742 consensus_adapter,
743 transaction.data(),
744 state.check_system_overload_at_signing(),
745 );
746 if let Err(error) = overload_check_res {
747 metrics
748 .num_rejected_tx_during_overload
749 .with_label_values(&[error.as_ref()])
750 .inc();
751 results[idx] = Some(SubmitTxResult::Rejected { error });
752 continue;
753 }
754
755 let verified_transaction = {
757 let _metrics_guard = metrics.tx_verification_latency.start_timer();
758 if epoch_store.protocol_config().address_aliases() {
759 match epoch_store.verify_transaction_with_current_aliases(transaction) {
760 Ok(tx) => tx,
761 Err(e) => {
762 metrics.signature_errors.inc();
763 return Err(e);
764 }
765 }
766 } else {
767 match epoch_store.verify_transaction_require_no_aliases(transaction) {
768 Ok(tx) => tx,
769 Err(e) => {
770 metrics.signature_errors.inc();
771 return Err(e);
772 }
773 }
774 }
775 };
776
777 let tx_digest = verified_transaction.tx().digest();
778 tx_digests.push(*tx_digest);
779
780 debug!(
781 ?tx_digest,
782 "handle_submit_transaction: verified transaction"
783 );
784
785 if let Some(effects) = state
788 .get_transaction_cache_reader()
789 .get_executed_effects(tx_digest)
790 {
791 let effects_digest = effects.digest();
792 if let Ok(executed_data) = self.complete_executed_data(effects, None).await {
793 let executed_result = SubmitTxResult::Executed {
794 effects_digest,
795 details: Some(executed_data),
796 fast_path: false,
797 };
798 results[idx] = Some(executed_result);
799 debug!(?tx_digest, "handle_submit_transaction: already executed");
800 continue;
801 }
802 }
803
804 if self
805 .state
806 .get_transaction_cache_reader()
807 .transaction_executed_in_last_epoch(tx_digest, epoch_store.epoch())
808 {
809 results[idx] = Some(SubmitTxResult::Rejected {
810 error: UserInputError::TransactionAlreadyExecuted { digest: *tx_digest }.into(),
811 });
812 debug!(
813 ?tx_digest,
814 "handle_submit_transaction: transaction already executed in previous epoch"
815 );
816 continue;
817 }
818
819 debug!(
820 ?tx_digest,
821 "handle_submit_transaction: waiting for fastpath dependency objects"
822 );
823 if !state
824 .wait_for_fastpath_dependency_objects(
825 verified_transaction.tx(),
826 epoch_store.epoch(),
827 )
828 .await?
829 {
830 debug!(
831 ?tx_digest,
832 "fastpath input objects are still unavailable after waiting"
833 );
834 }
835
836 match state.handle_vote_transaction(&epoch_store, verified_transaction.tx().clone()) {
837 Ok(_) => { }
838 Err(e) => {
839 if let Some(effects) = state
842 .get_transaction_cache_reader()
843 .get_executed_effects(tx_digest)
844 {
845 let effects_digest = effects.digest();
846 if let Ok(executed_data) = self.complete_executed_data(effects, None).await
847 {
848 let executed_result = SubmitTxResult::Executed {
849 effects_digest,
850 details: Some(executed_data),
851 fast_path: false,
852 };
853 results[idx] = Some(executed_result);
854 continue;
855 }
856 }
857
858 debug!(?tx_digest, "Transaction rejected during submission: {e}");
860 metrics
861 .submission_rejected_transactions
862 .with_label_values(&[e.to_variant_name()])
863 .inc();
864 results[idx] = Some(SubmitTxResult::Rejected { error: e });
865 continue;
866 }
867 }
868
869 if epoch_store.protocol_config().address_aliases()
871 || epoch_store.protocol_config().disable_preconsensus_locking()
872 {
873 let mut claims = vec![];
874
875 if epoch_store.protocol_config().disable_preconsensus_locking() {
876 let immutable_object_ids = self
877 .collect_immutable_object_ids(verified_transaction.tx(), state)
878 .await?;
879 if !immutable_object_ids.is_empty() {
880 claims.push(TransactionClaim::ImmutableInputObjects(
881 immutable_object_ids,
882 ));
883 }
884 }
885
886 let (tx, aliases) = verified_transaction.into_inner();
887 if epoch_store.protocol_config().address_aliases() {
888 claims.push(TransactionClaim::AddressAliases(aliases));
889 }
890
891 let tx_with_claims = TransactionWithClaims::new(tx.into(), claims);
892
893 consensus_transactions.push(ConsensusTransaction::new_user_transaction_v2_message(
894 &state.name,
895 tx_with_claims,
896 ));
897 } else {
898 consensus_transactions.push(ConsensusTransaction::new_user_transaction_message(
899 &state.name,
900 verified_transaction.into_tx().into(),
901 ));
902 }
903 transaction_indexes.push(idx);
904 total_size_bytes += tx_size;
905 }
906
907 if consensus_transactions.is_empty() && !is_ping_request {
908 return Ok((Self::try_from_submit_tx_response(results)?, Weight::zero()));
909 }
910
911 let max_transaction_bytes = if is_soft_bundle_request {
915 epoch_store
916 .protocol_config()
917 .consensus_max_transactions_in_block_bytes()
918 / 2
919 } else {
920 epoch_store
921 .protocol_config()
922 .consensus_max_transactions_in_block_bytes()
923 };
924 fp_ensure!(
925 total_size_bytes <= max_transaction_bytes as usize,
926 SuiErrorKind::UserInputError {
927 error: UserInputError::TotalTransactionSizeTooLargeInBatch {
928 size: total_size_bytes,
929 limit: max_transaction_bytes,
930 },
931 }
932 .into()
933 );
934
935 metrics
936 .handle_submit_transaction_bytes
937 .with_label_values(&[req_type])
938 .observe(total_size_bytes as f64);
939 metrics
940 .handle_submit_transaction_batch_size
941 .with_label_values(&[req_type])
942 .observe(consensus_transactions.len() as f64);
943
944 let _latency_metric_guard = metrics
945 .handle_submit_transaction_consensus_latency
946 .with_label_values(&[req_type])
947 .start_timer();
948
949 let consensus_positions = if is_soft_bundle_request || is_ping_request {
950 assert!(
953 is_ping_request || !consensus_transactions.is_empty(),
954 "A valid soft bundle must have at least one transaction"
955 );
956 debug!(
957 "handle_submit_transaction: submitting consensus transactions ({}): {}",
958 req_type,
959 consensus_transactions
960 .iter()
961 .map(|t| t.local_display())
962 .join(", ")
963 );
964 self.handle_submit_to_consensus_for_position(
965 consensus_transactions,
966 &epoch_store,
967 submitter_client_addr,
968 )
969 .await?
970 } else {
971 let futures = consensus_transactions.into_iter().map(|t| {
972 debug!(
973 "handle_submit_transaction: submitting consensus transaction ({}): {}",
974 req_type,
975 t.local_display(),
976 );
977 self.handle_submit_to_consensus_for_position(
978 vec![t],
979 &epoch_store,
980 submitter_client_addr,
981 )
982 });
983 future::try_join_all(futures)
984 .await?
985 .into_iter()
986 .flatten()
987 .collect()
988 };
989
990 if is_ping_request {
991 assert_eq!(consensus_positions.len(), 1);
993 results.push(Some(SubmitTxResult::Submitted {
994 consensus_position: consensus_positions[0],
995 }));
996 } else {
997 for ((idx, tx_digest), consensus_position) in transaction_indexes
999 .into_iter()
1000 .zip(tx_digests)
1001 .zip(consensus_positions)
1002 {
1003 debug!(
1004 ?tx_digest,
1005 "handle_submit_transaction: submitted consensus transaction at {}",
1006 consensus_position,
1007 );
1008 results[idx] = Some(SubmitTxResult::Submitted { consensus_position });
1009 }
1010 }
1011
1012 Ok((Self::try_from_submit_tx_response(results)?, Weight::zero()))
1013 }
1014
1015 fn try_from_submit_tx_response(
1016 results: Vec<Option<SubmitTxResult>>,
1017 ) -> Result<RawSubmitTxResponse, SuiError> {
1018 let mut raw_results = Vec::new();
1019 for (i, result) in results.into_iter().enumerate() {
1020 let result = result.ok_or_else(|| SuiErrorKind::GenericAuthorityError {
1021 error: format!("Missing transaction result at {}", i),
1022 })?;
1023 let raw_result = result.try_into()?;
1024 raw_results.push(raw_result);
1025 }
1026 Ok(RawSubmitTxResponse {
1027 results: raw_results,
1028 })
1029 }
1030
1031 #[instrument(
1032 name = "ValidatorService::handle_submit_to_consensus_for_position",
1033 level = "debug",
1034 skip_all,
1035 err(level = "debug")
1036 )]
1037 async fn handle_submit_to_consensus_for_position(
1038 &self,
1039 consensus_transactions: Vec<ConsensusTransaction>,
1041 epoch_store: &Arc<AuthorityPerEpochStore>,
1042 submitter_client_addr: Option<IpAddr>,
1043 ) -> Result<Vec<ConsensusPosition>, tonic::Status> {
1044 let (tx_consensus_positions, rx_consensus_positions) = oneshot::channel();
1045
1046 {
1047 let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
1049 if !reconfiguration_lock.should_accept_user_certs() {
1050 self.metrics.num_rejected_cert_in_epoch_boundary.inc();
1051 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1052 }
1053
1054 let _metrics_guard = self.metrics.consensus_latency.start_timer();
1058
1059 self.consensus_adapter.submit_batch(
1060 &consensus_transactions,
1061 Some(&reconfiguration_lock),
1062 epoch_store,
1063 Some(tx_consensus_positions),
1064 submitter_client_addr,
1065 )?;
1066 }
1067
1068 Ok(rx_consensus_positions.await.map_err(|e| {
1069 SuiErrorKind::FailedToSubmitToConsensus(format!(
1070 "Failed to get consensus position: {e}"
1071 ))
1072 })?)
1073 }
1074
1075 async fn collect_effects_data(
1076 &self,
1077 effects: &TransactionEffects,
1078 include_events: bool,
1079 include_input_objects: bool,
1080 include_output_objects: bool,
1081 fastpath_outputs: Option<Arc<TransactionOutputs>>,
1082 ) -> SuiResult<(Option<TransactionEvents>, Vec<Object>, Vec<Object>)> {
1083 let events = if include_events && effects.events_digest().is_some() {
1084 if let Some(fastpath_outputs) = &fastpath_outputs {
1085 Some(fastpath_outputs.events.clone())
1086 } else {
1087 Some(
1088 self.state
1089 .get_transaction_events(effects.transaction_digest())?,
1090 )
1091 }
1092 } else {
1093 None
1094 };
1095
1096 let input_objects = if include_input_objects {
1097 self.state.get_transaction_input_objects(effects)?
1098 } else {
1099 vec![]
1100 };
1101
1102 let output_objects = if include_output_objects {
1103 if let Some(fastpath_outputs) = &fastpath_outputs {
1104 fastpath_outputs.written.values().cloned().collect()
1105 } else {
1106 self.state.get_transaction_output_objects(effects)?
1107 }
1108 } else {
1109 vec![]
1110 };
1111
1112 Ok((events, input_objects, output_objects))
1113 }
1114}
1115
1116type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
1117
1118impl ValidatorService {
1119 async fn handle_submit_transaction_impl(
1120 &self,
1121 request: tonic::Request<RawSubmitTxRequest>,
1122 ) -> WrappedServiceResponse<RawSubmitTxResponse> {
1123 self.handle_submit_transaction(request).await
1124 }
1125
1126 async fn wait_for_effects_impl(
1127 &self,
1128 request: tonic::Request<RawWaitForEffectsRequest>,
1129 ) -> WrappedServiceResponse<RawWaitForEffectsResponse> {
1130 let request: WaitForEffectsRequest = request.into_inner().try_into()?;
1131 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1132 let response = timeout(
1133 Duration::from_secs(20),
1135 epoch_store
1136 .within_alive_epoch(self.wait_for_effects_response(request, &epoch_store))
1137 .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch())),
1138 )
1139 .await
1140 .map_err(|_| tonic::Status::internal("Timeout waiting for effects"))???
1141 .try_into()?;
1142 Ok((tonic::Response::new(response), Weight::zero()))
1143 }
1144
1145 #[instrument(name= "ValidatorService::wait_for_effects_response", level = "error", skip_all, fields(consensus_position = ?request.consensus_position, fast_path_effects = tracing::field::Empty))]
1146 async fn wait_for_effects_response(
1147 &self,
1148 request: WaitForEffectsRequest,
1149 epoch_store: &Arc<AuthorityPerEpochStore>,
1150 ) -> SuiResult<WaitForEffectsResponse> {
1151 if request.ping_type.is_some() {
1152 return timeout(
1153 Duration::from_secs(10),
1154 self.ping_response(request, epoch_store),
1155 )
1156 .await
1157 .map_err(|_| SuiErrorKind::TimeoutError)?;
1158 }
1159
1160 let Some(tx_digest) = request.transaction_digest else {
1161 return Err(SuiErrorKind::InvalidRequest(
1162 "Transaction digest is required for wait for effects requests".to_string(),
1163 )
1164 .into());
1165 };
1166 let tx_digests = [tx_digest];
1167
1168 let fastpath_effects_future: Pin<Box<dyn Future<Output = _> + Send>> =
1169 if let Some(consensus_position) = request.consensus_position {
1170 Box::pin(self.wait_for_fastpath_effects(
1171 consensus_position,
1172 &tx_digests,
1173 request.include_details,
1174 epoch_store,
1175 ))
1176 } else {
1177 Box::pin(futures::future::pending())
1178 };
1179
1180 tokio::select! {
1181 biased;
1183 mut effects = self.state
1190 .get_transaction_cache_reader()
1191 .notify_read_executed_effects(
1192 "AuthorityServer::wait_for_effects::notify_read_executed_effects_finalized",
1193 &tx_digests,
1194 ) => {
1195 tracing::Span::current().record("fast_path_effects", false);
1196 let effects = effects.pop().unwrap();
1197 let details = if request.include_details {
1198 Some(self.complete_executed_data(effects.clone(), None).await?)
1199 } else {
1200 None
1201 };
1202
1203 Ok(WaitForEffectsResponse::Executed {
1204 effects_digest: effects.digest(),
1205 details,
1206 fast_path: false,
1207 })
1208 }
1209
1210 fastpath_response = fastpath_effects_future => {
1211 tracing::Span::current().record("fast_path_effects", true);
1212 fastpath_response
1213 }
1214 }
1215 }
1216
1217 #[instrument(level = "error", skip_all, err(level = "debug"))]
1218 async fn ping_response(
1219 &self,
1220 request: WaitForEffectsRequest,
1221 epoch_store: &Arc<AuthorityPerEpochStore>,
1222 ) -> SuiResult<WaitForEffectsResponse> {
1223 let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1224 return Err(SuiErrorKind::UnsupportedFeatureError {
1225 error: "Mysticeti fastpath".to_string(),
1226 }
1227 .into());
1228 };
1229
1230 let Some(consensus_position) = request.consensus_position else {
1231 return Err(SuiErrorKind::InvalidRequest(
1232 "Consensus position is required for Ping requests".to_string(),
1233 )
1234 .into());
1235 };
1236
1237 let Some(ping) = request.ping_type else {
1239 return Err(SuiErrorKind::InvalidRequest(
1240 "Ping type is required for ping requests".to_string(),
1241 )
1242 .into());
1243 };
1244
1245 let _metrics_guard = self
1246 .metrics
1247 .handle_wait_for_effects_ping_latency
1248 .with_label_values(&[ping.as_str()])
1249 .start_timer();
1250
1251 consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1252
1253 let mut last_status = None;
1254 let details = if request.include_details {
1255 Some(Box::new(ExecutedData::default()))
1256 } else {
1257 None
1258 };
1259
1260 loop {
1261 let status = consensus_tx_status_cache
1262 .notify_read_transaction_status_change(consensus_position, last_status)
1263 .await;
1264 match status {
1265 NotifyReadConsensusTxStatusResult::Status(status) => match status {
1266 ConsensusTxStatus::FastpathCertified => {
1267 if ping == PingType::Consensus {
1269 last_status = Some(status);
1270 continue;
1271 }
1272 return Ok(WaitForEffectsResponse::Executed {
1273 effects_digest: TransactionEffectsDigest::ZERO,
1274 details,
1275 fast_path: true,
1276 });
1277 }
1278 ConsensusTxStatus::Rejected => {
1279 return Ok(WaitForEffectsResponse::Rejected { error: None });
1280 }
1281 ConsensusTxStatus::Finalized => {
1282 return Ok(WaitForEffectsResponse::Executed {
1283 effects_digest: TransactionEffectsDigest::ZERO,
1284 details,
1285 fast_path: false,
1286 });
1287 }
1288 ConsensusTxStatus::Dropped => {
1289 return Ok(WaitForEffectsResponse::Rejected {
1292 error: epoch_store.get_rejection_vote_reason(consensus_position),
1293 });
1294 }
1295 },
1296 NotifyReadConsensusTxStatusResult::Expired(round) => {
1297 return Ok(WaitForEffectsResponse::Expired {
1298 epoch: epoch_store.epoch(),
1299 round: Some(round),
1300 });
1301 }
1302 }
1303 }
1304 }
1305
1306 #[instrument(level = "error", skip_all, err(level = "debug"))]
1307 async fn wait_for_fastpath_effects(
1308 &self,
1309 consensus_position: ConsensusPosition,
1310 tx_digests: &[TransactionDigest],
1311 include_details: bool,
1312 epoch_store: &Arc<AuthorityPerEpochStore>,
1313 ) -> SuiResult<WaitForEffectsResponse> {
1314 let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1315 return Err(SuiErrorKind::UnsupportedFeatureError {
1316 error: "Mysticeti fastpath".to_string(),
1317 }
1318 .into());
1319 };
1320
1321 let local_epoch = epoch_store.epoch();
1322 match consensus_position.epoch.cmp(&local_epoch) {
1323 Ordering::Less => {
1324 let response = WaitForEffectsResponse::Expired {
1327 epoch: local_epoch,
1328 round: None,
1329 };
1330 return Ok(response);
1331 }
1332 Ordering::Greater => {
1333 return Err(SuiErrorKind::WrongEpoch {
1335 expected_epoch: local_epoch,
1336 actual_epoch: consensus_position.epoch,
1337 }
1338 .into());
1339 }
1340 Ordering::Equal => {
1341 }
1344 };
1345
1346 consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1347
1348 let mut current_status = None;
1349 loop {
1350 tokio::select! {
1351 status_result = consensus_tx_status_cache
1352 .notify_read_transaction_status_change(consensus_position, current_status) => {
1353 match status_result {
1354 NotifyReadConsensusTxStatusResult::Status(new_status) => {
1355 match new_status {
1356 ConsensusTxStatus::Rejected => {
1357 return Ok(WaitForEffectsResponse::Rejected {
1358 error: epoch_store.get_rejection_vote_reason(
1359 consensus_position
1360 )
1361 });
1362 }
1363 ConsensusTxStatus::FastpathCertified => {
1364 current_status = Some(new_status);
1365 continue;
1366 }
1367 ConsensusTxStatus::Finalized => {
1368 current_status = Some(new_status);
1369 continue;
1370 }
1371 ConsensusTxStatus::Dropped => {
1372 return Ok(WaitForEffectsResponse::Rejected {
1375 error: epoch_store
1376 .get_rejection_vote_reason(consensus_position),
1377 });
1378 }
1379 }
1380 }
1381 NotifyReadConsensusTxStatusResult::Expired(round) => {
1382 return Ok(WaitForEffectsResponse::Expired {
1383 epoch: epoch_store.epoch(),
1384 round: Some(round),
1385 });
1386 }
1387 }
1388 }
1389
1390 mut outputs = self.state.get_transaction_cache_reader().notify_read_fastpath_transaction_outputs(tx_digests),
1391 if current_status == Some(ConsensusTxStatus::FastpathCertified) || current_status == Some(ConsensusTxStatus::Finalized) => {
1392 let outputs = outputs.pop().unwrap();
1393 let effects = outputs.effects.clone();
1394
1395 let details = if include_details {
1396 Some(self.complete_executed_data(effects.clone(), Some(outputs)).await?)
1397 } else {
1398 None
1399 };
1400
1401 return Ok(WaitForEffectsResponse::Executed {
1402 effects_digest: effects.digest(),
1403 details,
1404 fast_path: current_status == Some(ConsensusTxStatus::FastpathCertified),
1405 });
1406 }
1407 }
1408 }
1409 }
1410
1411 async fn complete_executed_data(
1412 &self,
1413 effects: TransactionEffects,
1414 fastpath_outputs: Option<Arc<TransactionOutputs>>,
1415 ) -> SuiResult<Box<ExecutedData>> {
1416 let (events, input_objects, output_objects) = self
1417 .collect_effects_data(
1418 &effects,
1419 true,
1420 true,
1421 true,
1422 fastpath_outputs,
1423 )
1424 .await?;
1425 Ok(Box::new(ExecutedData {
1426 effects,
1427 events,
1428 input_objects,
1429 output_objects,
1430 }))
1431 }
1432
1433 async fn object_info_impl(
1434 &self,
1435 request: tonic::Request<ObjectInfoRequest>,
1436 ) -> WrappedServiceResponse<ObjectInfoResponse> {
1437 let request = request.into_inner();
1438 let response = self.state.handle_object_info_request(request).await?;
1439 Ok((tonic::Response::new(response), Weight::one()))
1440 }
1441
1442 async fn transaction_info_impl(
1443 &self,
1444 request: tonic::Request<TransactionInfoRequest>,
1445 ) -> WrappedServiceResponse<TransactionInfoResponse> {
1446 let request = request.into_inner();
1447 let response = self.state.handle_transaction_info_request(request).await?;
1448 Ok((tonic::Response::new(response), Weight::one()))
1449 }
1450
1451 async fn checkpoint_impl(
1452 &self,
1453 request: tonic::Request<CheckpointRequest>,
1454 ) -> WrappedServiceResponse<CheckpointResponse> {
1455 let request = request.into_inner();
1456 let response = self.state.handle_checkpoint_request(&request)?;
1457 Ok((tonic::Response::new(response), Weight::one()))
1458 }
1459
1460 async fn checkpoint_v2_impl(
1461 &self,
1462 request: tonic::Request<CheckpointRequestV2>,
1463 ) -> WrappedServiceResponse<CheckpointResponseV2> {
1464 let request = request.into_inner();
1465 let response = self.state.handle_checkpoint_request_v2(&request)?;
1466 Ok((tonic::Response::new(response), Weight::one()))
1467 }
1468
1469 async fn get_system_state_object_impl(
1470 &self,
1471 _request: tonic::Request<SystemStateRequest>,
1472 ) -> WrappedServiceResponse<SuiSystemState> {
1473 let response = self
1474 .state
1475 .get_object_cache_reader()
1476 .get_sui_system_state_object_unsafe()?;
1477 Ok((tonic::Response::new(response), Weight::one()))
1478 }
1479
1480 async fn validator_health_impl(
1481 &self,
1482 _request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1483 ) -> WrappedServiceResponse<sui_types::messages_grpc::RawValidatorHealthResponse> {
1484 let state = &self.state;
1485
1486 let epoch_store = state.load_epoch_store_one_call_per_task();
1488
1489 let num_inflight_execution_transactions =
1491 state.execution_scheduler().num_pending_certificates() as u64;
1492
1493 let num_inflight_consensus_transactions =
1495 self.consensus_adapter.num_inflight_transactions();
1496
1497 let last_committed_leader_round = epoch_store
1499 .consensus_tx_status_cache
1500 .as_ref()
1501 .and_then(|cache| cache.get_last_committed_leader_round())
1502 .unwrap_or(0);
1503
1504 let last_locally_built_checkpoint = epoch_store
1506 .last_built_checkpoint_summary()
1507 .ok()
1508 .flatten()
1509 .map(|(_, summary)| summary.sequence_number)
1510 .unwrap_or(0);
1511
1512 let typed_response = sui_types::messages_grpc::ValidatorHealthResponse {
1513 num_inflight_consensus_transactions,
1514 num_inflight_execution_transactions,
1515 last_locally_built_checkpoint,
1516 last_committed_leader_round,
1517 };
1518
1519 let raw_response = typed_response
1520 .try_into()
1521 .map_err(|e: sui_types::error::SuiError| {
1522 tonic::Status::internal(format!("Failed to serialize health response: {}", e))
1523 })?;
1524
1525 Ok((tonic::Response::new(raw_response), Weight::one()))
1526 }
1527
1528 fn get_client_ip_addr<T>(
1529 &self,
1530 request: &tonic::Request<T>,
1531 source: &ClientIdSource,
1532 ) -> Option<IpAddr> {
1533 let forwarded_header = request.metadata().get_all("x-forwarded-for").iter().next();
1534
1535 if let Some(header) = forwarded_header {
1536 let num_hops = header
1537 .to_str()
1538 .map(|h| h.split(',').count().saturating_sub(1))
1539 .unwrap_or(0);
1540
1541 self.metrics.x_forwarded_for_num_hops.set(num_hops as f64);
1542 }
1543
1544 match source {
1545 ClientIdSource::SocketAddr => {
1546 let socket_addr: Option<SocketAddr> = request.remote_addr();
1547
1548 if let Some(socket_addr) = socket_addr {
1554 Some(socket_addr.ip())
1555 } else {
1556 if cfg!(msim) {
1557 } else if cfg!(test) {
1559 panic!("Failed to get remote address from request");
1560 } else {
1561 self.metrics.connection_ip_not_found.inc();
1562 error!("Failed to get remote address from request");
1563 }
1564 None
1565 }
1566 }
1567 ClientIdSource::XForwardedFor(num_hops) => {
1568 let do_header_parse = |op: &MetadataValue<Ascii>| {
1569 match op.to_str() {
1570 Ok(header_val) => {
1571 let header_contents =
1572 header_val.split(',').map(str::trim).collect::<Vec<_>>();
1573 if *num_hops == 0 {
1574 error!(
1575 "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
1576 number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
1577 to this node. Skipping traffic controller request handling.",
1578 header_contents,
1579 );
1580 return None;
1581 }
1582 let contents_len = header_contents.len();
1583 if contents_len < *num_hops {
1584 error!(
1585 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1586 Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
1587 `client-id-source` in the node config.",
1588 header_contents, contents_len, num_hops, contents_len,
1589 );
1590 self.metrics.client_id_source_config_mismatch.inc();
1591 return None;
1592 }
1593 let Some(client_ip) = header_contents.get(contents_len - num_hops)
1594 else {
1595 error!(
1596 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1597 Expected at least {} values. Skipping traffic controller request handling.",
1598 header_contents, contents_len, num_hops, contents_len,
1599 );
1600 return None;
1601 };
1602 parse_ip(client_ip).or_else(|| {
1603 self.metrics.forwarded_header_parse_error.inc();
1604 None
1605 })
1606 }
1607 Err(e) => {
1608 self.metrics.forwarded_header_invalid.inc();
1612 error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
1613 None
1614 }
1615 }
1616 };
1617 if let Some(op) = request.metadata().get("x-forwarded-for") {
1618 do_header_parse(op)
1619 } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
1620 do_header_parse(op)
1621 } else {
1622 self.metrics.forwarded_header_not_included.inc();
1623 error!(
1624 "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
1625 );
1626 None
1627 }
1628 }
1629 }
1630 }
1631
1632 async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
1633 if let Some(traffic_controller) = &self.traffic_controller {
1634 if !traffic_controller.check(&client, &None).await {
1635 Err(tonic::Status::from_error(
1637 SuiErrorKind::TooManyRequests.into(),
1638 ))
1639 } else {
1640 Ok(())
1641 }
1642 } else {
1643 Ok(())
1644 }
1645 }
1646
1647 fn handle_traffic_resp<T>(
1648 &self,
1649 client: Option<IpAddr>,
1650 wrapped_response: WrappedServiceResponse<T>,
1651 ) -> Result<tonic::Response<T>, tonic::Status> {
1652 let (error, spam_weight, unwrapped_response) = match wrapped_response {
1653 Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
1654 Err(status) => (
1655 Some(SuiError::from(status.clone())),
1656 Weight::zero(),
1657 Err(status.clone()),
1658 ),
1659 };
1660
1661 if let Some(traffic_controller) = self.traffic_controller.clone() {
1662 traffic_controller.tally(TrafficTally {
1663 direct: client,
1664 through_fullnode: None,
1665 error_info: error.map(|e| {
1666 let error_type = String::from(e.clone().as_ref());
1667 let error_weight = normalize(e);
1668 (error_weight, error_type)
1669 }),
1670 spam_weight,
1671 timestamp: SystemTime::now(),
1672 })
1673 }
1674 unwrapped_response
1675 }
1676}
1677
1678fn normalize(err: SuiError) -> Weight {
1680 match err.as_inner() {
1681 SuiErrorKind::UserInputError {
1682 error: UserInputError::IncorrectUserSignature { .. },
1683 } => Weight::one(),
1684 SuiErrorKind::InvalidSignature { .. }
1685 | SuiErrorKind::SignerSignatureAbsent { .. }
1686 | SuiErrorKind::SignerSignatureNumberMismatch { .. }
1687 | SuiErrorKind::IncorrectSigner { .. }
1688 | SuiErrorKind::UnknownSigner { .. }
1689 | SuiErrorKind::WrongEpoch { .. } => Weight::one(),
1690 _ => Weight::zero(),
1691 }
1692}
1693
1694#[macro_export]
1698macro_rules! handle_with_decoration {
1699 ($self:ident, $func_name:ident, $request:ident) => {{
1700 if $self.client_id_source.is_none() {
1701 return $self.$func_name($request).await.map(|(result, _)| result);
1702 }
1703
1704 let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
1705
1706 $self.handle_traffic_req(client.clone()).await?;
1708
1709 let wrapped_response = $self.$func_name($request).await;
1711 $self.handle_traffic_resp(client, wrapped_response)
1712 }};
1713}
1714
1715#[async_trait]
1716impl Validator for ValidatorService {
1717 async fn submit_transaction(
1718 &self,
1719 request: tonic::Request<RawSubmitTxRequest>,
1720 ) -> Result<tonic::Response<RawSubmitTxResponse>, tonic::Status> {
1721 let validator_service = self.clone();
1722
1723 spawn_monitored_task!(async move {
1726 handle_with_decoration!(validator_service, handle_submit_transaction_impl, request)
1729 })
1730 .await
1731 .unwrap()
1732 }
1733
1734 async fn wait_for_effects(
1735 &self,
1736 request: tonic::Request<RawWaitForEffectsRequest>,
1737 ) -> Result<tonic::Response<RawWaitForEffectsResponse>, tonic::Status> {
1738 handle_with_decoration!(self, wait_for_effects_impl, request)
1739 }
1740
1741 async fn object_info(
1742 &self,
1743 request: tonic::Request<ObjectInfoRequest>,
1744 ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
1745 handle_with_decoration!(self, object_info_impl, request)
1746 }
1747
1748 async fn transaction_info(
1749 &self,
1750 request: tonic::Request<TransactionInfoRequest>,
1751 ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
1752 handle_with_decoration!(self, transaction_info_impl, request)
1753 }
1754
1755 async fn checkpoint(
1756 &self,
1757 request: tonic::Request<CheckpointRequest>,
1758 ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
1759 handle_with_decoration!(self, checkpoint_impl, request)
1760 }
1761
1762 async fn checkpoint_v2(
1763 &self,
1764 request: tonic::Request<CheckpointRequestV2>,
1765 ) -> Result<tonic::Response<CheckpointResponseV2>, tonic::Status> {
1766 handle_with_decoration!(self, checkpoint_v2_impl, request)
1767 }
1768
1769 async fn get_system_state_object(
1770 &self,
1771 request: tonic::Request<SystemStateRequest>,
1772 ) -> Result<tonic::Response<SuiSystemState>, tonic::Status> {
1773 handle_with_decoration!(self, get_system_state_object_impl, request)
1774 }
1775
1776 async fn validator_health(
1777 &self,
1778 request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1779 ) -> Result<tonic::Response<sui_types::messages_grpc::RawValidatorHealthResponse>, tonic::Status>
1780 {
1781 handle_with_decoration!(self, validator_health_impl, request)
1782 }
1783}