1use anyhow::Result;
6use async_trait::async_trait;
7use fastcrypto::traits::KeyPair;
8use futures::{TryFutureExt, future};
9use itertools::Itertools as _;
10use mysten_metrics::spawn_monitored_task;
11use prometheus::{
12 Gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, Registry,
13 register_gauge_with_registry, register_histogram_vec_with_registry,
14 register_histogram_with_registry, register_int_counter_vec_with_registry,
15 register_int_counter_with_registry,
16};
17use std::{
18 cmp::Ordering,
19 future::Future,
20 io,
21 net::{IpAddr, SocketAddr},
22 pin::Pin,
23 sync::Arc,
24 time::{Duration, SystemTime},
25};
26use sui_network::{
27 api::{Validator, ValidatorServer},
28 tonic,
29 validator::server::SUI_TLS_SERVER_NAME,
30};
31use sui_types::message_envelope::Message;
32use sui_types::messages_consensus::ConsensusPosition;
33use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKind};
34use sui_types::messages_grpc::{
35 HandleCertificateRequestV3, HandleCertificateResponseV3, RawSubmitTxResponse,
36};
37use sui_types::messages_grpc::{
38 HandleCertificateResponseV2, HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse,
39 SubmitCertificateResponse, SystemStateRequest, TransactionInfoRequest, TransactionInfoResponse,
40};
41use sui_types::messages_grpc::{
42 HandleSoftBundleCertificatesRequestV3, HandleSoftBundleCertificatesResponseV3,
43};
44use sui_types::multiaddr::Multiaddr;
45use sui_types::object::Object;
46use sui_types::sui_system_state::SuiSystemState;
47use sui_types::traffic_control::{ClientIdSource, Weight};
48use sui_types::{
49 digests::{TransactionDigest, TransactionEffectsDigest},
50 error::{SuiErrorKind, UserInputError},
51};
52use sui_types::{
53 effects::TransactionEffects,
54 messages_grpc::{
55 ExecutedData, RawSubmitTxRequest, RawWaitForEffectsRequest, RawWaitForEffectsResponse,
56 SubmitTxResult, WaitForEffectsRequest, WaitForEffectsResponse,
57 },
58};
59use sui_types::{
60 effects::TransactionEffectsAPI, executable_transaction::VerifiedExecutableTransaction,
61};
62use sui_types::{effects::TransactionEvents, messages_grpc::SubmitTxType};
63use sui_types::{error::*, transaction::*};
64use sui_types::{
65 fp_ensure,
66 messages_checkpoint::{
67 CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
68 },
69};
70use tap::TapFallible;
71use tokio::sync::oneshot;
72use tokio::time::timeout;
73use tonic::metadata::{Ascii, MetadataValue};
74use tracing::{Instrument, debug, error, error_span, info, instrument};
75
76use crate::consensus_adapter::ConnectionMonitorStatusForTests;
77use crate::{
78 authority::{AuthorityState, consensus_tx_status_cache::ConsensusTxStatus},
79 consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics},
80 traffic_controller::{TrafficController, parse_ip, policies::TrafficTally},
81};
82use crate::{
83 authority::{
84 ExecutionEnv, authority_per_epoch_store::AuthorityPerEpochStore,
85 consensus_tx_status_cache::NotifyReadConsensusTxStatusResult,
86 shared_object_version_manager::Schedulable,
87 },
88 checkpoints::CheckpointStore,
89 execution_scheduler::SchedulingSource,
90 mysticeti_adapter::LazyMysticetiClient,
91 transaction_outputs::TransactionOutputs,
92};
93use nonempty::{NonEmpty, nonempty};
94use sui_config::local_ip_utils::new_local_tcp_address_for_testing;
95use sui_types::messages_grpc::PingType;
96use tonic::transport::server::TcpConnectInfo;
97
98#[cfg(test)]
99#[path = "unit_tests/server_tests.rs"]
100mod server_tests;
101
102#[cfg(test)]
103#[path = "unit_tests/wait_for_effects_tests.rs"]
104mod wait_for_effects_tests;
105
106#[cfg(test)]
107#[path = "unit_tests/submit_transaction_tests.rs"]
108mod submit_transaction_tests;
109
110pub struct AuthorityServerHandle {
111 server_handle: sui_network::validator::server::Server,
112}
113
114impl AuthorityServerHandle {
115 pub async fn join(self) -> Result<(), io::Error> {
116 self.server_handle.handle().wait_for_shutdown().await;
117 Ok(())
118 }
119
120 pub async fn kill(self) -> Result<(), io::Error> {
121 self.server_handle.handle().shutdown().await;
122 Ok(())
123 }
124
125 pub fn address(&self) -> &Multiaddr {
126 self.server_handle.local_addr()
127 }
128}
129
130pub struct AuthorityServer {
131 address: Multiaddr,
132 pub state: Arc<AuthorityState>,
133 consensus_adapter: Arc<ConsensusAdapter>,
134 pub metrics: Arc<ValidatorServiceMetrics>,
135}
136
137impl AuthorityServer {
138 pub fn new_for_test_with_consensus_adapter(
139 state: Arc<AuthorityState>,
140 consensus_adapter: Arc<ConsensusAdapter>,
141 ) -> Self {
142 let address = new_local_tcp_address_for_testing();
143 let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
144
145 Self {
146 address,
147 state,
148 consensus_adapter,
149 metrics,
150 }
151 }
152
153 pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
154 let consensus_adapter = Arc::new(ConsensusAdapter::new(
155 Arc::new(LazyMysticetiClient::new()),
156 CheckpointStore::new_for_tests(),
157 state.name,
158 Arc::new(ConnectionMonitorStatusForTests {}),
159 100_000,
160 100_000,
161 None,
162 None,
163 ConsensusAdapterMetrics::new_test(),
164 state.epoch_store_for_testing().protocol_config().clone(),
165 ));
166 Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
167 }
168
169 pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
170 let address = self.address.clone();
171 self.spawn_with_bind_address_for_test(address).await
172 }
173
174 pub async fn spawn_with_bind_address_for_test(
175 self,
176 address: Multiaddr,
177 ) -> Result<AuthorityServerHandle, io::Error> {
178 let tls_config = sui_tls::create_rustls_server_config(
179 self.state.config.network_key_pair().copy().private(),
180 SUI_TLS_SERVER_NAME.to_string(),
181 );
182 let config = mysten_network::config::Config::new();
183 let server = sui_network::validator::server::ServerBuilder::from_config(
184 &config,
185 mysten_network::metrics::DefaultMetricsCallbackProvider::default(),
186 )
187 .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
188 self.state,
189 self.consensus_adapter,
190 self.metrics,
191 )))
192 .bind(&address, Some(tls_config))
193 .await
194 .unwrap();
195 let local_addr = server.local_addr().to_owned();
196 info!("Listening to traffic on {local_addr}");
197 let handle = AuthorityServerHandle {
198 server_handle: server,
199 };
200 Ok(handle)
201 }
202}
203
204pub struct ValidatorServiceMetrics {
205 pub signature_errors: IntCounter,
206 pub tx_verification_latency: Histogram,
207 pub cert_verification_latency: Histogram,
208 pub consensus_latency: Histogram,
209 pub handle_transaction_latency: Histogram,
210 pub submit_certificate_consensus_latency: Histogram,
211 pub handle_certificate_consensus_latency: Histogram,
212 pub handle_certificate_non_consensus_latency: Histogram,
213 pub handle_soft_bundle_certificates_consensus_latency: Histogram,
214 pub handle_soft_bundle_certificates_count: Histogram,
215 pub handle_soft_bundle_certificates_size_bytes: Histogram,
216 pub handle_transaction_consensus_latency: Histogram,
217 pub handle_submit_transaction_consensus_latency: HistogramVec,
218 pub handle_wait_for_effects_ping_latency: HistogramVec,
219
220 handle_submit_transaction_latency: HistogramVec,
221 handle_submit_transaction_bytes: HistogramVec,
222 handle_submit_transaction_batch_size: HistogramVec,
223
224 num_rejected_tx_in_epoch_boundary: IntCounter,
225 num_rejected_cert_in_epoch_boundary: IntCounter,
226 num_rejected_tx_during_overload: IntCounterVec,
227 num_rejected_cert_during_overload: IntCounterVec,
228 connection_ip_not_found: IntCounter,
229 forwarded_header_parse_error: IntCounter,
230 forwarded_header_invalid: IntCounter,
231 forwarded_header_not_included: IntCounter,
232 client_id_source_config_mismatch: IntCounter,
233 x_forwarded_for_num_hops: Gauge,
234}
235
236impl ValidatorServiceMetrics {
237 pub fn new(registry: &Registry) -> Self {
238 Self {
239 signature_errors: register_int_counter_with_registry!(
240 "total_signature_errors",
241 "Number of transaction signature errors",
242 registry,
243 )
244 .unwrap(),
245 tx_verification_latency: register_histogram_with_registry!(
246 "validator_service_tx_verification_latency",
247 "Latency of verifying a transaction",
248 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
249 registry,
250 )
251 .unwrap(),
252 cert_verification_latency: register_histogram_with_registry!(
253 "validator_service_cert_verification_latency",
254 "Latency of verifying a certificate",
255 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
256 registry,
257 )
258 .unwrap(),
259 consensus_latency: register_histogram_with_registry!(
260 "validator_service_consensus_latency",
261 "Time spent between submitting a txn to consensus and getting back local acknowledgement. Execution and finalization time are not included.",
262 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
263 registry,
264 )
265 .unwrap(),
266 handle_transaction_latency: register_histogram_with_registry!(
267 "validator_service_handle_transaction_latency",
268 "Latency of handling a transaction",
269 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
270 registry,
271 )
272 .unwrap(),
273 handle_certificate_consensus_latency: register_histogram_with_registry!(
274 "validator_service_handle_certificate_consensus_latency",
275 "Latency of handling a consensus transaction certificate",
276 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
277 registry,
278 )
279 .unwrap(),
280 submit_certificate_consensus_latency: register_histogram_with_registry!(
281 "validator_service_submit_certificate_consensus_latency",
282 "Latency of submit_certificate RPC handler",
283 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
284 registry,
285 )
286 .unwrap(),
287 handle_certificate_non_consensus_latency: register_histogram_with_registry!(
288 "validator_service_handle_certificate_non_consensus_latency",
289 "Latency of handling a non-consensus transaction certificate",
290 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
291 registry,
292 )
293 .unwrap(),
294 handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
295 "validator_service_handle_soft_bundle_certificates_consensus_latency",
296 "Latency of handling a consensus soft bundle",
297 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
298 registry,
299 )
300 .unwrap(),
301 handle_soft_bundle_certificates_count: register_histogram_with_registry!(
302 "handle_soft_bundle_certificates_count",
303 "The number of certificates included in a soft bundle",
304 mysten_metrics::COUNT_BUCKETS.to_vec(),
305 registry,
306 )
307 .unwrap(),
308 handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
309 "handle_soft_bundle_certificates_size_bytes",
310 "The size of soft bundle in bytes",
311 mysten_metrics::BYTES_BUCKETS.to_vec(),
312 registry,
313 )
314 .unwrap(),
315 handle_transaction_consensus_latency: register_histogram_with_registry!(
316 "validator_service_handle_transaction_consensus_latency",
317 "Latency of handling a user transaction sent through consensus",
318 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
319 registry,
320 )
321 .unwrap(),
322 handle_submit_transaction_consensus_latency: register_histogram_vec_with_registry!(
323 "validator_service_submit_transaction_consensus_latency",
324 "Latency of submitting a user transaction sent through consensus",
325 &["req_type"],
326 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
327 registry,
328 )
329 .unwrap(),
330 handle_submit_transaction_latency: register_histogram_vec_with_registry!(
331 "validator_service_submit_transaction_latency",
332 "Latency of submit transaction handler",
333 &["req_type"],
334 mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
335 registry,
336 )
337 .unwrap(),
338 handle_wait_for_effects_ping_latency: register_histogram_vec_with_registry!(
339 "validator_service_handle_wait_for_effects_ping_latency",
340 "Latency of handling a ping request for wait_for_effects",
341 &["req_type"],
342 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
343 registry,
344 )
345 .unwrap(),
346 handle_submit_transaction_bytes: register_histogram_vec_with_registry!(
347 "validator_service_submit_transaction_bytes",
348 "The size of transactions in the submit transaction request",
349 &["req_type"],
350 mysten_metrics::BYTES_BUCKETS.to_vec(),
351 registry,
352 )
353 .unwrap(),
354 handle_submit_transaction_batch_size: register_histogram_vec_with_registry!(
355 "validator_service_submit_transaction_batch_size",
356 "The number of transactions in the submit transaction request",
357 &["req_type"],
358 mysten_metrics::COUNT_BUCKETS.to_vec(),
359 registry,
360 )
361 .unwrap(),
362 num_rejected_tx_in_epoch_boundary: register_int_counter_with_registry!(
363 "validator_service_num_rejected_tx_in_epoch_boundary",
364 "Number of rejected transaction during epoch transitioning",
365 registry,
366 )
367 .unwrap(),
368 num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
369 "validator_service_num_rejected_cert_in_epoch_boundary",
370 "Number of rejected transaction certificate during epoch transitioning",
371 registry,
372 )
373 .unwrap(),
374 num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
375 "validator_service_num_rejected_tx_during_overload",
376 "Number of rejected transaction due to system overload",
377 &["error_type"],
378 registry,
379 )
380 .unwrap(),
381 num_rejected_cert_during_overload: register_int_counter_vec_with_registry!(
382 "validator_service_num_rejected_cert_during_overload",
383 "Number of rejected transaction certificate due to system overload",
384 &["error_type"],
385 registry,
386 )
387 .unwrap(),
388 connection_ip_not_found: register_int_counter_with_registry!(
389 "validator_service_connection_ip_not_found",
390 "Number of times connection IP was not extractable from request",
391 registry,
392 )
393 .unwrap(),
394 forwarded_header_parse_error: register_int_counter_with_registry!(
395 "validator_service_forwarded_header_parse_error",
396 "Number of times x-forwarded-for header could not be parsed",
397 registry,
398 )
399 .unwrap(),
400 forwarded_header_invalid: register_int_counter_with_registry!(
401 "validator_service_forwarded_header_invalid",
402 "Number of times x-forwarded-for header was invalid",
403 registry,
404 )
405 .unwrap(),
406 forwarded_header_not_included: register_int_counter_with_registry!(
407 "validator_service_forwarded_header_not_included",
408 "Number of times x-forwarded-for header was (unexpectedly) not included in request",
409 registry,
410 )
411 .unwrap(),
412 client_id_source_config_mismatch: register_int_counter_with_registry!(
413 "validator_service_client_id_source_config_mismatch",
414 "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
415 registry,
416 )
417 .unwrap(),
418 x_forwarded_for_num_hops: register_gauge_with_registry!(
419 "validator_service_x_forwarded_for_num_hops",
420 "Number of hops in x-forwarded-for header",
421 registry,
422 )
423 .unwrap(),
424 }
425 }
426
427 pub fn new_for_tests() -> Self {
428 let registry = Registry::new();
429 Self::new(®istry)
430 }
431}
432
433#[derive(Clone)]
434pub struct ValidatorService {
435 state: Arc<AuthorityState>,
436 consensus_adapter: Arc<ConsensusAdapter>,
437 metrics: Arc<ValidatorServiceMetrics>,
438 traffic_controller: Option<Arc<TrafficController>>,
439 client_id_source: Option<ClientIdSource>,
440}
441
442impl ValidatorService {
443 pub fn new(
444 state: Arc<AuthorityState>,
445 consensus_adapter: Arc<ConsensusAdapter>,
446 validator_metrics: Arc<ValidatorServiceMetrics>,
447 client_id_source: Option<ClientIdSource>,
448 ) -> Self {
449 let traffic_controller = state.traffic_controller.clone();
450 Self {
451 state,
452 consensus_adapter,
453 metrics: validator_metrics,
454 traffic_controller,
455 client_id_source,
456 }
457 }
458
459 pub fn new_for_tests(
460 state: Arc<AuthorityState>,
461 consensus_adapter: Arc<ConsensusAdapter>,
462 metrics: Arc<ValidatorServiceMetrics>,
463 ) -> Self {
464 Self {
465 state,
466 consensus_adapter,
467 metrics,
468 traffic_controller: None,
469 client_id_source: None,
470 }
471 }
472
473 pub fn validator_state(&self) -> &Arc<AuthorityState> {
474 &self.state
475 }
476
477 pub async fn execute_certificate_for_testing(
478 &self,
479 cert: CertifiedTransaction,
480 ) -> Result<tonic::Response<HandleCertificateResponseV2>, tonic::Status> {
481 let request = make_tonic_request_for_testing(cert);
482 self.handle_certificate_v2(request).await
483 }
484
485 pub async fn handle_transaction_for_benchmarking(
486 &self,
487 transaction: Transaction,
488 ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
489 let request = make_tonic_request_for_testing(transaction);
490 self.transaction(request).await
491 }
492
493 async fn handle_transaction(
496 &self,
497 request: tonic::Request<Transaction>,
498 ) -> WrappedServiceResponse<HandleTransactionResponse> {
499 let Self {
500 state,
501 consensus_adapter,
502 metrics,
503 traffic_controller: _,
504 client_id_source: _,
505 } = self.clone();
506 let transaction = request.into_inner();
507 let epoch_store = state.load_epoch_store_one_call_per_task();
508
509 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
510
511 let mut validator_pushback_error = None;
519 let overload_check_res = state.check_system_overload(
520 &*consensus_adapter,
521 transaction.data(),
522 state.check_system_overload_at_signing(),
523 );
524 if let Err(error) = overload_check_res {
525 metrics
526 .num_rejected_tx_during_overload
527 .with_label_values(&[error.as_ref()])
528 .inc();
529 match error.as_inner() {
531 SuiErrorKind::ValidatorOverloadedRetryAfter { .. } => {
532 validator_pushback_error = Some(error)
533 }
534 _ => return Err(error.into()),
535 }
536 }
537
538 let _handle_tx_metrics_guard = metrics.handle_transaction_latency.start_timer();
539
540 let tx_verif_metrics_guard = metrics.tx_verification_latency.start_timer();
541 let transaction = epoch_store.verify_transaction(transaction).tap_err(|_| {
542 metrics.signature_errors.inc();
543 })?;
544 drop(tx_verif_metrics_guard);
545
546 let tx_digest = transaction.digest();
547
548 let span = error_span!("ValidatorService::validator_state_process_tx", ?tx_digest);
550
551 let info = state
552 .handle_transaction(&epoch_store, transaction.clone())
553 .instrument(span)
554 .await
555 .tap_err(|e| {
556 if let SuiErrorKind::ValidatorHaltedAtEpochEnd = e.as_inner() {
557 metrics.num_rejected_tx_in_epoch_boundary.inc();
558 }
559 })?;
560
561 if let Some(error) = validator_pushback_error {
562 return Err(error.into());
565 }
566
567 Ok((tonic::Response::new(info), Weight::zero()))
568 }
569
570 #[instrument(
571 name = "ValidatorService::handle_submit_transaction",
572 level = "error",
573 skip_all,
574 err(level = "debug")
575 )]
576 async fn handle_submit_transaction(
577 &self,
578 request: tonic::Request<RawSubmitTxRequest>,
579 ) -> WrappedServiceResponse<RawSubmitTxResponse> {
580 let Self {
581 state,
582 consensus_adapter,
583 metrics,
584 traffic_controller: _,
585 client_id_source,
586 } = self.clone();
587
588 let submitter_client_addr = if let Some(client_id_source) = &client_id_source {
589 self.get_client_ip_addr(&request, client_id_source)
590 } else {
591 self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
592 };
593
594 let epoch_store = state.load_epoch_store_one_call_per_task();
595 if !epoch_store.protocol_config().mysticeti_fastpath() {
596 return Err(SuiErrorKind::UnsupportedFeatureError {
597 error: "Mysticeti fastpath".to_string(),
598 }
599 .into());
600 }
601
602 let request = request.into_inner();
603 let submit_type = SubmitTxType::try_from(request.submit_type).map_err(|e| {
604 SuiErrorKind::GrpcMessageDeserializeError {
605 type_info: "RawSubmitTxRequest.submit_type".to_string(),
606 error: e.to_string(),
607 }
608 })?;
609
610 let is_ping_request = submit_type == SubmitTxType::Ping;
611 if is_ping_request {
612 fp_ensure!(
613 request.transactions.is_empty(),
614 SuiErrorKind::InvalidRequest(format!(
615 "Ping request cannot contain {} transactions",
616 request.transactions.len()
617 ))
618 .into()
619 );
620 } else {
621 fp_ensure!(
623 !request.transactions.is_empty(),
624 SuiErrorKind::InvalidRequest(
625 "At least one transaction needs to be submitted".to_string(),
626 )
627 .into()
628 );
629 }
630
631 let is_soft_bundle_request = submit_type == SubmitTxType::SoftBundle;
636
637 let max_num_transactions = if is_soft_bundle_request {
638 epoch_store.protocol_config().max_soft_bundle_size()
641 } else {
642 epoch_store
644 .protocol_config()
645 .max_num_transactions_in_block()
646 };
647 fp_ensure!(
648 request.transactions.len() <= max_num_transactions as usize,
649 SuiErrorKind::InvalidRequest(format!(
650 "Too many transactions in request: {} vs {}",
651 request.transactions.len(),
652 max_num_transactions
653 ))
654 .into()
655 );
656
657 let mut tx_digests = Vec::with_capacity(request.transactions.len());
659 let mut consensus_transactions = Vec::with_capacity(request.transactions.len());
661 let mut transaction_indexes = Vec::with_capacity(request.transactions.len());
663 let mut results: Vec<Option<SubmitTxResult>> = vec![None; request.transactions.len()];
665 let mut total_size_bytes = 0;
667
668 let req_type = if is_ping_request {
669 "ping"
670 } else if request.transactions.len() == 1 {
671 "single_transaction"
672 } else if is_soft_bundle_request {
673 "soft_bundle"
674 } else {
675 "batch"
676 };
677
678 let _handle_tx_metrics_guard = metrics
679 .handle_submit_transaction_latency
680 .with_label_values(&[req_type])
681 .start_timer();
682
683 for (idx, tx_bytes) in request.transactions.iter().enumerate() {
684 let transaction = match bcs::from_bytes::<Transaction>(tx_bytes) {
685 Ok(txn) => txn,
686 Err(e) => {
687 return Err(SuiErrorKind::TransactionDeserializationError {
689 error: format!("Failed to deserialize transaction at index {}: {}", idx, e),
690 }
691 .into());
692 }
693 };
694
695 let tx_size = transaction.validity_check(&epoch_store.tx_validity_check_context())?;
697
698 let overload_check_res = self.state.check_system_overload(
699 &*consensus_adapter,
700 transaction.data(),
701 state.check_system_overload_at_signing(),
702 );
703 if let Err(error) = overload_check_res {
704 metrics
705 .num_rejected_tx_during_overload
706 .with_label_values(&[error.as_ref()])
707 .inc();
708 results[idx] = Some(SubmitTxResult::Rejected { error });
709 continue;
710 }
711
712 let verified_transaction = {
714 let _metrics_guard = metrics.tx_verification_latency.start_timer();
715 match epoch_store.verify_transaction(transaction) {
716 Ok(txn) => txn,
717 Err(e) => {
718 metrics.signature_errors.inc();
719 return Err(e.into());
720 }
721 }
722 };
723
724 let tx_digest = verified_transaction.digest();
725 tx_digests.push(*tx_digest);
726
727 debug!(
728 ?tx_digest,
729 "handle_submit_transaction: verified transaction"
730 );
731
732 if let Some(effects) = self
735 .state
736 .get_transaction_cache_reader()
737 .get_executed_effects(tx_digest)
738 {
739 let effects_digest = effects.digest();
740 if let Ok(executed_data) = self.complete_executed_data(effects, None).await {
741 let executed_result = SubmitTxResult::Executed {
742 effects_digest,
743 details: Some(executed_data),
744 fast_path: false,
745 };
746 results[idx] = Some(executed_result);
747 debug!(?tx_digest, "handle_submit_transaction: already executed");
748 continue;
749 }
750 }
751
752 debug!(
753 ?tx_digest,
754 "handle_submit_transaction: waiting for fastpath dependency objects"
755 );
756 if !state
757 .wait_for_fastpath_dependency_objects(&verified_transaction, epoch_store.epoch())
758 .await?
759 {
760 debug!(
761 ?tx_digest,
762 "fastpath input objects are still unavailable after waiting"
763 );
764 }
765
766 match state.handle_vote_transaction(&epoch_store, verified_transaction.clone()) {
767 Ok(_) => { }
768 Err(e) => {
769 if let Some(effects) = self
772 .state
773 .get_transaction_cache_reader()
774 .get_executed_effects(tx_digest)
775 {
776 let effects_digest = effects.digest();
777 if let Ok(executed_data) = self.complete_executed_data(effects, None).await
778 {
779 let executed_result = SubmitTxResult::Executed {
780 effects_digest,
781 details: Some(executed_data),
782 fast_path: false,
783 };
784 results[idx] = Some(executed_result);
785 continue;
786 }
787 }
788 results[idx] = Some(SubmitTxResult::Rejected { error: e });
790 continue;
791 }
792 }
793
794 consensus_transactions.push(ConsensusTransaction::new_user_transaction_message(
795 &self.state.name,
796 verified_transaction.into(),
797 ));
798 transaction_indexes.push(idx);
799 total_size_bytes += tx_size;
800 }
801
802 if consensus_transactions.is_empty() && !is_ping_request {
803 return Ok((
804 tonic::Response::new(Self::try_from_submit_tx_response(results)?),
805 Weight::zero(),
806 ));
807 }
808
809 let max_transaction_bytes = if is_soft_bundle_request {
813 epoch_store
814 .protocol_config()
815 .consensus_max_transactions_in_block_bytes()
816 / 2
817 } else {
818 epoch_store
819 .protocol_config()
820 .consensus_max_transactions_in_block_bytes()
821 };
822 fp_ensure!(
823 total_size_bytes <= max_transaction_bytes as usize,
824 SuiErrorKind::UserInputError {
825 error: UserInputError::TotalTransactionSizeTooLargeInBatch {
826 size: total_size_bytes,
827 limit: max_transaction_bytes,
828 },
829 }
830 .into()
831 );
832
833 metrics
834 .handle_submit_transaction_bytes
835 .with_label_values(&[req_type])
836 .observe(total_size_bytes as f64);
837 metrics
838 .handle_submit_transaction_batch_size
839 .with_label_values(&[req_type])
840 .observe(consensus_transactions.len() as f64);
841
842 let _latency_metric_guard = metrics
843 .handle_submit_transaction_consensus_latency
844 .with_label_values(&[req_type])
845 .start_timer();
846
847 let consensus_positions = if is_soft_bundle_request || is_ping_request {
848 assert!(
851 is_ping_request || !consensus_transactions.is_empty(),
852 "A valid soft bundle must have at least one transaction"
853 );
854 debug!(
855 "handle_submit_transaction: submitting consensus transactions ({}): {}",
856 req_type,
857 consensus_transactions
858 .iter()
859 .map(|t| t.local_display())
860 .join(", ")
861 );
862 self.handle_submit_to_consensus_for_position(
863 consensus_transactions,
864 &epoch_store,
865 submitter_client_addr,
866 )
867 .await?
868 } else {
869 let futures = consensus_transactions.into_iter().map(|t| {
870 debug!(
871 "handle_submit_transaction: submitting consensus transaction ({}): {}",
872 req_type,
873 t.local_display(),
874 );
875 self.handle_submit_to_consensus_for_position(
876 vec![t],
877 &epoch_store,
878 submitter_client_addr,
879 )
880 });
881 future::try_join_all(futures)
882 .await?
883 .into_iter()
884 .flatten()
885 .collect()
886 };
887
888 if is_ping_request {
889 assert_eq!(consensus_positions.len(), 1);
891 results.push(Some(SubmitTxResult::Submitted {
892 consensus_position: consensus_positions[0],
893 }));
894 } else {
895 for ((idx, tx_digest), consensus_position) in transaction_indexes
897 .into_iter()
898 .zip(tx_digests)
899 .zip(consensus_positions)
900 {
901 debug!(
902 ?tx_digest,
903 "handle_submit_transaction: submitted consensus transaction at {}",
904 consensus_position,
905 );
906 results[idx] = Some(SubmitTxResult::Submitted { consensus_position });
907 }
908 }
909
910 Ok((
911 tonic::Response::new(Self::try_from_submit_tx_response(results)?),
912 Weight::zero(),
913 ))
914 }
915
916 fn try_from_submit_tx_response(
917 results: Vec<Option<SubmitTxResult>>,
918 ) -> Result<RawSubmitTxResponse, SuiError> {
919 let mut raw_results = Vec::new();
920 for (i, result) in results.into_iter().enumerate() {
921 let result = result.ok_or_else(|| SuiErrorKind::GenericAuthorityError {
922 error: format!("Missing transaction result at {}", i),
923 })?;
924 let raw_result = result.try_into()?;
925 raw_results.push(raw_result);
926 }
927 Ok(RawSubmitTxResponse {
928 results: raw_results,
929 })
930 }
931
932 async fn handle_certificates(
938 &self,
939 certificates: NonEmpty<CertifiedTransaction>,
940 include_events: bool,
941 include_input_objects: bool,
942 include_output_objects: bool,
943 include_auxiliary_data: bool,
944 epoch_store: &Arc<AuthorityPerEpochStore>,
945 wait_for_effects: bool,
946 ) -> Result<(Option<Vec<HandleCertificateResponseV3>>, Weight), tonic::Status> {
947 fp_ensure!(
950 !self.state.is_fullnode(epoch_store),
951 SuiErrorKind::FullNodeCantHandleCertificate.into()
952 );
953
954 let is_consensus_tx = certificates.iter().any(|cert| cert.is_consensus_tx());
955
956 let metrics = if certificates.len() == 1 {
957 if wait_for_effects {
958 if is_consensus_tx {
959 &self.metrics.handle_certificate_consensus_latency
960 } else {
961 &self.metrics.handle_certificate_non_consensus_latency
962 }
963 } else {
964 &self.metrics.submit_certificate_consensus_latency
965 }
966 } else {
967 &self
969 .metrics
970 .handle_soft_bundle_certificates_consensus_latency
971 };
972
973 let _metrics_guard = metrics.start_timer();
974
975 if certificates.len() == 1 {
979 let tx_digest = *certificates[0].digest();
980 debug!(tx_digest=?tx_digest, "Checking if certificate is already executed");
981
982 if let Some(signed_effects) = self
983 .state
984 .get_signed_effects_and_maybe_resign(&tx_digest, epoch_store)?
985 {
986 let events = if include_events && signed_effects.events_digest().is_some() {
987 Some(
988 self.state
989 .get_transaction_events(signed_effects.transaction_digest())?,
990 )
991 } else {
992 None
993 };
994
995 return Ok((
996 Some(vec![HandleCertificateResponseV3 {
997 effects: signed_effects.into_inner(),
998 events,
999 input_objects: None,
1000 output_objects: None,
1001 auxiliary_data: None,
1002 }]),
1003 Weight::one(),
1004 ));
1005 };
1006 }
1007
1008 for certificate in &certificates {
1011 let overload_check_res = self.state.check_system_overload(
1012 &*self.consensus_adapter,
1013 certificate.data(),
1014 self.state.check_system_overload_at_execution(),
1015 );
1016 if let Err(error) = overload_check_res {
1017 self.metrics
1018 .num_rejected_cert_during_overload
1019 .with_label_values(&[error.as_ref()])
1020 .inc();
1021 return Err(error.into());
1022 }
1023 }
1024
1025 let verified_certificates = {
1026 let _timer = self.metrics.cert_verification_latency.start_timer();
1027 epoch_store
1028 .signature_verifier
1029 .multi_verify_certs(certificates.into())
1030 .await
1031 .into_iter()
1032 .collect::<Result<Vec<_>, _>>()?
1033 };
1034 let consensus_transactions =
1035 NonEmpty::collect(verified_certificates.iter().map(|certificate| {
1036 ConsensusTransaction::new_certificate_message(
1037 &self.state.name,
1038 certificate.clone().into(),
1039 )
1040 }))
1041 .unwrap();
1042
1043 let (responses, weight) = self
1044 .handle_submit_to_consensus(
1045 consensus_transactions,
1046 include_events,
1047 include_input_objects,
1048 include_output_objects,
1049 include_auxiliary_data,
1050 epoch_store,
1051 wait_for_effects,
1052 )
1053 .await?;
1054 let responses = if let Some(responses) = responses {
1056 Some(
1057 responses
1058 .into_iter()
1059 .map(|response| {
1060 let signed_effects =
1061 self.state.sign_effects(response.effects, epoch_store)?;
1062 Ok(HandleCertificateResponseV3 {
1063 effects: signed_effects.into_inner(),
1064 events: response.events,
1065 input_objects: if response.input_objects.is_empty() {
1066 None
1067 } else {
1068 Some(response.input_objects)
1069 },
1070 output_objects: if response.output_objects.is_empty() {
1071 None
1072 } else {
1073 Some(response.output_objects)
1074 },
1075 auxiliary_data: None,
1076 })
1077 })
1078 .collect::<Result<Vec<HandleCertificateResponseV3>, tonic::Status>>()?,
1079 )
1080 } else {
1081 None
1082 };
1083
1084 Ok((responses, weight))
1085 }
1086
1087 #[instrument(
1088 name = "ValidatorService::handle_submit_to_consensus_for_position",
1089 level = "debug",
1090 skip_all,
1091 err(level = "debug")
1092 )]
1093 async fn handle_submit_to_consensus_for_position(
1094 &self,
1095 consensus_transactions: Vec<ConsensusTransaction>,
1097 epoch_store: &Arc<AuthorityPerEpochStore>,
1098 submitter_client_addr: Option<IpAddr>,
1099 ) -> Result<Vec<ConsensusPosition>, tonic::Status> {
1100 let (tx_consensus_positions, rx_consensus_positions) = oneshot::channel();
1101
1102 {
1103 let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
1105 if !reconfiguration_lock.should_accept_user_certs() {
1106 self.metrics.num_rejected_cert_in_epoch_boundary.inc();
1107 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1108 }
1109
1110 let _metrics_guard = self.metrics.consensus_latency.start_timer();
1114
1115 self.consensus_adapter.submit_batch(
1116 &consensus_transactions,
1117 Some(&reconfiguration_lock),
1118 epoch_store,
1119 Some(tx_consensus_positions),
1120 submitter_client_addr,
1121 )?;
1122 }
1123
1124 Ok(rx_consensus_positions.await.map_err(|e| {
1125 SuiErrorKind::FailedToSubmitToConsensus(format!(
1126 "Failed to get consensus position: {e}"
1127 ))
1128 })?)
1129 }
1130
1131 async fn handle_submit_to_consensus(
1132 &self,
1133 consensus_transactions: NonEmpty<ConsensusTransaction>,
1134 include_events: bool,
1135 include_input_objects: bool,
1136 include_output_objects: bool,
1137 _include_auxiliary_data: bool,
1138 epoch_store: &Arc<AuthorityPerEpochStore>,
1139 wait_for_effects: bool,
1140 ) -> Result<(Option<Vec<ExecutedData>>, Weight), tonic::Status> {
1141 let consensus_transactions: Vec<_> = consensus_transactions.into();
1142 {
1143 let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
1145 if !reconfiguration_lock.should_accept_user_certs() {
1146 self.metrics.num_rejected_cert_in_epoch_boundary.inc();
1147 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1148 }
1149
1150 if !epoch_store.all_external_consensus_messages_processed(
1156 consensus_transactions.iter().map(|tx| tx.key()),
1157 )? {
1158 let _metrics_guard = self.metrics.consensus_latency.start_timer();
1159 self.consensus_adapter.submit_batch(
1160 &consensus_transactions,
1161 Some(&reconfiguration_lock),
1162 epoch_store,
1163 None,
1164 None, )?;
1166 }
1169 }
1170
1171 if !wait_for_effects {
1172 let fast_path_certificates = consensus_transactions
1175 .iter()
1176 .filter_map(|tx| {
1177 if let ConsensusTransactionKind::CertifiedTransaction(certificate) = &tx.kind {
1178 (!certificate.is_consensus_tx())
1179 .then_some((
1181 VerifiedExecutableTransaction::new_from_certificate(
1182 VerifiedCertificate::new_unchecked(*(certificate.clone())),
1183 ),
1184 ExecutionEnv::new()
1185 .with_scheduling_source(SchedulingSource::NonFastPath),
1186 ))
1187 } else {
1188 None
1189 }
1190 })
1191 .map(|(tx, env)| (Schedulable::Transaction(tx), env))
1192 .collect::<Vec<_>>();
1193 if !fast_path_certificates.is_empty() {
1194 self.state
1195 .execution_scheduler()
1196 .enqueue(fast_path_certificates, epoch_store);
1197 }
1198 return Ok((None, Weight::zero()));
1199 }
1200
1201 let responses = futures::future::try_join_all(consensus_transactions.into_iter().map(
1204 |tx| async move {
1205 let effects = match &tx.kind {
1206 ConsensusTransactionKind::CertifiedTransaction(certificate) => {
1207 let certificate = VerifiedCertificate::new_unchecked(*(certificate.clone()));
1209 self.state
1210 .wait_for_certificate_execution(&certificate, epoch_store)
1211 .await?
1212 }
1213 ConsensusTransactionKind::UserTransaction(tx) => {
1214 self.state.await_transaction_effects(*tx.digest(), epoch_store).await?
1215 }
1216 _ => panic!("`handle_submit_to_consensus` received transaction that is not a CertifiedTransaction or UserTransaction"),
1217 };
1218 let events = if include_events && effects.events_digest().is_some() {
1219 Some(self.state.get_transaction_events(effects.transaction_digest())?)
1220 } else {
1221 None
1222 };
1223
1224 let input_objects = if include_input_objects {
1225 self.state.get_transaction_input_objects(&effects)?
1226 } else {
1227 vec![]
1228 };
1229
1230 let output_objects = if include_output_objects {
1231 self.state.get_transaction_output_objects(&effects)?
1232 } else {
1233 vec![]
1234 };
1235
1236 if let ConsensusTransactionKind::CertifiedTransaction(certificate) = &tx.kind {
1237 epoch_store.insert_tx_cert_sig(certificate.digest(), certificate.auth_sig())?;
1238 }
1239
1240 Ok::<_, SuiError>(ExecutedData {
1241 effects,
1242 events,
1243 input_objects,
1244 output_objects,
1245 })
1246 },
1247 ))
1248 .await?;
1249
1250 Ok((Some(responses), Weight::zero()))
1251 }
1252
1253 async fn collect_effects_data(
1254 &self,
1255 effects: &TransactionEffects,
1256 include_events: bool,
1257 include_input_objects: bool,
1258 include_output_objects: bool,
1259 fastpath_outputs: Option<Arc<TransactionOutputs>>,
1260 ) -> SuiResult<(Option<TransactionEvents>, Vec<Object>, Vec<Object>)> {
1261 let events = if include_events && effects.events_digest().is_some() {
1262 if let Some(fastpath_outputs) = &fastpath_outputs {
1263 Some(fastpath_outputs.events.clone())
1264 } else {
1265 Some(
1266 self.state
1267 .get_transaction_events(effects.transaction_digest())?,
1268 )
1269 }
1270 } else {
1271 None
1272 };
1273
1274 let input_objects = if include_input_objects {
1275 self.state.get_transaction_input_objects(effects)?
1276 } else {
1277 vec![]
1278 };
1279
1280 let output_objects = if include_output_objects {
1281 if let Some(fastpath_outputs) = &fastpath_outputs {
1282 fastpath_outputs.written.values().cloned().collect()
1283 } else {
1284 self.state.get_transaction_output_objects(effects)?
1285 }
1286 } else {
1287 vec![]
1288 };
1289
1290 Ok((events, input_objects, output_objects))
1291 }
1292}
1293
1294type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
1295
1296impl ValidatorService {
1297 async fn transaction_impl(
1298 &self,
1299 request: tonic::Request<Transaction>,
1300 ) -> WrappedServiceResponse<HandleTransactionResponse> {
1301 self.handle_transaction(request).await
1302 }
1303
1304 async fn handle_submit_transaction_impl(
1305 &self,
1306 request: tonic::Request<RawSubmitTxRequest>,
1307 ) -> WrappedServiceResponse<RawSubmitTxResponse> {
1308 self.handle_submit_transaction(request).await
1309 }
1310
1311 async fn submit_certificate_impl(
1312 &self,
1313 request: tonic::Request<CertifiedTransaction>,
1314 ) -> WrappedServiceResponse<SubmitCertificateResponse> {
1315 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1316 let certificate = request.into_inner();
1317 certificate.validity_check(&epoch_store.tx_validity_check_context())?;
1318
1319 let span =
1320 error_span!("ValidatorService::submit_certificate", tx_digest = ?certificate.digest());
1321 self.handle_certificates(
1322 nonempty![certificate],
1323 true,
1324 false,
1325 false,
1326 false,
1327 &epoch_store,
1328 false,
1329 )
1330 .instrument(span)
1331 .await
1332 .map(|(executed, spam_weight)| {
1333 (
1334 tonic::Response::new(SubmitCertificateResponse {
1335 executed: executed.map(|mut x| x.remove(0)).map(Into::into),
1336 }),
1337 spam_weight,
1338 )
1339 })
1340 }
1341
1342 async fn handle_certificate_v2_impl(
1343 &self,
1344 request: tonic::Request<CertifiedTransaction>,
1345 ) -> WrappedServiceResponse<HandleCertificateResponseV2> {
1346 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1347 let certificate = request.into_inner();
1348 certificate.validity_check(&epoch_store.tx_validity_check_context())?;
1349
1350 let span = error_span!("ValidatorService::handle_certificate_v2", tx_digest = ?certificate.digest());
1351 self.handle_certificates(
1352 nonempty![certificate],
1353 true,
1354 false,
1355 false,
1356 false,
1357 &epoch_store,
1358 true,
1359 )
1360 .instrument(span)
1361 .await
1362 .map(|(resp, spam_weight)| {
1363 (
1364 tonic::Response::new(
1365 resp.expect(
1366 "handle_certificate should not return none with wait_for_effects=true",
1367 )
1368 .remove(0)
1369 .into(),
1370 ),
1371 spam_weight,
1372 )
1373 })
1374 }
1375
1376 async fn handle_certificate_v3_impl(
1377 &self,
1378 request: tonic::Request<HandleCertificateRequestV3>,
1379 ) -> WrappedServiceResponse<HandleCertificateResponseV3> {
1380 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1381 let request = request.into_inner();
1382 request
1383 .certificate
1384 .validity_check(&epoch_store.tx_validity_check_context())?;
1385
1386 let span = error_span!("ValidatorService::handle_certificate_v3", tx_digest = ?request.certificate.digest());
1387 self.handle_certificates(
1388 nonempty![request.certificate],
1389 request.include_events,
1390 request.include_input_objects,
1391 request.include_output_objects,
1392 request.include_auxiliary_data,
1393 &epoch_store,
1394 true,
1395 )
1396 .instrument(span)
1397 .await
1398 .map(|(resp, spam_weight)| {
1399 (
1400 tonic::Response::new(
1401 resp.expect(
1402 "handle_certificate should not return none with wait_for_effects=true",
1403 )
1404 .remove(0),
1405 ),
1406 spam_weight,
1407 )
1408 })
1409 }
1410
1411 async fn wait_for_effects_impl(
1412 &self,
1413 request: tonic::Request<RawWaitForEffectsRequest>,
1414 ) -> WrappedServiceResponse<RawWaitForEffectsResponse> {
1415 let request: WaitForEffectsRequest = request.into_inner().try_into()?;
1416 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1417 let response = timeout(
1418 Duration::from_secs(20),
1420 epoch_store
1421 .within_alive_epoch(self.wait_for_effects_response(request, &epoch_store))
1422 .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch())),
1423 )
1424 .await
1425 .map_err(|_| tonic::Status::internal("Timeout waiting for effects"))???
1426 .try_into()?;
1427 Ok((tonic::Response::new(response), Weight::zero()))
1428 }
1429
1430 #[instrument(name= "ValidatorService::wait_for_effects_response", level = "error", skip_all, err(level = "debug"), fields(consensus_position = ?request.consensus_position, fast_path_effects = tracing::field::Empty))]
1431 async fn wait_for_effects_response(
1432 &self,
1433 request: WaitForEffectsRequest,
1434 epoch_store: &Arc<AuthorityPerEpochStore>,
1435 ) -> SuiResult<WaitForEffectsResponse> {
1436 if request.ping_type.is_some() {
1437 return timeout(
1438 Duration::from_secs(10),
1439 self.ping_response(request, epoch_store),
1440 )
1441 .await
1442 .map_err(|_| SuiErrorKind::TimeoutError)?;
1443 }
1444
1445 let Some(tx_digest) = request.transaction_digest else {
1446 return Err(SuiErrorKind::InvalidRequest(
1447 "Transaction digest is required for wait for effects requests".to_string(),
1448 )
1449 .into());
1450 };
1451 let tx_digests = [tx_digest];
1452
1453 let fastpath_effects_future: Pin<Box<dyn Future<Output = _> + Send>> =
1454 if let Some(consensus_position) = request.consensus_position {
1455 Box::pin(self.wait_for_fastpath_effects(
1456 consensus_position,
1457 &tx_digests,
1458 request.include_details,
1459 epoch_store,
1460 ))
1461 } else {
1462 Box::pin(futures::future::pending())
1463 };
1464
1465 tokio::select! {
1466 biased;
1468 mut effects = self.state
1475 .get_transaction_cache_reader()
1476 .notify_read_executed_effects(
1477 "AuthorityServer::wait_for_effects::notify_read_executed_effects_finalized",
1478 &tx_digests,
1479 ) => {
1480 tracing::Span::current().record("fast_path_effects", false);
1481 let effects = effects.pop().unwrap();
1482 let details = if request.include_details {
1483 Some(self.complete_executed_data(effects.clone(), None).await?)
1484 } else {
1485 None
1486 };
1487
1488 Ok(WaitForEffectsResponse::Executed {
1489 effects_digest: effects.digest(),
1490 details,
1491 fast_path: false,
1492 })
1493 }
1494
1495 fastpath_response = fastpath_effects_future => {
1496 tracing::Span::current().record("fast_path_effects", true);
1497 fastpath_response
1498 }
1499 }
1500 }
1501
1502 #[instrument(level = "error", skip_all, err(level = "debug"))]
1503 async fn ping_response(
1504 &self,
1505 request: WaitForEffectsRequest,
1506 epoch_store: &Arc<AuthorityPerEpochStore>,
1507 ) -> SuiResult<WaitForEffectsResponse> {
1508 let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1509 return Err(SuiErrorKind::UnsupportedFeatureError {
1510 error: "Mysticeti fastpath".to_string(),
1511 }
1512 .into());
1513 };
1514
1515 let Some(consensus_position) = request.consensus_position else {
1516 return Err(SuiErrorKind::InvalidRequest(
1517 "Consensus position is required for Ping requests".to_string(),
1518 )
1519 .into());
1520 };
1521
1522 let Some(ping) = request.ping_type else {
1524 return Err(SuiErrorKind::InvalidRequest(
1525 "Ping type is required for ping requests".to_string(),
1526 )
1527 .into());
1528 };
1529
1530 let _metrics_guard = self
1531 .metrics
1532 .handle_wait_for_effects_ping_latency
1533 .with_label_values(&[ping.as_str()])
1534 .start_timer();
1535
1536 consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1537
1538 let mut last_status = None;
1539 let details = if request.include_details {
1540 Some(Box::new(ExecutedData::default()))
1541 } else {
1542 None
1543 };
1544
1545 loop {
1546 let status = consensus_tx_status_cache
1547 .notify_read_transaction_status_change(consensus_position, last_status)
1548 .await;
1549 match status {
1550 NotifyReadConsensusTxStatusResult::Status(status) => match status {
1551 ConsensusTxStatus::FastpathCertified => {
1552 if ping == PingType::Consensus {
1554 last_status = Some(status);
1555 continue;
1556 }
1557 return Ok(WaitForEffectsResponse::Executed {
1558 effects_digest: TransactionEffectsDigest::ZERO,
1559 details,
1560 fast_path: true,
1561 });
1562 }
1563 ConsensusTxStatus::Rejected => {
1564 return Ok(WaitForEffectsResponse::Rejected { error: None });
1565 }
1566 ConsensusTxStatus::Finalized => {
1567 return Ok(WaitForEffectsResponse::Executed {
1568 effects_digest: TransactionEffectsDigest::ZERO,
1569 details,
1570 fast_path: false,
1571 });
1572 }
1573 },
1574 NotifyReadConsensusTxStatusResult::Expired(round) => {
1575 return Ok(WaitForEffectsResponse::Expired {
1576 epoch: epoch_store.epoch(),
1577 round: Some(round),
1578 });
1579 }
1580 }
1581 }
1582 }
1583
1584 #[instrument(level = "error", skip_all, err(level = "debug"))]
1585 async fn wait_for_fastpath_effects(
1586 &self,
1587 consensus_position: ConsensusPosition,
1588 tx_digests: &[TransactionDigest],
1589 include_details: bool,
1590 epoch_store: &Arc<AuthorityPerEpochStore>,
1591 ) -> SuiResult<WaitForEffectsResponse> {
1592 let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1593 return Err(SuiErrorKind::UnsupportedFeatureError {
1594 error: "Mysticeti fastpath".to_string(),
1595 }
1596 .into());
1597 };
1598
1599 let local_epoch = epoch_store.epoch();
1600 match consensus_position.epoch.cmp(&local_epoch) {
1601 Ordering::Less => {
1602 let response = WaitForEffectsResponse::Expired {
1605 epoch: local_epoch,
1606 round: None,
1607 };
1608 return Ok(response);
1609 }
1610 Ordering::Greater => {
1611 return Err(SuiErrorKind::WrongEpoch {
1613 expected_epoch: local_epoch,
1614 actual_epoch: consensus_position.epoch,
1615 }
1616 .into());
1617 }
1618 Ordering::Equal => {
1619 }
1622 };
1623
1624 consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1625
1626 let mut current_status = None;
1627 loop {
1628 tokio::select! {
1629 status_result = consensus_tx_status_cache
1630 .notify_read_transaction_status_change(consensus_position, current_status) => {
1631 match status_result {
1632 NotifyReadConsensusTxStatusResult::Status(new_status) => {
1633 match new_status {
1634 ConsensusTxStatus::Rejected => {
1635 return Ok(WaitForEffectsResponse::Rejected {
1636 error: epoch_store.get_rejection_vote_reason(
1637 consensus_position
1638 )
1639 });
1640 }
1641 ConsensusTxStatus::FastpathCertified => {
1642 current_status = Some(new_status);
1643 continue;
1644 }
1645 ConsensusTxStatus::Finalized => {
1646 current_status = Some(new_status);
1647 continue;
1648 }
1649 }
1650 }
1651 NotifyReadConsensusTxStatusResult::Expired(round) => {
1652 return Ok(WaitForEffectsResponse::Expired {
1653 epoch: epoch_store.epoch(),
1654 round: Some(round),
1655 });
1656 }
1657 }
1658 }
1659
1660 mut outputs = self.state.get_transaction_cache_reader().notify_read_fastpath_transaction_outputs(tx_digests),
1661 if current_status == Some(ConsensusTxStatus::FastpathCertified) || current_status == Some(ConsensusTxStatus::Finalized) => {
1662 let outputs = outputs.pop().unwrap();
1663 let effects = outputs.effects.clone();
1664
1665 let details = if include_details {
1666 Some(self.complete_executed_data(effects.clone(), Some(outputs)).await?)
1667 } else {
1668 None
1669 };
1670
1671 return Ok(WaitForEffectsResponse::Executed {
1672 effects_digest: effects.digest(),
1673 details,
1674 fast_path: current_status == Some(ConsensusTxStatus::FastpathCertified),
1675 });
1676 }
1677 }
1678 }
1679 }
1680
1681 async fn complete_executed_data(
1682 &self,
1683 effects: TransactionEffects,
1684 fastpath_outputs: Option<Arc<TransactionOutputs>>,
1685 ) -> SuiResult<Box<ExecutedData>> {
1686 let (events, input_objects, output_objects) = self
1687 .collect_effects_data(
1688 &effects,
1689 true,
1690 true,
1691 true,
1692 fastpath_outputs,
1693 )
1694 .await?;
1695 Ok(Box::new(ExecutedData {
1696 effects,
1697 events,
1698 input_objects,
1699 output_objects,
1700 }))
1701 }
1702
1703 async fn soft_bundle_validity_check(
1704 &self,
1705 certificates: &NonEmpty<CertifiedTransaction>,
1706 epoch_store: &Arc<AuthorityPerEpochStore>,
1707 total_size_bytes: u64,
1708 ) -> Result<(), tonic::Status> {
1709 let protocol_config = epoch_store.protocol_config();
1710 let node_config = &self.state.config;
1711
1712 fp_ensure!(
1718 protocol_config.soft_bundle() && node_config.enable_soft_bundle,
1719 SuiErrorKind::UnsupportedFeatureError {
1720 error: "Soft Bundle".to_string()
1721 }
1722 .into()
1723 );
1724
1725 fp_ensure!(
1732 certificates.len() as u64 <= protocol_config.max_soft_bundle_size(),
1733 SuiErrorKind::UserInputError {
1734 error: UserInputError::TooManyTransactionsInBatch {
1735 size: certificates.len(),
1736 limit: protocol_config.max_soft_bundle_size()
1737 }
1738 }
1739 .into()
1740 );
1741
1742 let soft_bundle_max_size_bytes =
1746 protocol_config.consensus_max_transactions_in_block_bytes() / 2;
1747 fp_ensure!(
1748 total_size_bytes <= soft_bundle_max_size_bytes,
1749 SuiErrorKind::UserInputError {
1750 error: UserInputError::TotalTransactionSizeTooLargeInBatch {
1751 size: total_size_bytes as usize,
1752 limit: soft_bundle_max_size_bytes,
1753 },
1754 }
1755 .into()
1756 );
1757
1758 let mut gas_price = None;
1759 for certificate in certificates {
1760 let tx_digest = *certificate.digest();
1761 fp_ensure!(
1762 certificate.is_consensus_tx(),
1763 SuiErrorKind::UserInputError {
1764 error: UserInputError::NoSharedObjectError { digest: tx_digest }
1765 }
1766 .into()
1767 );
1768 fp_ensure!(
1769 !self.state.is_tx_already_executed(&tx_digest),
1770 SuiErrorKind::UserInputError {
1771 error: UserInputError::AlreadyExecutedInSoftBundleError { digest: tx_digest }
1772 }
1773 .into()
1774 );
1775 if let Some(gas) = gas_price {
1776 fp_ensure!(
1777 gas == certificate.gas_price(),
1778 SuiErrorKind::UserInputError {
1779 error: UserInputError::GasPriceMismatchError {
1780 digest: tx_digest,
1781 expected: gas,
1782 actual: certificate.gas_price()
1783 }
1784 }
1785 .into()
1786 );
1787 } else {
1788 gas_price = Some(certificate.gas_price());
1789 }
1790 }
1791
1792 fp_ensure!(
1797 !epoch_store.is_any_tx_certs_consensus_message_processed(certificates.iter())?,
1798 SuiErrorKind::UserInputError {
1799 error: UserInputError::CertificateAlreadyProcessed
1800 }
1801 .into()
1802 );
1803
1804 Ok(())
1805 }
1806
1807 async fn handle_soft_bundle_certificates_v3_impl(
1808 &self,
1809 request: tonic::Request<HandleSoftBundleCertificatesRequestV3>,
1810 ) -> WrappedServiceResponse<HandleSoftBundleCertificatesResponseV3> {
1811 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1812 let client_addr = if self.client_id_source.is_none() {
1813 self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
1814 } else {
1815 self.get_client_ip_addr(&request, self.client_id_source.as_ref().unwrap())
1816 };
1817 let request = request.into_inner();
1818
1819 let certificates = NonEmpty::from_vec(request.certificates)
1820 .ok_or(SuiErrorKind::NoCertificateProvidedError)?;
1821 let mut total_size_bytes = 0;
1822 for certificate in &certificates {
1823 total_size_bytes +=
1825 certificate.validity_check(&epoch_store.tx_validity_check_context())? as u64;
1826 }
1827
1828 self.metrics
1829 .handle_soft_bundle_certificates_count
1830 .observe(certificates.len() as f64);
1831
1832 self.metrics
1833 .handle_soft_bundle_certificates_size_bytes
1834 .observe(total_size_bytes as f64);
1835
1836 self.soft_bundle_validity_check(&certificates, &epoch_store, total_size_bytes)
1838 .await?;
1839
1840 info!(
1841 "Received Soft Bundle with {} certificates, from {}, tx digests are [{}], total size [{}]bytes",
1842 certificates.len(),
1843 client_addr
1844 .map(|x| x.to_string())
1845 .unwrap_or_else(|| "unknown".to_string()),
1846 certificates
1847 .iter()
1848 .map(|x| x.digest().to_string())
1849 .collect::<Vec<_>>()
1850 .join(", "),
1851 total_size_bytes
1852 );
1853
1854 let span = error_span!("ValidatorService::handle_soft_bundle_certificates_v3");
1855 self.handle_certificates(
1856 certificates,
1857 request.include_events,
1858 request.include_input_objects,
1859 request.include_output_objects,
1860 request.include_auxiliary_data,
1861 &epoch_store,
1862 request.wait_for_effects,
1863 )
1864 .instrument(span)
1865 .await
1866 .map(|(resp, spam_weight)| {
1867 (
1868 tonic::Response::new(HandleSoftBundleCertificatesResponseV3 {
1869 responses: resp.unwrap_or_default(),
1870 }),
1871 spam_weight,
1872 )
1873 })
1874 }
1875
1876 async fn object_info_impl(
1877 &self,
1878 request: tonic::Request<ObjectInfoRequest>,
1879 ) -> WrappedServiceResponse<ObjectInfoResponse> {
1880 let request = request.into_inner();
1881 let response = self.state.handle_object_info_request(request).await?;
1882 Ok((tonic::Response::new(response), Weight::one()))
1883 }
1884
1885 async fn transaction_info_impl(
1886 &self,
1887 request: tonic::Request<TransactionInfoRequest>,
1888 ) -> WrappedServiceResponse<TransactionInfoResponse> {
1889 let request = request.into_inner();
1890 let response = self.state.handle_transaction_info_request(request).await?;
1891 Ok((tonic::Response::new(response), Weight::one()))
1892 }
1893
1894 async fn checkpoint_impl(
1895 &self,
1896 request: tonic::Request<CheckpointRequest>,
1897 ) -> WrappedServiceResponse<CheckpointResponse> {
1898 let request = request.into_inner();
1899 let response = self.state.handle_checkpoint_request(&request)?;
1900 Ok((tonic::Response::new(response), Weight::one()))
1901 }
1902
1903 async fn checkpoint_v2_impl(
1904 &self,
1905 request: tonic::Request<CheckpointRequestV2>,
1906 ) -> WrappedServiceResponse<CheckpointResponseV2> {
1907 let request = request.into_inner();
1908 let response = self.state.handle_checkpoint_request_v2(&request)?;
1909 Ok((tonic::Response::new(response), Weight::one()))
1910 }
1911
1912 async fn get_system_state_object_impl(
1913 &self,
1914 _request: tonic::Request<SystemStateRequest>,
1915 ) -> WrappedServiceResponse<SuiSystemState> {
1916 let response = self
1917 .state
1918 .get_object_cache_reader()
1919 .get_sui_system_state_object_unsafe()?;
1920 Ok((tonic::Response::new(response), Weight::one()))
1921 }
1922
1923 async fn validator_health_impl(
1924 &self,
1925 _request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1926 ) -> WrappedServiceResponse<sui_types::messages_grpc::RawValidatorHealthResponse> {
1927 let state = &self.state;
1928
1929 let epoch_store = state.load_epoch_store_one_call_per_task();
1931
1932 let num_inflight_execution_transactions =
1934 state.execution_scheduler().num_pending_certificates() as u64;
1935
1936 let num_inflight_consensus_transactions =
1938 self.consensus_adapter.num_inflight_transactions();
1939
1940 let last_committed_leader_round = epoch_store
1942 .consensus_tx_status_cache
1943 .as_ref()
1944 .and_then(|cache| cache.get_last_committed_leader_round())
1945 .unwrap_or(0);
1946
1947 let last_locally_built_checkpoint = epoch_store
1949 .last_built_checkpoint_summary()
1950 .ok()
1951 .flatten()
1952 .map(|(_, summary)| summary.sequence_number)
1953 .unwrap_or(0);
1954
1955 let typed_response = sui_types::messages_grpc::ValidatorHealthResponse {
1956 num_inflight_consensus_transactions,
1957 num_inflight_execution_transactions,
1958 last_locally_built_checkpoint,
1959 last_committed_leader_round,
1960 };
1961
1962 let raw_response = typed_response
1963 .try_into()
1964 .map_err(|e: sui_types::error::SuiError| {
1965 tonic::Status::internal(format!("Failed to serialize health response: {}", e))
1966 })?;
1967
1968 Ok((tonic::Response::new(raw_response), Weight::one()))
1969 }
1970
1971 fn get_client_ip_addr<T>(
1972 &self,
1973 request: &tonic::Request<T>,
1974 source: &ClientIdSource,
1975 ) -> Option<IpAddr> {
1976 let forwarded_header = request.metadata().get_all("x-forwarded-for").iter().next();
1977
1978 if let Some(header) = forwarded_header {
1979 let num_hops = header
1980 .to_str()
1981 .map(|h| h.split(',').count().saturating_sub(1))
1982 .unwrap_or(0);
1983
1984 self.metrics.x_forwarded_for_num_hops.set(num_hops as f64);
1985 }
1986
1987 match source {
1988 ClientIdSource::SocketAddr => {
1989 let socket_addr: Option<SocketAddr> = request.remote_addr();
1990
1991 if let Some(socket_addr) = socket_addr {
1997 Some(socket_addr.ip())
1998 } else {
1999 if cfg!(msim) {
2000 } else if cfg!(test) {
2002 panic!("Failed to get remote address from request");
2003 } else {
2004 self.metrics.connection_ip_not_found.inc();
2005 error!("Failed to get remote address from request");
2006 }
2007 None
2008 }
2009 }
2010 ClientIdSource::XForwardedFor(num_hops) => {
2011 let do_header_parse = |op: &MetadataValue<Ascii>| {
2012 match op.to_str() {
2013 Ok(header_val) => {
2014 let header_contents =
2015 header_val.split(',').map(str::trim).collect::<Vec<_>>();
2016 if *num_hops == 0 {
2017 error!(
2018 "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
2019 number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
2020 to this node. Skipping traffic controller request handling.",
2021 header_contents,
2022 );
2023 return None;
2024 }
2025 let contents_len = header_contents.len();
2026 if contents_len < *num_hops {
2027 error!(
2028 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
2029 Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
2030 `client-id-source` in the node config.",
2031 header_contents, contents_len, num_hops, contents_len,
2032 );
2033 self.metrics.client_id_source_config_mismatch.inc();
2034 return None;
2035 }
2036 let Some(client_ip) = header_contents.get(contents_len - num_hops)
2037 else {
2038 error!(
2039 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
2040 Expected at least {} values. Skipping traffic controller request handling.",
2041 header_contents, contents_len, num_hops, contents_len,
2042 );
2043 return None;
2044 };
2045 parse_ip(client_ip).or_else(|| {
2046 self.metrics.forwarded_header_parse_error.inc();
2047 None
2048 })
2049 }
2050 Err(e) => {
2051 self.metrics.forwarded_header_invalid.inc();
2055 error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
2056 None
2057 }
2058 }
2059 };
2060 if let Some(op) = request.metadata().get("x-forwarded-for") {
2061 do_header_parse(op)
2062 } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
2063 do_header_parse(op)
2064 } else {
2065 self.metrics.forwarded_header_not_included.inc();
2066 error!(
2067 "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
2068 );
2069 None
2070 }
2071 }
2072 }
2073 }
2074
2075 async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
2076 if let Some(traffic_controller) = &self.traffic_controller {
2077 if !traffic_controller.check(&client, &None).await {
2078 Err(tonic::Status::from_error(
2080 SuiErrorKind::TooManyRequests.into(),
2081 ))
2082 } else {
2083 Ok(())
2084 }
2085 } else {
2086 Ok(())
2087 }
2088 }
2089
2090 fn handle_traffic_resp<T>(
2091 &self,
2092 client: Option<IpAddr>,
2093 wrapped_response: WrappedServiceResponse<T>,
2094 ) -> Result<tonic::Response<T>, tonic::Status> {
2095 let (error, spam_weight, unwrapped_response) = match wrapped_response {
2096 Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
2097 Err(status) => (
2098 Some(SuiError::from(status.clone())),
2099 Weight::zero(),
2100 Err(status.clone()),
2101 ),
2102 };
2103
2104 if let Some(traffic_controller) = self.traffic_controller.clone() {
2105 traffic_controller.tally(TrafficTally {
2106 direct: client,
2107 through_fullnode: None,
2108 error_info: error.map(|e| {
2109 let error_type = String::from(e.clone().as_ref());
2110 let error_weight = normalize(e);
2111 (error_weight, error_type)
2112 }),
2113 spam_weight,
2114 timestamp: SystemTime::now(),
2115 })
2116 }
2117 unwrapped_response
2118 }
2119}
2120
2121fn make_tonic_request_for_testing<T>(message: T) -> tonic::Request<T> {
2122 let mut request = tonic::Request::new(message);
2125 let tcp_connect_info = TcpConnectInfo {
2126 local_addr: None,
2127 remote_addr: Some(SocketAddr::new([127, 0, 0, 1].into(), 0)),
2128 };
2129 request.extensions_mut().insert(tcp_connect_info);
2130 request
2131}
2132
2133fn normalize(err: SuiError) -> Weight {
2135 match err.as_inner() {
2136 SuiErrorKind::UserInputError {
2137 error: UserInputError::IncorrectUserSignature { .. },
2138 } => Weight::one(),
2139 SuiErrorKind::InvalidSignature { .. }
2140 | SuiErrorKind::SignerSignatureAbsent { .. }
2141 | SuiErrorKind::SignerSignatureNumberMismatch { .. }
2142 | SuiErrorKind::IncorrectSigner { .. }
2143 | SuiErrorKind::UnknownSigner { .. }
2144 | SuiErrorKind::WrongEpoch { .. } => Weight::one(),
2145 _ => Weight::zero(),
2146 }
2147}
2148
2149#[macro_export]
2153macro_rules! handle_with_decoration {
2154 ($self:ident, $func_name:ident, $request:ident) => {{
2155 if $self.client_id_source.is_none() {
2156 return $self.$func_name($request).await.map(|(result, _)| result);
2157 }
2158
2159 let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
2160
2161 $self.handle_traffic_req(client.clone()).await?;
2163
2164 let wrapped_response = $self.$func_name($request).await;
2166 $self.handle_traffic_resp(client, wrapped_response)
2167 }};
2168}
2169
2170#[async_trait]
2171impl Validator for ValidatorService {
2172 async fn submit_transaction(
2173 &self,
2174 request: tonic::Request<RawSubmitTxRequest>,
2175 ) -> Result<tonic::Response<RawSubmitTxResponse>, tonic::Status> {
2176 let validator_service = self.clone();
2177
2178 spawn_monitored_task!(async move {
2181 handle_with_decoration!(validator_service, handle_submit_transaction_impl, request)
2184 })
2185 .await
2186 .unwrap()
2187 }
2188
2189 async fn transaction(
2190 &self,
2191 request: tonic::Request<Transaction>,
2192 ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
2193 let validator_service = self.clone();
2194
2195 spawn_monitored_task!(async move {
2198 handle_with_decoration!(validator_service, transaction_impl, request)
2201 })
2202 .await
2203 .unwrap()
2204 }
2205
2206 async fn submit_certificate(
2207 &self,
2208 request: tonic::Request<CertifiedTransaction>,
2209 ) -> Result<tonic::Response<SubmitCertificateResponse>, tonic::Status> {
2210 let validator_service = self.clone();
2211
2212 spawn_monitored_task!(async move {
2215 handle_with_decoration!(validator_service, submit_certificate_impl, request)
2218 })
2219 .await
2220 .unwrap()
2221 }
2222
2223 async fn handle_certificate_v2(
2224 &self,
2225 request: tonic::Request<CertifiedTransaction>,
2226 ) -> Result<tonic::Response<HandleCertificateResponseV2>, tonic::Status> {
2227 handle_with_decoration!(self, handle_certificate_v2_impl, request)
2228 }
2229
2230 async fn handle_certificate_v3(
2231 &self,
2232 request: tonic::Request<HandleCertificateRequestV3>,
2233 ) -> Result<tonic::Response<HandleCertificateResponseV3>, tonic::Status> {
2234 handle_with_decoration!(self, handle_certificate_v3_impl, request)
2235 }
2236
2237 async fn wait_for_effects(
2238 &self,
2239 request: tonic::Request<RawWaitForEffectsRequest>,
2240 ) -> Result<tonic::Response<RawWaitForEffectsResponse>, tonic::Status> {
2241 handle_with_decoration!(self, wait_for_effects_impl, request)
2242 }
2243
2244 async fn handle_soft_bundle_certificates_v3(
2245 &self,
2246 request: tonic::Request<HandleSoftBundleCertificatesRequestV3>,
2247 ) -> Result<tonic::Response<HandleSoftBundleCertificatesResponseV3>, tonic::Status> {
2248 handle_with_decoration!(self, handle_soft_bundle_certificates_v3_impl, request)
2249 }
2250
2251 async fn object_info(
2252 &self,
2253 request: tonic::Request<ObjectInfoRequest>,
2254 ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
2255 handle_with_decoration!(self, object_info_impl, request)
2256 }
2257
2258 async fn transaction_info(
2259 &self,
2260 request: tonic::Request<TransactionInfoRequest>,
2261 ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
2262 handle_with_decoration!(self, transaction_info_impl, request)
2263 }
2264
2265 async fn checkpoint(
2266 &self,
2267 request: tonic::Request<CheckpointRequest>,
2268 ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
2269 handle_with_decoration!(self, checkpoint_impl, request)
2270 }
2271
2272 async fn checkpoint_v2(
2273 &self,
2274 request: tonic::Request<CheckpointRequestV2>,
2275 ) -> Result<tonic::Response<CheckpointResponseV2>, tonic::Status> {
2276 handle_with_decoration!(self, checkpoint_v2_impl, request)
2277 }
2278
2279 async fn get_system_state_object(
2280 &self,
2281 request: tonic::Request<SystemStateRequest>,
2282 ) -> Result<tonic::Response<SuiSystemState>, tonic::Status> {
2283 handle_with_decoration!(self, get_system_state_object_impl, request)
2284 }
2285
2286 async fn validator_health(
2287 &self,
2288 request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
2289 ) -> Result<tonic::Response<sui_types::messages_grpc::RawValidatorHealthResponse>, tonic::Status>
2290 {
2291 handle_with_decoration!(self, validator_health_impl, request)
2292 }
2293}