1use anyhow::Result;
6use async_trait::async_trait;
7use fastcrypto::traits::KeyPair;
8use futures::{TryFutureExt, future};
9use itertools::Itertools as _;
10use mysten_metrics::spawn_monitored_task;
11use prometheus::{
12 Gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, Registry,
13 register_gauge_with_registry, register_histogram_vec_with_registry,
14 register_histogram_with_registry, register_int_counter_vec_with_registry,
15 register_int_counter_with_registry,
16};
17use std::{
18 cmp::Ordering,
19 future::Future,
20 io,
21 net::{IpAddr, SocketAddr},
22 pin::Pin,
23 sync::Arc,
24 time::{Duration, SystemTime},
25};
26use sui_network::{
27 api::{Validator, ValidatorServer},
28 tonic,
29 validator::server::SUI_TLS_SERVER_NAME,
30};
31use sui_types::message_envelope::Message;
32use sui_types::messages_consensus::ConsensusPosition;
33use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKind};
34use sui_types::messages_grpc::{
35 HandleCertificateRequestV3, HandleCertificateResponseV3, RawSubmitTxResponse,
36};
37use sui_types::messages_grpc::{
38 HandleCertificateResponseV2, HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse,
39 SubmitCertificateResponse, SystemStateRequest, TransactionInfoRequest, TransactionInfoResponse,
40};
41use sui_types::messages_grpc::{
42 HandleSoftBundleCertificatesRequestV3, HandleSoftBundleCertificatesResponseV3,
43};
44use sui_types::multiaddr::Multiaddr;
45use sui_types::object::Object;
46use sui_types::sui_system_state::SuiSystemState;
47use sui_types::traffic_control::{ClientIdSource, Weight};
48use sui_types::{
49 digests::{TransactionDigest, TransactionEffectsDigest},
50 error::{SuiErrorKind, UserInputError},
51};
52use sui_types::{
53 effects::TransactionEffects,
54 messages_grpc::{
55 ExecutedData, RawSubmitTxRequest, RawWaitForEffectsRequest, RawWaitForEffectsResponse,
56 SubmitTxResult, WaitForEffectsRequest, WaitForEffectsResponse,
57 },
58};
59use sui_types::{
60 effects::TransactionEffectsAPI, executable_transaction::VerifiedExecutableTransaction,
61};
62use sui_types::{effects::TransactionEvents, messages_grpc::SubmitTxType};
63use sui_types::{error::*, transaction::*};
64use sui_types::{
65 fp_ensure,
66 messages_checkpoint::{
67 CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
68 },
69};
70use tap::TapFallible;
71use tokio::sync::oneshot;
72use tokio::time::timeout;
73use tonic::metadata::{Ascii, MetadataValue};
74use tracing::{Instrument, debug, error, error_span, info, instrument};
75
76use crate::consensus_adapter::ConnectionMonitorStatusForTests;
77use crate::{
78 authority::{AuthorityState, consensus_tx_status_cache::ConsensusTxStatus},
79 consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics},
80 traffic_controller::{TrafficController, parse_ip, policies::TrafficTally},
81};
82use crate::{
83 authority::{
84 ExecutionEnv, authority_per_epoch_store::AuthorityPerEpochStore,
85 consensus_tx_status_cache::NotifyReadConsensusTxStatusResult,
86 shared_object_version_manager::Schedulable,
87 },
88 checkpoints::CheckpointStore,
89 execution_scheduler::SchedulingSource,
90 mysticeti_adapter::LazyMysticetiClient,
91 transaction_outputs::TransactionOutputs,
92};
93use nonempty::{NonEmpty, nonempty};
94use sui_config::local_ip_utils::new_local_tcp_address_for_testing;
95use sui_types::messages_grpc::PingType;
96use tonic::transport::server::TcpConnectInfo;
97
98#[cfg(test)]
99#[path = "unit_tests/server_tests.rs"]
100mod server_tests;
101
102#[cfg(test)]
103#[path = "unit_tests/wait_for_effects_tests.rs"]
104mod wait_for_effects_tests;
105
106#[cfg(test)]
107#[path = "unit_tests/submit_transaction_tests.rs"]
108mod submit_transaction_tests;
109
110pub struct AuthorityServerHandle {
111 server_handle: sui_network::validator::server::Server,
112}
113
114impl AuthorityServerHandle {
115 pub async fn join(self) -> Result<(), io::Error> {
116 self.server_handle.handle().wait_for_shutdown().await;
117 Ok(())
118 }
119
120 pub async fn kill(self) -> Result<(), io::Error> {
121 self.server_handle.handle().shutdown().await;
122 Ok(())
123 }
124
125 pub fn address(&self) -> &Multiaddr {
126 self.server_handle.local_addr()
127 }
128}
129
130pub struct AuthorityServer {
131 address: Multiaddr,
132 pub state: Arc<AuthorityState>,
133 consensus_adapter: Arc<ConsensusAdapter>,
134 pub metrics: Arc<ValidatorServiceMetrics>,
135}
136
137impl AuthorityServer {
138 pub fn new_for_test_with_consensus_adapter(
139 state: Arc<AuthorityState>,
140 consensus_adapter: Arc<ConsensusAdapter>,
141 ) -> Self {
142 let address = new_local_tcp_address_for_testing();
143 let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
144
145 Self {
146 address,
147 state,
148 consensus_adapter,
149 metrics,
150 }
151 }
152
153 pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
154 let consensus_adapter = Arc::new(ConsensusAdapter::new(
155 Arc::new(LazyMysticetiClient::new()),
156 CheckpointStore::new_for_tests(),
157 state.name,
158 Arc::new(ConnectionMonitorStatusForTests {}),
159 100_000,
160 100_000,
161 None,
162 None,
163 ConsensusAdapterMetrics::new_test(),
164 state.epoch_store_for_testing().protocol_config().clone(),
165 ));
166 Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
167 }
168
169 pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
170 let address = self.address.clone();
171 self.spawn_with_bind_address_for_test(address).await
172 }
173
174 pub async fn spawn_with_bind_address_for_test(
175 self,
176 address: Multiaddr,
177 ) -> Result<AuthorityServerHandle, io::Error> {
178 let tls_config = sui_tls::create_rustls_server_config(
179 self.state.config.network_key_pair().copy().private(),
180 SUI_TLS_SERVER_NAME.to_string(),
181 );
182 let config = mysten_network::config::Config::new();
183 let server = sui_network::validator::server::ServerBuilder::from_config(
184 &config,
185 mysten_network::metrics::DefaultMetricsCallbackProvider::default(),
186 )
187 .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
188 self.state,
189 self.consensus_adapter,
190 self.metrics,
191 )))
192 .bind(&address, Some(tls_config))
193 .await
194 .unwrap();
195 let local_addr = server.local_addr().to_owned();
196 info!("Listening to traffic on {local_addr}");
197 let handle = AuthorityServerHandle {
198 server_handle: server,
199 };
200 Ok(handle)
201 }
202}
203
204pub struct ValidatorServiceMetrics {
205 pub signature_errors: IntCounter,
206 pub tx_verification_latency: Histogram,
207 pub cert_verification_latency: Histogram,
208 pub consensus_latency: Histogram,
209 pub handle_transaction_latency: Histogram,
210 pub submit_certificate_consensus_latency: Histogram,
211 pub handle_certificate_consensus_latency: Histogram,
212 pub handle_certificate_non_consensus_latency: Histogram,
213 pub handle_soft_bundle_certificates_consensus_latency: Histogram,
214 pub handle_soft_bundle_certificates_count: Histogram,
215 pub handle_soft_bundle_certificates_size_bytes: Histogram,
216 pub handle_transaction_consensus_latency: Histogram,
217 pub handle_submit_transaction_consensus_latency: HistogramVec,
218 pub handle_wait_for_effects_ping_latency: HistogramVec,
219
220 handle_submit_transaction_latency: HistogramVec,
221 handle_submit_transaction_bytes: HistogramVec,
222 handle_submit_transaction_batch_size: HistogramVec,
223
224 num_rejected_tx_in_epoch_boundary: IntCounter,
225 num_rejected_cert_in_epoch_boundary: IntCounter,
226 num_rejected_tx_during_overload: IntCounterVec,
227 num_rejected_cert_during_overload: IntCounterVec,
228 submission_rejected_transactions: IntCounterVec,
229 connection_ip_not_found: IntCounter,
230 forwarded_header_parse_error: IntCounter,
231 forwarded_header_invalid: IntCounter,
232 forwarded_header_not_included: IntCounter,
233 client_id_source_config_mismatch: IntCounter,
234 x_forwarded_for_num_hops: Gauge,
235}
236
237impl ValidatorServiceMetrics {
238 pub fn new(registry: &Registry) -> Self {
239 Self {
240 signature_errors: register_int_counter_with_registry!(
241 "total_signature_errors",
242 "Number of transaction signature errors",
243 registry,
244 )
245 .unwrap(),
246 tx_verification_latency: register_histogram_with_registry!(
247 "validator_service_tx_verification_latency",
248 "Latency of verifying a transaction",
249 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
250 registry,
251 )
252 .unwrap(),
253 cert_verification_latency: register_histogram_with_registry!(
254 "validator_service_cert_verification_latency",
255 "Latency of verifying a certificate",
256 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
257 registry,
258 )
259 .unwrap(),
260 consensus_latency: register_histogram_with_registry!(
261 "validator_service_consensus_latency",
262 "Time spent between submitting a txn to consensus and getting back local acknowledgement. Execution and finalization time are not included.",
263 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
264 registry,
265 )
266 .unwrap(),
267 handle_transaction_latency: register_histogram_with_registry!(
268 "validator_service_handle_transaction_latency",
269 "Latency of handling a transaction",
270 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
271 registry,
272 )
273 .unwrap(),
274 handle_certificate_consensus_latency: register_histogram_with_registry!(
275 "validator_service_handle_certificate_consensus_latency",
276 "Latency of handling a consensus transaction certificate",
277 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
278 registry,
279 )
280 .unwrap(),
281 submit_certificate_consensus_latency: register_histogram_with_registry!(
282 "validator_service_submit_certificate_consensus_latency",
283 "Latency of submit_certificate RPC handler",
284 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
285 registry,
286 )
287 .unwrap(),
288 handle_certificate_non_consensus_latency: register_histogram_with_registry!(
289 "validator_service_handle_certificate_non_consensus_latency",
290 "Latency of handling a non-consensus transaction certificate",
291 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
292 registry,
293 )
294 .unwrap(),
295 handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
296 "validator_service_handle_soft_bundle_certificates_consensus_latency",
297 "Latency of handling a consensus soft bundle",
298 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
299 registry,
300 )
301 .unwrap(),
302 handle_soft_bundle_certificates_count: register_histogram_with_registry!(
303 "handle_soft_bundle_certificates_count",
304 "The number of certificates included in a soft bundle",
305 mysten_metrics::COUNT_BUCKETS.to_vec(),
306 registry,
307 )
308 .unwrap(),
309 handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
310 "handle_soft_bundle_certificates_size_bytes",
311 "The size of soft bundle in bytes",
312 mysten_metrics::BYTES_BUCKETS.to_vec(),
313 registry,
314 )
315 .unwrap(),
316 handle_transaction_consensus_latency: register_histogram_with_registry!(
317 "validator_service_handle_transaction_consensus_latency",
318 "Latency of handling a user transaction sent through consensus",
319 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
320 registry,
321 )
322 .unwrap(),
323 handle_submit_transaction_consensus_latency: register_histogram_vec_with_registry!(
324 "validator_service_submit_transaction_consensus_latency",
325 "Latency of submitting a user transaction sent through consensus",
326 &["req_type"],
327 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
328 registry,
329 )
330 .unwrap(),
331 handle_submit_transaction_latency: register_histogram_vec_with_registry!(
332 "validator_service_submit_transaction_latency",
333 "Latency of submit transaction handler",
334 &["req_type"],
335 mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
336 registry,
337 )
338 .unwrap(),
339 handle_wait_for_effects_ping_latency: register_histogram_vec_with_registry!(
340 "validator_service_handle_wait_for_effects_ping_latency",
341 "Latency of handling a ping request for wait_for_effects",
342 &["req_type"],
343 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
344 registry,
345 )
346 .unwrap(),
347 handle_submit_transaction_bytes: register_histogram_vec_with_registry!(
348 "validator_service_submit_transaction_bytes",
349 "The size of transactions in the submit transaction request",
350 &["req_type"],
351 mysten_metrics::BYTES_BUCKETS.to_vec(),
352 registry,
353 )
354 .unwrap(),
355 handle_submit_transaction_batch_size: register_histogram_vec_with_registry!(
356 "validator_service_submit_transaction_batch_size",
357 "The number of transactions in the submit transaction request",
358 &["req_type"],
359 mysten_metrics::COUNT_BUCKETS.to_vec(),
360 registry,
361 )
362 .unwrap(),
363 num_rejected_tx_in_epoch_boundary: register_int_counter_with_registry!(
364 "validator_service_num_rejected_tx_in_epoch_boundary",
365 "Number of rejected transaction during epoch transitioning",
366 registry,
367 )
368 .unwrap(),
369 num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
370 "validator_service_num_rejected_cert_in_epoch_boundary",
371 "Number of rejected transaction certificate during epoch transitioning",
372 registry,
373 )
374 .unwrap(),
375 num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
376 "validator_service_num_rejected_tx_during_overload",
377 "Number of rejected transaction due to system overload",
378 &["error_type"],
379 registry,
380 )
381 .unwrap(),
382 num_rejected_cert_during_overload: register_int_counter_vec_with_registry!(
383 "validator_service_num_rejected_cert_during_overload",
384 "Number of rejected transaction certificate due to system overload",
385 &["error_type"],
386 registry,
387 )
388 .unwrap(),
389 submission_rejected_transactions: register_int_counter_vec_with_registry!(
390 "validator_service_submission_rejected_transactions",
391 "Number of transactions rejected during submission",
392 &["reason"],
393 registry,
394 )
395 .unwrap(),
396 connection_ip_not_found: register_int_counter_with_registry!(
397 "validator_service_connection_ip_not_found",
398 "Number of times connection IP was not extractable from request",
399 registry,
400 )
401 .unwrap(),
402 forwarded_header_parse_error: register_int_counter_with_registry!(
403 "validator_service_forwarded_header_parse_error",
404 "Number of times x-forwarded-for header could not be parsed",
405 registry,
406 )
407 .unwrap(),
408 forwarded_header_invalid: register_int_counter_with_registry!(
409 "validator_service_forwarded_header_invalid",
410 "Number of times x-forwarded-for header was invalid",
411 registry,
412 )
413 .unwrap(),
414 forwarded_header_not_included: register_int_counter_with_registry!(
415 "validator_service_forwarded_header_not_included",
416 "Number of times x-forwarded-for header was (unexpectedly) not included in request",
417 registry,
418 )
419 .unwrap(),
420 client_id_source_config_mismatch: register_int_counter_with_registry!(
421 "validator_service_client_id_source_config_mismatch",
422 "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
423 registry,
424 )
425 .unwrap(),
426 x_forwarded_for_num_hops: register_gauge_with_registry!(
427 "validator_service_x_forwarded_for_num_hops",
428 "Number of hops in x-forwarded-for header",
429 registry,
430 )
431 .unwrap(),
432 }
433 }
434
435 pub fn new_for_tests() -> Self {
436 let registry = Registry::new();
437 Self::new(®istry)
438 }
439}
440
441#[derive(Clone)]
442pub struct ValidatorService {
443 state: Arc<AuthorityState>,
444 consensus_adapter: Arc<ConsensusAdapter>,
445 metrics: Arc<ValidatorServiceMetrics>,
446 traffic_controller: Option<Arc<TrafficController>>,
447 client_id_source: Option<ClientIdSource>,
448}
449
450impl ValidatorService {
451 pub fn new(
452 state: Arc<AuthorityState>,
453 consensus_adapter: Arc<ConsensusAdapter>,
454 validator_metrics: Arc<ValidatorServiceMetrics>,
455 client_id_source: Option<ClientIdSource>,
456 ) -> Self {
457 let traffic_controller = state.traffic_controller.clone();
458 Self {
459 state,
460 consensus_adapter,
461 metrics: validator_metrics,
462 traffic_controller,
463 client_id_source,
464 }
465 }
466
467 pub fn new_for_tests(
468 state: Arc<AuthorityState>,
469 consensus_adapter: Arc<ConsensusAdapter>,
470 metrics: Arc<ValidatorServiceMetrics>,
471 ) -> Self {
472 Self {
473 state,
474 consensus_adapter,
475 metrics,
476 traffic_controller: None,
477 client_id_source: None,
478 }
479 }
480
481 pub fn validator_state(&self) -> &Arc<AuthorityState> {
482 &self.state
483 }
484
485 pub async fn execute_certificate_for_testing(
486 &self,
487 cert: CertifiedTransaction,
488 ) -> Result<tonic::Response<HandleCertificateResponseV2>, tonic::Status> {
489 let request = make_tonic_request_for_testing(cert);
490 self.handle_certificate_v2(request).await
491 }
492
493 pub async fn handle_transaction_for_benchmarking(
494 &self,
495 transaction: Transaction,
496 ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
497 let request = make_tonic_request_for_testing(transaction);
498 self.transaction(request).await
499 }
500
501 async fn handle_transaction(
504 &self,
505 request: tonic::Request<Transaction>,
506 ) -> WrappedServiceResponse<HandleTransactionResponse> {
507 let Self {
508 state,
509 consensus_adapter,
510 metrics,
511 traffic_controller: _,
512 client_id_source: _,
513 } = self.clone();
514 let transaction = request.into_inner();
515 let epoch_store = state.load_epoch_store_one_call_per_task();
516
517 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
518
519 let mut validator_pushback_error = None;
527 let overload_check_res = state.check_system_overload(
528 &*consensus_adapter,
529 transaction.data(),
530 state.check_system_overload_at_signing(),
531 );
532 if let Err(error) = overload_check_res {
533 metrics
534 .num_rejected_tx_during_overload
535 .with_label_values(&[error.as_ref()])
536 .inc();
537 match error.as_inner() {
539 SuiErrorKind::ValidatorOverloadedRetryAfter { .. } => {
540 validator_pushback_error = Some(error)
541 }
542 _ => return Err(error.into()),
543 }
544 }
545
546 let _handle_tx_metrics_guard = metrics.handle_transaction_latency.start_timer();
547
548 let tx_verif_metrics_guard = metrics.tx_verification_latency.start_timer();
549 let transaction = epoch_store.verify_transaction(transaction).tap_err(|_| {
550 metrics.signature_errors.inc();
551 })?;
552 drop(tx_verif_metrics_guard);
553
554 let tx_digest = transaction.digest();
555
556 let span = error_span!("ValidatorService::validator_state_process_tx", ?tx_digest);
558
559 let info = state
560 .handle_transaction(&epoch_store, transaction.clone())
561 .instrument(span)
562 .await
563 .tap_err(|e| {
564 if let SuiErrorKind::ValidatorHaltedAtEpochEnd = e.as_inner() {
565 metrics.num_rejected_tx_in_epoch_boundary.inc();
566 }
567 })?;
568
569 if let Some(error) = validator_pushback_error {
570 return Err(error.into());
573 }
574
575 Ok((tonic::Response::new(info), Weight::zero()))
576 }
577
578 #[instrument(
579 name = "ValidatorService::handle_submit_transaction",
580 level = "error",
581 skip_all,
582 err(level = "debug")
583 )]
584 async fn handle_submit_transaction(
585 &self,
586 request: tonic::Request<RawSubmitTxRequest>,
587 ) -> WrappedServiceResponse<RawSubmitTxResponse> {
588 let Self {
589 state,
590 consensus_adapter,
591 metrics,
592 traffic_controller: _,
593 client_id_source,
594 } = self.clone();
595
596 let submitter_client_addr = if let Some(client_id_source) = &client_id_source {
597 self.get_client_ip_addr(&request, client_id_source)
598 } else {
599 self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
600 };
601
602 let epoch_store = state.load_epoch_store_one_call_per_task();
603 if !epoch_store.protocol_config().mysticeti_fastpath() {
604 return Err(SuiErrorKind::UnsupportedFeatureError {
605 error: "Mysticeti fastpath".to_string(),
606 }
607 .into());
608 }
609
610 let request = request.into_inner();
611 let submit_type = SubmitTxType::try_from(request.submit_type).map_err(|e| {
612 SuiErrorKind::GrpcMessageDeserializeError {
613 type_info: "RawSubmitTxRequest.submit_type".to_string(),
614 error: e.to_string(),
615 }
616 })?;
617
618 let is_ping_request = submit_type == SubmitTxType::Ping;
619 if is_ping_request {
620 fp_ensure!(
621 request.transactions.is_empty(),
622 SuiErrorKind::InvalidRequest(format!(
623 "Ping request cannot contain {} transactions",
624 request.transactions.len()
625 ))
626 .into()
627 );
628 } else {
629 fp_ensure!(
631 !request.transactions.is_empty(),
632 SuiErrorKind::InvalidRequest(
633 "At least one transaction needs to be submitted".to_string(),
634 )
635 .into()
636 );
637 }
638
639 let is_soft_bundle_request = submit_type == SubmitTxType::SoftBundle;
644
645 let max_num_transactions = if is_soft_bundle_request {
646 epoch_store.protocol_config().max_soft_bundle_size()
649 } else {
650 epoch_store
652 .protocol_config()
653 .max_num_transactions_in_block()
654 };
655 fp_ensure!(
656 request.transactions.len() <= max_num_transactions as usize,
657 SuiErrorKind::InvalidRequest(format!(
658 "Too many transactions in request: {} vs {}",
659 request.transactions.len(),
660 max_num_transactions
661 ))
662 .into()
663 );
664
665 let mut tx_digests = Vec::with_capacity(request.transactions.len());
667 let mut consensus_transactions = Vec::with_capacity(request.transactions.len());
669 let mut transaction_indexes = Vec::with_capacity(request.transactions.len());
671 let mut results: Vec<Option<SubmitTxResult>> = vec![None; request.transactions.len()];
673 let mut total_size_bytes = 0;
675
676 let req_type = if is_ping_request {
677 "ping"
678 } else if request.transactions.len() == 1 {
679 "single_transaction"
680 } else if is_soft_bundle_request {
681 "soft_bundle"
682 } else {
683 "batch"
684 };
685
686 let _handle_tx_metrics_guard = metrics
687 .handle_submit_transaction_latency
688 .with_label_values(&[req_type])
689 .start_timer();
690
691 for (idx, tx_bytes) in request.transactions.iter().enumerate() {
692 let transaction = match bcs::from_bytes::<Transaction>(tx_bytes) {
693 Ok(txn) => txn,
694 Err(e) => {
695 return Err(SuiErrorKind::TransactionDeserializationError {
697 error: format!("Failed to deserialize transaction at index {}: {}", idx, e),
698 }
699 .into());
700 }
701 };
702
703 let tx_size = transaction.validity_check(&epoch_store.tx_validity_check_context())?;
705
706 let overload_check_res = self.state.check_system_overload(
707 &*consensus_adapter,
708 transaction.data(),
709 state.check_system_overload_at_signing(),
710 );
711 if let Err(error) = overload_check_res {
712 metrics
713 .num_rejected_tx_during_overload
714 .with_label_values(&[error.as_ref()])
715 .inc();
716 results[idx] = Some(SubmitTxResult::Rejected { error });
717 continue;
718 }
719
720 let verified_transaction = {
722 let _metrics_guard = metrics.tx_verification_latency.start_timer();
723 match epoch_store.verify_transaction(transaction) {
724 Ok(txn) => txn,
725 Err(e) => {
726 metrics.signature_errors.inc();
727 return Err(e.into());
728 }
729 }
730 };
731
732 let tx_digest = verified_transaction.digest();
733 tx_digests.push(*tx_digest);
734
735 debug!(
736 ?tx_digest,
737 "handle_submit_transaction: verified transaction"
738 );
739
740 if let Some(effects) = self
743 .state
744 .get_transaction_cache_reader()
745 .get_executed_effects(tx_digest)
746 {
747 let effects_digest = effects.digest();
748 if let Ok(executed_data) = self.complete_executed_data(effects, None).await {
749 let executed_result = SubmitTxResult::Executed {
750 effects_digest,
751 details: Some(executed_data),
752 fast_path: false,
753 };
754 results[idx] = Some(executed_result);
755 debug!(?tx_digest, "handle_submit_transaction: already executed");
756 continue;
757 }
758 }
759
760 debug!(
761 ?tx_digest,
762 "handle_submit_transaction: waiting for fastpath dependency objects"
763 );
764 if !state
765 .wait_for_fastpath_dependency_objects(&verified_transaction, epoch_store.epoch())
766 .await?
767 {
768 debug!(
769 ?tx_digest,
770 "fastpath input objects are still unavailable after waiting"
771 );
772 }
773
774 match state.handle_vote_transaction(&epoch_store, verified_transaction.clone()) {
775 Ok(_) => { }
776 Err(e) => {
777 if let Some(effects) = self
780 .state
781 .get_transaction_cache_reader()
782 .get_executed_effects(tx_digest)
783 {
784 let effects_digest = effects.digest();
785 if let Ok(executed_data) = self.complete_executed_data(effects, None).await
786 {
787 let executed_result = SubmitTxResult::Executed {
788 effects_digest,
789 details: Some(executed_data),
790 fast_path: false,
791 };
792 results[idx] = Some(executed_result);
793 continue;
794 }
795 }
796
797 debug!(?tx_digest, "Transaction rejected during submission: {e}");
799 metrics
800 .submission_rejected_transactions
801 .with_label_values(&[e.to_variant_name()])
802 .inc();
803 results[idx] = Some(SubmitTxResult::Rejected { error: e });
804 continue;
805 }
806 }
807
808 consensus_transactions.push(ConsensusTransaction::new_user_transaction_message(
809 &self.state.name,
810 verified_transaction.into(),
811 ));
812 transaction_indexes.push(idx);
813 total_size_bytes += tx_size;
814 }
815
816 if consensus_transactions.is_empty() && !is_ping_request {
817 return Ok((
818 tonic::Response::new(Self::try_from_submit_tx_response(results)?),
819 Weight::zero(),
820 ));
821 }
822
823 let max_transaction_bytes = if is_soft_bundle_request {
827 epoch_store
828 .protocol_config()
829 .consensus_max_transactions_in_block_bytes()
830 / 2
831 } else {
832 epoch_store
833 .protocol_config()
834 .consensus_max_transactions_in_block_bytes()
835 };
836 fp_ensure!(
837 total_size_bytes <= max_transaction_bytes as usize,
838 SuiErrorKind::UserInputError {
839 error: UserInputError::TotalTransactionSizeTooLargeInBatch {
840 size: total_size_bytes,
841 limit: max_transaction_bytes,
842 },
843 }
844 .into()
845 );
846
847 metrics
848 .handle_submit_transaction_bytes
849 .with_label_values(&[req_type])
850 .observe(total_size_bytes as f64);
851 metrics
852 .handle_submit_transaction_batch_size
853 .with_label_values(&[req_type])
854 .observe(consensus_transactions.len() as f64);
855
856 let _latency_metric_guard = metrics
857 .handle_submit_transaction_consensus_latency
858 .with_label_values(&[req_type])
859 .start_timer();
860
861 let consensus_positions = if is_soft_bundle_request || is_ping_request {
862 assert!(
865 is_ping_request || !consensus_transactions.is_empty(),
866 "A valid soft bundle must have at least one transaction"
867 );
868 debug!(
869 "handle_submit_transaction: submitting consensus transactions ({}): {}",
870 req_type,
871 consensus_transactions
872 .iter()
873 .map(|t| t.local_display())
874 .join(", ")
875 );
876 self.handle_submit_to_consensus_for_position(
877 consensus_transactions,
878 &epoch_store,
879 submitter_client_addr,
880 )
881 .await?
882 } else {
883 let futures = consensus_transactions.into_iter().map(|t| {
884 debug!(
885 "handle_submit_transaction: submitting consensus transaction ({}): {}",
886 req_type,
887 t.local_display(),
888 );
889 self.handle_submit_to_consensus_for_position(
890 vec![t],
891 &epoch_store,
892 submitter_client_addr,
893 )
894 });
895 future::try_join_all(futures)
896 .await?
897 .into_iter()
898 .flatten()
899 .collect()
900 };
901
902 if is_ping_request {
903 assert_eq!(consensus_positions.len(), 1);
905 results.push(Some(SubmitTxResult::Submitted {
906 consensus_position: consensus_positions[0],
907 }));
908 } else {
909 for ((idx, tx_digest), consensus_position) in transaction_indexes
911 .into_iter()
912 .zip(tx_digests)
913 .zip(consensus_positions)
914 {
915 debug!(
916 ?tx_digest,
917 "handle_submit_transaction: submitted consensus transaction at {}",
918 consensus_position,
919 );
920 results[idx] = Some(SubmitTxResult::Submitted { consensus_position });
921 }
922 }
923
924 Ok((
925 tonic::Response::new(Self::try_from_submit_tx_response(results)?),
926 Weight::zero(),
927 ))
928 }
929
930 fn try_from_submit_tx_response(
931 results: Vec<Option<SubmitTxResult>>,
932 ) -> Result<RawSubmitTxResponse, SuiError> {
933 let mut raw_results = Vec::new();
934 for (i, result) in results.into_iter().enumerate() {
935 let result = result.ok_or_else(|| SuiErrorKind::GenericAuthorityError {
936 error: format!("Missing transaction result at {}", i),
937 })?;
938 let raw_result = result.try_into()?;
939 raw_results.push(raw_result);
940 }
941 Ok(RawSubmitTxResponse {
942 results: raw_results,
943 })
944 }
945
946 async fn handle_certificates(
952 &self,
953 certificates: NonEmpty<CertifiedTransaction>,
954 include_events: bool,
955 include_input_objects: bool,
956 include_output_objects: bool,
957 include_auxiliary_data: bool,
958 epoch_store: &Arc<AuthorityPerEpochStore>,
959 wait_for_effects: bool,
960 ) -> Result<(Option<Vec<HandleCertificateResponseV3>>, Weight), tonic::Status> {
961 fp_ensure!(
964 !self.state.is_fullnode(epoch_store),
965 SuiErrorKind::FullNodeCantHandleCertificate.into()
966 );
967
968 let is_consensus_tx = certificates.iter().any(|cert| cert.is_consensus_tx());
969
970 let metrics = if certificates.len() == 1 {
971 if wait_for_effects {
972 if is_consensus_tx {
973 &self.metrics.handle_certificate_consensus_latency
974 } else {
975 &self.metrics.handle_certificate_non_consensus_latency
976 }
977 } else {
978 &self.metrics.submit_certificate_consensus_latency
979 }
980 } else {
981 &self
983 .metrics
984 .handle_soft_bundle_certificates_consensus_latency
985 };
986
987 let _metrics_guard = metrics.start_timer();
988
989 if certificates.len() == 1 {
993 let tx_digest = *certificates[0].digest();
994 debug!(tx_digest=?tx_digest, "Checking if certificate is already executed");
995
996 if let Some(signed_effects) = self
997 .state
998 .get_signed_effects_and_maybe_resign(&tx_digest, epoch_store)?
999 {
1000 let events = if include_events && signed_effects.events_digest().is_some() {
1001 Some(
1002 self.state
1003 .get_transaction_events(signed_effects.transaction_digest())?,
1004 )
1005 } else {
1006 None
1007 };
1008
1009 return Ok((
1010 Some(vec![HandleCertificateResponseV3 {
1011 effects: signed_effects.into_inner(),
1012 events,
1013 input_objects: None,
1014 output_objects: None,
1015 auxiliary_data: None,
1016 }]),
1017 Weight::one(),
1018 ));
1019 };
1020 }
1021
1022 for certificate in &certificates {
1025 let overload_check_res = self.state.check_system_overload(
1026 &*self.consensus_adapter,
1027 certificate.data(),
1028 self.state.check_system_overload_at_execution(),
1029 );
1030 if let Err(error) = overload_check_res {
1031 self.metrics
1032 .num_rejected_cert_during_overload
1033 .with_label_values(&[error.as_ref()])
1034 .inc();
1035 return Err(error.into());
1036 }
1037 }
1038
1039 let verified_certificates = {
1040 let _timer = self.metrics.cert_verification_latency.start_timer();
1041 epoch_store
1042 .signature_verifier
1043 .multi_verify_certs(certificates.into())
1044 .await
1045 .into_iter()
1046 .collect::<Result<Vec<_>, _>>()?
1047 };
1048 let consensus_transactions =
1049 NonEmpty::collect(verified_certificates.iter().map(|certificate| {
1050 ConsensusTransaction::new_certificate_message(
1051 &self.state.name,
1052 certificate.clone().into(),
1053 )
1054 }))
1055 .unwrap();
1056
1057 let (responses, weight) = self
1058 .handle_submit_to_consensus(
1059 consensus_transactions,
1060 include_events,
1061 include_input_objects,
1062 include_output_objects,
1063 include_auxiliary_data,
1064 epoch_store,
1065 wait_for_effects,
1066 )
1067 .await?;
1068 let responses = if let Some(responses) = responses {
1070 Some(
1071 responses
1072 .into_iter()
1073 .map(|response| {
1074 let signed_effects =
1075 self.state.sign_effects(response.effects, epoch_store)?;
1076 Ok(HandleCertificateResponseV3 {
1077 effects: signed_effects.into_inner(),
1078 events: response.events,
1079 input_objects: if response.input_objects.is_empty() {
1080 None
1081 } else {
1082 Some(response.input_objects)
1083 },
1084 output_objects: if response.output_objects.is_empty() {
1085 None
1086 } else {
1087 Some(response.output_objects)
1088 },
1089 auxiliary_data: None,
1090 })
1091 })
1092 .collect::<Result<Vec<HandleCertificateResponseV3>, tonic::Status>>()?,
1093 )
1094 } else {
1095 None
1096 };
1097
1098 Ok((responses, weight))
1099 }
1100
1101 #[instrument(
1102 name = "ValidatorService::handle_submit_to_consensus_for_position",
1103 level = "debug",
1104 skip_all,
1105 err(level = "debug")
1106 )]
1107 async fn handle_submit_to_consensus_for_position(
1108 &self,
1109 consensus_transactions: Vec<ConsensusTransaction>,
1111 epoch_store: &Arc<AuthorityPerEpochStore>,
1112 submitter_client_addr: Option<IpAddr>,
1113 ) -> Result<Vec<ConsensusPosition>, tonic::Status> {
1114 let (tx_consensus_positions, rx_consensus_positions) = oneshot::channel();
1115
1116 {
1117 let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
1119 if !reconfiguration_lock.should_accept_user_certs() {
1120 self.metrics.num_rejected_cert_in_epoch_boundary.inc();
1121 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1122 }
1123
1124 let _metrics_guard = self.metrics.consensus_latency.start_timer();
1128
1129 self.consensus_adapter.submit_batch(
1130 &consensus_transactions,
1131 Some(&reconfiguration_lock),
1132 epoch_store,
1133 Some(tx_consensus_positions),
1134 submitter_client_addr,
1135 )?;
1136 }
1137
1138 Ok(rx_consensus_positions.await.map_err(|e| {
1139 SuiErrorKind::FailedToSubmitToConsensus(format!(
1140 "Failed to get consensus position: {e}"
1141 ))
1142 })?)
1143 }
1144
1145 async fn handle_submit_to_consensus(
1146 &self,
1147 consensus_transactions: NonEmpty<ConsensusTransaction>,
1148 include_events: bool,
1149 include_input_objects: bool,
1150 include_output_objects: bool,
1151 _include_auxiliary_data: bool,
1152 epoch_store: &Arc<AuthorityPerEpochStore>,
1153 wait_for_effects: bool,
1154 ) -> Result<(Option<Vec<ExecutedData>>, Weight), tonic::Status> {
1155 let consensus_transactions: Vec<_> = consensus_transactions.into();
1156 {
1157 let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
1159 if !reconfiguration_lock.should_accept_user_certs() {
1160 self.metrics.num_rejected_cert_in_epoch_boundary.inc();
1161 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1162 }
1163
1164 if !epoch_store.all_external_consensus_messages_processed(
1170 consensus_transactions.iter().map(|tx| tx.key()),
1171 )? {
1172 let _metrics_guard = self.metrics.consensus_latency.start_timer();
1173 self.consensus_adapter.submit_batch(
1174 &consensus_transactions,
1175 Some(&reconfiguration_lock),
1176 epoch_store,
1177 None,
1178 None, )?;
1180 }
1183 }
1184
1185 if !wait_for_effects {
1186 let fast_path_certificates = consensus_transactions
1189 .iter()
1190 .filter_map(|tx| {
1191 if let ConsensusTransactionKind::CertifiedTransaction(certificate) = &tx.kind {
1192 (!certificate.is_consensus_tx())
1193 .then_some((
1195 VerifiedExecutableTransaction::new_from_certificate(
1196 VerifiedCertificate::new_unchecked(*(certificate.clone())),
1197 ),
1198 ExecutionEnv::new()
1199 .with_scheduling_source(SchedulingSource::NonFastPath),
1200 ))
1201 } else {
1202 None
1203 }
1204 })
1205 .map(|(tx, env)| (Schedulable::Transaction(tx), env))
1206 .collect::<Vec<_>>();
1207 if !fast_path_certificates.is_empty() {
1208 self.state
1209 .execution_scheduler()
1210 .enqueue(fast_path_certificates, epoch_store);
1211 }
1212 return Ok((None, Weight::zero()));
1213 }
1214
1215 let responses = futures::future::try_join_all(consensus_transactions.into_iter().map(
1218 |tx| async move {
1219 let effects = match &tx.kind {
1220 ConsensusTransactionKind::CertifiedTransaction(certificate) => {
1221 let certificate = VerifiedCertificate::new_unchecked(*(certificate.clone()));
1223 self.state
1224 .wait_for_certificate_execution(&certificate, epoch_store)
1225 .await?
1226 }
1227 ConsensusTransactionKind::UserTransaction(tx) => {
1228 self.state.await_transaction_effects(*tx.digest(), epoch_store).await?
1229 }
1230 _ => panic!("`handle_submit_to_consensus` received transaction that is not a CertifiedTransaction or UserTransaction"),
1231 };
1232 let events = if include_events && effects.events_digest().is_some() {
1233 Some(self.state.get_transaction_events(effects.transaction_digest())?)
1234 } else {
1235 None
1236 };
1237
1238 let input_objects = if include_input_objects {
1239 self.state.get_transaction_input_objects(&effects)?
1240 } else {
1241 vec![]
1242 };
1243
1244 let output_objects = if include_output_objects {
1245 self.state.get_transaction_output_objects(&effects)?
1246 } else {
1247 vec![]
1248 };
1249
1250 if let ConsensusTransactionKind::CertifiedTransaction(certificate) = &tx.kind {
1251 epoch_store.insert_tx_cert_sig(certificate.digest(), certificate.auth_sig())?;
1252 }
1253
1254 Ok::<_, SuiError>(ExecutedData {
1255 effects,
1256 events,
1257 input_objects,
1258 output_objects,
1259 })
1260 },
1261 ))
1262 .await?;
1263
1264 Ok((Some(responses), Weight::zero()))
1265 }
1266
1267 async fn collect_effects_data(
1268 &self,
1269 effects: &TransactionEffects,
1270 include_events: bool,
1271 include_input_objects: bool,
1272 include_output_objects: bool,
1273 fastpath_outputs: Option<Arc<TransactionOutputs>>,
1274 ) -> SuiResult<(Option<TransactionEvents>, Vec<Object>, Vec<Object>)> {
1275 let events = if include_events && effects.events_digest().is_some() {
1276 if let Some(fastpath_outputs) = &fastpath_outputs {
1277 Some(fastpath_outputs.events.clone())
1278 } else {
1279 Some(
1280 self.state
1281 .get_transaction_events(effects.transaction_digest())?,
1282 )
1283 }
1284 } else {
1285 None
1286 };
1287
1288 let input_objects = if include_input_objects {
1289 self.state.get_transaction_input_objects(effects)?
1290 } else {
1291 vec![]
1292 };
1293
1294 let output_objects = if include_output_objects {
1295 if let Some(fastpath_outputs) = &fastpath_outputs {
1296 fastpath_outputs.written.values().cloned().collect()
1297 } else {
1298 self.state.get_transaction_output_objects(effects)?
1299 }
1300 } else {
1301 vec![]
1302 };
1303
1304 Ok((events, input_objects, output_objects))
1305 }
1306}
1307
1308type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
1309
1310impl ValidatorService {
1311 async fn transaction_impl(
1312 &self,
1313 request: tonic::Request<Transaction>,
1314 ) -> WrappedServiceResponse<HandleTransactionResponse> {
1315 self.handle_transaction(request).await
1316 }
1317
1318 async fn handle_submit_transaction_impl(
1319 &self,
1320 request: tonic::Request<RawSubmitTxRequest>,
1321 ) -> WrappedServiceResponse<RawSubmitTxResponse> {
1322 self.handle_submit_transaction(request).await
1323 }
1324
1325 async fn submit_certificate_impl(
1326 &self,
1327 request: tonic::Request<CertifiedTransaction>,
1328 ) -> WrappedServiceResponse<SubmitCertificateResponse> {
1329 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1330 let certificate = request.into_inner();
1331 certificate.validity_check(&epoch_store.tx_validity_check_context())?;
1332
1333 let span =
1334 error_span!("ValidatorService::submit_certificate", tx_digest = ?certificate.digest());
1335 self.handle_certificates(
1336 nonempty![certificate],
1337 true,
1338 false,
1339 false,
1340 false,
1341 &epoch_store,
1342 false,
1343 )
1344 .instrument(span)
1345 .await
1346 .map(|(executed, spam_weight)| {
1347 (
1348 tonic::Response::new(SubmitCertificateResponse {
1349 executed: executed.map(|mut x| x.remove(0)).map(Into::into),
1350 }),
1351 spam_weight,
1352 )
1353 })
1354 }
1355
1356 async fn handle_certificate_v2_impl(
1357 &self,
1358 request: tonic::Request<CertifiedTransaction>,
1359 ) -> WrappedServiceResponse<HandleCertificateResponseV2> {
1360 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1361 let certificate = request.into_inner();
1362 certificate.validity_check(&epoch_store.tx_validity_check_context())?;
1363
1364 let span = error_span!("ValidatorService::handle_certificate_v2", tx_digest = ?certificate.digest());
1365 self.handle_certificates(
1366 nonempty![certificate],
1367 true,
1368 false,
1369 false,
1370 false,
1371 &epoch_store,
1372 true,
1373 )
1374 .instrument(span)
1375 .await
1376 .map(|(resp, spam_weight)| {
1377 (
1378 tonic::Response::new(
1379 resp.expect(
1380 "handle_certificate should not return none with wait_for_effects=true",
1381 )
1382 .remove(0)
1383 .into(),
1384 ),
1385 spam_weight,
1386 )
1387 })
1388 }
1389
1390 async fn handle_certificate_v3_impl(
1391 &self,
1392 request: tonic::Request<HandleCertificateRequestV3>,
1393 ) -> WrappedServiceResponse<HandleCertificateResponseV3> {
1394 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1395 let request = request.into_inner();
1396 request
1397 .certificate
1398 .validity_check(&epoch_store.tx_validity_check_context())?;
1399
1400 let span = error_span!("ValidatorService::handle_certificate_v3", tx_digest = ?request.certificate.digest());
1401 self.handle_certificates(
1402 nonempty![request.certificate],
1403 request.include_events,
1404 request.include_input_objects,
1405 request.include_output_objects,
1406 request.include_auxiliary_data,
1407 &epoch_store,
1408 true,
1409 )
1410 .instrument(span)
1411 .await
1412 .map(|(resp, spam_weight)| {
1413 (
1414 tonic::Response::new(
1415 resp.expect(
1416 "handle_certificate should not return none with wait_for_effects=true",
1417 )
1418 .remove(0),
1419 ),
1420 spam_weight,
1421 )
1422 })
1423 }
1424
1425 async fn wait_for_effects_impl(
1426 &self,
1427 request: tonic::Request<RawWaitForEffectsRequest>,
1428 ) -> WrappedServiceResponse<RawWaitForEffectsResponse> {
1429 let request: WaitForEffectsRequest = request.into_inner().try_into()?;
1430 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1431 let response = timeout(
1432 Duration::from_secs(20),
1434 epoch_store
1435 .within_alive_epoch(self.wait_for_effects_response(request, &epoch_store))
1436 .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch())),
1437 )
1438 .await
1439 .map_err(|_| tonic::Status::internal("Timeout waiting for effects"))???
1440 .try_into()?;
1441 Ok((tonic::Response::new(response), Weight::zero()))
1442 }
1443
1444 #[instrument(name= "ValidatorService::wait_for_effects_response", level = "error", skip_all, fields(consensus_position = ?request.consensus_position, fast_path_effects = tracing::field::Empty))]
1445 async fn wait_for_effects_response(
1446 &self,
1447 request: WaitForEffectsRequest,
1448 epoch_store: &Arc<AuthorityPerEpochStore>,
1449 ) -> SuiResult<WaitForEffectsResponse> {
1450 if request.ping_type.is_some() {
1451 return timeout(
1452 Duration::from_secs(10),
1453 self.ping_response(request, epoch_store),
1454 )
1455 .await
1456 .map_err(|_| SuiErrorKind::TimeoutError)?;
1457 }
1458
1459 let Some(tx_digest) = request.transaction_digest else {
1460 return Err(SuiErrorKind::InvalidRequest(
1461 "Transaction digest is required for wait for effects requests".to_string(),
1462 )
1463 .into());
1464 };
1465 let tx_digests = [tx_digest];
1466
1467 let fastpath_effects_future: Pin<Box<dyn Future<Output = _> + Send>> =
1468 if let Some(consensus_position) = request.consensus_position {
1469 Box::pin(self.wait_for_fastpath_effects(
1470 consensus_position,
1471 &tx_digests,
1472 request.include_details,
1473 epoch_store,
1474 ))
1475 } else {
1476 Box::pin(futures::future::pending())
1477 };
1478
1479 tokio::select! {
1480 biased;
1482 mut effects = self.state
1489 .get_transaction_cache_reader()
1490 .notify_read_executed_effects(
1491 "AuthorityServer::wait_for_effects::notify_read_executed_effects_finalized",
1492 &tx_digests,
1493 ) => {
1494 tracing::Span::current().record("fast_path_effects", false);
1495 let effects = effects.pop().unwrap();
1496 let details = if request.include_details {
1497 Some(self.complete_executed_data(effects.clone(), None).await?)
1498 } else {
1499 None
1500 };
1501
1502 Ok(WaitForEffectsResponse::Executed {
1503 effects_digest: effects.digest(),
1504 details,
1505 fast_path: false,
1506 })
1507 }
1508
1509 fastpath_response = fastpath_effects_future => {
1510 tracing::Span::current().record("fast_path_effects", true);
1511 fastpath_response
1512 }
1513 }
1514 }
1515
1516 #[instrument(level = "error", skip_all, err(level = "debug"))]
1517 async fn ping_response(
1518 &self,
1519 request: WaitForEffectsRequest,
1520 epoch_store: &Arc<AuthorityPerEpochStore>,
1521 ) -> SuiResult<WaitForEffectsResponse> {
1522 let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1523 return Err(SuiErrorKind::UnsupportedFeatureError {
1524 error: "Mysticeti fastpath".to_string(),
1525 }
1526 .into());
1527 };
1528
1529 let Some(consensus_position) = request.consensus_position else {
1530 return Err(SuiErrorKind::InvalidRequest(
1531 "Consensus position is required for Ping requests".to_string(),
1532 )
1533 .into());
1534 };
1535
1536 let Some(ping) = request.ping_type else {
1538 return Err(SuiErrorKind::InvalidRequest(
1539 "Ping type is required for ping requests".to_string(),
1540 )
1541 .into());
1542 };
1543
1544 let _metrics_guard = self
1545 .metrics
1546 .handle_wait_for_effects_ping_latency
1547 .with_label_values(&[ping.as_str()])
1548 .start_timer();
1549
1550 consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1551
1552 let mut last_status = None;
1553 let details = if request.include_details {
1554 Some(Box::new(ExecutedData::default()))
1555 } else {
1556 None
1557 };
1558
1559 loop {
1560 let status = consensus_tx_status_cache
1561 .notify_read_transaction_status_change(consensus_position, last_status)
1562 .await;
1563 match status {
1564 NotifyReadConsensusTxStatusResult::Status(status) => match status {
1565 ConsensusTxStatus::FastpathCertified => {
1566 if ping == PingType::Consensus {
1568 last_status = Some(status);
1569 continue;
1570 }
1571 return Ok(WaitForEffectsResponse::Executed {
1572 effects_digest: TransactionEffectsDigest::ZERO,
1573 details,
1574 fast_path: true,
1575 });
1576 }
1577 ConsensusTxStatus::Rejected => {
1578 return Ok(WaitForEffectsResponse::Rejected { error: None });
1579 }
1580 ConsensusTxStatus::Finalized => {
1581 return Ok(WaitForEffectsResponse::Executed {
1582 effects_digest: TransactionEffectsDigest::ZERO,
1583 details,
1584 fast_path: false,
1585 });
1586 }
1587 },
1588 NotifyReadConsensusTxStatusResult::Expired(round) => {
1589 return Ok(WaitForEffectsResponse::Expired {
1590 epoch: epoch_store.epoch(),
1591 round: Some(round),
1592 });
1593 }
1594 }
1595 }
1596 }
1597
1598 #[instrument(level = "error", skip_all, err(level = "debug"))]
1599 async fn wait_for_fastpath_effects(
1600 &self,
1601 consensus_position: ConsensusPosition,
1602 tx_digests: &[TransactionDigest],
1603 include_details: bool,
1604 epoch_store: &Arc<AuthorityPerEpochStore>,
1605 ) -> SuiResult<WaitForEffectsResponse> {
1606 let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1607 return Err(SuiErrorKind::UnsupportedFeatureError {
1608 error: "Mysticeti fastpath".to_string(),
1609 }
1610 .into());
1611 };
1612
1613 let local_epoch = epoch_store.epoch();
1614 match consensus_position.epoch.cmp(&local_epoch) {
1615 Ordering::Less => {
1616 let response = WaitForEffectsResponse::Expired {
1619 epoch: local_epoch,
1620 round: None,
1621 };
1622 return Ok(response);
1623 }
1624 Ordering::Greater => {
1625 return Err(SuiErrorKind::WrongEpoch {
1627 expected_epoch: local_epoch,
1628 actual_epoch: consensus_position.epoch,
1629 }
1630 .into());
1631 }
1632 Ordering::Equal => {
1633 }
1636 };
1637
1638 consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1639
1640 let mut current_status = None;
1641 loop {
1642 tokio::select! {
1643 status_result = consensus_tx_status_cache
1644 .notify_read_transaction_status_change(consensus_position, current_status) => {
1645 match status_result {
1646 NotifyReadConsensusTxStatusResult::Status(new_status) => {
1647 match new_status {
1648 ConsensusTxStatus::Rejected => {
1649 return Ok(WaitForEffectsResponse::Rejected {
1650 error: epoch_store.get_rejection_vote_reason(
1651 consensus_position
1652 )
1653 });
1654 }
1655 ConsensusTxStatus::FastpathCertified => {
1656 current_status = Some(new_status);
1657 continue;
1658 }
1659 ConsensusTxStatus::Finalized => {
1660 current_status = Some(new_status);
1661 continue;
1662 }
1663 }
1664 }
1665 NotifyReadConsensusTxStatusResult::Expired(round) => {
1666 return Ok(WaitForEffectsResponse::Expired {
1667 epoch: epoch_store.epoch(),
1668 round: Some(round),
1669 });
1670 }
1671 }
1672 }
1673
1674 mut outputs = self.state.get_transaction_cache_reader().notify_read_fastpath_transaction_outputs(tx_digests),
1675 if current_status == Some(ConsensusTxStatus::FastpathCertified) || current_status == Some(ConsensusTxStatus::Finalized) => {
1676 let outputs = outputs.pop().unwrap();
1677 let effects = outputs.effects.clone();
1678
1679 let details = if include_details {
1680 Some(self.complete_executed_data(effects.clone(), Some(outputs)).await?)
1681 } else {
1682 None
1683 };
1684
1685 return Ok(WaitForEffectsResponse::Executed {
1686 effects_digest: effects.digest(),
1687 details,
1688 fast_path: current_status == Some(ConsensusTxStatus::FastpathCertified),
1689 });
1690 }
1691 }
1692 }
1693 }
1694
1695 async fn complete_executed_data(
1696 &self,
1697 effects: TransactionEffects,
1698 fastpath_outputs: Option<Arc<TransactionOutputs>>,
1699 ) -> SuiResult<Box<ExecutedData>> {
1700 let (events, input_objects, output_objects) = self
1701 .collect_effects_data(
1702 &effects,
1703 true,
1704 true,
1705 true,
1706 fastpath_outputs,
1707 )
1708 .await?;
1709 Ok(Box::new(ExecutedData {
1710 effects,
1711 events,
1712 input_objects,
1713 output_objects,
1714 }))
1715 }
1716
1717 async fn soft_bundle_validity_check(
1718 &self,
1719 certificates: &NonEmpty<CertifiedTransaction>,
1720 epoch_store: &Arc<AuthorityPerEpochStore>,
1721 total_size_bytes: u64,
1722 ) -> Result<(), tonic::Status> {
1723 let protocol_config = epoch_store.protocol_config();
1724 let node_config = &self.state.config;
1725
1726 fp_ensure!(
1732 protocol_config.soft_bundle() && node_config.enable_soft_bundle,
1733 SuiErrorKind::UnsupportedFeatureError {
1734 error: "Soft Bundle".to_string()
1735 }
1736 .into()
1737 );
1738
1739 fp_ensure!(
1746 certificates.len() as u64 <= protocol_config.max_soft_bundle_size(),
1747 SuiErrorKind::UserInputError {
1748 error: UserInputError::TooManyTransactionsInBatch {
1749 size: certificates.len(),
1750 limit: protocol_config.max_soft_bundle_size()
1751 }
1752 }
1753 .into()
1754 );
1755
1756 let soft_bundle_max_size_bytes =
1760 protocol_config.consensus_max_transactions_in_block_bytes() / 2;
1761 fp_ensure!(
1762 total_size_bytes <= soft_bundle_max_size_bytes,
1763 SuiErrorKind::UserInputError {
1764 error: UserInputError::TotalTransactionSizeTooLargeInBatch {
1765 size: total_size_bytes as usize,
1766 limit: soft_bundle_max_size_bytes,
1767 },
1768 }
1769 .into()
1770 );
1771
1772 let mut gas_price = None;
1773 for certificate in certificates {
1774 let tx_digest = *certificate.digest();
1775 fp_ensure!(
1776 certificate.is_consensus_tx(),
1777 SuiErrorKind::UserInputError {
1778 error: UserInputError::NoSharedObjectError { digest: tx_digest }
1779 }
1780 .into()
1781 );
1782 fp_ensure!(
1783 !self.state.is_tx_already_executed(&tx_digest),
1784 SuiErrorKind::UserInputError {
1785 error: UserInputError::AlreadyExecutedInSoftBundleError { digest: tx_digest }
1786 }
1787 .into()
1788 );
1789 if let Some(gas) = gas_price {
1790 fp_ensure!(
1791 gas == certificate.gas_price(),
1792 SuiErrorKind::UserInputError {
1793 error: UserInputError::GasPriceMismatchError {
1794 digest: tx_digest,
1795 expected: gas,
1796 actual: certificate.gas_price()
1797 }
1798 }
1799 .into()
1800 );
1801 } else {
1802 gas_price = Some(certificate.gas_price());
1803 }
1804 }
1805
1806 fp_ensure!(
1811 !epoch_store.is_any_tx_certs_consensus_message_processed(certificates.iter())?,
1812 SuiErrorKind::UserInputError {
1813 error: UserInputError::CertificateAlreadyProcessed
1814 }
1815 .into()
1816 );
1817
1818 Ok(())
1819 }
1820
1821 async fn handle_soft_bundle_certificates_v3_impl(
1822 &self,
1823 request: tonic::Request<HandleSoftBundleCertificatesRequestV3>,
1824 ) -> WrappedServiceResponse<HandleSoftBundleCertificatesResponseV3> {
1825 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1826 let client_addr = if self.client_id_source.is_none() {
1827 self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
1828 } else {
1829 self.get_client_ip_addr(&request, self.client_id_source.as_ref().unwrap())
1830 };
1831 let request = request.into_inner();
1832
1833 let certificates = NonEmpty::from_vec(request.certificates)
1834 .ok_or(SuiErrorKind::NoCertificateProvidedError)?;
1835 let mut total_size_bytes = 0;
1836 for certificate in &certificates {
1837 total_size_bytes +=
1839 certificate.validity_check(&epoch_store.tx_validity_check_context())? as u64;
1840 }
1841
1842 self.metrics
1843 .handle_soft_bundle_certificates_count
1844 .observe(certificates.len() as f64);
1845
1846 self.metrics
1847 .handle_soft_bundle_certificates_size_bytes
1848 .observe(total_size_bytes as f64);
1849
1850 self.soft_bundle_validity_check(&certificates, &epoch_store, total_size_bytes)
1852 .await?;
1853
1854 info!(
1855 "Received Soft Bundle with {} certificates, from {}, tx digests are [{}], total size [{}]bytes",
1856 certificates.len(),
1857 client_addr
1858 .map(|x| x.to_string())
1859 .unwrap_or_else(|| "unknown".to_string()),
1860 certificates
1861 .iter()
1862 .map(|x| x.digest().to_string())
1863 .collect::<Vec<_>>()
1864 .join(", "),
1865 total_size_bytes
1866 );
1867
1868 let span = error_span!("ValidatorService::handle_soft_bundle_certificates_v3");
1869 self.handle_certificates(
1870 certificates,
1871 request.include_events,
1872 request.include_input_objects,
1873 request.include_output_objects,
1874 request.include_auxiliary_data,
1875 &epoch_store,
1876 request.wait_for_effects,
1877 )
1878 .instrument(span)
1879 .await
1880 .map(|(resp, spam_weight)| {
1881 (
1882 tonic::Response::new(HandleSoftBundleCertificatesResponseV3 {
1883 responses: resp.unwrap_or_default(),
1884 }),
1885 spam_weight,
1886 )
1887 })
1888 }
1889
1890 async fn object_info_impl(
1891 &self,
1892 request: tonic::Request<ObjectInfoRequest>,
1893 ) -> WrappedServiceResponse<ObjectInfoResponse> {
1894 let request = request.into_inner();
1895 let response = self.state.handle_object_info_request(request).await?;
1896 Ok((tonic::Response::new(response), Weight::one()))
1897 }
1898
1899 async fn transaction_info_impl(
1900 &self,
1901 request: tonic::Request<TransactionInfoRequest>,
1902 ) -> WrappedServiceResponse<TransactionInfoResponse> {
1903 let request = request.into_inner();
1904 let response = self.state.handle_transaction_info_request(request).await?;
1905 Ok((tonic::Response::new(response), Weight::one()))
1906 }
1907
1908 async fn checkpoint_impl(
1909 &self,
1910 request: tonic::Request<CheckpointRequest>,
1911 ) -> WrappedServiceResponse<CheckpointResponse> {
1912 let request = request.into_inner();
1913 let response = self.state.handle_checkpoint_request(&request)?;
1914 Ok((tonic::Response::new(response), Weight::one()))
1915 }
1916
1917 async fn checkpoint_v2_impl(
1918 &self,
1919 request: tonic::Request<CheckpointRequestV2>,
1920 ) -> WrappedServiceResponse<CheckpointResponseV2> {
1921 let request = request.into_inner();
1922 let response = self.state.handle_checkpoint_request_v2(&request)?;
1923 Ok((tonic::Response::new(response), Weight::one()))
1924 }
1925
1926 async fn get_system_state_object_impl(
1927 &self,
1928 _request: tonic::Request<SystemStateRequest>,
1929 ) -> WrappedServiceResponse<SuiSystemState> {
1930 let response = self
1931 .state
1932 .get_object_cache_reader()
1933 .get_sui_system_state_object_unsafe()?;
1934 Ok((tonic::Response::new(response), Weight::one()))
1935 }
1936
1937 async fn validator_health_impl(
1938 &self,
1939 _request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1940 ) -> WrappedServiceResponse<sui_types::messages_grpc::RawValidatorHealthResponse> {
1941 let state = &self.state;
1942
1943 let epoch_store = state.load_epoch_store_one_call_per_task();
1945
1946 let num_inflight_execution_transactions =
1948 state.execution_scheduler().num_pending_certificates() as u64;
1949
1950 let num_inflight_consensus_transactions =
1952 self.consensus_adapter.num_inflight_transactions();
1953
1954 let last_committed_leader_round = epoch_store
1956 .consensus_tx_status_cache
1957 .as_ref()
1958 .and_then(|cache| cache.get_last_committed_leader_round())
1959 .unwrap_or(0);
1960
1961 let last_locally_built_checkpoint = epoch_store
1963 .last_built_checkpoint_summary()
1964 .ok()
1965 .flatten()
1966 .map(|(_, summary)| summary.sequence_number)
1967 .unwrap_or(0);
1968
1969 let typed_response = sui_types::messages_grpc::ValidatorHealthResponse {
1970 num_inflight_consensus_transactions,
1971 num_inflight_execution_transactions,
1972 last_locally_built_checkpoint,
1973 last_committed_leader_round,
1974 };
1975
1976 let raw_response = typed_response
1977 .try_into()
1978 .map_err(|e: sui_types::error::SuiError| {
1979 tonic::Status::internal(format!("Failed to serialize health response: {}", e))
1980 })?;
1981
1982 Ok((tonic::Response::new(raw_response), Weight::one()))
1983 }
1984
1985 fn get_client_ip_addr<T>(
1986 &self,
1987 request: &tonic::Request<T>,
1988 source: &ClientIdSource,
1989 ) -> Option<IpAddr> {
1990 let forwarded_header = request.metadata().get_all("x-forwarded-for").iter().next();
1991
1992 if let Some(header) = forwarded_header {
1993 let num_hops = header
1994 .to_str()
1995 .map(|h| h.split(',').count().saturating_sub(1))
1996 .unwrap_or(0);
1997
1998 self.metrics.x_forwarded_for_num_hops.set(num_hops as f64);
1999 }
2000
2001 match source {
2002 ClientIdSource::SocketAddr => {
2003 let socket_addr: Option<SocketAddr> = request.remote_addr();
2004
2005 if let Some(socket_addr) = socket_addr {
2011 Some(socket_addr.ip())
2012 } else {
2013 if cfg!(msim) {
2014 } else if cfg!(test) {
2016 panic!("Failed to get remote address from request");
2017 } else {
2018 self.metrics.connection_ip_not_found.inc();
2019 error!("Failed to get remote address from request");
2020 }
2021 None
2022 }
2023 }
2024 ClientIdSource::XForwardedFor(num_hops) => {
2025 let do_header_parse = |op: &MetadataValue<Ascii>| {
2026 match op.to_str() {
2027 Ok(header_val) => {
2028 let header_contents =
2029 header_val.split(',').map(str::trim).collect::<Vec<_>>();
2030 if *num_hops == 0 {
2031 error!(
2032 "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
2033 number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
2034 to this node. Skipping traffic controller request handling.",
2035 header_contents,
2036 );
2037 return None;
2038 }
2039 let contents_len = header_contents.len();
2040 if contents_len < *num_hops {
2041 error!(
2042 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
2043 Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
2044 `client-id-source` in the node config.",
2045 header_contents, contents_len, num_hops, contents_len,
2046 );
2047 self.metrics.client_id_source_config_mismatch.inc();
2048 return None;
2049 }
2050 let Some(client_ip) = header_contents.get(contents_len - num_hops)
2051 else {
2052 error!(
2053 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
2054 Expected at least {} values. Skipping traffic controller request handling.",
2055 header_contents, contents_len, num_hops, contents_len,
2056 );
2057 return None;
2058 };
2059 parse_ip(client_ip).or_else(|| {
2060 self.metrics.forwarded_header_parse_error.inc();
2061 None
2062 })
2063 }
2064 Err(e) => {
2065 self.metrics.forwarded_header_invalid.inc();
2069 error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
2070 None
2071 }
2072 }
2073 };
2074 if let Some(op) = request.metadata().get("x-forwarded-for") {
2075 do_header_parse(op)
2076 } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
2077 do_header_parse(op)
2078 } else {
2079 self.metrics.forwarded_header_not_included.inc();
2080 error!(
2081 "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
2082 );
2083 None
2084 }
2085 }
2086 }
2087 }
2088
2089 async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
2090 if let Some(traffic_controller) = &self.traffic_controller {
2091 if !traffic_controller.check(&client, &None).await {
2092 Err(tonic::Status::from_error(
2094 SuiErrorKind::TooManyRequests.into(),
2095 ))
2096 } else {
2097 Ok(())
2098 }
2099 } else {
2100 Ok(())
2101 }
2102 }
2103
2104 fn handle_traffic_resp<T>(
2105 &self,
2106 client: Option<IpAddr>,
2107 wrapped_response: WrappedServiceResponse<T>,
2108 ) -> Result<tonic::Response<T>, tonic::Status> {
2109 let (error, spam_weight, unwrapped_response) = match wrapped_response {
2110 Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
2111 Err(status) => (
2112 Some(SuiError::from(status.clone())),
2113 Weight::zero(),
2114 Err(status.clone()),
2115 ),
2116 };
2117
2118 if let Some(traffic_controller) = self.traffic_controller.clone() {
2119 traffic_controller.tally(TrafficTally {
2120 direct: client,
2121 through_fullnode: None,
2122 error_info: error.map(|e| {
2123 let error_type = String::from(e.clone().as_ref());
2124 let error_weight = normalize(e);
2125 (error_weight, error_type)
2126 }),
2127 spam_weight,
2128 timestamp: SystemTime::now(),
2129 })
2130 }
2131 unwrapped_response
2132 }
2133}
2134
2135fn make_tonic_request_for_testing<T>(message: T) -> tonic::Request<T> {
2136 let mut request = tonic::Request::new(message);
2139 let tcp_connect_info = TcpConnectInfo {
2140 local_addr: None,
2141 remote_addr: Some(SocketAddr::new([127, 0, 0, 1].into(), 0)),
2142 };
2143 request.extensions_mut().insert(tcp_connect_info);
2144 request
2145}
2146
2147fn normalize(err: SuiError) -> Weight {
2149 match err.as_inner() {
2150 SuiErrorKind::UserInputError {
2151 error: UserInputError::IncorrectUserSignature { .. },
2152 } => Weight::one(),
2153 SuiErrorKind::InvalidSignature { .. }
2154 | SuiErrorKind::SignerSignatureAbsent { .. }
2155 | SuiErrorKind::SignerSignatureNumberMismatch { .. }
2156 | SuiErrorKind::IncorrectSigner { .. }
2157 | SuiErrorKind::UnknownSigner { .. }
2158 | SuiErrorKind::WrongEpoch { .. } => Weight::one(),
2159 _ => Weight::zero(),
2160 }
2161}
2162
2163#[macro_export]
2167macro_rules! handle_with_decoration {
2168 ($self:ident, $func_name:ident, $request:ident) => {{
2169 if $self.client_id_source.is_none() {
2170 return $self.$func_name($request).await.map(|(result, _)| result);
2171 }
2172
2173 let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
2174
2175 $self.handle_traffic_req(client.clone()).await?;
2177
2178 let wrapped_response = $self.$func_name($request).await;
2180 $self.handle_traffic_resp(client, wrapped_response)
2181 }};
2182}
2183
2184#[async_trait]
2185impl Validator for ValidatorService {
2186 async fn submit_transaction(
2187 &self,
2188 request: tonic::Request<RawSubmitTxRequest>,
2189 ) -> Result<tonic::Response<RawSubmitTxResponse>, tonic::Status> {
2190 let validator_service = self.clone();
2191
2192 spawn_monitored_task!(async move {
2195 handle_with_decoration!(validator_service, handle_submit_transaction_impl, request)
2198 })
2199 .await
2200 .unwrap()
2201 }
2202
2203 async fn transaction(
2204 &self,
2205 request: tonic::Request<Transaction>,
2206 ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
2207 let validator_service = self.clone();
2208
2209 spawn_monitored_task!(async move {
2212 handle_with_decoration!(validator_service, transaction_impl, request)
2215 })
2216 .await
2217 .unwrap()
2218 }
2219
2220 async fn submit_certificate(
2221 &self,
2222 request: tonic::Request<CertifiedTransaction>,
2223 ) -> Result<tonic::Response<SubmitCertificateResponse>, tonic::Status> {
2224 let validator_service = self.clone();
2225
2226 spawn_monitored_task!(async move {
2229 handle_with_decoration!(validator_service, submit_certificate_impl, request)
2232 })
2233 .await
2234 .unwrap()
2235 }
2236
2237 async fn handle_certificate_v2(
2238 &self,
2239 request: tonic::Request<CertifiedTransaction>,
2240 ) -> Result<tonic::Response<HandleCertificateResponseV2>, tonic::Status> {
2241 handle_with_decoration!(self, handle_certificate_v2_impl, request)
2242 }
2243
2244 async fn handle_certificate_v3(
2245 &self,
2246 request: tonic::Request<HandleCertificateRequestV3>,
2247 ) -> Result<tonic::Response<HandleCertificateResponseV3>, tonic::Status> {
2248 handle_with_decoration!(self, handle_certificate_v3_impl, request)
2249 }
2250
2251 async fn wait_for_effects(
2252 &self,
2253 request: tonic::Request<RawWaitForEffectsRequest>,
2254 ) -> Result<tonic::Response<RawWaitForEffectsResponse>, tonic::Status> {
2255 handle_with_decoration!(self, wait_for_effects_impl, request)
2256 }
2257
2258 async fn handle_soft_bundle_certificates_v3(
2259 &self,
2260 request: tonic::Request<HandleSoftBundleCertificatesRequestV3>,
2261 ) -> Result<tonic::Response<HandleSoftBundleCertificatesResponseV3>, tonic::Status> {
2262 handle_with_decoration!(self, handle_soft_bundle_certificates_v3_impl, request)
2263 }
2264
2265 async fn object_info(
2266 &self,
2267 request: tonic::Request<ObjectInfoRequest>,
2268 ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
2269 handle_with_decoration!(self, object_info_impl, request)
2270 }
2271
2272 async fn transaction_info(
2273 &self,
2274 request: tonic::Request<TransactionInfoRequest>,
2275 ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
2276 handle_with_decoration!(self, transaction_info_impl, request)
2277 }
2278
2279 async fn checkpoint(
2280 &self,
2281 request: tonic::Request<CheckpointRequest>,
2282 ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
2283 handle_with_decoration!(self, checkpoint_impl, request)
2284 }
2285
2286 async fn checkpoint_v2(
2287 &self,
2288 request: tonic::Request<CheckpointRequestV2>,
2289 ) -> Result<tonic::Response<CheckpointResponseV2>, tonic::Status> {
2290 handle_with_decoration!(self, checkpoint_v2_impl, request)
2291 }
2292
2293 async fn get_system_state_object(
2294 &self,
2295 request: tonic::Request<SystemStateRequest>,
2296 ) -> Result<tonic::Response<SuiSystemState>, tonic::Status> {
2297 handle_with_decoration!(self, get_system_state_object_impl, request)
2298 }
2299
2300 async fn validator_health(
2301 &self,
2302 request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
2303 ) -> Result<tonic::Response<sui_types::messages_grpc::RawValidatorHealthResponse>, tonic::Status>
2304 {
2305 handle_with_decoration!(self, validator_health_impl, request)
2306 }
2307}