1use anyhow::Result;
6use async_trait::async_trait;
7use fastcrypto::traits::KeyPair;
8use futures::{TryFutureExt, future};
9use itertools::Itertools as _;
10use mysten_common::{assert_reachable, debug_fatal};
11use mysten_metrics::spawn_monitored_task;
12use prometheus::{
13 Gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, Registry,
14 register_gauge_with_registry, register_histogram_vec_with_registry,
15 register_histogram_with_registry, register_int_counter_vec_with_registry,
16 register_int_counter_with_registry,
17};
18use std::{
19 cmp::Ordering,
20 future::Future,
21 io,
22 net::{IpAddr, SocketAddr},
23 pin::Pin,
24 sync::Arc,
25 time::{Duration, SystemTime},
26};
27use sui_network::{
28 api::{Validator, ValidatorServer},
29 tonic,
30 validator::server::SUI_TLS_SERVER_NAME,
31};
32use sui_types::message_envelope::Message;
33use sui_types::messages_consensus::ConsensusPosition;
34use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKind};
35use sui_types::messages_grpc::{
36 HandleCertificateRequestV3, HandleCertificateResponseV3, RawSubmitTxResponse,
37};
38use sui_types::messages_grpc::{
39 HandleCertificateResponseV2, HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse,
40 SubmitCertificateResponse, SystemStateRequest, TransactionInfoRequest, TransactionInfoResponse,
41};
42use sui_types::messages_grpc::{
43 HandleSoftBundleCertificatesRequestV3, HandleSoftBundleCertificatesResponseV3,
44};
45use sui_types::multiaddr::Multiaddr;
46use sui_types::object::Object;
47use sui_types::sui_system_state::SuiSystemState;
48use sui_types::traffic_control::{ClientIdSource, Weight};
49use sui_types::{
50 digests::{TransactionDigest, TransactionEffectsDigest},
51 error::{SuiErrorKind, UserInputError},
52};
53use sui_types::{
54 effects::TransactionEffects,
55 messages_grpc::{
56 ExecutedData, RawSubmitTxRequest, RawWaitForEffectsRequest, RawWaitForEffectsResponse,
57 SubmitTxResult, WaitForEffectsRequest, WaitForEffectsResponse,
58 },
59};
60use sui_types::{
61 effects::TransactionEffectsAPI, executable_transaction::VerifiedExecutableTransaction,
62};
63use sui_types::{effects::TransactionEvents, messages_grpc::SubmitTxType};
64use sui_types::{error::*, transaction::*};
65use sui_types::{
66 fp_ensure,
67 messages_checkpoint::{
68 CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
69 },
70};
71use tap::TapFallible;
72use tokio::sync::oneshot;
73use tokio::time::timeout;
74use tonic::metadata::{Ascii, MetadataValue};
75use tracing::{Instrument, debug, error, error_span, info, instrument};
76
77use crate::consensus_adapter::ConnectionMonitorStatusForTests;
78use crate::{
79 authority::{AuthorityState, consensus_tx_status_cache::ConsensusTxStatus},
80 consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics},
81 traffic_controller::{TrafficController, parse_ip, policies::TrafficTally},
82};
83use crate::{
84 authority::{
85 ExecutionEnv, authority_per_epoch_store::AuthorityPerEpochStore,
86 consensus_tx_status_cache::NotifyReadConsensusTxStatusResult,
87 shared_object_version_manager::Schedulable,
88 },
89 checkpoints::CheckpointStore,
90 execution_scheduler::SchedulingSource,
91 mysticeti_adapter::LazyMysticetiClient,
92 transaction_outputs::TransactionOutputs,
93};
94use nonempty::{NonEmpty, nonempty};
95use sui_config::local_ip_utils::new_local_tcp_address_for_testing;
96use sui_types::messages_grpc::PingType;
97use tonic::transport::server::TcpConnectInfo;
98
99#[cfg(test)]
100#[path = "unit_tests/server_tests.rs"]
101mod server_tests;
102
103#[cfg(test)]
104#[path = "unit_tests/wait_for_effects_tests.rs"]
105mod wait_for_effects_tests;
106
107#[cfg(test)]
108#[path = "unit_tests/submit_transaction_tests.rs"]
109mod submit_transaction_tests;
110
111pub struct AuthorityServerHandle {
112 server_handle: sui_network::validator::server::Server,
113}
114
115impl AuthorityServerHandle {
116 pub async fn join(self) -> Result<(), io::Error> {
117 self.server_handle.handle().wait_for_shutdown().await;
118 Ok(())
119 }
120
121 pub async fn kill(self) -> Result<(), io::Error> {
122 self.server_handle.handle().shutdown().await;
123 Ok(())
124 }
125
126 pub fn address(&self) -> &Multiaddr {
127 self.server_handle.local_addr()
128 }
129}
130
131pub struct AuthorityServer {
132 address: Multiaddr,
133 pub state: Arc<AuthorityState>,
134 consensus_adapter: Arc<ConsensusAdapter>,
135 pub metrics: Arc<ValidatorServiceMetrics>,
136}
137
138impl AuthorityServer {
139 pub fn new_for_test_with_consensus_adapter(
140 state: Arc<AuthorityState>,
141 consensus_adapter: Arc<ConsensusAdapter>,
142 ) -> Self {
143 let address = new_local_tcp_address_for_testing();
144 let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
145
146 Self {
147 address,
148 state,
149 consensus_adapter,
150 metrics,
151 }
152 }
153
154 pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
155 let consensus_adapter = Arc::new(ConsensusAdapter::new(
156 Arc::new(LazyMysticetiClient::new()),
157 CheckpointStore::new_for_tests(),
158 state.name,
159 Arc::new(ConnectionMonitorStatusForTests {}),
160 100_000,
161 100_000,
162 None,
163 None,
164 ConsensusAdapterMetrics::new_test(),
165 state.epoch_store_for_testing().protocol_config().clone(),
166 ));
167 Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
168 }
169
170 pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
171 let address = self.address.clone();
172 self.spawn_with_bind_address_for_test(address).await
173 }
174
175 pub async fn spawn_with_bind_address_for_test(
176 self,
177 address: Multiaddr,
178 ) -> Result<AuthorityServerHandle, io::Error> {
179 let tls_config = sui_tls::create_rustls_server_config(
180 self.state.config.network_key_pair().copy().private(),
181 SUI_TLS_SERVER_NAME.to_string(),
182 );
183 let config = mysten_network::config::Config::new();
184 let server = sui_network::validator::server::ServerBuilder::from_config(
185 &config,
186 mysten_network::metrics::DefaultMetricsCallbackProvider::default(),
187 )
188 .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
189 self.state,
190 self.consensus_adapter,
191 self.metrics,
192 )))
193 .bind(&address, Some(tls_config))
194 .await
195 .unwrap();
196 let local_addr = server.local_addr().to_owned();
197 info!("Listening to traffic on {local_addr}");
198 let handle = AuthorityServerHandle {
199 server_handle: server,
200 };
201 Ok(handle)
202 }
203}
204
205pub struct ValidatorServiceMetrics {
206 pub signature_errors: IntCounter,
207 pub tx_verification_latency: Histogram,
208 pub cert_verification_latency: Histogram,
209 pub consensus_latency: Histogram,
210 pub handle_transaction_latency: Histogram,
211 pub submit_certificate_consensus_latency: Histogram,
212 pub handle_certificate_consensus_latency: Histogram,
213 pub handle_certificate_non_consensus_latency: Histogram,
214 pub handle_soft_bundle_certificates_consensus_latency: Histogram,
215 pub handle_soft_bundle_certificates_count: Histogram,
216 pub handle_soft_bundle_certificates_size_bytes: Histogram,
217 pub handle_transaction_consensus_latency: Histogram,
218 pub handle_submit_transaction_consensus_latency: HistogramVec,
219 pub handle_wait_for_effects_ping_latency: HistogramVec,
220
221 handle_submit_transaction_latency: HistogramVec,
222 handle_submit_transaction_bytes: HistogramVec,
223 handle_submit_transaction_batch_size: HistogramVec,
224
225 num_rejected_tx_in_epoch_boundary: IntCounter,
226 num_rejected_cert_in_epoch_boundary: IntCounter,
227 num_rejected_tx_during_overload: IntCounterVec,
228 num_rejected_cert_during_overload: IntCounterVec,
229 submission_rejected_transactions: IntCounterVec,
230 connection_ip_not_found: IntCounter,
231 forwarded_header_parse_error: IntCounter,
232 forwarded_header_invalid: IntCounter,
233 forwarded_header_not_included: IntCounter,
234 client_id_source_config_mismatch: IntCounter,
235 x_forwarded_for_num_hops: Gauge,
236}
237
238impl ValidatorServiceMetrics {
239 pub fn new(registry: &Registry) -> Self {
240 Self {
241 signature_errors: register_int_counter_with_registry!(
242 "total_signature_errors",
243 "Number of transaction signature errors",
244 registry,
245 )
246 .unwrap(),
247 tx_verification_latency: register_histogram_with_registry!(
248 "validator_service_tx_verification_latency",
249 "Latency of verifying a transaction",
250 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
251 registry,
252 )
253 .unwrap(),
254 cert_verification_latency: register_histogram_with_registry!(
255 "validator_service_cert_verification_latency",
256 "Latency of verifying a certificate",
257 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
258 registry,
259 )
260 .unwrap(),
261 consensus_latency: register_histogram_with_registry!(
262 "validator_service_consensus_latency",
263 "Time spent between submitting a txn to consensus and getting back local acknowledgement. Execution and finalization time are not included.",
264 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
265 registry,
266 )
267 .unwrap(),
268 handle_transaction_latency: register_histogram_with_registry!(
269 "validator_service_handle_transaction_latency",
270 "Latency of handling a transaction",
271 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
272 registry,
273 )
274 .unwrap(),
275 handle_certificate_consensus_latency: register_histogram_with_registry!(
276 "validator_service_handle_certificate_consensus_latency",
277 "Latency of handling a consensus transaction certificate",
278 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
279 registry,
280 )
281 .unwrap(),
282 submit_certificate_consensus_latency: register_histogram_with_registry!(
283 "validator_service_submit_certificate_consensus_latency",
284 "Latency of submit_certificate RPC handler",
285 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
286 registry,
287 )
288 .unwrap(),
289 handle_certificate_non_consensus_latency: register_histogram_with_registry!(
290 "validator_service_handle_certificate_non_consensus_latency",
291 "Latency of handling a non-consensus transaction certificate",
292 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
293 registry,
294 )
295 .unwrap(),
296 handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
297 "validator_service_handle_soft_bundle_certificates_consensus_latency",
298 "Latency of handling a consensus soft bundle",
299 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
300 registry,
301 )
302 .unwrap(),
303 handle_soft_bundle_certificates_count: register_histogram_with_registry!(
304 "handle_soft_bundle_certificates_count",
305 "The number of certificates included in a soft bundle",
306 mysten_metrics::COUNT_BUCKETS.to_vec(),
307 registry,
308 )
309 .unwrap(),
310 handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
311 "handle_soft_bundle_certificates_size_bytes",
312 "The size of soft bundle in bytes",
313 mysten_metrics::BYTES_BUCKETS.to_vec(),
314 registry,
315 )
316 .unwrap(),
317 handle_transaction_consensus_latency: register_histogram_with_registry!(
318 "validator_service_handle_transaction_consensus_latency",
319 "Latency of handling a user transaction sent through consensus",
320 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
321 registry,
322 )
323 .unwrap(),
324 handle_submit_transaction_consensus_latency: register_histogram_vec_with_registry!(
325 "validator_service_submit_transaction_consensus_latency",
326 "Latency of submitting a user transaction sent through consensus",
327 &["req_type"],
328 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
329 registry,
330 )
331 .unwrap(),
332 handle_submit_transaction_latency: register_histogram_vec_with_registry!(
333 "validator_service_submit_transaction_latency",
334 "Latency of submit transaction handler",
335 &["req_type"],
336 mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
337 registry,
338 )
339 .unwrap(),
340 handle_wait_for_effects_ping_latency: register_histogram_vec_with_registry!(
341 "validator_service_handle_wait_for_effects_ping_latency",
342 "Latency of handling a ping request for wait_for_effects",
343 &["req_type"],
344 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
345 registry,
346 )
347 .unwrap(),
348 handle_submit_transaction_bytes: register_histogram_vec_with_registry!(
349 "validator_service_submit_transaction_bytes",
350 "The size of transactions in the submit transaction request",
351 &["req_type"],
352 mysten_metrics::BYTES_BUCKETS.to_vec(),
353 registry,
354 )
355 .unwrap(),
356 handle_submit_transaction_batch_size: register_histogram_vec_with_registry!(
357 "validator_service_submit_transaction_batch_size",
358 "The number of transactions in the submit transaction request",
359 &["req_type"],
360 mysten_metrics::COUNT_BUCKETS.to_vec(),
361 registry,
362 )
363 .unwrap(),
364 num_rejected_tx_in_epoch_boundary: register_int_counter_with_registry!(
365 "validator_service_num_rejected_tx_in_epoch_boundary",
366 "Number of rejected transaction during epoch transitioning",
367 registry,
368 )
369 .unwrap(),
370 num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
371 "validator_service_num_rejected_cert_in_epoch_boundary",
372 "Number of rejected transaction certificate during epoch transitioning",
373 registry,
374 )
375 .unwrap(),
376 num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
377 "validator_service_num_rejected_tx_during_overload",
378 "Number of rejected transaction due to system overload",
379 &["error_type"],
380 registry,
381 )
382 .unwrap(),
383 num_rejected_cert_during_overload: register_int_counter_vec_with_registry!(
384 "validator_service_num_rejected_cert_during_overload",
385 "Number of rejected transaction certificate due to system overload",
386 &["error_type"],
387 registry,
388 )
389 .unwrap(),
390 submission_rejected_transactions: register_int_counter_vec_with_registry!(
391 "validator_service_submission_rejected_transactions",
392 "Number of transactions rejected during submission",
393 &["reason"],
394 registry,
395 )
396 .unwrap(),
397 connection_ip_not_found: register_int_counter_with_registry!(
398 "validator_service_connection_ip_not_found",
399 "Number of times connection IP was not extractable from request",
400 registry,
401 )
402 .unwrap(),
403 forwarded_header_parse_error: register_int_counter_with_registry!(
404 "validator_service_forwarded_header_parse_error",
405 "Number of times x-forwarded-for header could not be parsed",
406 registry,
407 )
408 .unwrap(),
409 forwarded_header_invalid: register_int_counter_with_registry!(
410 "validator_service_forwarded_header_invalid",
411 "Number of times x-forwarded-for header was invalid",
412 registry,
413 )
414 .unwrap(),
415 forwarded_header_not_included: register_int_counter_with_registry!(
416 "validator_service_forwarded_header_not_included",
417 "Number of times x-forwarded-for header was (unexpectedly) not included in request",
418 registry,
419 )
420 .unwrap(),
421 client_id_source_config_mismatch: register_int_counter_with_registry!(
422 "validator_service_client_id_source_config_mismatch",
423 "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
424 registry,
425 )
426 .unwrap(),
427 x_forwarded_for_num_hops: register_gauge_with_registry!(
428 "validator_service_x_forwarded_for_num_hops",
429 "Number of hops in x-forwarded-for header",
430 registry,
431 )
432 .unwrap(),
433 }
434 }
435
436 pub fn new_for_tests() -> Self {
437 let registry = Registry::new();
438 Self::new(®istry)
439 }
440}
441
442#[derive(Clone)]
443pub struct ValidatorService {
444 state: Arc<AuthorityState>,
445 consensus_adapter: Arc<ConsensusAdapter>,
446 metrics: Arc<ValidatorServiceMetrics>,
447 traffic_controller: Option<Arc<TrafficController>>,
448 client_id_source: Option<ClientIdSource>,
449}
450
451impl ValidatorService {
452 pub fn new(
453 state: Arc<AuthorityState>,
454 consensus_adapter: Arc<ConsensusAdapter>,
455 validator_metrics: Arc<ValidatorServiceMetrics>,
456 client_id_source: Option<ClientIdSource>,
457 ) -> Self {
458 let traffic_controller = state.traffic_controller.clone();
459 Self {
460 state,
461 consensus_adapter,
462 metrics: validator_metrics,
463 traffic_controller,
464 client_id_source,
465 }
466 }
467
468 pub fn new_for_tests(
469 state: Arc<AuthorityState>,
470 consensus_adapter: Arc<ConsensusAdapter>,
471 metrics: Arc<ValidatorServiceMetrics>,
472 ) -> Self {
473 Self {
474 state,
475 consensus_adapter,
476 metrics,
477 traffic_controller: None,
478 client_id_source: None,
479 }
480 }
481
482 pub fn validator_state(&self) -> &Arc<AuthorityState> {
483 &self.state
484 }
485
486 pub async fn execute_certificate_for_testing(
487 &self,
488 cert: CertifiedTransaction,
489 ) -> Result<tonic::Response<HandleCertificateResponseV2>, tonic::Status> {
490 let request = make_tonic_request_for_testing(cert);
491 self.handle_certificate_v2(request).await
492 }
493
494 pub async fn handle_transaction_for_benchmarking(
495 &self,
496 transaction: Transaction,
497 ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
498 let request = make_tonic_request_for_testing(transaction);
499 self.transaction(request).await
500 }
501
502 async fn handle_transaction(
505 &self,
506 request: tonic::Request<Transaction>,
507 ) -> WrappedServiceResponse<HandleTransactionResponse> {
508 let Self {
509 state,
510 consensus_adapter,
511 metrics,
512 traffic_controller: _,
513 client_id_source: _,
514 } = self.clone();
515 let transaction = request.into_inner();
516 let epoch_store = state.load_epoch_store_one_call_per_task();
517
518 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
519
520 let mut validator_pushback_error = None;
528 let overload_check_res = state.check_system_overload(
529 &*consensus_adapter,
530 transaction.data(),
531 state.check_system_overload_at_signing(),
532 );
533 if let Err(error) = overload_check_res {
534 metrics
535 .num_rejected_tx_during_overload
536 .with_label_values(&[error.as_ref()])
537 .inc();
538 match error.as_inner() {
540 SuiErrorKind::ValidatorOverloadedRetryAfter { .. } => {
541 validator_pushback_error = Some(error)
542 }
543 _ => return Err(error.into()),
544 }
545 }
546
547 let _handle_tx_metrics_guard = metrics.handle_transaction_latency.start_timer();
548
549 let tx_verif_metrics_guard = metrics.tx_verification_latency.start_timer();
550 let transaction = epoch_store
551 .verify_transaction_require_no_aliases(transaction)
553 .tap_err(|_| {
554 metrics.signature_errors.inc();
555 })?
556 .into_tx();
557 drop(tx_verif_metrics_guard);
558
559 let tx_digest = transaction.digest();
560
561 let span = error_span!("ValidatorService::validator_state_process_tx", ?tx_digest);
563
564 let info = state
565 .handle_transaction(&epoch_store, transaction.clone())
566 .instrument(span)
567 .await
568 .tap_err(|e| {
569 if let SuiErrorKind::ValidatorHaltedAtEpochEnd = e.as_inner() {
570 metrics.num_rejected_tx_in_epoch_boundary.inc();
571 }
572 })?;
573
574 if let Some(error) = validator_pushback_error {
575 return Err(error.into());
578 }
579
580 Ok((tonic::Response::new(info), Weight::zero()))
581 }
582
583 #[instrument(
584 name = "ValidatorService::handle_submit_transaction",
585 level = "error",
586 skip_all,
587 err(level = "debug")
588 )]
589 async fn handle_submit_transaction(
590 &self,
591 request: tonic::Request<RawSubmitTxRequest>,
592 ) -> WrappedServiceResponse<RawSubmitTxResponse> {
593 let Self {
594 state,
595 consensus_adapter,
596 metrics,
597 traffic_controller: _,
598 client_id_source,
599 } = self.clone();
600
601 let submitter_client_addr = if let Some(client_id_source) = &client_id_source {
602 self.get_client_ip_addr(&request, client_id_source)
603 } else {
604 self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
605 };
606
607 let inner = request.into_inner();
608 let start_epoch = state.load_epoch_store_one_call_per_task().epoch();
609
610 let next_epoch = start_epoch + 1;
611 let mut max_retries = 1;
612
613 loop {
614 let res = self
615 .handle_submit_transaction_inner(
616 &state,
617 &consensus_adapter,
618 &metrics,
619 &inner,
620 submitter_client_addr,
621 )
622 .await;
623 match res {
624 Ok((response, weight)) => return Ok((tonic::Response::new(response), weight)),
625 Err(err) => {
626 if max_retries > 0
627 && let SuiErrorKind::ValidatorHaltedAtEpochEnd = err.as_inner()
628 {
629 max_retries -= 1;
630
631 debug!(
632 "ValidatorHaltedAtEpochEnd. Will retry after validator reconfigures"
633 );
634
635 if let Ok(Ok(new_epoch)) =
636 timeout(Duration::from_secs(15), state.wait_for_epoch(next_epoch)).await
637 {
638 assert_reachable!("retry submission at epoch end");
639 if new_epoch == next_epoch {
640 continue;
641 }
642
643 debug_fatal!(
644 "expected epoch {} after reconfiguration. got {}",
645 next_epoch,
646 new_epoch
647 );
648 }
649 }
650 return Err(err.into());
651 }
652 }
653 }
654 }
655
656 async fn handle_submit_transaction_inner(
657 &self,
658 state: &AuthorityState,
659 consensus_adapter: &ConsensusAdapter,
660 metrics: &ValidatorServiceMetrics,
661 request: &RawSubmitTxRequest,
662 submitter_client_addr: Option<IpAddr>,
663 ) -> SuiResult<(RawSubmitTxResponse, Weight)> {
664 let epoch_store = state.load_epoch_store_one_call_per_task();
665 if !epoch_store.protocol_config().mysticeti_fastpath() {
666 return Err(SuiErrorKind::UnsupportedFeatureError {
667 error: "Mysticeti fastpath".to_string(),
668 }
669 .into());
670 }
671
672 let submit_type = SubmitTxType::try_from(request.submit_type).map_err(|e| {
673 SuiErrorKind::GrpcMessageDeserializeError {
674 type_info: "RawSubmitTxRequest.submit_type".to_string(),
675 error: e.to_string(),
676 }
677 })?;
678
679 let is_ping_request = submit_type == SubmitTxType::Ping;
680 if is_ping_request {
681 fp_ensure!(
682 request.transactions.is_empty(),
683 SuiErrorKind::InvalidRequest(format!(
684 "Ping request cannot contain {} transactions",
685 request.transactions.len()
686 ))
687 .into()
688 );
689 } else {
690 fp_ensure!(
692 !request.transactions.is_empty(),
693 SuiErrorKind::InvalidRequest(
694 "At least one transaction needs to be submitted".to_string(),
695 )
696 .into()
697 );
698 }
699
700 let is_soft_bundle_request = submit_type == SubmitTxType::SoftBundle;
705
706 let max_num_transactions = if is_soft_bundle_request {
707 epoch_store.protocol_config().max_soft_bundle_size()
710 } else {
711 epoch_store
713 .protocol_config()
714 .max_num_transactions_in_block()
715 };
716 fp_ensure!(
717 request.transactions.len() <= max_num_transactions as usize,
718 SuiErrorKind::InvalidRequest(format!(
719 "Too many transactions in request: {} vs {}",
720 request.transactions.len(),
721 max_num_transactions
722 ))
723 .into()
724 );
725
726 let mut tx_digests = Vec::with_capacity(request.transactions.len());
728 let mut consensus_transactions = Vec::with_capacity(request.transactions.len());
730 let mut transaction_indexes = Vec::with_capacity(request.transactions.len());
732 let mut results: Vec<Option<SubmitTxResult>> = vec![None; request.transactions.len()];
734 let mut total_size_bytes = 0;
736
737 let req_type = if is_ping_request {
738 "ping"
739 } else if request.transactions.len() == 1 {
740 "single_transaction"
741 } else if is_soft_bundle_request {
742 "soft_bundle"
743 } else {
744 "batch"
745 };
746
747 let _handle_tx_metrics_guard = metrics
748 .handle_submit_transaction_latency
749 .with_label_values(&[req_type])
750 .start_timer();
751
752 for (idx, tx_bytes) in request.transactions.iter().enumerate() {
753 let transaction = match bcs::from_bytes::<Transaction>(tx_bytes) {
754 Ok(txn) => txn,
755 Err(e) => {
756 return Err(SuiErrorKind::TransactionDeserializationError {
758 error: format!("Failed to deserialize transaction at index {}: {}", idx, e),
759 }
760 .into());
761 }
762 };
763
764 let tx_size = transaction.validity_check(&epoch_store.tx_validity_check_context())?;
766
767 let overload_check_res = state.check_system_overload(
768 consensus_adapter,
769 transaction.data(),
770 state.check_system_overload_at_signing(),
771 );
772 if let Err(error) = overload_check_res {
773 metrics
774 .num_rejected_tx_during_overload
775 .with_label_values(&[error.as_ref()])
776 .inc();
777 results[idx] = Some(SubmitTxResult::Rejected { error });
778 continue;
779 }
780
781 let verified_transaction = {
783 let _metrics_guard = metrics.tx_verification_latency.start_timer();
784 if epoch_store.protocol_config().address_aliases() {
785 match epoch_store.verify_transaction_with_current_aliases(transaction) {
786 Ok(tx) => tx,
787 Err(e) => {
788 metrics.signature_errors.inc();
789 return Err(e);
790 }
791 }
792 } else {
793 match epoch_store.verify_transaction_require_no_aliases(transaction) {
794 Ok(tx) => tx,
795 Err(e) => {
796 metrics.signature_errors.inc();
797 return Err(e);
798 }
799 }
800 }
801 };
802
803 let tx_digest = verified_transaction.tx().digest();
804 tx_digests.push(*tx_digest);
805
806 debug!(
807 ?tx_digest,
808 "handle_submit_transaction: verified transaction"
809 );
810
811 if let Some(effects) = state
814 .get_transaction_cache_reader()
815 .get_executed_effects(tx_digest)
816 {
817 let effects_digest = effects.digest();
818 if let Ok(executed_data) = self.complete_executed_data(effects, None).await {
819 let executed_result = SubmitTxResult::Executed {
820 effects_digest,
821 details: Some(executed_data),
822 fast_path: false,
823 };
824 results[idx] = Some(executed_result);
825 debug!(?tx_digest, "handle_submit_transaction: already executed");
826 continue;
827 }
828 }
829
830 debug!(
831 ?tx_digest,
832 "handle_submit_transaction: waiting for fastpath dependency objects"
833 );
834 if !state
835 .wait_for_fastpath_dependency_objects(
836 verified_transaction.tx(),
837 epoch_store.epoch(),
838 )
839 .await?
840 {
841 debug!(
842 ?tx_digest,
843 "fastpath input objects are still unavailable after waiting"
844 );
845 }
846
847 match state.handle_vote_transaction(&epoch_store, verified_transaction.tx().clone()) {
848 Ok(_) => { }
849 Err(e) => {
850 if let Some(effects) = state
853 .get_transaction_cache_reader()
854 .get_executed_effects(tx_digest)
855 {
856 let effects_digest = effects.digest();
857 if let Ok(executed_data) = self.complete_executed_data(effects, None).await
858 {
859 let executed_result = SubmitTxResult::Executed {
860 effects_digest,
861 details: Some(executed_data),
862 fast_path: false,
863 };
864 results[idx] = Some(executed_result);
865 continue;
866 }
867 }
868
869 debug!(?tx_digest, "Transaction rejected during submission: {e}");
871 metrics
872 .submission_rejected_transactions
873 .with_label_values(&[e.to_variant_name()])
874 .inc();
875 results[idx] = Some(SubmitTxResult::Rejected { error: e });
876 continue;
877 }
878 }
879
880 if epoch_store.protocol_config().address_aliases() {
881 consensus_transactions.push(ConsensusTransaction::new_user_transaction_v2_message(
882 &state.name,
883 verified_transaction.into(),
884 ));
885 } else {
886 consensus_transactions.push(ConsensusTransaction::new_user_transaction_message(
887 &state.name,
888 verified_transaction.into_tx().into(),
889 ));
890 }
891 transaction_indexes.push(idx);
892 total_size_bytes += tx_size;
893 }
894
895 if consensus_transactions.is_empty() && !is_ping_request {
896 return Ok((Self::try_from_submit_tx_response(results)?, Weight::zero()));
897 }
898
899 let max_transaction_bytes = if is_soft_bundle_request {
903 epoch_store
904 .protocol_config()
905 .consensus_max_transactions_in_block_bytes()
906 / 2
907 } else {
908 epoch_store
909 .protocol_config()
910 .consensus_max_transactions_in_block_bytes()
911 };
912 fp_ensure!(
913 total_size_bytes <= max_transaction_bytes as usize,
914 SuiErrorKind::UserInputError {
915 error: UserInputError::TotalTransactionSizeTooLargeInBatch {
916 size: total_size_bytes,
917 limit: max_transaction_bytes,
918 },
919 }
920 .into()
921 );
922
923 metrics
924 .handle_submit_transaction_bytes
925 .with_label_values(&[req_type])
926 .observe(total_size_bytes as f64);
927 metrics
928 .handle_submit_transaction_batch_size
929 .with_label_values(&[req_type])
930 .observe(consensus_transactions.len() as f64);
931
932 let _latency_metric_guard = metrics
933 .handle_submit_transaction_consensus_latency
934 .with_label_values(&[req_type])
935 .start_timer();
936
937 let consensus_positions = if is_soft_bundle_request || is_ping_request {
938 assert!(
941 is_ping_request || !consensus_transactions.is_empty(),
942 "A valid soft bundle must have at least one transaction"
943 );
944 debug!(
945 "handle_submit_transaction: submitting consensus transactions ({}): {}",
946 req_type,
947 consensus_transactions
948 .iter()
949 .map(|t| t.local_display())
950 .join(", ")
951 );
952 self.handle_submit_to_consensus_for_position(
953 consensus_transactions,
954 &epoch_store,
955 submitter_client_addr,
956 )
957 .await?
958 } else {
959 let futures = consensus_transactions.into_iter().map(|t| {
960 debug!(
961 "handle_submit_transaction: submitting consensus transaction ({}): {}",
962 req_type,
963 t.local_display(),
964 );
965 self.handle_submit_to_consensus_for_position(
966 vec![t],
967 &epoch_store,
968 submitter_client_addr,
969 )
970 });
971 future::try_join_all(futures)
972 .await?
973 .into_iter()
974 .flatten()
975 .collect()
976 };
977
978 if is_ping_request {
979 assert_eq!(consensus_positions.len(), 1);
981 results.push(Some(SubmitTxResult::Submitted {
982 consensus_position: consensus_positions[0],
983 }));
984 } else {
985 for ((idx, tx_digest), consensus_position) in transaction_indexes
987 .into_iter()
988 .zip(tx_digests)
989 .zip(consensus_positions)
990 {
991 debug!(
992 ?tx_digest,
993 "handle_submit_transaction: submitted consensus transaction at {}",
994 consensus_position,
995 );
996 results[idx] = Some(SubmitTxResult::Submitted { consensus_position });
997 }
998 }
999
1000 Ok((Self::try_from_submit_tx_response(results)?, Weight::zero()))
1001 }
1002
1003 fn try_from_submit_tx_response(
1004 results: Vec<Option<SubmitTxResult>>,
1005 ) -> Result<RawSubmitTxResponse, SuiError> {
1006 let mut raw_results = Vec::new();
1007 for (i, result) in results.into_iter().enumerate() {
1008 let result = result.ok_or_else(|| SuiErrorKind::GenericAuthorityError {
1009 error: format!("Missing transaction result at {}", i),
1010 })?;
1011 let raw_result = result.try_into()?;
1012 raw_results.push(raw_result);
1013 }
1014 Ok(RawSubmitTxResponse {
1015 results: raw_results,
1016 })
1017 }
1018
1019 async fn handle_certificates(
1025 &self,
1026 certificates: NonEmpty<CertifiedTransaction>,
1027 include_events: bool,
1028 include_input_objects: bool,
1029 include_output_objects: bool,
1030 include_auxiliary_data: bool,
1031 epoch_store: &Arc<AuthorityPerEpochStore>,
1032 wait_for_effects: bool,
1033 ) -> Result<(Option<Vec<HandleCertificateResponseV3>>, Weight), tonic::Status> {
1034 fp_ensure!(
1037 !self.state.is_fullnode(epoch_store),
1038 SuiErrorKind::FullNodeCantHandleCertificate.into()
1039 );
1040
1041 let is_consensus_tx = certificates.iter().any(|cert| cert.is_consensus_tx());
1042
1043 let metrics = if certificates.len() == 1 {
1044 if wait_for_effects {
1045 if is_consensus_tx {
1046 &self.metrics.handle_certificate_consensus_latency
1047 } else {
1048 &self.metrics.handle_certificate_non_consensus_latency
1049 }
1050 } else {
1051 &self.metrics.submit_certificate_consensus_latency
1052 }
1053 } else {
1054 &self
1056 .metrics
1057 .handle_soft_bundle_certificates_consensus_latency
1058 };
1059
1060 let _metrics_guard = metrics.start_timer();
1061
1062 if certificates.len() == 1 {
1066 let tx_digest = *certificates[0].digest();
1067 debug!(tx_digest=?tx_digest, "Checking if certificate is already executed");
1068
1069 if let Some(signed_effects) = self
1070 .state
1071 .get_signed_effects_and_maybe_resign(&tx_digest, epoch_store)?
1072 {
1073 let events = if include_events && signed_effects.events_digest().is_some() {
1074 Some(
1075 self.state
1076 .get_transaction_events(signed_effects.transaction_digest())?,
1077 )
1078 } else {
1079 None
1080 };
1081
1082 return Ok((
1083 Some(vec![HandleCertificateResponseV3 {
1084 effects: signed_effects.into_inner(),
1085 events,
1086 input_objects: None,
1087 output_objects: None,
1088 auxiliary_data: None,
1089 }]),
1090 Weight::one(),
1091 ));
1092 };
1093 }
1094
1095 for certificate in &certificates {
1098 let overload_check_res = self.state.check_system_overload(
1099 &*self.consensus_adapter,
1100 certificate.data(),
1101 self.state.check_system_overload_at_execution(),
1102 );
1103 if let Err(error) = overload_check_res {
1104 self.metrics
1105 .num_rejected_cert_during_overload
1106 .with_label_values(&[error.as_ref()])
1107 .inc();
1108 return Err(error.into());
1109 }
1110 }
1111
1112 let verified_certificates = {
1113 let _timer = self.metrics.cert_verification_latency.start_timer();
1114 epoch_store
1115 .signature_verifier
1116 .multi_verify_certs(certificates.into())
1117 .await
1118 .into_iter()
1119 .collect::<Result<Vec<_>, _>>()?
1120 };
1121 let consensus_transactions =
1122 NonEmpty::collect(verified_certificates.iter().map(|certificate| {
1123 ConsensusTransaction::new_certificate_message(
1124 &self.state.name,
1125 certificate.clone().into(),
1126 )
1127 }))
1128 .unwrap();
1129
1130 let (responses, weight) = self
1131 .handle_submit_to_consensus(
1132 consensus_transactions,
1133 include_events,
1134 include_input_objects,
1135 include_output_objects,
1136 include_auxiliary_data,
1137 epoch_store,
1138 wait_for_effects,
1139 )
1140 .await?;
1141 let responses = if let Some(responses) = responses {
1143 Some(
1144 responses
1145 .into_iter()
1146 .map(|response| {
1147 let signed_effects =
1148 self.state.sign_effects(response.effects, epoch_store)?;
1149 Ok(HandleCertificateResponseV3 {
1150 effects: signed_effects.into_inner(),
1151 events: response.events,
1152 input_objects: if response.input_objects.is_empty() {
1153 None
1154 } else {
1155 Some(response.input_objects)
1156 },
1157 output_objects: if response.output_objects.is_empty() {
1158 None
1159 } else {
1160 Some(response.output_objects)
1161 },
1162 auxiliary_data: None,
1163 })
1164 })
1165 .collect::<Result<Vec<HandleCertificateResponseV3>, tonic::Status>>()?,
1166 )
1167 } else {
1168 None
1169 };
1170
1171 Ok((responses, weight))
1172 }
1173
1174 #[instrument(
1175 name = "ValidatorService::handle_submit_to_consensus_for_position",
1176 level = "debug",
1177 skip_all,
1178 err(level = "debug")
1179 )]
1180 async fn handle_submit_to_consensus_for_position(
1181 &self,
1182 consensus_transactions: Vec<ConsensusTransaction>,
1184 epoch_store: &Arc<AuthorityPerEpochStore>,
1185 submitter_client_addr: Option<IpAddr>,
1186 ) -> Result<Vec<ConsensusPosition>, tonic::Status> {
1187 let (tx_consensus_positions, rx_consensus_positions) = oneshot::channel();
1188
1189 {
1190 let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
1192 if !reconfiguration_lock.should_accept_user_certs() {
1193 self.metrics.num_rejected_cert_in_epoch_boundary.inc();
1194 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1195 }
1196
1197 let _metrics_guard = self.metrics.consensus_latency.start_timer();
1201
1202 self.consensus_adapter.submit_batch(
1203 &consensus_transactions,
1204 Some(&reconfiguration_lock),
1205 epoch_store,
1206 Some(tx_consensus_positions),
1207 submitter_client_addr,
1208 )?;
1209 }
1210
1211 Ok(rx_consensus_positions.await.map_err(|e| {
1212 SuiErrorKind::FailedToSubmitToConsensus(format!(
1213 "Failed to get consensus position: {e}"
1214 ))
1215 })?)
1216 }
1217
1218 async fn handle_submit_to_consensus(
1219 &self,
1220 consensus_transactions: NonEmpty<ConsensusTransaction>,
1221 include_events: bool,
1222 include_input_objects: bool,
1223 include_output_objects: bool,
1224 _include_auxiliary_data: bool,
1225 epoch_store: &Arc<AuthorityPerEpochStore>,
1226 wait_for_effects: bool,
1227 ) -> Result<(Option<Vec<ExecutedData>>, Weight), tonic::Status> {
1228 let consensus_transactions: Vec<_> = consensus_transactions.into();
1229 {
1230 let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
1232 if !reconfiguration_lock.should_accept_user_certs() {
1233 self.metrics.num_rejected_cert_in_epoch_boundary.inc();
1234 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1235 }
1236
1237 if !epoch_store.all_external_consensus_messages_processed(
1243 consensus_transactions.iter().map(|tx| tx.key()),
1244 )? {
1245 let _metrics_guard = self.metrics.consensus_latency.start_timer();
1246 self.consensus_adapter.submit_batch(
1247 &consensus_transactions,
1248 Some(&reconfiguration_lock),
1249 epoch_store,
1250 None,
1251 None, )?;
1253 }
1256 }
1257
1258 if !wait_for_effects {
1259 let fast_path_certificates = consensus_transactions
1262 .iter()
1263 .filter_map(|tx| {
1264 if let ConsensusTransactionKind::CertifiedTransaction(certificate) = &tx.kind {
1265 (!certificate.is_consensus_tx())
1266 .then_some((
1268 VerifiedExecutableTransaction::new_from_certificate(
1269 VerifiedCertificate::new_unchecked(*(certificate.clone())),
1270 ),
1271 ExecutionEnv::new()
1272 .with_scheduling_source(SchedulingSource::NonFastPath),
1273 ))
1274 } else {
1275 None
1276 }
1277 })
1278 .map(|(tx, env)| (Schedulable::Transaction(tx), env))
1279 .collect::<Vec<_>>();
1280 if !fast_path_certificates.is_empty() {
1281 self.state
1282 .execution_scheduler()
1283 .enqueue(fast_path_certificates, epoch_store);
1284 }
1285 return Ok((None, Weight::zero()));
1286 }
1287
1288 let responses = futures::future::try_join_all(consensus_transactions.into_iter().map(
1291 |tx| async move {
1292 let effects = match &tx.kind {
1293 ConsensusTransactionKind::CertifiedTransaction(certificate) => {
1294 let certificate = VerifiedCertificate::new_unchecked(*(certificate.clone()));
1296 self.state
1297 .wait_for_certificate_execution(&certificate, epoch_store)
1298 .await?
1299 }
1300 ConsensusTransactionKind::UserTransaction(tx) => {
1301 self.state.await_transaction_effects(*tx.digest(), epoch_store).await?
1302 }
1303 ConsensusTransactionKind::UserTransactionV2(tx) => {
1304 self.state.await_transaction_effects(*tx.tx().digest(), epoch_store).await?
1305 }
1306 _ => panic!("`handle_submit_to_consensus` received transaction that is not a CertifiedTransaction, UserTransaction, or UserTransactionV2"),
1307 };
1308 let events = if include_events && effects.events_digest().is_some() {
1309 Some(self.state.get_transaction_events(effects.transaction_digest())?)
1310 } else {
1311 None
1312 };
1313
1314 let input_objects = if include_input_objects {
1315 self.state.get_transaction_input_objects(&effects)?
1316 } else {
1317 vec![]
1318 };
1319
1320 let output_objects = if include_output_objects {
1321 self.state.get_transaction_output_objects(&effects)?
1322 } else {
1323 vec![]
1324 };
1325
1326 if let ConsensusTransactionKind::CertifiedTransaction(certificate) = &tx.kind {
1327 epoch_store.insert_tx_cert_sig(certificate.digest(), certificate.auth_sig())?;
1328 }
1329
1330 Ok::<_, SuiError>(ExecutedData {
1331 effects,
1332 events,
1333 input_objects,
1334 output_objects,
1335 })
1336 },
1337 ))
1338 .await?;
1339
1340 Ok((Some(responses), Weight::zero()))
1341 }
1342
1343 async fn collect_effects_data(
1344 &self,
1345 effects: &TransactionEffects,
1346 include_events: bool,
1347 include_input_objects: bool,
1348 include_output_objects: bool,
1349 fastpath_outputs: Option<Arc<TransactionOutputs>>,
1350 ) -> SuiResult<(Option<TransactionEvents>, Vec<Object>, Vec<Object>)> {
1351 let events = if include_events && effects.events_digest().is_some() {
1352 if let Some(fastpath_outputs) = &fastpath_outputs {
1353 Some(fastpath_outputs.events.clone())
1354 } else {
1355 Some(
1356 self.state
1357 .get_transaction_events(effects.transaction_digest())?,
1358 )
1359 }
1360 } else {
1361 None
1362 };
1363
1364 let input_objects = if include_input_objects {
1365 self.state.get_transaction_input_objects(effects)?
1366 } else {
1367 vec![]
1368 };
1369
1370 let output_objects = if include_output_objects {
1371 if let Some(fastpath_outputs) = &fastpath_outputs {
1372 fastpath_outputs.written.values().cloned().collect()
1373 } else {
1374 self.state.get_transaction_output_objects(effects)?
1375 }
1376 } else {
1377 vec![]
1378 };
1379
1380 Ok((events, input_objects, output_objects))
1381 }
1382}
1383
1384type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
1385
1386impl ValidatorService {
1387 async fn transaction_impl(
1388 &self,
1389 request: tonic::Request<Transaction>,
1390 ) -> WrappedServiceResponse<HandleTransactionResponse> {
1391 self.handle_transaction(request).await
1392 }
1393
1394 async fn handle_submit_transaction_impl(
1395 &self,
1396 request: tonic::Request<RawSubmitTxRequest>,
1397 ) -> WrappedServiceResponse<RawSubmitTxResponse> {
1398 self.handle_submit_transaction(request).await
1399 }
1400
1401 async fn submit_certificate_impl(
1402 &self,
1403 request: tonic::Request<CertifiedTransaction>,
1404 ) -> WrappedServiceResponse<SubmitCertificateResponse> {
1405 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1406 let certificate = request.into_inner();
1407 certificate.validity_check(&epoch_store.tx_validity_check_context())?;
1408
1409 let span =
1410 error_span!("ValidatorService::submit_certificate", tx_digest = ?certificate.digest());
1411 self.handle_certificates(
1412 nonempty![certificate],
1413 true,
1414 false,
1415 false,
1416 false,
1417 &epoch_store,
1418 false,
1419 )
1420 .instrument(span)
1421 .await
1422 .map(|(executed, spam_weight)| {
1423 (
1424 tonic::Response::new(SubmitCertificateResponse {
1425 executed: executed.map(|mut x| x.remove(0)).map(Into::into),
1426 }),
1427 spam_weight,
1428 )
1429 })
1430 }
1431
1432 async fn handle_certificate_v2_impl(
1433 &self,
1434 request: tonic::Request<CertifiedTransaction>,
1435 ) -> WrappedServiceResponse<HandleCertificateResponseV2> {
1436 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1437 let certificate = request.into_inner();
1438 certificate.validity_check(&epoch_store.tx_validity_check_context())?;
1439
1440 let span = error_span!("ValidatorService::handle_certificate_v2", tx_digest = ?certificate.digest());
1441 self.handle_certificates(
1442 nonempty![certificate],
1443 true,
1444 false,
1445 false,
1446 false,
1447 &epoch_store,
1448 true,
1449 )
1450 .instrument(span)
1451 .await
1452 .map(|(resp, spam_weight)| {
1453 (
1454 tonic::Response::new(
1455 resp.expect(
1456 "handle_certificate should not return none with wait_for_effects=true",
1457 )
1458 .remove(0)
1459 .into(),
1460 ),
1461 spam_weight,
1462 )
1463 })
1464 }
1465
1466 async fn handle_certificate_v3_impl(
1467 &self,
1468 request: tonic::Request<HandleCertificateRequestV3>,
1469 ) -> WrappedServiceResponse<HandleCertificateResponseV3> {
1470 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1471 let request = request.into_inner();
1472 request
1473 .certificate
1474 .validity_check(&epoch_store.tx_validity_check_context())?;
1475
1476 let span = error_span!("ValidatorService::handle_certificate_v3", tx_digest = ?request.certificate.digest());
1477 self.handle_certificates(
1478 nonempty![request.certificate],
1479 request.include_events,
1480 request.include_input_objects,
1481 request.include_output_objects,
1482 request.include_auxiliary_data,
1483 &epoch_store,
1484 true,
1485 )
1486 .instrument(span)
1487 .await
1488 .map(|(resp, spam_weight)| {
1489 (
1490 tonic::Response::new(
1491 resp.expect(
1492 "handle_certificate should not return none with wait_for_effects=true",
1493 )
1494 .remove(0),
1495 ),
1496 spam_weight,
1497 )
1498 })
1499 }
1500
1501 async fn wait_for_effects_impl(
1502 &self,
1503 request: tonic::Request<RawWaitForEffectsRequest>,
1504 ) -> WrappedServiceResponse<RawWaitForEffectsResponse> {
1505 let request: WaitForEffectsRequest = request.into_inner().try_into()?;
1506 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1507 let response = timeout(
1508 Duration::from_secs(20),
1510 epoch_store
1511 .within_alive_epoch(self.wait_for_effects_response(request, &epoch_store))
1512 .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch())),
1513 )
1514 .await
1515 .map_err(|_| tonic::Status::internal("Timeout waiting for effects"))???
1516 .try_into()?;
1517 Ok((tonic::Response::new(response), Weight::zero()))
1518 }
1519
1520 #[instrument(name= "ValidatorService::wait_for_effects_response", level = "error", skip_all, fields(consensus_position = ?request.consensus_position, fast_path_effects = tracing::field::Empty))]
1521 async fn wait_for_effects_response(
1522 &self,
1523 request: WaitForEffectsRequest,
1524 epoch_store: &Arc<AuthorityPerEpochStore>,
1525 ) -> SuiResult<WaitForEffectsResponse> {
1526 if request.ping_type.is_some() {
1527 return timeout(
1528 Duration::from_secs(10),
1529 self.ping_response(request, epoch_store),
1530 )
1531 .await
1532 .map_err(|_| SuiErrorKind::TimeoutError)?;
1533 }
1534
1535 let Some(tx_digest) = request.transaction_digest else {
1536 return Err(SuiErrorKind::InvalidRequest(
1537 "Transaction digest is required for wait for effects requests".to_string(),
1538 )
1539 .into());
1540 };
1541 let tx_digests = [tx_digest];
1542
1543 let fastpath_effects_future: Pin<Box<dyn Future<Output = _> + Send>> =
1544 if let Some(consensus_position) = request.consensus_position {
1545 Box::pin(self.wait_for_fastpath_effects(
1546 consensus_position,
1547 &tx_digests,
1548 request.include_details,
1549 epoch_store,
1550 ))
1551 } else {
1552 Box::pin(futures::future::pending())
1553 };
1554
1555 tokio::select! {
1556 biased;
1558 mut effects = self.state
1565 .get_transaction_cache_reader()
1566 .notify_read_executed_effects(
1567 "AuthorityServer::wait_for_effects::notify_read_executed_effects_finalized",
1568 &tx_digests,
1569 ) => {
1570 tracing::Span::current().record("fast_path_effects", false);
1571 let effects = effects.pop().unwrap();
1572 let details = if request.include_details {
1573 Some(self.complete_executed_data(effects.clone(), None).await?)
1574 } else {
1575 None
1576 };
1577
1578 Ok(WaitForEffectsResponse::Executed {
1579 effects_digest: effects.digest(),
1580 details,
1581 fast_path: false,
1582 })
1583 }
1584
1585 fastpath_response = fastpath_effects_future => {
1586 tracing::Span::current().record("fast_path_effects", true);
1587 fastpath_response
1588 }
1589 }
1590 }
1591
1592 #[instrument(level = "error", skip_all, err(level = "debug"))]
1593 async fn ping_response(
1594 &self,
1595 request: WaitForEffectsRequest,
1596 epoch_store: &Arc<AuthorityPerEpochStore>,
1597 ) -> SuiResult<WaitForEffectsResponse> {
1598 let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1599 return Err(SuiErrorKind::UnsupportedFeatureError {
1600 error: "Mysticeti fastpath".to_string(),
1601 }
1602 .into());
1603 };
1604
1605 let Some(consensus_position) = request.consensus_position else {
1606 return Err(SuiErrorKind::InvalidRequest(
1607 "Consensus position is required for Ping requests".to_string(),
1608 )
1609 .into());
1610 };
1611
1612 let Some(ping) = request.ping_type else {
1614 return Err(SuiErrorKind::InvalidRequest(
1615 "Ping type is required for ping requests".to_string(),
1616 )
1617 .into());
1618 };
1619
1620 let _metrics_guard = self
1621 .metrics
1622 .handle_wait_for_effects_ping_latency
1623 .with_label_values(&[ping.as_str()])
1624 .start_timer();
1625
1626 consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1627
1628 let mut last_status = None;
1629 let details = if request.include_details {
1630 Some(Box::new(ExecutedData::default()))
1631 } else {
1632 None
1633 };
1634
1635 loop {
1636 let status = consensus_tx_status_cache
1637 .notify_read_transaction_status_change(consensus_position, last_status)
1638 .await;
1639 match status {
1640 NotifyReadConsensusTxStatusResult::Status(status) => match status {
1641 ConsensusTxStatus::FastpathCertified => {
1642 if ping == PingType::Consensus {
1644 last_status = Some(status);
1645 continue;
1646 }
1647 return Ok(WaitForEffectsResponse::Executed {
1648 effects_digest: TransactionEffectsDigest::ZERO,
1649 details,
1650 fast_path: true,
1651 });
1652 }
1653 ConsensusTxStatus::Rejected => {
1654 return Ok(WaitForEffectsResponse::Rejected { error: None });
1655 }
1656 ConsensusTxStatus::Finalized => {
1657 return Ok(WaitForEffectsResponse::Executed {
1658 effects_digest: TransactionEffectsDigest::ZERO,
1659 details,
1660 fast_path: false,
1661 });
1662 }
1663 },
1664 NotifyReadConsensusTxStatusResult::Expired(round) => {
1665 return Ok(WaitForEffectsResponse::Expired {
1666 epoch: epoch_store.epoch(),
1667 round: Some(round),
1668 });
1669 }
1670 }
1671 }
1672 }
1673
1674 #[instrument(level = "error", skip_all, err(level = "debug"))]
1675 async fn wait_for_fastpath_effects(
1676 &self,
1677 consensus_position: ConsensusPosition,
1678 tx_digests: &[TransactionDigest],
1679 include_details: bool,
1680 epoch_store: &Arc<AuthorityPerEpochStore>,
1681 ) -> SuiResult<WaitForEffectsResponse> {
1682 let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1683 return Err(SuiErrorKind::UnsupportedFeatureError {
1684 error: "Mysticeti fastpath".to_string(),
1685 }
1686 .into());
1687 };
1688
1689 let local_epoch = epoch_store.epoch();
1690 match consensus_position.epoch.cmp(&local_epoch) {
1691 Ordering::Less => {
1692 let response = WaitForEffectsResponse::Expired {
1695 epoch: local_epoch,
1696 round: None,
1697 };
1698 return Ok(response);
1699 }
1700 Ordering::Greater => {
1701 return Err(SuiErrorKind::WrongEpoch {
1703 expected_epoch: local_epoch,
1704 actual_epoch: consensus_position.epoch,
1705 }
1706 .into());
1707 }
1708 Ordering::Equal => {
1709 }
1712 };
1713
1714 consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1715
1716 let mut current_status = None;
1717 loop {
1718 tokio::select! {
1719 status_result = consensus_tx_status_cache
1720 .notify_read_transaction_status_change(consensus_position, current_status) => {
1721 match status_result {
1722 NotifyReadConsensusTxStatusResult::Status(new_status) => {
1723 match new_status {
1724 ConsensusTxStatus::Rejected => {
1725 return Ok(WaitForEffectsResponse::Rejected {
1726 error: epoch_store.get_rejection_vote_reason(
1727 consensus_position
1728 )
1729 });
1730 }
1731 ConsensusTxStatus::FastpathCertified => {
1732 current_status = Some(new_status);
1733 continue;
1734 }
1735 ConsensusTxStatus::Finalized => {
1736 current_status = Some(new_status);
1737 continue;
1738 }
1739 }
1740 }
1741 NotifyReadConsensusTxStatusResult::Expired(round) => {
1742 return Ok(WaitForEffectsResponse::Expired {
1743 epoch: epoch_store.epoch(),
1744 round: Some(round),
1745 });
1746 }
1747 }
1748 }
1749
1750 mut outputs = self.state.get_transaction_cache_reader().notify_read_fastpath_transaction_outputs(tx_digests),
1751 if current_status == Some(ConsensusTxStatus::FastpathCertified) || current_status == Some(ConsensusTxStatus::Finalized) => {
1752 let outputs = outputs.pop().unwrap();
1753 let effects = outputs.effects.clone();
1754
1755 let details = if include_details {
1756 Some(self.complete_executed_data(effects.clone(), Some(outputs)).await?)
1757 } else {
1758 None
1759 };
1760
1761 return Ok(WaitForEffectsResponse::Executed {
1762 effects_digest: effects.digest(),
1763 details,
1764 fast_path: current_status == Some(ConsensusTxStatus::FastpathCertified),
1765 });
1766 }
1767 }
1768 }
1769 }
1770
1771 async fn complete_executed_data(
1772 &self,
1773 effects: TransactionEffects,
1774 fastpath_outputs: Option<Arc<TransactionOutputs>>,
1775 ) -> SuiResult<Box<ExecutedData>> {
1776 let (events, input_objects, output_objects) = self
1777 .collect_effects_data(
1778 &effects,
1779 true,
1780 true,
1781 true,
1782 fastpath_outputs,
1783 )
1784 .await?;
1785 Ok(Box::new(ExecutedData {
1786 effects,
1787 events,
1788 input_objects,
1789 output_objects,
1790 }))
1791 }
1792
1793 async fn soft_bundle_validity_check(
1794 &self,
1795 certificates: &NonEmpty<CertifiedTransaction>,
1796 epoch_store: &Arc<AuthorityPerEpochStore>,
1797 total_size_bytes: u64,
1798 ) -> Result<(), tonic::Status> {
1799 let protocol_config = epoch_store.protocol_config();
1800 let node_config = &self.state.config;
1801
1802 fp_ensure!(
1808 protocol_config.soft_bundle() && node_config.enable_soft_bundle,
1809 SuiErrorKind::UnsupportedFeatureError {
1810 error: "Soft Bundle".to_string()
1811 }
1812 .into()
1813 );
1814
1815 fp_ensure!(
1822 certificates.len() as u64 <= protocol_config.max_soft_bundle_size(),
1823 SuiErrorKind::UserInputError {
1824 error: UserInputError::TooManyTransactionsInBatch {
1825 size: certificates.len(),
1826 limit: protocol_config.max_soft_bundle_size()
1827 }
1828 }
1829 .into()
1830 );
1831
1832 let soft_bundle_max_size_bytes =
1836 protocol_config.consensus_max_transactions_in_block_bytes() / 2;
1837 fp_ensure!(
1838 total_size_bytes <= soft_bundle_max_size_bytes,
1839 SuiErrorKind::UserInputError {
1840 error: UserInputError::TotalTransactionSizeTooLargeInBatch {
1841 size: total_size_bytes as usize,
1842 limit: soft_bundle_max_size_bytes,
1843 },
1844 }
1845 .into()
1846 );
1847
1848 let mut gas_price = None;
1849 for certificate in certificates {
1850 let tx_digest = *certificate.digest();
1851 fp_ensure!(
1852 certificate.is_consensus_tx(),
1853 SuiErrorKind::UserInputError {
1854 error: UserInputError::NoSharedObjectError { digest: tx_digest }
1855 }
1856 .into()
1857 );
1858 fp_ensure!(
1859 !self.state.is_tx_already_executed(&tx_digest),
1860 SuiErrorKind::UserInputError {
1861 error: UserInputError::AlreadyExecutedInSoftBundleError { digest: tx_digest }
1862 }
1863 .into()
1864 );
1865 if let Some(gas) = gas_price {
1866 fp_ensure!(
1867 gas == certificate.gas_price(),
1868 SuiErrorKind::UserInputError {
1869 error: UserInputError::GasPriceMismatchError {
1870 digest: tx_digest,
1871 expected: gas,
1872 actual: certificate.gas_price()
1873 }
1874 }
1875 .into()
1876 );
1877 } else {
1878 gas_price = Some(certificate.gas_price());
1879 }
1880 }
1881
1882 fp_ensure!(
1887 !epoch_store.is_any_tx_certs_consensus_message_processed(certificates.iter())?,
1888 SuiErrorKind::UserInputError {
1889 error: UserInputError::CertificateAlreadyProcessed
1890 }
1891 .into()
1892 );
1893
1894 Ok(())
1895 }
1896
1897 async fn handle_soft_bundle_certificates_v3_impl(
1898 &self,
1899 request: tonic::Request<HandleSoftBundleCertificatesRequestV3>,
1900 ) -> WrappedServiceResponse<HandleSoftBundleCertificatesResponseV3> {
1901 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1902 let client_addr = if self.client_id_source.is_none() {
1903 self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
1904 } else {
1905 self.get_client_ip_addr(&request, self.client_id_source.as_ref().unwrap())
1906 };
1907 let request = request.into_inner();
1908
1909 let certificates = NonEmpty::from_vec(request.certificates)
1910 .ok_or(SuiErrorKind::NoCertificateProvidedError)?;
1911 let mut total_size_bytes = 0;
1912 for certificate in &certificates {
1913 total_size_bytes +=
1915 certificate.validity_check(&epoch_store.tx_validity_check_context())? as u64;
1916 }
1917
1918 self.metrics
1919 .handle_soft_bundle_certificates_count
1920 .observe(certificates.len() as f64);
1921
1922 self.metrics
1923 .handle_soft_bundle_certificates_size_bytes
1924 .observe(total_size_bytes as f64);
1925
1926 self.soft_bundle_validity_check(&certificates, &epoch_store, total_size_bytes)
1928 .await?;
1929
1930 info!(
1931 "Received Soft Bundle with {} certificates, from {}, tx digests are [{}], total size [{}]bytes",
1932 certificates.len(),
1933 client_addr
1934 .map(|x| x.to_string())
1935 .unwrap_or_else(|| "unknown".to_string()),
1936 certificates
1937 .iter()
1938 .map(|x| x.digest().to_string())
1939 .collect::<Vec<_>>()
1940 .join(", "),
1941 total_size_bytes
1942 );
1943
1944 let span = error_span!("ValidatorService::handle_soft_bundle_certificates_v3");
1945 self.handle_certificates(
1946 certificates,
1947 request.include_events,
1948 request.include_input_objects,
1949 request.include_output_objects,
1950 request.include_auxiliary_data,
1951 &epoch_store,
1952 request.wait_for_effects,
1953 )
1954 .instrument(span)
1955 .await
1956 .map(|(resp, spam_weight)| {
1957 (
1958 tonic::Response::new(HandleSoftBundleCertificatesResponseV3 {
1959 responses: resp.unwrap_or_default(),
1960 }),
1961 spam_weight,
1962 )
1963 })
1964 }
1965
1966 async fn object_info_impl(
1967 &self,
1968 request: tonic::Request<ObjectInfoRequest>,
1969 ) -> WrappedServiceResponse<ObjectInfoResponse> {
1970 let request = request.into_inner();
1971 let response = self.state.handle_object_info_request(request).await?;
1972 Ok((tonic::Response::new(response), Weight::one()))
1973 }
1974
1975 async fn transaction_info_impl(
1976 &self,
1977 request: tonic::Request<TransactionInfoRequest>,
1978 ) -> WrappedServiceResponse<TransactionInfoResponse> {
1979 let request = request.into_inner();
1980 let response = self.state.handle_transaction_info_request(request).await?;
1981 Ok((tonic::Response::new(response), Weight::one()))
1982 }
1983
1984 async fn checkpoint_impl(
1985 &self,
1986 request: tonic::Request<CheckpointRequest>,
1987 ) -> WrappedServiceResponse<CheckpointResponse> {
1988 let request = request.into_inner();
1989 let response = self.state.handle_checkpoint_request(&request)?;
1990 Ok((tonic::Response::new(response), Weight::one()))
1991 }
1992
1993 async fn checkpoint_v2_impl(
1994 &self,
1995 request: tonic::Request<CheckpointRequestV2>,
1996 ) -> WrappedServiceResponse<CheckpointResponseV2> {
1997 let request = request.into_inner();
1998 let response = self.state.handle_checkpoint_request_v2(&request)?;
1999 Ok((tonic::Response::new(response), Weight::one()))
2000 }
2001
2002 async fn get_system_state_object_impl(
2003 &self,
2004 _request: tonic::Request<SystemStateRequest>,
2005 ) -> WrappedServiceResponse<SuiSystemState> {
2006 let response = self
2007 .state
2008 .get_object_cache_reader()
2009 .get_sui_system_state_object_unsafe()?;
2010 Ok((tonic::Response::new(response), Weight::one()))
2011 }
2012
2013 async fn validator_health_impl(
2014 &self,
2015 _request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
2016 ) -> WrappedServiceResponse<sui_types::messages_grpc::RawValidatorHealthResponse> {
2017 let state = &self.state;
2018
2019 let epoch_store = state.load_epoch_store_one_call_per_task();
2021
2022 let num_inflight_execution_transactions =
2024 state.execution_scheduler().num_pending_certificates() as u64;
2025
2026 let num_inflight_consensus_transactions =
2028 self.consensus_adapter.num_inflight_transactions();
2029
2030 let last_committed_leader_round = epoch_store
2032 .consensus_tx_status_cache
2033 .as_ref()
2034 .and_then(|cache| cache.get_last_committed_leader_round())
2035 .unwrap_or(0);
2036
2037 let last_locally_built_checkpoint = epoch_store
2039 .last_built_checkpoint_summary()
2040 .ok()
2041 .flatten()
2042 .map(|(_, summary)| summary.sequence_number)
2043 .unwrap_or(0);
2044
2045 let typed_response = sui_types::messages_grpc::ValidatorHealthResponse {
2046 num_inflight_consensus_transactions,
2047 num_inflight_execution_transactions,
2048 last_locally_built_checkpoint,
2049 last_committed_leader_round,
2050 };
2051
2052 let raw_response = typed_response
2053 .try_into()
2054 .map_err(|e: sui_types::error::SuiError| {
2055 tonic::Status::internal(format!("Failed to serialize health response: {}", e))
2056 })?;
2057
2058 Ok((tonic::Response::new(raw_response), Weight::one()))
2059 }
2060
2061 fn get_client_ip_addr<T>(
2062 &self,
2063 request: &tonic::Request<T>,
2064 source: &ClientIdSource,
2065 ) -> Option<IpAddr> {
2066 let forwarded_header = request.metadata().get_all("x-forwarded-for").iter().next();
2067
2068 if let Some(header) = forwarded_header {
2069 let num_hops = header
2070 .to_str()
2071 .map(|h| h.split(',').count().saturating_sub(1))
2072 .unwrap_or(0);
2073
2074 self.metrics.x_forwarded_for_num_hops.set(num_hops as f64);
2075 }
2076
2077 match source {
2078 ClientIdSource::SocketAddr => {
2079 let socket_addr: Option<SocketAddr> = request.remote_addr();
2080
2081 if let Some(socket_addr) = socket_addr {
2087 Some(socket_addr.ip())
2088 } else {
2089 if cfg!(msim) {
2090 } else if cfg!(test) {
2092 panic!("Failed to get remote address from request");
2093 } else {
2094 self.metrics.connection_ip_not_found.inc();
2095 error!("Failed to get remote address from request");
2096 }
2097 None
2098 }
2099 }
2100 ClientIdSource::XForwardedFor(num_hops) => {
2101 let do_header_parse = |op: &MetadataValue<Ascii>| {
2102 match op.to_str() {
2103 Ok(header_val) => {
2104 let header_contents =
2105 header_val.split(',').map(str::trim).collect::<Vec<_>>();
2106 if *num_hops == 0 {
2107 error!(
2108 "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
2109 number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
2110 to this node. Skipping traffic controller request handling.",
2111 header_contents,
2112 );
2113 return None;
2114 }
2115 let contents_len = header_contents.len();
2116 if contents_len < *num_hops {
2117 error!(
2118 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
2119 Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
2120 `client-id-source` in the node config.",
2121 header_contents, contents_len, num_hops, contents_len,
2122 );
2123 self.metrics.client_id_source_config_mismatch.inc();
2124 return None;
2125 }
2126 let Some(client_ip) = header_contents.get(contents_len - num_hops)
2127 else {
2128 error!(
2129 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
2130 Expected at least {} values. Skipping traffic controller request handling.",
2131 header_contents, contents_len, num_hops, contents_len,
2132 );
2133 return None;
2134 };
2135 parse_ip(client_ip).or_else(|| {
2136 self.metrics.forwarded_header_parse_error.inc();
2137 None
2138 })
2139 }
2140 Err(e) => {
2141 self.metrics.forwarded_header_invalid.inc();
2145 error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
2146 None
2147 }
2148 }
2149 };
2150 if let Some(op) = request.metadata().get("x-forwarded-for") {
2151 do_header_parse(op)
2152 } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
2153 do_header_parse(op)
2154 } else {
2155 self.metrics.forwarded_header_not_included.inc();
2156 error!(
2157 "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
2158 );
2159 None
2160 }
2161 }
2162 }
2163 }
2164
2165 async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
2166 if let Some(traffic_controller) = &self.traffic_controller {
2167 if !traffic_controller.check(&client, &None).await {
2168 Err(tonic::Status::from_error(
2170 SuiErrorKind::TooManyRequests.into(),
2171 ))
2172 } else {
2173 Ok(())
2174 }
2175 } else {
2176 Ok(())
2177 }
2178 }
2179
2180 fn handle_traffic_resp<T>(
2181 &self,
2182 client: Option<IpAddr>,
2183 wrapped_response: WrappedServiceResponse<T>,
2184 ) -> Result<tonic::Response<T>, tonic::Status> {
2185 let (error, spam_weight, unwrapped_response) = match wrapped_response {
2186 Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
2187 Err(status) => (
2188 Some(SuiError::from(status.clone())),
2189 Weight::zero(),
2190 Err(status.clone()),
2191 ),
2192 };
2193
2194 if let Some(traffic_controller) = self.traffic_controller.clone() {
2195 traffic_controller.tally(TrafficTally {
2196 direct: client,
2197 through_fullnode: None,
2198 error_info: error.map(|e| {
2199 let error_type = String::from(e.clone().as_ref());
2200 let error_weight = normalize(e);
2201 (error_weight, error_type)
2202 }),
2203 spam_weight,
2204 timestamp: SystemTime::now(),
2205 })
2206 }
2207 unwrapped_response
2208 }
2209}
2210
2211fn make_tonic_request_for_testing<T>(message: T) -> tonic::Request<T> {
2212 let mut request = tonic::Request::new(message);
2215 let tcp_connect_info = TcpConnectInfo {
2216 local_addr: None,
2217 remote_addr: Some(SocketAddr::new([127, 0, 0, 1].into(), 0)),
2218 };
2219 request.extensions_mut().insert(tcp_connect_info);
2220 request
2221}
2222
2223fn normalize(err: SuiError) -> Weight {
2225 match err.as_inner() {
2226 SuiErrorKind::UserInputError {
2227 error: UserInputError::IncorrectUserSignature { .. },
2228 } => Weight::one(),
2229 SuiErrorKind::InvalidSignature { .. }
2230 | SuiErrorKind::SignerSignatureAbsent { .. }
2231 | SuiErrorKind::SignerSignatureNumberMismatch { .. }
2232 | SuiErrorKind::IncorrectSigner { .. }
2233 | SuiErrorKind::UnknownSigner { .. }
2234 | SuiErrorKind::WrongEpoch { .. } => Weight::one(),
2235 _ => Weight::zero(),
2236 }
2237}
2238
2239#[macro_export]
2243macro_rules! handle_with_decoration {
2244 ($self:ident, $func_name:ident, $request:ident) => {{
2245 if $self.client_id_source.is_none() {
2246 return $self.$func_name($request).await.map(|(result, _)| result);
2247 }
2248
2249 let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
2250
2251 $self.handle_traffic_req(client.clone()).await?;
2253
2254 let wrapped_response = $self.$func_name($request).await;
2256 $self.handle_traffic_resp(client, wrapped_response)
2257 }};
2258}
2259
2260#[async_trait]
2261impl Validator for ValidatorService {
2262 async fn submit_transaction(
2263 &self,
2264 request: tonic::Request<RawSubmitTxRequest>,
2265 ) -> Result<tonic::Response<RawSubmitTxResponse>, tonic::Status> {
2266 let validator_service = self.clone();
2267
2268 spawn_monitored_task!(async move {
2271 handle_with_decoration!(validator_service, handle_submit_transaction_impl, request)
2274 })
2275 .await
2276 .unwrap()
2277 }
2278
2279 async fn transaction(
2280 &self,
2281 request: tonic::Request<Transaction>,
2282 ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
2283 let validator_service = self.clone();
2284
2285 spawn_monitored_task!(async move {
2288 handle_with_decoration!(validator_service, transaction_impl, request)
2291 })
2292 .await
2293 .unwrap()
2294 }
2295
2296 async fn submit_certificate(
2297 &self,
2298 request: tonic::Request<CertifiedTransaction>,
2299 ) -> Result<tonic::Response<SubmitCertificateResponse>, tonic::Status> {
2300 let validator_service = self.clone();
2301
2302 spawn_monitored_task!(async move {
2305 handle_with_decoration!(validator_service, submit_certificate_impl, request)
2308 })
2309 .await
2310 .unwrap()
2311 }
2312
2313 async fn handle_certificate_v2(
2314 &self,
2315 request: tonic::Request<CertifiedTransaction>,
2316 ) -> Result<tonic::Response<HandleCertificateResponseV2>, tonic::Status> {
2317 handle_with_decoration!(self, handle_certificate_v2_impl, request)
2318 }
2319
2320 async fn handle_certificate_v3(
2321 &self,
2322 request: tonic::Request<HandleCertificateRequestV3>,
2323 ) -> Result<tonic::Response<HandleCertificateResponseV3>, tonic::Status> {
2324 handle_with_decoration!(self, handle_certificate_v3_impl, request)
2325 }
2326
2327 async fn wait_for_effects(
2328 &self,
2329 request: tonic::Request<RawWaitForEffectsRequest>,
2330 ) -> Result<tonic::Response<RawWaitForEffectsResponse>, tonic::Status> {
2331 handle_with_decoration!(self, wait_for_effects_impl, request)
2332 }
2333
2334 async fn handle_soft_bundle_certificates_v3(
2335 &self,
2336 request: tonic::Request<HandleSoftBundleCertificatesRequestV3>,
2337 ) -> Result<tonic::Response<HandleSoftBundleCertificatesResponseV3>, tonic::Status> {
2338 handle_with_decoration!(self, handle_soft_bundle_certificates_v3_impl, request)
2339 }
2340
2341 async fn object_info(
2342 &self,
2343 request: tonic::Request<ObjectInfoRequest>,
2344 ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
2345 handle_with_decoration!(self, object_info_impl, request)
2346 }
2347
2348 async fn transaction_info(
2349 &self,
2350 request: tonic::Request<TransactionInfoRequest>,
2351 ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
2352 handle_with_decoration!(self, transaction_info_impl, request)
2353 }
2354
2355 async fn checkpoint(
2356 &self,
2357 request: tonic::Request<CheckpointRequest>,
2358 ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
2359 handle_with_decoration!(self, checkpoint_impl, request)
2360 }
2361
2362 async fn checkpoint_v2(
2363 &self,
2364 request: tonic::Request<CheckpointRequestV2>,
2365 ) -> Result<tonic::Response<CheckpointResponseV2>, tonic::Status> {
2366 handle_with_decoration!(self, checkpoint_v2_impl, request)
2367 }
2368
2369 async fn get_system_state_object(
2370 &self,
2371 request: tonic::Request<SystemStateRequest>,
2372 ) -> Result<tonic::Response<SuiSystemState>, tonic::Status> {
2373 handle_with_decoration!(self, get_system_state_object_impl, request)
2374 }
2375
2376 async fn validator_health(
2377 &self,
2378 request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
2379 ) -> Result<tonic::Response<sui_types::messages_grpc::RawValidatorHealthResponse>, tonic::Status>
2380 {
2381 handle_with_decoration!(self, validator_health_impl, request)
2382 }
2383}