1use anyhow::Result;
6use async_trait::async_trait;
7use fastcrypto::traits::KeyPair;
8use futures::{TryFutureExt, future};
9use itertools::Itertools as _;
10use mysten_common::{assert_reachable, debug_fatal};
11use mysten_metrics::spawn_monitored_task;
12use prometheus::{
13 Gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, Registry,
14 register_gauge_with_registry, register_histogram_vec_with_registry,
15 register_histogram_with_registry, register_int_counter_vec_with_registry,
16 register_int_counter_with_registry,
17};
18use std::{
19 cmp::Ordering,
20 future::Future,
21 io,
22 net::{IpAddr, SocketAddr},
23 pin::Pin,
24 sync::Arc,
25 time::{Duration, SystemTime},
26};
27use sui_network::{
28 api::{Validator, ValidatorServer},
29 tonic,
30 validator::server::SUI_TLS_SERVER_NAME,
31};
32use sui_types::message_envelope::Message;
33use sui_types::messages_consensus::ConsensusPosition;
34use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKind};
35use sui_types::messages_grpc::{
36 HandleCertificateRequestV3, HandleCertificateResponseV3, RawSubmitTxResponse,
37};
38use sui_types::messages_grpc::{
39 HandleCertificateResponseV2, HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse,
40 SubmitCertificateResponse, SystemStateRequest, TransactionInfoRequest, TransactionInfoResponse,
41};
42use sui_types::messages_grpc::{
43 HandleSoftBundleCertificatesRequestV3, HandleSoftBundleCertificatesResponseV3,
44};
45use sui_types::multiaddr::Multiaddr;
46use sui_types::object::Object;
47use sui_types::sui_system_state::SuiSystemState;
48use sui_types::traffic_control::{ClientIdSource, Weight};
49use sui_types::{
50 digests::{TransactionDigest, TransactionEffectsDigest},
51 error::{SuiErrorKind, UserInputError},
52};
53use sui_types::{
54 effects::TransactionEffects,
55 messages_grpc::{
56 ExecutedData, RawSubmitTxRequest, RawWaitForEffectsRequest, RawWaitForEffectsResponse,
57 SubmitTxResult, WaitForEffectsRequest, WaitForEffectsResponse,
58 },
59};
60use sui_types::{
61 effects::TransactionEffectsAPI, executable_transaction::VerifiedExecutableTransaction,
62};
63use sui_types::{effects::TransactionEvents, messages_grpc::SubmitTxType};
64use sui_types::{error::*, transaction::*};
65use sui_types::{
66 fp_ensure,
67 messages_checkpoint::{
68 CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
69 },
70};
71use tap::TapFallible;
72use tokio::sync::oneshot;
73use tokio::time::timeout;
74use tonic::metadata::{Ascii, MetadataValue};
75use tracing::{Instrument, debug, error, error_span, info, instrument};
76
77use crate::consensus_adapter::ConnectionMonitorStatusForTests;
78use crate::{
79 authority::{AuthorityState, consensus_tx_status_cache::ConsensusTxStatus},
80 consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics},
81 traffic_controller::{TrafficController, parse_ip, policies::TrafficTally},
82};
83use crate::{
84 authority::{
85 ExecutionEnv, authority_per_epoch_store::AuthorityPerEpochStore,
86 consensus_tx_status_cache::NotifyReadConsensusTxStatusResult,
87 shared_object_version_manager::Schedulable,
88 },
89 checkpoints::CheckpointStore,
90 execution_scheduler::SchedulingSource,
91 mysticeti_adapter::LazyMysticetiClient,
92 transaction_outputs::TransactionOutputs,
93};
94use nonempty::{NonEmpty, nonempty};
95use sui_config::local_ip_utils::new_local_tcp_address_for_testing;
96use sui_types::messages_grpc::PingType;
97use tonic::transport::server::TcpConnectInfo;
98
99#[cfg(test)]
100#[path = "unit_tests/server_tests.rs"]
101mod server_tests;
102
103#[cfg(test)]
104#[path = "unit_tests/wait_for_effects_tests.rs"]
105mod wait_for_effects_tests;
106
107#[cfg(test)]
108#[path = "unit_tests/submit_transaction_tests.rs"]
109mod submit_transaction_tests;
110
111pub struct AuthorityServerHandle {
112 server_handle: sui_network::validator::server::Server,
113}
114
115impl AuthorityServerHandle {
116 pub async fn join(self) -> Result<(), io::Error> {
117 self.server_handle.handle().wait_for_shutdown().await;
118 Ok(())
119 }
120
121 pub async fn kill(self) -> Result<(), io::Error> {
122 self.server_handle.handle().shutdown().await;
123 Ok(())
124 }
125
126 pub fn address(&self) -> &Multiaddr {
127 self.server_handle.local_addr()
128 }
129}
130
131pub struct AuthorityServer {
132 address: Multiaddr,
133 pub state: Arc<AuthorityState>,
134 consensus_adapter: Arc<ConsensusAdapter>,
135 pub metrics: Arc<ValidatorServiceMetrics>,
136}
137
138impl AuthorityServer {
139 pub fn new_for_test_with_consensus_adapter(
140 state: Arc<AuthorityState>,
141 consensus_adapter: Arc<ConsensusAdapter>,
142 ) -> Self {
143 let address = new_local_tcp_address_for_testing();
144 let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
145
146 Self {
147 address,
148 state,
149 consensus_adapter,
150 metrics,
151 }
152 }
153
154 pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
155 let consensus_adapter = Arc::new(ConsensusAdapter::new(
156 Arc::new(LazyMysticetiClient::new()),
157 CheckpointStore::new_for_tests(),
158 state.name,
159 Arc::new(ConnectionMonitorStatusForTests {}),
160 100_000,
161 100_000,
162 None,
163 None,
164 ConsensusAdapterMetrics::new_test(),
165 state.epoch_store_for_testing().protocol_config().clone(),
166 ));
167 Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
168 }
169
170 pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
171 let address = self.address.clone();
172 self.spawn_with_bind_address_for_test(address).await
173 }
174
175 pub async fn spawn_with_bind_address_for_test(
176 self,
177 address: Multiaddr,
178 ) -> Result<AuthorityServerHandle, io::Error> {
179 let tls_config = sui_tls::create_rustls_server_config(
180 self.state.config.network_key_pair().copy().private(),
181 SUI_TLS_SERVER_NAME.to_string(),
182 );
183 let config = mysten_network::config::Config::new();
184 let server = sui_network::validator::server::ServerBuilder::from_config(
185 &config,
186 mysten_network::metrics::DefaultMetricsCallbackProvider::default(),
187 )
188 .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
189 self.state,
190 self.consensus_adapter,
191 self.metrics,
192 )))
193 .bind(&address, Some(tls_config))
194 .await
195 .unwrap();
196 let local_addr = server.local_addr().to_owned();
197 info!("Listening to traffic on {local_addr}");
198 let handle = AuthorityServerHandle {
199 server_handle: server,
200 };
201 Ok(handle)
202 }
203}
204
205pub struct ValidatorServiceMetrics {
206 pub signature_errors: IntCounter,
207 pub tx_verification_latency: Histogram,
208 pub cert_verification_latency: Histogram,
209 pub consensus_latency: Histogram,
210 pub handle_transaction_latency: Histogram,
211 pub submit_certificate_consensus_latency: Histogram,
212 pub handle_certificate_consensus_latency: Histogram,
213 pub handle_certificate_non_consensus_latency: Histogram,
214 pub handle_soft_bundle_certificates_consensus_latency: Histogram,
215 pub handle_soft_bundle_certificates_count: Histogram,
216 pub handle_soft_bundle_certificates_size_bytes: Histogram,
217 pub handle_transaction_consensus_latency: Histogram,
218 pub handle_submit_transaction_consensus_latency: HistogramVec,
219 pub handle_wait_for_effects_ping_latency: HistogramVec,
220
221 handle_submit_transaction_latency: HistogramVec,
222 handle_submit_transaction_bytes: HistogramVec,
223 handle_submit_transaction_batch_size: HistogramVec,
224
225 num_rejected_tx_in_epoch_boundary: IntCounter,
226 num_rejected_cert_in_epoch_boundary: IntCounter,
227 num_rejected_tx_during_overload: IntCounterVec,
228 num_rejected_cert_during_overload: IntCounterVec,
229 submission_rejected_transactions: IntCounterVec,
230 connection_ip_not_found: IntCounter,
231 forwarded_header_parse_error: IntCounter,
232 forwarded_header_invalid: IntCounter,
233 forwarded_header_not_included: IntCounter,
234 client_id_source_config_mismatch: IntCounter,
235 x_forwarded_for_num_hops: Gauge,
236}
237
238impl ValidatorServiceMetrics {
239 pub fn new(registry: &Registry) -> Self {
240 Self {
241 signature_errors: register_int_counter_with_registry!(
242 "total_signature_errors",
243 "Number of transaction signature errors",
244 registry,
245 )
246 .unwrap(),
247 tx_verification_latency: register_histogram_with_registry!(
248 "validator_service_tx_verification_latency",
249 "Latency of verifying a transaction",
250 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
251 registry,
252 )
253 .unwrap(),
254 cert_verification_latency: register_histogram_with_registry!(
255 "validator_service_cert_verification_latency",
256 "Latency of verifying a certificate",
257 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
258 registry,
259 )
260 .unwrap(),
261 consensus_latency: register_histogram_with_registry!(
262 "validator_service_consensus_latency",
263 "Time spent between submitting a txn to consensus and getting back local acknowledgement. Execution and finalization time are not included.",
264 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
265 registry,
266 )
267 .unwrap(),
268 handle_transaction_latency: register_histogram_with_registry!(
269 "validator_service_handle_transaction_latency",
270 "Latency of handling a transaction",
271 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
272 registry,
273 )
274 .unwrap(),
275 handle_certificate_consensus_latency: register_histogram_with_registry!(
276 "validator_service_handle_certificate_consensus_latency",
277 "Latency of handling a consensus transaction certificate",
278 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
279 registry,
280 )
281 .unwrap(),
282 submit_certificate_consensus_latency: register_histogram_with_registry!(
283 "validator_service_submit_certificate_consensus_latency",
284 "Latency of submit_certificate RPC handler",
285 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
286 registry,
287 )
288 .unwrap(),
289 handle_certificate_non_consensus_latency: register_histogram_with_registry!(
290 "validator_service_handle_certificate_non_consensus_latency",
291 "Latency of handling a non-consensus transaction certificate",
292 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
293 registry,
294 )
295 .unwrap(),
296 handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
297 "validator_service_handle_soft_bundle_certificates_consensus_latency",
298 "Latency of handling a consensus soft bundle",
299 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
300 registry,
301 )
302 .unwrap(),
303 handle_soft_bundle_certificates_count: register_histogram_with_registry!(
304 "handle_soft_bundle_certificates_count",
305 "The number of certificates included in a soft bundle",
306 mysten_metrics::COUNT_BUCKETS.to_vec(),
307 registry,
308 )
309 .unwrap(),
310 handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
311 "handle_soft_bundle_certificates_size_bytes",
312 "The size of soft bundle in bytes",
313 mysten_metrics::BYTES_BUCKETS.to_vec(),
314 registry,
315 )
316 .unwrap(),
317 handle_transaction_consensus_latency: register_histogram_with_registry!(
318 "validator_service_handle_transaction_consensus_latency",
319 "Latency of handling a user transaction sent through consensus",
320 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
321 registry,
322 )
323 .unwrap(),
324 handle_submit_transaction_consensus_latency: register_histogram_vec_with_registry!(
325 "validator_service_submit_transaction_consensus_latency",
326 "Latency of submitting a user transaction sent through consensus",
327 &["req_type"],
328 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
329 registry,
330 )
331 .unwrap(),
332 handle_submit_transaction_latency: register_histogram_vec_with_registry!(
333 "validator_service_submit_transaction_latency",
334 "Latency of submit transaction handler",
335 &["req_type"],
336 mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
337 registry,
338 )
339 .unwrap(),
340 handle_wait_for_effects_ping_latency: register_histogram_vec_with_registry!(
341 "validator_service_handle_wait_for_effects_ping_latency",
342 "Latency of handling a ping request for wait_for_effects",
343 &["req_type"],
344 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
345 registry,
346 )
347 .unwrap(),
348 handle_submit_transaction_bytes: register_histogram_vec_with_registry!(
349 "validator_service_submit_transaction_bytes",
350 "The size of transactions in the submit transaction request",
351 &["req_type"],
352 mysten_metrics::BYTES_BUCKETS.to_vec(),
353 registry,
354 )
355 .unwrap(),
356 handle_submit_transaction_batch_size: register_histogram_vec_with_registry!(
357 "validator_service_submit_transaction_batch_size",
358 "The number of transactions in the submit transaction request",
359 &["req_type"],
360 mysten_metrics::COUNT_BUCKETS.to_vec(),
361 registry,
362 )
363 .unwrap(),
364 num_rejected_tx_in_epoch_boundary: register_int_counter_with_registry!(
365 "validator_service_num_rejected_tx_in_epoch_boundary",
366 "Number of rejected transaction during epoch transitioning",
367 registry,
368 )
369 .unwrap(),
370 num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
371 "validator_service_num_rejected_cert_in_epoch_boundary",
372 "Number of rejected transaction certificate during epoch transitioning",
373 registry,
374 )
375 .unwrap(),
376 num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
377 "validator_service_num_rejected_tx_during_overload",
378 "Number of rejected transaction due to system overload",
379 &["error_type"],
380 registry,
381 )
382 .unwrap(),
383 num_rejected_cert_during_overload: register_int_counter_vec_with_registry!(
384 "validator_service_num_rejected_cert_during_overload",
385 "Number of rejected transaction certificate due to system overload",
386 &["error_type"],
387 registry,
388 )
389 .unwrap(),
390 submission_rejected_transactions: register_int_counter_vec_with_registry!(
391 "validator_service_submission_rejected_transactions",
392 "Number of transactions rejected during submission",
393 &["reason"],
394 registry,
395 )
396 .unwrap(),
397 connection_ip_not_found: register_int_counter_with_registry!(
398 "validator_service_connection_ip_not_found",
399 "Number of times connection IP was not extractable from request",
400 registry,
401 )
402 .unwrap(),
403 forwarded_header_parse_error: register_int_counter_with_registry!(
404 "validator_service_forwarded_header_parse_error",
405 "Number of times x-forwarded-for header could not be parsed",
406 registry,
407 )
408 .unwrap(),
409 forwarded_header_invalid: register_int_counter_with_registry!(
410 "validator_service_forwarded_header_invalid",
411 "Number of times x-forwarded-for header was invalid",
412 registry,
413 )
414 .unwrap(),
415 forwarded_header_not_included: register_int_counter_with_registry!(
416 "validator_service_forwarded_header_not_included",
417 "Number of times x-forwarded-for header was (unexpectedly) not included in request",
418 registry,
419 )
420 .unwrap(),
421 client_id_source_config_mismatch: register_int_counter_with_registry!(
422 "validator_service_client_id_source_config_mismatch",
423 "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
424 registry,
425 )
426 .unwrap(),
427 x_forwarded_for_num_hops: register_gauge_with_registry!(
428 "validator_service_x_forwarded_for_num_hops",
429 "Number of hops in x-forwarded-for header",
430 registry,
431 )
432 .unwrap(),
433 }
434 }
435
436 pub fn new_for_tests() -> Self {
437 let registry = Registry::new();
438 Self::new(®istry)
439 }
440}
441
442#[derive(Clone)]
443pub struct ValidatorService {
444 state: Arc<AuthorityState>,
445 consensus_adapter: Arc<ConsensusAdapter>,
446 metrics: Arc<ValidatorServiceMetrics>,
447 traffic_controller: Option<Arc<TrafficController>>,
448 client_id_source: Option<ClientIdSource>,
449}
450
451impl ValidatorService {
452 pub fn new(
453 state: Arc<AuthorityState>,
454 consensus_adapter: Arc<ConsensusAdapter>,
455 validator_metrics: Arc<ValidatorServiceMetrics>,
456 client_id_source: Option<ClientIdSource>,
457 ) -> Self {
458 let traffic_controller = state.traffic_controller.clone();
459 Self {
460 state,
461 consensus_adapter,
462 metrics: validator_metrics,
463 traffic_controller,
464 client_id_source,
465 }
466 }
467
468 pub fn new_for_tests(
469 state: Arc<AuthorityState>,
470 consensus_adapter: Arc<ConsensusAdapter>,
471 metrics: Arc<ValidatorServiceMetrics>,
472 ) -> Self {
473 Self {
474 state,
475 consensus_adapter,
476 metrics,
477 traffic_controller: None,
478 client_id_source: None,
479 }
480 }
481
482 pub fn validator_state(&self) -> &Arc<AuthorityState> {
483 &self.state
484 }
485
486 pub async fn execute_certificate_for_testing(
487 &self,
488 cert: CertifiedTransaction,
489 ) -> Result<tonic::Response<HandleCertificateResponseV2>, tonic::Status> {
490 let request = make_tonic_request_for_testing(cert);
491 self.handle_certificate_v2(request).await
492 }
493
494 pub async fn handle_transaction_for_benchmarking(
495 &self,
496 transaction: Transaction,
497 ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
498 let request = make_tonic_request_for_testing(transaction);
499 self.transaction(request).await
500 }
501
502 async fn handle_transaction(
505 &self,
506 request: tonic::Request<Transaction>,
507 ) -> WrappedServiceResponse<HandleTransactionResponse> {
508 let Self {
509 state,
510 consensus_adapter,
511 metrics,
512 traffic_controller: _,
513 client_id_source: _,
514 } = self.clone();
515 let transaction = request.into_inner();
516 let epoch_store = state.load_epoch_store_one_call_per_task();
517
518 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
519
520 let mut validator_pushback_error = None;
528 let overload_check_res = state.check_system_overload(
529 &*consensus_adapter,
530 transaction.data(),
531 state.check_system_overload_at_signing(),
532 );
533 if let Err(error) = overload_check_res {
534 metrics
535 .num_rejected_tx_during_overload
536 .with_label_values(&[error.as_ref()])
537 .inc();
538 match error.as_inner() {
540 SuiErrorKind::ValidatorOverloadedRetryAfter { .. } => {
541 validator_pushback_error = Some(error)
542 }
543 _ => return Err(error.into()),
544 }
545 }
546
547 let _handle_tx_metrics_guard = metrics.handle_transaction_latency.start_timer();
548
549 let tx_verif_metrics_guard = metrics.tx_verification_latency.start_timer();
550 let transaction = epoch_store
551 .verify_transaction_require_no_aliases(transaction)
553 .tap_err(|_| {
554 metrics.signature_errors.inc();
555 })?
556 .into_tx();
557 drop(tx_verif_metrics_guard);
558
559 let tx_digest = transaction.digest();
560
561 let span = error_span!("ValidatorService::validator_state_process_tx", ?tx_digest);
563
564 let info = state
565 .handle_transaction(&epoch_store, transaction.clone())
566 .instrument(span)
567 .await
568 .tap_err(|e| {
569 if let SuiErrorKind::ValidatorHaltedAtEpochEnd = e.as_inner() {
570 metrics.num_rejected_tx_in_epoch_boundary.inc();
571 }
572 })?;
573
574 if let Some(error) = validator_pushback_error {
575 return Err(error.into());
578 }
579
580 Ok((tonic::Response::new(info), Weight::zero()))
581 }
582
583 #[instrument(
584 name = "ValidatorService::handle_submit_transaction",
585 level = "error",
586 skip_all,
587 err(level = "debug")
588 )]
589 async fn handle_submit_transaction(
590 &self,
591 request: tonic::Request<RawSubmitTxRequest>,
592 ) -> WrappedServiceResponse<RawSubmitTxResponse> {
593 let Self {
594 state,
595 consensus_adapter,
596 metrics,
597 traffic_controller: _,
598 client_id_source,
599 } = self.clone();
600
601 let submitter_client_addr = if let Some(client_id_source) = &client_id_source {
602 self.get_client_ip_addr(&request, client_id_source)
603 } else {
604 self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
605 };
606
607 let inner = request.into_inner();
608 let start_epoch = state.load_epoch_store_one_call_per_task().epoch();
609
610 let next_epoch = start_epoch + 1;
611 let mut max_retries = 1;
612
613 loop {
614 let res = self
615 .handle_submit_transaction_inner(
616 &state,
617 &consensus_adapter,
618 &metrics,
619 &inner,
620 submitter_client_addr,
621 )
622 .await;
623 match res {
624 Ok((response, weight)) => return Ok((tonic::Response::new(response), weight)),
625 Err(err) => {
626 if max_retries > 0
627 && let SuiErrorKind::ValidatorHaltedAtEpochEnd = err.as_inner()
628 {
629 max_retries -= 1;
630
631 debug!(
632 "ValidatorHaltedAtEpochEnd. Will retry after validator reconfigures"
633 );
634
635 if let Ok(Ok(new_epoch)) =
636 timeout(Duration::from_secs(15), state.wait_for_epoch(next_epoch)).await
637 {
638 assert_reachable!("retry submission at epoch end");
639 if new_epoch == next_epoch {
640 continue;
641 }
642
643 debug_fatal!(
644 "expected epoch {} after reconfiguration. got {}",
645 next_epoch,
646 new_epoch
647 );
648 }
649 }
650 return Err(err.into());
651 }
652 }
653 }
654 }
655
656 async fn handle_submit_transaction_inner(
657 &self,
658 state: &AuthorityState,
659 consensus_adapter: &ConsensusAdapter,
660 metrics: &ValidatorServiceMetrics,
661 request: &RawSubmitTxRequest,
662 submitter_client_addr: Option<IpAddr>,
663 ) -> SuiResult<(RawSubmitTxResponse, Weight)> {
664 let epoch_store = state.load_epoch_store_one_call_per_task();
665 if !epoch_store.protocol_config().mysticeti_fastpath() {
666 return Err(SuiErrorKind::UnsupportedFeatureError {
667 error: "Mysticeti fastpath".to_string(),
668 }
669 .into());
670 }
671
672 let submit_type = SubmitTxType::try_from(request.submit_type).map_err(|e| {
673 SuiErrorKind::GrpcMessageDeserializeError {
674 type_info: "RawSubmitTxRequest.submit_type".to_string(),
675 error: e.to_string(),
676 }
677 })?;
678
679 let is_ping_request = submit_type == SubmitTxType::Ping;
680 if is_ping_request {
681 fp_ensure!(
682 request.transactions.is_empty(),
683 SuiErrorKind::InvalidRequest(format!(
684 "Ping request cannot contain {} transactions",
685 request.transactions.len()
686 ))
687 .into()
688 );
689 } else {
690 fp_ensure!(
692 !request.transactions.is_empty(),
693 SuiErrorKind::InvalidRequest(
694 "At least one transaction needs to be submitted".to_string(),
695 )
696 .into()
697 );
698 }
699
700 let is_soft_bundle_request = submit_type == SubmitTxType::SoftBundle;
705
706 let max_num_transactions = if is_soft_bundle_request {
707 epoch_store.protocol_config().max_soft_bundle_size()
710 } else {
711 epoch_store
713 .protocol_config()
714 .max_num_transactions_in_block()
715 };
716 fp_ensure!(
717 request.transactions.len() <= max_num_transactions as usize,
718 SuiErrorKind::InvalidRequest(format!(
719 "Too many transactions in request: {} vs {}",
720 request.transactions.len(),
721 max_num_transactions
722 ))
723 .into()
724 );
725
726 let mut tx_digests = Vec::with_capacity(request.transactions.len());
728 let mut consensus_transactions = Vec::with_capacity(request.transactions.len());
730 let mut transaction_indexes = Vec::with_capacity(request.transactions.len());
732 let mut results: Vec<Option<SubmitTxResult>> = vec![None; request.transactions.len()];
734 let mut total_size_bytes = 0;
736
737 let req_type = if is_ping_request {
738 "ping"
739 } else if request.transactions.len() == 1 {
740 "single_transaction"
741 } else if is_soft_bundle_request {
742 "soft_bundle"
743 } else {
744 "batch"
745 };
746
747 let _handle_tx_metrics_guard = metrics
748 .handle_submit_transaction_latency
749 .with_label_values(&[req_type])
750 .start_timer();
751
752 for (idx, tx_bytes) in request.transactions.iter().enumerate() {
753 let transaction = match bcs::from_bytes::<Transaction>(tx_bytes) {
754 Ok(txn) => txn,
755 Err(e) => {
756 return Err(SuiErrorKind::TransactionDeserializationError {
758 error: format!("Failed to deserialize transaction at index {}: {}", idx, e),
759 }
760 .into());
761 }
762 };
763
764 let tx_size = transaction.validity_check(&epoch_store.tx_validity_check_context())?;
766
767 let overload_check_res = state.check_system_overload(
768 consensus_adapter,
769 transaction.data(),
770 state.check_system_overload_at_signing(),
771 );
772 if let Err(error) = overload_check_res {
773 metrics
774 .num_rejected_tx_during_overload
775 .with_label_values(&[error.as_ref()])
776 .inc();
777 results[idx] = Some(SubmitTxResult::Rejected { error });
778 continue;
779 }
780
781 let verified_transaction = {
783 let _metrics_guard = metrics.tx_verification_latency.start_timer();
784 if epoch_store.protocol_config().address_aliases() {
785 match epoch_store.verify_transaction_with_current_aliases(transaction) {
786 Ok(tx) => tx,
787 Err(e) => {
788 metrics.signature_errors.inc();
789 return Err(e);
790 }
791 }
792 } else {
793 match epoch_store.verify_transaction_require_no_aliases(transaction) {
794 Ok(tx) => tx,
795 Err(e) => {
796 metrics.signature_errors.inc();
797 return Err(e);
798 }
799 }
800 }
801 };
802
803 let tx_digest = verified_transaction.tx().digest();
804 tx_digests.push(*tx_digest);
805
806 debug!(
807 ?tx_digest,
808 "handle_submit_transaction: verified transaction"
809 );
810
811 if let Some(effects) = state
814 .get_transaction_cache_reader()
815 .get_executed_effects(tx_digest)
816 {
817 let effects_digest = effects.digest();
818 if let Ok(executed_data) = self.complete_executed_data(effects, None).await {
819 let executed_result = SubmitTxResult::Executed {
820 effects_digest,
821 details: Some(executed_data),
822 fast_path: false,
823 };
824 results[idx] = Some(executed_result);
825 debug!(?tx_digest, "handle_submit_transaction: already executed");
826 continue;
827 }
828 }
829
830 if self
831 .state
832 .get_transaction_cache_reader()
833 .transaction_executed_in_last_epoch(tx_digest, epoch_store.epoch())
834 {
835 results[idx] = Some(SubmitTxResult::Rejected {
836 error: UserInputError::TransactionAlreadyExecuted { digest: *tx_digest }.into(),
837 });
838 debug!(
839 ?tx_digest,
840 "handle_submit_transaction: transaction already executed in previous epoch"
841 );
842 continue;
843 }
844
845 debug!(
846 ?tx_digest,
847 "handle_submit_transaction: waiting for fastpath dependency objects"
848 );
849 if !state
850 .wait_for_fastpath_dependency_objects(
851 verified_transaction.tx(),
852 epoch_store.epoch(),
853 )
854 .await?
855 {
856 debug!(
857 ?tx_digest,
858 "fastpath input objects are still unavailable after waiting"
859 );
860 }
861
862 match state.handle_vote_transaction(&epoch_store, verified_transaction.tx().clone()) {
863 Ok(_) => { }
864 Err(e) => {
865 if let Some(effects) = state
868 .get_transaction_cache_reader()
869 .get_executed_effects(tx_digest)
870 {
871 let effects_digest = effects.digest();
872 if let Ok(executed_data) = self.complete_executed_data(effects, None).await
873 {
874 let executed_result = SubmitTxResult::Executed {
875 effects_digest,
876 details: Some(executed_data),
877 fast_path: false,
878 };
879 results[idx] = Some(executed_result);
880 continue;
881 }
882 }
883
884 debug!(?tx_digest, "Transaction rejected during submission: {e}");
886 metrics
887 .submission_rejected_transactions
888 .with_label_values(&[e.to_variant_name()])
889 .inc();
890 results[idx] = Some(SubmitTxResult::Rejected { error: e });
891 continue;
892 }
893 }
894
895 if epoch_store.protocol_config().address_aliases() {
896 consensus_transactions.push(ConsensusTransaction::new_user_transaction_v2_message(
897 &state.name,
898 verified_transaction.into(),
899 ));
900 } else {
901 consensus_transactions.push(ConsensusTransaction::new_user_transaction_message(
902 &state.name,
903 verified_transaction.into_tx().into(),
904 ));
905 }
906 transaction_indexes.push(idx);
907 total_size_bytes += tx_size;
908 }
909
910 if consensus_transactions.is_empty() && !is_ping_request {
911 return Ok((Self::try_from_submit_tx_response(results)?, Weight::zero()));
912 }
913
914 let max_transaction_bytes = if is_soft_bundle_request {
918 epoch_store
919 .protocol_config()
920 .consensus_max_transactions_in_block_bytes()
921 / 2
922 } else {
923 epoch_store
924 .protocol_config()
925 .consensus_max_transactions_in_block_bytes()
926 };
927 fp_ensure!(
928 total_size_bytes <= max_transaction_bytes as usize,
929 SuiErrorKind::UserInputError {
930 error: UserInputError::TotalTransactionSizeTooLargeInBatch {
931 size: total_size_bytes,
932 limit: max_transaction_bytes,
933 },
934 }
935 .into()
936 );
937
938 metrics
939 .handle_submit_transaction_bytes
940 .with_label_values(&[req_type])
941 .observe(total_size_bytes as f64);
942 metrics
943 .handle_submit_transaction_batch_size
944 .with_label_values(&[req_type])
945 .observe(consensus_transactions.len() as f64);
946
947 let _latency_metric_guard = metrics
948 .handle_submit_transaction_consensus_latency
949 .with_label_values(&[req_type])
950 .start_timer();
951
952 let consensus_positions = if is_soft_bundle_request || is_ping_request {
953 assert!(
956 is_ping_request || !consensus_transactions.is_empty(),
957 "A valid soft bundle must have at least one transaction"
958 );
959 debug!(
960 "handle_submit_transaction: submitting consensus transactions ({}): {}",
961 req_type,
962 consensus_transactions
963 .iter()
964 .map(|t| t.local_display())
965 .join(", ")
966 );
967 self.handle_submit_to_consensus_for_position(
968 consensus_transactions,
969 &epoch_store,
970 submitter_client_addr,
971 )
972 .await?
973 } else {
974 let futures = consensus_transactions.into_iter().map(|t| {
975 debug!(
976 "handle_submit_transaction: submitting consensus transaction ({}): {}",
977 req_type,
978 t.local_display(),
979 );
980 self.handle_submit_to_consensus_for_position(
981 vec![t],
982 &epoch_store,
983 submitter_client_addr,
984 )
985 });
986 future::try_join_all(futures)
987 .await?
988 .into_iter()
989 .flatten()
990 .collect()
991 };
992
993 if is_ping_request {
994 assert_eq!(consensus_positions.len(), 1);
996 results.push(Some(SubmitTxResult::Submitted {
997 consensus_position: consensus_positions[0],
998 }));
999 } else {
1000 for ((idx, tx_digest), consensus_position) in transaction_indexes
1002 .into_iter()
1003 .zip(tx_digests)
1004 .zip(consensus_positions)
1005 {
1006 debug!(
1007 ?tx_digest,
1008 "handle_submit_transaction: submitted consensus transaction at {}",
1009 consensus_position,
1010 );
1011 results[idx] = Some(SubmitTxResult::Submitted { consensus_position });
1012 }
1013 }
1014
1015 Ok((Self::try_from_submit_tx_response(results)?, Weight::zero()))
1016 }
1017
1018 fn try_from_submit_tx_response(
1019 results: Vec<Option<SubmitTxResult>>,
1020 ) -> Result<RawSubmitTxResponse, SuiError> {
1021 let mut raw_results = Vec::new();
1022 for (i, result) in results.into_iter().enumerate() {
1023 let result = result.ok_or_else(|| SuiErrorKind::GenericAuthorityError {
1024 error: format!("Missing transaction result at {}", i),
1025 })?;
1026 let raw_result = result.try_into()?;
1027 raw_results.push(raw_result);
1028 }
1029 Ok(RawSubmitTxResponse {
1030 results: raw_results,
1031 })
1032 }
1033
1034 async fn handle_certificates(
1040 &self,
1041 certificates: NonEmpty<CertifiedTransaction>,
1042 include_events: bool,
1043 include_input_objects: bool,
1044 include_output_objects: bool,
1045 include_auxiliary_data: bool,
1046 epoch_store: &Arc<AuthorityPerEpochStore>,
1047 wait_for_effects: bool,
1048 ) -> Result<(Option<Vec<HandleCertificateResponseV3>>, Weight), tonic::Status> {
1049 fp_ensure!(
1052 !self.state.is_fullnode(epoch_store),
1053 SuiErrorKind::FullNodeCantHandleCertificate.into()
1054 );
1055
1056 let is_consensus_tx = certificates.iter().any(|cert| cert.is_consensus_tx());
1057
1058 let metrics = if certificates.len() == 1 {
1059 if wait_for_effects {
1060 if is_consensus_tx {
1061 &self.metrics.handle_certificate_consensus_latency
1062 } else {
1063 &self.metrics.handle_certificate_non_consensus_latency
1064 }
1065 } else {
1066 &self.metrics.submit_certificate_consensus_latency
1067 }
1068 } else {
1069 &self
1071 .metrics
1072 .handle_soft_bundle_certificates_consensus_latency
1073 };
1074
1075 let _metrics_guard = metrics.start_timer();
1076
1077 if certificates.len() == 1 {
1081 let tx_digest = *certificates[0].digest();
1082 debug!(tx_digest=?tx_digest, "Checking if certificate is already executed");
1083
1084 if let Some(signed_effects) = self
1085 .state
1086 .get_signed_effects_and_maybe_resign(&tx_digest, epoch_store)?
1087 {
1088 let events = if include_events && signed_effects.events_digest().is_some() {
1089 Some(
1090 self.state
1091 .get_transaction_events(signed_effects.transaction_digest())?,
1092 )
1093 } else {
1094 None
1095 };
1096
1097 return Ok((
1098 Some(vec![HandleCertificateResponseV3 {
1099 effects: signed_effects.into_inner(),
1100 events,
1101 input_objects: None,
1102 output_objects: None,
1103 auxiliary_data: None,
1104 }]),
1105 Weight::one(),
1106 ));
1107 };
1108 }
1109
1110 for certificate in &certificates {
1113 let overload_check_res = self.state.check_system_overload(
1114 &*self.consensus_adapter,
1115 certificate.data(),
1116 self.state.check_system_overload_at_execution(),
1117 );
1118 if let Err(error) = overload_check_res {
1119 self.metrics
1120 .num_rejected_cert_during_overload
1121 .with_label_values(&[error.as_ref()])
1122 .inc();
1123 return Err(error.into());
1124 }
1125 }
1126
1127 let verified_certificates = {
1128 let _timer = self.metrics.cert_verification_latency.start_timer();
1129 epoch_store
1130 .signature_verifier
1131 .multi_verify_certs(certificates.into())
1132 .await
1133 .into_iter()
1134 .collect::<Result<Vec<_>, _>>()?
1135 };
1136 let consensus_transactions =
1137 NonEmpty::collect(verified_certificates.iter().map(|certificate| {
1138 ConsensusTransaction::new_certificate_message(
1139 &self.state.name,
1140 certificate.clone().into(),
1141 )
1142 }))
1143 .unwrap();
1144
1145 let (responses, weight) = self
1146 .handle_submit_to_consensus(
1147 consensus_transactions,
1148 include_events,
1149 include_input_objects,
1150 include_output_objects,
1151 include_auxiliary_data,
1152 epoch_store,
1153 wait_for_effects,
1154 )
1155 .await?;
1156 let responses = if let Some(responses) = responses {
1158 Some(
1159 responses
1160 .into_iter()
1161 .map(|response| {
1162 let signed_effects =
1163 self.state.sign_effects(response.effects, epoch_store)?;
1164 Ok(HandleCertificateResponseV3 {
1165 effects: signed_effects.into_inner(),
1166 events: response.events,
1167 input_objects: if response.input_objects.is_empty() {
1168 None
1169 } else {
1170 Some(response.input_objects)
1171 },
1172 output_objects: if response.output_objects.is_empty() {
1173 None
1174 } else {
1175 Some(response.output_objects)
1176 },
1177 auxiliary_data: None,
1178 })
1179 })
1180 .collect::<Result<Vec<HandleCertificateResponseV3>, tonic::Status>>()?,
1181 )
1182 } else {
1183 None
1184 };
1185
1186 Ok((responses, weight))
1187 }
1188
1189 #[instrument(
1190 name = "ValidatorService::handle_submit_to_consensus_for_position",
1191 level = "debug",
1192 skip_all,
1193 err(level = "debug")
1194 )]
1195 async fn handle_submit_to_consensus_for_position(
1196 &self,
1197 consensus_transactions: Vec<ConsensusTransaction>,
1199 epoch_store: &Arc<AuthorityPerEpochStore>,
1200 submitter_client_addr: Option<IpAddr>,
1201 ) -> Result<Vec<ConsensusPosition>, tonic::Status> {
1202 let (tx_consensus_positions, rx_consensus_positions) = oneshot::channel();
1203
1204 {
1205 let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
1207 if !reconfiguration_lock.should_accept_user_certs() {
1208 self.metrics.num_rejected_cert_in_epoch_boundary.inc();
1209 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1210 }
1211
1212 let _metrics_guard = self.metrics.consensus_latency.start_timer();
1216
1217 self.consensus_adapter.submit_batch(
1218 &consensus_transactions,
1219 Some(&reconfiguration_lock),
1220 epoch_store,
1221 Some(tx_consensus_positions),
1222 submitter_client_addr,
1223 )?;
1224 }
1225
1226 Ok(rx_consensus_positions.await.map_err(|e| {
1227 SuiErrorKind::FailedToSubmitToConsensus(format!(
1228 "Failed to get consensus position: {e}"
1229 ))
1230 })?)
1231 }
1232
1233 async fn handle_submit_to_consensus(
1234 &self,
1235 consensus_transactions: NonEmpty<ConsensusTransaction>,
1236 include_events: bool,
1237 include_input_objects: bool,
1238 include_output_objects: bool,
1239 _include_auxiliary_data: bool,
1240 epoch_store: &Arc<AuthorityPerEpochStore>,
1241 wait_for_effects: bool,
1242 ) -> Result<(Option<Vec<ExecutedData>>, Weight), tonic::Status> {
1243 let consensus_transactions: Vec<_> = consensus_transactions.into();
1244 {
1245 let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
1247 if !reconfiguration_lock.should_accept_user_certs() {
1248 self.metrics.num_rejected_cert_in_epoch_boundary.inc();
1249 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1250 }
1251
1252 if !epoch_store.all_external_consensus_messages_processed(
1258 consensus_transactions.iter().map(|tx| tx.key()),
1259 )? {
1260 let _metrics_guard = self.metrics.consensus_latency.start_timer();
1261 self.consensus_adapter.submit_batch(
1262 &consensus_transactions,
1263 Some(&reconfiguration_lock),
1264 epoch_store,
1265 None,
1266 None, )?;
1268 }
1271 }
1272
1273 if !wait_for_effects {
1274 let fast_path_certificates = consensus_transactions
1277 .iter()
1278 .filter_map(|tx| {
1279 if let ConsensusTransactionKind::CertifiedTransaction(certificate) = &tx.kind {
1280 (!certificate.is_consensus_tx())
1281 .then_some((
1283 VerifiedExecutableTransaction::new_from_certificate(
1284 VerifiedCertificate::new_unchecked(*(certificate.clone())),
1285 ),
1286 ExecutionEnv::new()
1287 .with_scheduling_source(SchedulingSource::NonFastPath),
1288 ))
1289 } else {
1290 None
1291 }
1292 })
1293 .map(|(tx, env)| (Schedulable::Transaction(tx), env))
1294 .collect::<Vec<_>>();
1295 if !fast_path_certificates.is_empty() {
1296 self.state
1297 .execution_scheduler()
1298 .enqueue(fast_path_certificates, epoch_store);
1299 }
1300 return Ok((None, Weight::zero()));
1301 }
1302
1303 let responses = futures::future::try_join_all(consensus_transactions.into_iter().map(
1306 |tx| async move {
1307 let effects = match &tx.kind {
1308 ConsensusTransactionKind::CertifiedTransaction(certificate) => {
1309 let certificate = VerifiedCertificate::new_unchecked(*(certificate.clone()));
1311 self.state
1312 .wait_for_certificate_execution(&certificate, epoch_store)
1313 .await?
1314 }
1315 ConsensusTransactionKind::UserTransaction(tx) => {
1316 self.state.await_transaction_effects(*tx.digest(), epoch_store).await?
1317 }
1318 ConsensusTransactionKind::UserTransactionV2(tx) => {
1319 self.state.await_transaction_effects(*tx.tx().digest(), epoch_store).await?
1320 }
1321 _ => panic!("`handle_submit_to_consensus` received transaction that is not a CertifiedTransaction, UserTransaction, or UserTransactionV2"),
1322 };
1323 let events = if include_events && effects.events_digest().is_some() {
1324 Some(self.state.get_transaction_events(effects.transaction_digest())?)
1325 } else {
1326 None
1327 };
1328
1329 let input_objects = if include_input_objects {
1330 self.state.get_transaction_input_objects(&effects)?
1331 } else {
1332 vec![]
1333 };
1334
1335 let output_objects = if include_output_objects {
1336 self.state.get_transaction_output_objects(&effects)?
1337 } else {
1338 vec![]
1339 };
1340
1341 if let ConsensusTransactionKind::CertifiedTransaction(certificate) = &tx.kind {
1342 epoch_store.insert_tx_cert_sig(certificate.digest(), certificate.auth_sig())?;
1343 }
1344
1345 Ok::<_, SuiError>(ExecutedData {
1346 effects,
1347 events,
1348 input_objects,
1349 output_objects,
1350 })
1351 },
1352 ))
1353 .await?;
1354
1355 Ok((Some(responses), Weight::zero()))
1356 }
1357
1358 async fn collect_effects_data(
1359 &self,
1360 effects: &TransactionEffects,
1361 include_events: bool,
1362 include_input_objects: bool,
1363 include_output_objects: bool,
1364 fastpath_outputs: Option<Arc<TransactionOutputs>>,
1365 ) -> SuiResult<(Option<TransactionEvents>, Vec<Object>, Vec<Object>)> {
1366 let events = if include_events && effects.events_digest().is_some() {
1367 if let Some(fastpath_outputs) = &fastpath_outputs {
1368 Some(fastpath_outputs.events.clone())
1369 } else {
1370 Some(
1371 self.state
1372 .get_transaction_events(effects.transaction_digest())?,
1373 )
1374 }
1375 } else {
1376 None
1377 };
1378
1379 let input_objects = if include_input_objects {
1380 self.state.get_transaction_input_objects(effects)?
1381 } else {
1382 vec![]
1383 };
1384
1385 let output_objects = if include_output_objects {
1386 if let Some(fastpath_outputs) = &fastpath_outputs {
1387 fastpath_outputs.written.values().cloned().collect()
1388 } else {
1389 self.state.get_transaction_output_objects(effects)?
1390 }
1391 } else {
1392 vec![]
1393 };
1394
1395 Ok((events, input_objects, output_objects))
1396 }
1397}
1398
1399type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
1400
1401impl ValidatorService {
1402 async fn transaction_impl(
1403 &self,
1404 request: tonic::Request<Transaction>,
1405 ) -> WrappedServiceResponse<HandleTransactionResponse> {
1406 self.handle_transaction(request).await
1407 }
1408
1409 async fn handle_submit_transaction_impl(
1410 &self,
1411 request: tonic::Request<RawSubmitTxRequest>,
1412 ) -> WrappedServiceResponse<RawSubmitTxResponse> {
1413 self.handle_submit_transaction(request).await
1414 }
1415
1416 async fn submit_certificate_impl(
1417 &self,
1418 request: tonic::Request<CertifiedTransaction>,
1419 ) -> WrappedServiceResponse<SubmitCertificateResponse> {
1420 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1421 let certificate = request.into_inner();
1422 certificate.validity_check(&epoch_store.tx_validity_check_context())?;
1423
1424 let span =
1425 error_span!("ValidatorService::submit_certificate", tx_digest = ?certificate.digest());
1426 self.handle_certificates(
1427 nonempty![certificate],
1428 true,
1429 false,
1430 false,
1431 false,
1432 &epoch_store,
1433 false,
1434 )
1435 .instrument(span)
1436 .await
1437 .map(|(executed, spam_weight)| {
1438 (
1439 tonic::Response::new(SubmitCertificateResponse {
1440 executed: executed.map(|mut x| x.remove(0)).map(Into::into),
1441 }),
1442 spam_weight,
1443 )
1444 })
1445 }
1446
1447 async fn handle_certificate_v2_impl(
1448 &self,
1449 request: tonic::Request<CertifiedTransaction>,
1450 ) -> WrappedServiceResponse<HandleCertificateResponseV2> {
1451 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1452 let certificate = request.into_inner();
1453 certificate.validity_check(&epoch_store.tx_validity_check_context())?;
1454
1455 let span = error_span!("ValidatorService::handle_certificate_v2", tx_digest = ?certificate.digest());
1456 self.handle_certificates(
1457 nonempty![certificate],
1458 true,
1459 false,
1460 false,
1461 false,
1462 &epoch_store,
1463 true,
1464 )
1465 .instrument(span)
1466 .await
1467 .map(|(resp, spam_weight)| {
1468 (
1469 tonic::Response::new(
1470 resp.expect(
1471 "handle_certificate should not return none with wait_for_effects=true",
1472 )
1473 .remove(0)
1474 .into(),
1475 ),
1476 spam_weight,
1477 )
1478 })
1479 }
1480
1481 async fn handle_certificate_v3_impl(
1482 &self,
1483 request: tonic::Request<HandleCertificateRequestV3>,
1484 ) -> WrappedServiceResponse<HandleCertificateResponseV3> {
1485 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1486 let request = request.into_inner();
1487 request
1488 .certificate
1489 .validity_check(&epoch_store.tx_validity_check_context())?;
1490
1491 let span = error_span!("ValidatorService::handle_certificate_v3", tx_digest = ?request.certificate.digest());
1492 self.handle_certificates(
1493 nonempty![request.certificate],
1494 request.include_events,
1495 request.include_input_objects,
1496 request.include_output_objects,
1497 request.include_auxiliary_data,
1498 &epoch_store,
1499 true,
1500 )
1501 .instrument(span)
1502 .await
1503 .map(|(resp, spam_weight)| {
1504 (
1505 tonic::Response::new(
1506 resp.expect(
1507 "handle_certificate should not return none with wait_for_effects=true",
1508 )
1509 .remove(0),
1510 ),
1511 spam_weight,
1512 )
1513 })
1514 }
1515
1516 async fn wait_for_effects_impl(
1517 &self,
1518 request: tonic::Request<RawWaitForEffectsRequest>,
1519 ) -> WrappedServiceResponse<RawWaitForEffectsResponse> {
1520 let request: WaitForEffectsRequest = request.into_inner().try_into()?;
1521 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1522 let response = timeout(
1523 Duration::from_secs(20),
1525 epoch_store
1526 .within_alive_epoch(self.wait_for_effects_response(request, &epoch_store))
1527 .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch())),
1528 )
1529 .await
1530 .map_err(|_| tonic::Status::internal("Timeout waiting for effects"))???
1531 .try_into()?;
1532 Ok((tonic::Response::new(response), Weight::zero()))
1533 }
1534
1535 #[instrument(name= "ValidatorService::wait_for_effects_response", level = "error", skip_all, fields(consensus_position = ?request.consensus_position, fast_path_effects = tracing::field::Empty))]
1536 async fn wait_for_effects_response(
1537 &self,
1538 request: WaitForEffectsRequest,
1539 epoch_store: &Arc<AuthorityPerEpochStore>,
1540 ) -> SuiResult<WaitForEffectsResponse> {
1541 if request.ping_type.is_some() {
1542 return timeout(
1543 Duration::from_secs(10),
1544 self.ping_response(request, epoch_store),
1545 )
1546 .await
1547 .map_err(|_| SuiErrorKind::TimeoutError)?;
1548 }
1549
1550 let Some(tx_digest) = request.transaction_digest else {
1551 return Err(SuiErrorKind::InvalidRequest(
1552 "Transaction digest is required for wait for effects requests".to_string(),
1553 )
1554 .into());
1555 };
1556 let tx_digests = [tx_digest];
1557
1558 let fastpath_effects_future: Pin<Box<dyn Future<Output = _> + Send>> =
1559 if let Some(consensus_position) = request.consensus_position {
1560 Box::pin(self.wait_for_fastpath_effects(
1561 consensus_position,
1562 &tx_digests,
1563 request.include_details,
1564 epoch_store,
1565 ))
1566 } else {
1567 Box::pin(futures::future::pending())
1568 };
1569
1570 tokio::select! {
1571 biased;
1573 mut effects = self.state
1580 .get_transaction_cache_reader()
1581 .notify_read_executed_effects(
1582 "AuthorityServer::wait_for_effects::notify_read_executed_effects_finalized",
1583 &tx_digests,
1584 ) => {
1585 tracing::Span::current().record("fast_path_effects", false);
1586 let effects = effects.pop().unwrap();
1587 let details = if request.include_details {
1588 Some(self.complete_executed_data(effects.clone(), None).await?)
1589 } else {
1590 None
1591 };
1592
1593 Ok(WaitForEffectsResponse::Executed {
1594 effects_digest: effects.digest(),
1595 details,
1596 fast_path: false,
1597 })
1598 }
1599
1600 fastpath_response = fastpath_effects_future => {
1601 tracing::Span::current().record("fast_path_effects", true);
1602 fastpath_response
1603 }
1604 }
1605 }
1606
1607 #[instrument(level = "error", skip_all, err(level = "debug"))]
1608 async fn ping_response(
1609 &self,
1610 request: WaitForEffectsRequest,
1611 epoch_store: &Arc<AuthorityPerEpochStore>,
1612 ) -> SuiResult<WaitForEffectsResponse> {
1613 let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1614 return Err(SuiErrorKind::UnsupportedFeatureError {
1615 error: "Mysticeti fastpath".to_string(),
1616 }
1617 .into());
1618 };
1619
1620 let Some(consensus_position) = request.consensus_position else {
1621 return Err(SuiErrorKind::InvalidRequest(
1622 "Consensus position is required for Ping requests".to_string(),
1623 )
1624 .into());
1625 };
1626
1627 let Some(ping) = request.ping_type else {
1629 return Err(SuiErrorKind::InvalidRequest(
1630 "Ping type is required for ping requests".to_string(),
1631 )
1632 .into());
1633 };
1634
1635 let _metrics_guard = self
1636 .metrics
1637 .handle_wait_for_effects_ping_latency
1638 .with_label_values(&[ping.as_str()])
1639 .start_timer();
1640
1641 consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1642
1643 let mut last_status = None;
1644 let details = if request.include_details {
1645 Some(Box::new(ExecutedData::default()))
1646 } else {
1647 None
1648 };
1649
1650 loop {
1651 let status = consensus_tx_status_cache
1652 .notify_read_transaction_status_change(consensus_position, last_status)
1653 .await;
1654 match status {
1655 NotifyReadConsensusTxStatusResult::Status(status) => match status {
1656 ConsensusTxStatus::FastpathCertified => {
1657 if ping == PingType::Consensus {
1659 last_status = Some(status);
1660 continue;
1661 }
1662 return Ok(WaitForEffectsResponse::Executed {
1663 effects_digest: TransactionEffectsDigest::ZERO,
1664 details,
1665 fast_path: true,
1666 });
1667 }
1668 ConsensusTxStatus::Rejected => {
1669 return Ok(WaitForEffectsResponse::Rejected { error: None });
1670 }
1671 ConsensusTxStatus::Finalized => {
1672 return Ok(WaitForEffectsResponse::Executed {
1673 effects_digest: TransactionEffectsDigest::ZERO,
1674 details,
1675 fast_path: false,
1676 });
1677 }
1678 },
1679 NotifyReadConsensusTxStatusResult::Expired(round) => {
1680 return Ok(WaitForEffectsResponse::Expired {
1681 epoch: epoch_store.epoch(),
1682 round: Some(round),
1683 });
1684 }
1685 }
1686 }
1687 }
1688
1689 #[instrument(level = "error", skip_all, err(level = "debug"))]
1690 async fn wait_for_fastpath_effects(
1691 &self,
1692 consensus_position: ConsensusPosition,
1693 tx_digests: &[TransactionDigest],
1694 include_details: bool,
1695 epoch_store: &Arc<AuthorityPerEpochStore>,
1696 ) -> SuiResult<WaitForEffectsResponse> {
1697 let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1698 return Err(SuiErrorKind::UnsupportedFeatureError {
1699 error: "Mysticeti fastpath".to_string(),
1700 }
1701 .into());
1702 };
1703
1704 let local_epoch = epoch_store.epoch();
1705 match consensus_position.epoch.cmp(&local_epoch) {
1706 Ordering::Less => {
1707 let response = WaitForEffectsResponse::Expired {
1710 epoch: local_epoch,
1711 round: None,
1712 };
1713 return Ok(response);
1714 }
1715 Ordering::Greater => {
1716 return Err(SuiErrorKind::WrongEpoch {
1718 expected_epoch: local_epoch,
1719 actual_epoch: consensus_position.epoch,
1720 }
1721 .into());
1722 }
1723 Ordering::Equal => {
1724 }
1727 };
1728
1729 consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1730
1731 let mut current_status = None;
1732 loop {
1733 tokio::select! {
1734 status_result = consensus_tx_status_cache
1735 .notify_read_transaction_status_change(consensus_position, current_status) => {
1736 match status_result {
1737 NotifyReadConsensusTxStatusResult::Status(new_status) => {
1738 match new_status {
1739 ConsensusTxStatus::Rejected => {
1740 return Ok(WaitForEffectsResponse::Rejected {
1741 error: epoch_store.get_rejection_vote_reason(
1742 consensus_position
1743 )
1744 });
1745 }
1746 ConsensusTxStatus::FastpathCertified => {
1747 current_status = Some(new_status);
1748 continue;
1749 }
1750 ConsensusTxStatus::Finalized => {
1751 current_status = Some(new_status);
1752 continue;
1753 }
1754 }
1755 }
1756 NotifyReadConsensusTxStatusResult::Expired(round) => {
1757 return Ok(WaitForEffectsResponse::Expired {
1758 epoch: epoch_store.epoch(),
1759 round: Some(round),
1760 });
1761 }
1762 }
1763 }
1764
1765 mut outputs = self.state.get_transaction_cache_reader().notify_read_fastpath_transaction_outputs(tx_digests),
1766 if current_status == Some(ConsensusTxStatus::FastpathCertified) || current_status == Some(ConsensusTxStatus::Finalized) => {
1767 let outputs = outputs.pop().unwrap();
1768 let effects = outputs.effects.clone();
1769
1770 let details = if include_details {
1771 Some(self.complete_executed_data(effects.clone(), Some(outputs)).await?)
1772 } else {
1773 None
1774 };
1775
1776 return Ok(WaitForEffectsResponse::Executed {
1777 effects_digest: effects.digest(),
1778 details,
1779 fast_path: current_status == Some(ConsensusTxStatus::FastpathCertified),
1780 });
1781 }
1782 }
1783 }
1784 }
1785
1786 async fn complete_executed_data(
1787 &self,
1788 effects: TransactionEffects,
1789 fastpath_outputs: Option<Arc<TransactionOutputs>>,
1790 ) -> SuiResult<Box<ExecutedData>> {
1791 let (events, input_objects, output_objects) = self
1792 .collect_effects_data(
1793 &effects,
1794 true,
1795 true,
1796 true,
1797 fastpath_outputs,
1798 )
1799 .await?;
1800 Ok(Box::new(ExecutedData {
1801 effects,
1802 events,
1803 input_objects,
1804 output_objects,
1805 }))
1806 }
1807
1808 async fn soft_bundle_validity_check(
1809 &self,
1810 certificates: &NonEmpty<CertifiedTransaction>,
1811 epoch_store: &Arc<AuthorityPerEpochStore>,
1812 total_size_bytes: u64,
1813 ) -> Result<(), tonic::Status> {
1814 let protocol_config = epoch_store.protocol_config();
1815 let node_config = &self.state.config;
1816
1817 fp_ensure!(
1823 protocol_config.soft_bundle() && node_config.enable_soft_bundle,
1824 SuiErrorKind::UnsupportedFeatureError {
1825 error: "Soft Bundle".to_string()
1826 }
1827 .into()
1828 );
1829
1830 fp_ensure!(
1837 certificates.len() as u64 <= protocol_config.max_soft_bundle_size(),
1838 SuiErrorKind::UserInputError {
1839 error: UserInputError::TooManyTransactionsInBatch {
1840 size: certificates.len(),
1841 limit: protocol_config.max_soft_bundle_size()
1842 }
1843 }
1844 .into()
1845 );
1846
1847 let soft_bundle_max_size_bytes =
1851 protocol_config.consensus_max_transactions_in_block_bytes() / 2;
1852 fp_ensure!(
1853 total_size_bytes <= soft_bundle_max_size_bytes,
1854 SuiErrorKind::UserInputError {
1855 error: UserInputError::TotalTransactionSizeTooLargeInBatch {
1856 size: total_size_bytes as usize,
1857 limit: soft_bundle_max_size_bytes,
1858 },
1859 }
1860 .into()
1861 );
1862
1863 let mut gas_price = None;
1864 for certificate in certificates {
1865 let tx_digest = *certificate.digest();
1866 fp_ensure!(
1867 certificate.is_consensus_tx(),
1868 SuiErrorKind::UserInputError {
1869 error: UserInputError::NoSharedObjectError { digest: tx_digest }
1870 }
1871 .into()
1872 );
1873 fp_ensure!(
1874 !self.state.is_tx_already_executed(&tx_digest),
1875 SuiErrorKind::UserInputError {
1876 error: UserInputError::AlreadyExecutedInSoftBundleError { digest: tx_digest }
1877 }
1878 .into()
1879 );
1880 if let Some(gas) = gas_price {
1881 fp_ensure!(
1882 gas == certificate.gas_price(),
1883 SuiErrorKind::UserInputError {
1884 error: UserInputError::GasPriceMismatchError {
1885 digest: tx_digest,
1886 expected: gas,
1887 actual: certificate.gas_price()
1888 }
1889 }
1890 .into()
1891 );
1892 } else {
1893 gas_price = Some(certificate.gas_price());
1894 }
1895 }
1896
1897 fp_ensure!(
1902 !epoch_store.is_any_tx_certs_consensus_message_processed(certificates.iter())?,
1903 SuiErrorKind::UserInputError {
1904 error: UserInputError::CertificateAlreadyProcessed
1905 }
1906 .into()
1907 );
1908
1909 Ok(())
1910 }
1911
1912 async fn handle_soft_bundle_certificates_v3_impl(
1913 &self,
1914 request: tonic::Request<HandleSoftBundleCertificatesRequestV3>,
1915 ) -> WrappedServiceResponse<HandleSoftBundleCertificatesResponseV3> {
1916 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1917 let client_addr = if self.client_id_source.is_none() {
1918 self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
1919 } else {
1920 self.get_client_ip_addr(&request, self.client_id_source.as_ref().unwrap())
1921 };
1922 let request = request.into_inner();
1923
1924 let certificates = NonEmpty::from_vec(request.certificates)
1925 .ok_or(SuiErrorKind::NoCertificateProvidedError)?;
1926 let mut total_size_bytes = 0;
1927 for certificate in &certificates {
1928 total_size_bytes +=
1930 certificate.validity_check(&epoch_store.tx_validity_check_context())? as u64;
1931 }
1932
1933 self.metrics
1934 .handle_soft_bundle_certificates_count
1935 .observe(certificates.len() as f64);
1936
1937 self.metrics
1938 .handle_soft_bundle_certificates_size_bytes
1939 .observe(total_size_bytes as f64);
1940
1941 self.soft_bundle_validity_check(&certificates, &epoch_store, total_size_bytes)
1943 .await?;
1944
1945 info!(
1946 "Received Soft Bundle with {} certificates, from {}, tx digests are [{}], total size [{}]bytes",
1947 certificates.len(),
1948 client_addr
1949 .map(|x| x.to_string())
1950 .unwrap_or_else(|| "unknown".to_string()),
1951 certificates
1952 .iter()
1953 .map(|x| x.digest().to_string())
1954 .collect::<Vec<_>>()
1955 .join(", "),
1956 total_size_bytes
1957 );
1958
1959 let span = error_span!("ValidatorService::handle_soft_bundle_certificates_v3");
1960 self.handle_certificates(
1961 certificates,
1962 request.include_events,
1963 request.include_input_objects,
1964 request.include_output_objects,
1965 request.include_auxiliary_data,
1966 &epoch_store,
1967 request.wait_for_effects,
1968 )
1969 .instrument(span)
1970 .await
1971 .map(|(resp, spam_weight)| {
1972 (
1973 tonic::Response::new(HandleSoftBundleCertificatesResponseV3 {
1974 responses: resp.unwrap_or_default(),
1975 }),
1976 spam_weight,
1977 )
1978 })
1979 }
1980
1981 async fn object_info_impl(
1982 &self,
1983 request: tonic::Request<ObjectInfoRequest>,
1984 ) -> WrappedServiceResponse<ObjectInfoResponse> {
1985 let request = request.into_inner();
1986 let response = self.state.handle_object_info_request(request).await?;
1987 Ok((tonic::Response::new(response), Weight::one()))
1988 }
1989
1990 async fn transaction_info_impl(
1991 &self,
1992 request: tonic::Request<TransactionInfoRequest>,
1993 ) -> WrappedServiceResponse<TransactionInfoResponse> {
1994 let request = request.into_inner();
1995 let response = self.state.handle_transaction_info_request(request).await?;
1996 Ok((tonic::Response::new(response), Weight::one()))
1997 }
1998
1999 async fn checkpoint_impl(
2000 &self,
2001 request: tonic::Request<CheckpointRequest>,
2002 ) -> WrappedServiceResponse<CheckpointResponse> {
2003 let request = request.into_inner();
2004 let response = self.state.handle_checkpoint_request(&request)?;
2005 Ok((tonic::Response::new(response), Weight::one()))
2006 }
2007
2008 async fn checkpoint_v2_impl(
2009 &self,
2010 request: tonic::Request<CheckpointRequestV2>,
2011 ) -> WrappedServiceResponse<CheckpointResponseV2> {
2012 let request = request.into_inner();
2013 let response = self.state.handle_checkpoint_request_v2(&request)?;
2014 Ok((tonic::Response::new(response), Weight::one()))
2015 }
2016
2017 async fn get_system_state_object_impl(
2018 &self,
2019 _request: tonic::Request<SystemStateRequest>,
2020 ) -> WrappedServiceResponse<SuiSystemState> {
2021 let response = self
2022 .state
2023 .get_object_cache_reader()
2024 .get_sui_system_state_object_unsafe()?;
2025 Ok((tonic::Response::new(response), Weight::one()))
2026 }
2027
2028 async fn validator_health_impl(
2029 &self,
2030 _request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
2031 ) -> WrappedServiceResponse<sui_types::messages_grpc::RawValidatorHealthResponse> {
2032 let state = &self.state;
2033
2034 let epoch_store = state.load_epoch_store_one_call_per_task();
2036
2037 let num_inflight_execution_transactions =
2039 state.execution_scheduler().num_pending_certificates() as u64;
2040
2041 let num_inflight_consensus_transactions =
2043 self.consensus_adapter.num_inflight_transactions();
2044
2045 let last_committed_leader_round = epoch_store
2047 .consensus_tx_status_cache
2048 .as_ref()
2049 .and_then(|cache| cache.get_last_committed_leader_round())
2050 .unwrap_or(0);
2051
2052 let last_locally_built_checkpoint = epoch_store
2054 .last_built_checkpoint_summary()
2055 .ok()
2056 .flatten()
2057 .map(|(_, summary)| summary.sequence_number)
2058 .unwrap_or(0);
2059
2060 let typed_response = sui_types::messages_grpc::ValidatorHealthResponse {
2061 num_inflight_consensus_transactions,
2062 num_inflight_execution_transactions,
2063 last_locally_built_checkpoint,
2064 last_committed_leader_round,
2065 };
2066
2067 let raw_response = typed_response
2068 .try_into()
2069 .map_err(|e: sui_types::error::SuiError| {
2070 tonic::Status::internal(format!("Failed to serialize health response: {}", e))
2071 })?;
2072
2073 Ok((tonic::Response::new(raw_response), Weight::one()))
2074 }
2075
2076 fn get_client_ip_addr<T>(
2077 &self,
2078 request: &tonic::Request<T>,
2079 source: &ClientIdSource,
2080 ) -> Option<IpAddr> {
2081 let forwarded_header = request.metadata().get_all("x-forwarded-for").iter().next();
2082
2083 if let Some(header) = forwarded_header {
2084 let num_hops = header
2085 .to_str()
2086 .map(|h| h.split(',').count().saturating_sub(1))
2087 .unwrap_or(0);
2088
2089 self.metrics.x_forwarded_for_num_hops.set(num_hops as f64);
2090 }
2091
2092 match source {
2093 ClientIdSource::SocketAddr => {
2094 let socket_addr: Option<SocketAddr> = request.remote_addr();
2095
2096 if let Some(socket_addr) = socket_addr {
2102 Some(socket_addr.ip())
2103 } else {
2104 if cfg!(msim) {
2105 } else if cfg!(test) {
2107 panic!("Failed to get remote address from request");
2108 } else {
2109 self.metrics.connection_ip_not_found.inc();
2110 error!("Failed to get remote address from request");
2111 }
2112 None
2113 }
2114 }
2115 ClientIdSource::XForwardedFor(num_hops) => {
2116 let do_header_parse = |op: &MetadataValue<Ascii>| {
2117 match op.to_str() {
2118 Ok(header_val) => {
2119 let header_contents =
2120 header_val.split(',').map(str::trim).collect::<Vec<_>>();
2121 if *num_hops == 0 {
2122 error!(
2123 "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
2124 number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
2125 to this node. Skipping traffic controller request handling.",
2126 header_contents,
2127 );
2128 return None;
2129 }
2130 let contents_len = header_contents.len();
2131 if contents_len < *num_hops {
2132 error!(
2133 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
2134 Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
2135 `client-id-source` in the node config.",
2136 header_contents, contents_len, num_hops, contents_len,
2137 );
2138 self.metrics.client_id_source_config_mismatch.inc();
2139 return None;
2140 }
2141 let Some(client_ip) = header_contents.get(contents_len - num_hops)
2142 else {
2143 error!(
2144 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
2145 Expected at least {} values. Skipping traffic controller request handling.",
2146 header_contents, contents_len, num_hops, contents_len,
2147 );
2148 return None;
2149 };
2150 parse_ip(client_ip).or_else(|| {
2151 self.metrics.forwarded_header_parse_error.inc();
2152 None
2153 })
2154 }
2155 Err(e) => {
2156 self.metrics.forwarded_header_invalid.inc();
2160 error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
2161 None
2162 }
2163 }
2164 };
2165 if let Some(op) = request.metadata().get("x-forwarded-for") {
2166 do_header_parse(op)
2167 } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
2168 do_header_parse(op)
2169 } else {
2170 self.metrics.forwarded_header_not_included.inc();
2171 error!(
2172 "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
2173 );
2174 None
2175 }
2176 }
2177 }
2178 }
2179
2180 async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
2181 if let Some(traffic_controller) = &self.traffic_controller {
2182 if !traffic_controller.check(&client, &None).await {
2183 Err(tonic::Status::from_error(
2185 SuiErrorKind::TooManyRequests.into(),
2186 ))
2187 } else {
2188 Ok(())
2189 }
2190 } else {
2191 Ok(())
2192 }
2193 }
2194
2195 fn handle_traffic_resp<T>(
2196 &self,
2197 client: Option<IpAddr>,
2198 wrapped_response: WrappedServiceResponse<T>,
2199 ) -> Result<tonic::Response<T>, tonic::Status> {
2200 let (error, spam_weight, unwrapped_response) = match wrapped_response {
2201 Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
2202 Err(status) => (
2203 Some(SuiError::from(status.clone())),
2204 Weight::zero(),
2205 Err(status.clone()),
2206 ),
2207 };
2208
2209 if let Some(traffic_controller) = self.traffic_controller.clone() {
2210 traffic_controller.tally(TrafficTally {
2211 direct: client,
2212 through_fullnode: None,
2213 error_info: error.map(|e| {
2214 let error_type = String::from(e.clone().as_ref());
2215 let error_weight = normalize(e);
2216 (error_weight, error_type)
2217 }),
2218 spam_weight,
2219 timestamp: SystemTime::now(),
2220 })
2221 }
2222 unwrapped_response
2223 }
2224}
2225
2226fn make_tonic_request_for_testing<T>(message: T) -> tonic::Request<T> {
2227 let mut request = tonic::Request::new(message);
2230 let tcp_connect_info = TcpConnectInfo {
2231 local_addr: None,
2232 remote_addr: Some(SocketAddr::new([127, 0, 0, 1].into(), 0)),
2233 };
2234 request.extensions_mut().insert(tcp_connect_info);
2235 request
2236}
2237
2238fn normalize(err: SuiError) -> Weight {
2240 match err.as_inner() {
2241 SuiErrorKind::UserInputError {
2242 error: UserInputError::IncorrectUserSignature { .. },
2243 } => Weight::one(),
2244 SuiErrorKind::InvalidSignature { .. }
2245 | SuiErrorKind::SignerSignatureAbsent { .. }
2246 | SuiErrorKind::SignerSignatureNumberMismatch { .. }
2247 | SuiErrorKind::IncorrectSigner { .. }
2248 | SuiErrorKind::UnknownSigner { .. }
2249 | SuiErrorKind::WrongEpoch { .. } => Weight::one(),
2250 _ => Weight::zero(),
2251 }
2252}
2253
2254#[macro_export]
2258macro_rules! handle_with_decoration {
2259 ($self:ident, $func_name:ident, $request:ident) => {{
2260 if $self.client_id_source.is_none() {
2261 return $self.$func_name($request).await.map(|(result, _)| result);
2262 }
2263
2264 let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
2265
2266 $self.handle_traffic_req(client.clone()).await?;
2268
2269 let wrapped_response = $self.$func_name($request).await;
2271 $self.handle_traffic_resp(client, wrapped_response)
2272 }};
2273}
2274
2275#[async_trait]
2276impl Validator for ValidatorService {
2277 async fn submit_transaction(
2278 &self,
2279 request: tonic::Request<RawSubmitTxRequest>,
2280 ) -> Result<tonic::Response<RawSubmitTxResponse>, tonic::Status> {
2281 let validator_service = self.clone();
2282
2283 spawn_monitored_task!(async move {
2286 handle_with_decoration!(validator_service, handle_submit_transaction_impl, request)
2289 })
2290 .await
2291 .unwrap()
2292 }
2293
2294 async fn transaction(
2295 &self,
2296 request: tonic::Request<Transaction>,
2297 ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
2298 let validator_service = self.clone();
2299
2300 spawn_monitored_task!(async move {
2303 handle_with_decoration!(validator_service, transaction_impl, request)
2306 })
2307 .await
2308 .unwrap()
2309 }
2310
2311 async fn submit_certificate(
2312 &self,
2313 request: tonic::Request<CertifiedTransaction>,
2314 ) -> Result<tonic::Response<SubmitCertificateResponse>, tonic::Status> {
2315 let validator_service = self.clone();
2316
2317 spawn_monitored_task!(async move {
2320 handle_with_decoration!(validator_service, submit_certificate_impl, request)
2323 })
2324 .await
2325 .unwrap()
2326 }
2327
2328 async fn handle_certificate_v2(
2329 &self,
2330 request: tonic::Request<CertifiedTransaction>,
2331 ) -> Result<tonic::Response<HandleCertificateResponseV2>, tonic::Status> {
2332 handle_with_decoration!(self, handle_certificate_v2_impl, request)
2333 }
2334
2335 async fn handle_certificate_v3(
2336 &self,
2337 request: tonic::Request<HandleCertificateRequestV3>,
2338 ) -> Result<tonic::Response<HandleCertificateResponseV3>, tonic::Status> {
2339 handle_with_decoration!(self, handle_certificate_v3_impl, request)
2340 }
2341
2342 async fn wait_for_effects(
2343 &self,
2344 request: tonic::Request<RawWaitForEffectsRequest>,
2345 ) -> Result<tonic::Response<RawWaitForEffectsResponse>, tonic::Status> {
2346 handle_with_decoration!(self, wait_for_effects_impl, request)
2347 }
2348
2349 async fn handle_soft_bundle_certificates_v3(
2350 &self,
2351 request: tonic::Request<HandleSoftBundleCertificatesRequestV3>,
2352 ) -> Result<tonic::Response<HandleSoftBundleCertificatesResponseV3>, tonic::Status> {
2353 handle_with_decoration!(self, handle_soft_bundle_certificates_v3_impl, request)
2354 }
2355
2356 async fn object_info(
2357 &self,
2358 request: tonic::Request<ObjectInfoRequest>,
2359 ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
2360 handle_with_decoration!(self, object_info_impl, request)
2361 }
2362
2363 async fn transaction_info(
2364 &self,
2365 request: tonic::Request<TransactionInfoRequest>,
2366 ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
2367 handle_with_decoration!(self, transaction_info_impl, request)
2368 }
2369
2370 async fn checkpoint(
2371 &self,
2372 request: tonic::Request<CheckpointRequest>,
2373 ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
2374 handle_with_decoration!(self, checkpoint_impl, request)
2375 }
2376
2377 async fn checkpoint_v2(
2378 &self,
2379 request: tonic::Request<CheckpointRequestV2>,
2380 ) -> Result<tonic::Response<CheckpointResponseV2>, tonic::Status> {
2381 handle_with_decoration!(self, checkpoint_v2_impl, request)
2382 }
2383
2384 async fn get_system_state_object(
2385 &self,
2386 request: tonic::Request<SystemStateRequest>,
2387 ) -> Result<tonic::Response<SuiSystemState>, tonic::Status> {
2388 handle_with_decoration!(self, get_system_state_object_impl, request)
2389 }
2390
2391 async fn validator_health(
2392 &self,
2393 request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
2394 ) -> Result<tonic::Response<sui_types::messages_grpc::RawValidatorHealthResponse>, tonic::Status>
2395 {
2396 handle_with_decoration!(self, validator_health_impl, request)
2397 }
2398}