1use anyhow::Result;
6use async_trait::async_trait;
7use fastcrypto::traits::KeyPair;
8use futures::{TryFutureExt, future};
9use itertools::Itertools as _;
10use mysten_metrics::spawn_monitored_task;
11use prometheus::{
12 Gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, Registry,
13 register_gauge_with_registry, register_histogram_vec_with_registry,
14 register_histogram_with_registry, register_int_counter_vec_with_registry,
15 register_int_counter_with_registry,
16};
17use std::{
18 cmp::Ordering,
19 future::Future,
20 io,
21 net::{IpAddr, SocketAddr},
22 pin::Pin,
23 sync::Arc,
24 time::{Duration, SystemTime},
25};
26use sui_network::{
27 api::{Validator, ValidatorServer},
28 tonic,
29 validator::server::SUI_TLS_SERVER_NAME,
30};
31use sui_types::message_envelope::Message;
32use sui_types::messages_consensus::ConsensusPosition;
33use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKind};
34use sui_types::messages_grpc::{
35 HandleCertificateRequestV3, HandleCertificateResponseV3, RawSubmitTxResponse,
36};
37use sui_types::messages_grpc::{
38 HandleCertificateResponseV2, HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse,
39 SubmitCertificateResponse, SystemStateRequest, TransactionInfoRequest, TransactionInfoResponse,
40};
41use sui_types::messages_grpc::{
42 HandleSoftBundleCertificatesRequestV3, HandleSoftBundleCertificatesResponseV3,
43};
44use sui_types::multiaddr::Multiaddr;
45use sui_types::object::Object;
46use sui_types::sui_system_state::SuiSystemState;
47use sui_types::traffic_control::{ClientIdSource, Weight};
48use sui_types::{
49 digests::{TransactionDigest, TransactionEffectsDigest},
50 error::{SuiErrorKind, UserInputError},
51};
52use sui_types::{
53 effects::TransactionEffects,
54 messages_grpc::{
55 ExecutedData, RawSubmitTxRequest, RawWaitForEffectsRequest, RawWaitForEffectsResponse,
56 SubmitTxResult, WaitForEffectsRequest, WaitForEffectsResponse,
57 },
58};
59use sui_types::{
60 effects::TransactionEffectsAPI, executable_transaction::VerifiedExecutableTransaction,
61};
62use sui_types::{effects::TransactionEvents, messages_grpc::SubmitTxType};
63use sui_types::{error::*, transaction::*};
64use sui_types::{
65 fp_ensure,
66 messages_checkpoint::{
67 CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
68 },
69};
70use tap::TapFallible;
71use tokio::sync::oneshot;
72use tokio::time::timeout;
73use tonic::metadata::{Ascii, MetadataValue};
74use tracing::{Instrument, debug, error, error_span, info, instrument};
75
76use crate::consensus_adapter::ConnectionMonitorStatusForTests;
77use crate::{
78 authority::{AuthorityState, consensus_tx_status_cache::ConsensusTxStatus},
79 consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics},
80 traffic_controller::{TrafficController, parse_ip, policies::TrafficTally},
81};
82use crate::{
83 authority::{
84 ExecutionEnv, authority_per_epoch_store::AuthorityPerEpochStore,
85 consensus_tx_status_cache::NotifyReadConsensusTxStatusResult,
86 shared_object_version_manager::Schedulable,
87 },
88 checkpoints::CheckpointStore,
89 execution_scheduler::SchedulingSource,
90 mysticeti_adapter::LazyMysticetiClient,
91 transaction_outputs::TransactionOutputs,
92};
93use nonempty::{NonEmpty, nonempty};
94use sui_config::local_ip_utils::new_local_tcp_address_for_testing;
95use sui_types::messages_grpc::PingType;
96use tonic::transport::server::TcpConnectInfo;
97
98#[cfg(test)]
99#[path = "unit_tests/server_tests.rs"]
100mod server_tests;
101
102#[cfg(test)]
103#[path = "unit_tests/wait_for_effects_tests.rs"]
104mod wait_for_effects_tests;
105
106#[cfg(test)]
107#[path = "unit_tests/submit_transaction_tests.rs"]
108mod submit_transaction_tests;
109
110pub struct AuthorityServerHandle {
111 server_handle: sui_network::validator::server::Server,
112}
113
114impl AuthorityServerHandle {
115 pub async fn join(self) -> Result<(), io::Error> {
116 self.server_handle.handle().wait_for_shutdown().await;
117 Ok(())
118 }
119
120 pub async fn kill(self) -> Result<(), io::Error> {
121 self.server_handle.handle().shutdown().await;
122 Ok(())
123 }
124
125 pub fn address(&self) -> &Multiaddr {
126 self.server_handle.local_addr()
127 }
128}
129
130pub struct AuthorityServer {
131 address: Multiaddr,
132 pub state: Arc<AuthorityState>,
133 consensus_adapter: Arc<ConsensusAdapter>,
134 pub metrics: Arc<ValidatorServiceMetrics>,
135}
136
137impl AuthorityServer {
138 pub fn new_for_test_with_consensus_adapter(
139 state: Arc<AuthorityState>,
140 consensus_adapter: Arc<ConsensusAdapter>,
141 ) -> Self {
142 let address = new_local_tcp_address_for_testing();
143 let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
144
145 Self {
146 address,
147 state,
148 consensus_adapter,
149 metrics,
150 }
151 }
152
153 pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
154 let consensus_adapter = Arc::new(ConsensusAdapter::new(
155 Arc::new(LazyMysticetiClient::new()),
156 CheckpointStore::new_for_tests(),
157 state.name,
158 Arc::new(ConnectionMonitorStatusForTests {}),
159 100_000,
160 100_000,
161 None,
162 None,
163 ConsensusAdapterMetrics::new_test(),
164 state.epoch_store_for_testing().protocol_config().clone(),
165 ));
166 Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
167 }
168
169 pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
170 let address = self.address.clone();
171 self.spawn_with_bind_address_for_test(address).await
172 }
173
174 pub async fn spawn_with_bind_address_for_test(
175 self,
176 address: Multiaddr,
177 ) -> Result<AuthorityServerHandle, io::Error> {
178 let tls_config = sui_tls::create_rustls_server_config(
179 self.state.config.network_key_pair().copy().private(),
180 SUI_TLS_SERVER_NAME.to_string(),
181 );
182 let config = mysten_network::config::Config::new();
183 let server = sui_network::validator::server::ServerBuilder::from_config(
184 &config,
185 mysten_network::metrics::DefaultMetricsCallbackProvider::default(),
186 )
187 .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
188 self.state,
189 self.consensus_adapter,
190 self.metrics,
191 )))
192 .bind(&address, Some(tls_config))
193 .await
194 .unwrap();
195 let local_addr = server.local_addr().to_owned();
196 info!("Listening to traffic on {local_addr}");
197 let handle = AuthorityServerHandle {
198 server_handle: server,
199 };
200 Ok(handle)
201 }
202}
203
204pub struct ValidatorServiceMetrics {
205 pub signature_errors: IntCounter,
206 pub tx_verification_latency: Histogram,
207 pub cert_verification_latency: Histogram,
208 pub consensus_latency: Histogram,
209 pub handle_transaction_latency: Histogram,
210 pub submit_certificate_consensus_latency: Histogram,
211 pub handle_certificate_consensus_latency: Histogram,
212 pub handle_certificate_non_consensus_latency: Histogram,
213 pub handle_soft_bundle_certificates_consensus_latency: Histogram,
214 pub handle_soft_bundle_certificates_count: Histogram,
215 pub handle_soft_bundle_certificates_size_bytes: Histogram,
216 pub handle_transaction_consensus_latency: Histogram,
217 pub handle_submit_transaction_consensus_latency: HistogramVec,
218 pub handle_wait_for_effects_ping_latency: HistogramVec,
219
220 handle_submit_transaction_latency: HistogramVec,
221 handle_submit_transaction_bytes: HistogramVec,
222 handle_submit_transaction_batch_size: HistogramVec,
223
224 num_rejected_tx_in_epoch_boundary: IntCounter,
225 num_rejected_cert_in_epoch_boundary: IntCounter,
226 num_rejected_tx_during_overload: IntCounterVec,
227 num_rejected_cert_during_overload: IntCounterVec,
228 submission_rejected_transactions: IntCounterVec,
229 connection_ip_not_found: IntCounter,
230 forwarded_header_parse_error: IntCounter,
231 forwarded_header_invalid: IntCounter,
232 forwarded_header_not_included: IntCounter,
233 client_id_source_config_mismatch: IntCounter,
234 x_forwarded_for_num_hops: Gauge,
235}
236
237impl ValidatorServiceMetrics {
238 pub fn new(registry: &Registry) -> Self {
239 Self {
240 signature_errors: register_int_counter_with_registry!(
241 "total_signature_errors",
242 "Number of transaction signature errors",
243 registry,
244 )
245 .unwrap(),
246 tx_verification_latency: register_histogram_with_registry!(
247 "validator_service_tx_verification_latency",
248 "Latency of verifying a transaction",
249 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
250 registry,
251 )
252 .unwrap(),
253 cert_verification_latency: register_histogram_with_registry!(
254 "validator_service_cert_verification_latency",
255 "Latency of verifying a certificate",
256 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
257 registry,
258 )
259 .unwrap(),
260 consensus_latency: register_histogram_with_registry!(
261 "validator_service_consensus_latency",
262 "Time spent between submitting a txn to consensus and getting back local acknowledgement. Execution and finalization time are not included.",
263 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
264 registry,
265 )
266 .unwrap(),
267 handle_transaction_latency: register_histogram_with_registry!(
268 "validator_service_handle_transaction_latency",
269 "Latency of handling a transaction",
270 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
271 registry,
272 )
273 .unwrap(),
274 handle_certificate_consensus_latency: register_histogram_with_registry!(
275 "validator_service_handle_certificate_consensus_latency",
276 "Latency of handling a consensus transaction certificate",
277 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
278 registry,
279 )
280 .unwrap(),
281 submit_certificate_consensus_latency: register_histogram_with_registry!(
282 "validator_service_submit_certificate_consensus_latency",
283 "Latency of submit_certificate RPC handler",
284 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
285 registry,
286 )
287 .unwrap(),
288 handle_certificate_non_consensus_latency: register_histogram_with_registry!(
289 "validator_service_handle_certificate_non_consensus_latency",
290 "Latency of handling a non-consensus transaction certificate",
291 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
292 registry,
293 )
294 .unwrap(),
295 handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
296 "validator_service_handle_soft_bundle_certificates_consensus_latency",
297 "Latency of handling a consensus soft bundle",
298 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
299 registry,
300 )
301 .unwrap(),
302 handle_soft_bundle_certificates_count: register_histogram_with_registry!(
303 "handle_soft_bundle_certificates_count",
304 "The number of certificates included in a soft bundle",
305 mysten_metrics::COUNT_BUCKETS.to_vec(),
306 registry,
307 )
308 .unwrap(),
309 handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
310 "handle_soft_bundle_certificates_size_bytes",
311 "The size of soft bundle in bytes",
312 mysten_metrics::BYTES_BUCKETS.to_vec(),
313 registry,
314 )
315 .unwrap(),
316 handle_transaction_consensus_latency: register_histogram_with_registry!(
317 "validator_service_handle_transaction_consensus_latency",
318 "Latency of handling a user transaction sent through consensus",
319 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
320 registry,
321 )
322 .unwrap(),
323 handle_submit_transaction_consensus_latency: register_histogram_vec_with_registry!(
324 "validator_service_submit_transaction_consensus_latency",
325 "Latency of submitting a user transaction sent through consensus",
326 &["req_type"],
327 mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
328 registry,
329 )
330 .unwrap(),
331 handle_submit_transaction_latency: register_histogram_vec_with_registry!(
332 "validator_service_submit_transaction_latency",
333 "Latency of submit transaction handler",
334 &["req_type"],
335 mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
336 registry,
337 )
338 .unwrap(),
339 handle_wait_for_effects_ping_latency: register_histogram_vec_with_registry!(
340 "validator_service_handle_wait_for_effects_ping_latency",
341 "Latency of handling a ping request for wait_for_effects",
342 &["req_type"],
343 mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
344 registry,
345 )
346 .unwrap(),
347 handle_submit_transaction_bytes: register_histogram_vec_with_registry!(
348 "validator_service_submit_transaction_bytes",
349 "The size of transactions in the submit transaction request",
350 &["req_type"],
351 mysten_metrics::BYTES_BUCKETS.to_vec(),
352 registry,
353 )
354 .unwrap(),
355 handle_submit_transaction_batch_size: register_histogram_vec_with_registry!(
356 "validator_service_submit_transaction_batch_size",
357 "The number of transactions in the submit transaction request",
358 &["req_type"],
359 mysten_metrics::COUNT_BUCKETS.to_vec(),
360 registry,
361 )
362 .unwrap(),
363 num_rejected_tx_in_epoch_boundary: register_int_counter_with_registry!(
364 "validator_service_num_rejected_tx_in_epoch_boundary",
365 "Number of rejected transaction during epoch transitioning",
366 registry,
367 )
368 .unwrap(),
369 num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
370 "validator_service_num_rejected_cert_in_epoch_boundary",
371 "Number of rejected transaction certificate during epoch transitioning",
372 registry,
373 )
374 .unwrap(),
375 num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
376 "validator_service_num_rejected_tx_during_overload",
377 "Number of rejected transaction due to system overload",
378 &["error_type"],
379 registry,
380 )
381 .unwrap(),
382 num_rejected_cert_during_overload: register_int_counter_vec_with_registry!(
383 "validator_service_num_rejected_cert_during_overload",
384 "Number of rejected transaction certificate due to system overload",
385 &["error_type"],
386 registry,
387 )
388 .unwrap(),
389 submission_rejected_transactions: register_int_counter_vec_with_registry!(
390 "validator_service_submission_rejected_transactions",
391 "Number of transactions rejected during submission",
392 &["reason"],
393 registry,
394 )
395 .unwrap(),
396 connection_ip_not_found: register_int_counter_with_registry!(
397 "validator_service_connection_ip_not_found",
398 "Number of times connection IP was not extractable from request",
399 registry,
400 )
401 .unwrap(),
402 forwarded_header_parse_error: register_int_counter_with_registry!(
403 "validator_service_forwarded_header_parse_error",
404 "Number of times x-forwarded-for header could not be parsed",
405 registry,
406 )
407 .unwrap(),
408 forwarded_header_invalid: register_int_counter_with_registry!(
409 "validator_service_forwarded_header_invalid",
410 "Number of times x-forwarded-for header was invalid",
411 registry,
412 )
413 .unwrap(),
414 forwarded_header_not_included: register_int_counter_with_registry!(
415 "validator_service_forwarded_header_not_included",
416 "Number of times x-forwarded-for header was (unexpectedly) not included in request",
417 registry,
418 )
419 .unwrap(),
420 client_id_source_config_mismatch: register_int_counter_with_registry!(
421 "validator_service_client_id_source_config_mismatch",
422 "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
423 registry,
424 )
425 .unwrap(),
426 x_forwarded_for_num_hops: register_gauge_with_registry!(
427 "validator_service_x_forwarded_for_num_hops",
428 "Number of hops in x-forwarded-for header",
429 registry,
430 )
431 .unwrap(),
432 }
433 }
434
435 pub fn new_for_tests() -> Self {
436 let registry = Registry::new();
437 Self::new(®istry)
438 }
439}
440
441#[derive(Clone)]
442pub struct ValidatorService {
443 state: Arc<AuthorityState>,
444 consensus_adapter: Arc<ConsensusAdapter>,
445 metrics: Arc<ValidatorServiceMetrics>,
446 traffic_controller: Option<Arc<TrafficController>>,
447 client_id_source: Option<ClientIdSource>,
448}
449
450impl ValidatorService {
451 pub fn new(
452 state: Arc<AuthorityState>,
453 consensus_adapter: Arc<ConsensusAdapter>,
454 validator_metrics: Arc<ValidatorServiceMetrics>,
455 client_id_source: Option<ClientIdSource>,
456 ) -> Self {
457 let traffic_controller = state.traffic_controller.clone();
458 Self {
459 state,
460 consensus_adapter,
461 metrics: validator_metrics,
462 traffic_controller,
463 client_id_source,
464 }
465 }
466
467 pub fn new_for_tests(
468 state: Arc<AuthorityState>,
469 consensus_adapter: Arc<ConsensusAdapter>,
470 metrics: Arc<ValidatorServiceMetrics>,
471 ) -> Self {
472 Self {
473 state,
474 consensus_adapter,
475 metrics,
476 traffic_controller: None,
477 client_id_source: None,
478 }
479 }
480
481 pub fn validator_state(&self) -> &Arc<AuthorityState> {
482 &self.state
483 }
484
485 pub async fn execute_certificate_for_testing(
486 &self,
487 cert: CertifiedTransaction,
488 ) -> Result<tonic::Response<HandleCertificateResponseV2>, tonic::Status> {
489 let request = make_tonic_request_for_testing(cert);
490 self.handle_certificate_v2(request).await
491 }
492
493 pub async fn handle_transaction_for_benchmarking(
494 &self,
495 transaction: Transaction,
496 ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
497 let request = make_tonic_request_for_testing(transaction);
498 self.transaction(request).await
499 }
500
501 async fn handle_transaction(
504 &self,
505 request: tonic::Request<Transaction>,
506 ) -> WrappedServiceResponse<HandleTransactionResponse> {
507 let Self {
508 state,
509 consensus_adapter,
510 metrics,
511 traffic_controller: _,
512 client_id_source: _,
513 } = self.clone();
514 let transaction = request.into_inner();
515 let epoch_store = state.load_epoch_store_one_call_per_task();
516
517 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
518
519 let mut validator_pushback_error = None;
527 let overload_check_res = state.check_system_overload(
528 &*consensus_adapter,
529 transaction.data(),
530 state.check_system_overload_at_signing(),
531 );
532 if let Err(error) = overload_check_res {
533 metrics
534 .num_rejected_tx_during_overload
535 .with_label_values(&[error.as_ref()])
536 .inc();
537 match error.as_inner() {
539 SuiErrorKind::ValidatorOverloadedRetryAfter { .. } => {
540 validator_pushback_error = Some(error)
541 }
542 _ => return Err(error.into()),
543 }
544 }
545
546 let _handle_tx_metrics_guard = metrics.handle_transaction_latency.start_timer();
547
548 let tx_verif_metrics_guard = metrics.tx_verification_latency.start_timer();
549 let transaction = epoch_store
550 .verify_transaction_require_no_aliases(transaction)
552 .tap_err(|_| {
553 metrics.signature_errors.inc();
554 })?
555 .into_tx();
556 drop(tx_verif_metrics_guard);
557
558 let tx_digest = transaction.digest();
559
560 let span = error_span!("ValidatorService::validator_state_process_tx", ?tx_digest);
562
563 let info = state
564 .handle_transaction(&epoch_store, transaction.clone())
565 .instrument(span)
566 .await
567 .tap_err(|e| {
568 if let SuiErrorKind::ValidatorHaltedAtEpochEnd = e.as_inner() {
569 metrics.num_rejected_tx_in_epoch_boundary.inc();
570 }
571 })?;
572
573 if let Some(error) = validator_pushback_error {
574 return Err(error.into());
577 }
578
579 Ok((tonic::Response::new(info), Weight::zero()))
580 }
581
582 #[instrument(
583 name = "ValidatorService::handle_submit_transaction",
584 level = "error",
585 skip_all,
586 err(level = "debug")
587 )]
588 async fn handle_submit_transaction(
589 &self,
590 request: tonic::Request<RawSubmitTxRequest>,
591 ) -> WrappedServiceResponse<RawSubmitTxResponse> {
592 let Self {
593 state,
594 consensus_adapter,
595 metrics,
596 traffic_controller: _,
597 client_id_source,
598 } = self.clone();
599
600 let submitter_client_addr = if let Some(client_id_source) = &client_id_source {
601 self.get_client_ip_addr(&request, client_id_source)
602 } else {
603 self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
604 };
605
606 let epoch_store = state.load_epoch_store_one_call_per_task();
607 if !epoch_store.protocol_config().mysticeti_fastpath() {
608 return Err(SuiErrorKind::UnsupportedFeatureError {
609 error: "Mysticeti fastpath".to_string(),
610 }
611 .into());
612 }
613
614 let request = request.into_inner();
615 let submit_type = SubmitTxType::try_from(request.submit_type).map_err(|e| {
616 SuiErrorKind::GrpcMessageDeserializeError {
617 type_info: "RawSubmitTxRequest.submit_type".to_string(),
618 error: e.to_string(),
619 }
620 })?;
621
622 let is_ping_request = submit_type == SubmitTxType::Ping;
623 if is_ping_request {
624 fp_ensure!(
625 request.transactions.is_empty(),
626 SuiErrorKind::InvalidRequest(format!(
627 "Ping request cannot contain {} transactions",
628 request.transactions.len()
629 ))
630 .into()
631 );
632 } else {
633 fp_ensure!(
635 !request.transactions.is_empty(),
636 SuiErrorKind::InvalidRequest(
637 "At least one transaction needs to be submitted".to_string(),
638 )
639 .into()
640 );
641 }
642
643 let is_soft_bundle_request = submit_type == SubmitTxType::SoftBundle;
648
649 let max_num_transactions = if is_soft_bundle_request {
650 epoch_store.protocol_config().max_soft_bundle_size()
653 } else {
654 epoch_store
656 .protocol_config()
657 .max_num_transactions_in_block()
658 };
659 fp_ensure!(
660 request.transactions.len() <= max_num_transactions as usize,
661 SuiErrorKind::InvalidRequest(format!(
662 "Too many transactions in request: {} vs {}",
663 request.transactions.len(),
664 max_num_transactions
665 ))
666 .into()
667 );
668
669 let mut tx_digests = Vec::with_capacity(request.transactions.len());
671 let mut consensus_transactions = Vec::with_capacity(request.transactions.len());
673 let mut transaction_indexes = Vec::with_capacity(request.transactions.len());
675 let mut results: Vec<Option<SubmitTxResult>> = vec![None; request.transactions.len()];
677 let mut total_size_bytes = 0;
679
680 let req_type = if is_ping_request {
681 "ping"
682 } else if request.transactions.len() == 1 {
683 "single_transaction"
684 } else if is_soft_bundle_request {
685 "soft_bundle"
686 } else {
687 "batch"
688 };
689
690 let _handle_tx_metrics_guard = metrics
691 .handle_submit_transaction_latency
692 .with_label_values(&[req_type])
693 .start_timer();
694
695 for (idx, tx_bytes) in request.transactions.iter().enumerate() {
696 let transaction = match bcs::from_bytes::<Transaction>(tx_bytes) {
697 Ok(txn) => txn,
698 Err(e) => {
699 return Err(SuiErrorKind::TransactionDeserializationError {
701 error: format!("Failed to deserialize transaction at index {}: {}", idx, e),
702 }
703 .into());
704 }
705 };
706
707 let tx_size = transaction.validity_check(&epoch_store.tx_validity_check_context())?;
709
710 let overload_check_res = self.state.check_system_overload(
711 &*consensus_adapter,
712 transaction.data(),
713 state.check_system_overload_at_signing(),
714 );
715 if let Err(error) = overload_check_res {
716 metrics
717 .num_rejected_tx_during_overload
718 .with_label_values(&[error.as_ref()])
719 .inc();
720 results[idx] = Some(SubmitTxResult::Rejected { error });
721 continue;
722 }
723
724 let verified_transaction = {
726 let _metrics_guard = metrics.tx_verification_latency.start_timer();
727 if epoch_store.protocol_config().address_aliases() {
728 match epoch_store.verify_transaction_with_current_aliases(transaction) {
729 Ok(tx) => tx,
730 Err(e) => {
731 metrics.signature_errors.inc();
732 return Err(e.into());
733 }
734 }
735 } else {
736 match epoch_store.verify_transaction_require_no_aliases(transaction) {
737 Ok(tx) => tx,
738 Err(e) => {
739 metrics.signature_errors.inc();
740 return Err(e.into());
741 }
742 }
743 }
744 };
745
746 let tx_digest = verified_transaction.tx().digest();
747 tx_digests.push(*tx_digest);
748
749 debug!(
750 ?tx_digest,
751 "handle_submit_transaction: verified transaction"
752 );
753
754 if let Some(effects) = self
757 .state
758 .get_transaction_cache_reader()
759 .get_executed_effects(tx_digest)
760 {
761 let effects_digest = effects.digest();
762 if let Ok(executed_data) = self.complete_executed_data(effects, None).await {
763 let executed_result = SubmitTxResult::Executed {
764 effects_digest,
765 details: Some(executed_data),
766 fast_path: false,
767 };
768 results[idx] = Some(executed_result);
769 debug!(?tx_digest, "handle_submit_transaction: already executed");
770 continue;
771 }
772 }
773
774 debug!(
775 ?tx_digest,
776 "handle_submit_transaction: waiting for fastpath dependency objects"
777 );
778 if !state
779 .wait_for_fastpath_dependency_objects(
780 verified_transaction.tx(),
781 epoch_store.epoch(),
782 )
783 .await?
784 {
785 debug!(
786 ?tx_digest,
787 "fastpath input objects are still unavailable after waiting"
788 );
789 }
790
791 match state.handle_vote_transaction(&epoch_store, verified_transaction.tx().clone()) {
792 Ok(_) => { }
793 Err(e) => {
794 if let Some(effects) = self
797 .state
798 .get_transaction_cache_reader()
799 .get_executed_effects(tx_digest)
800 {
801 let effects_digest = effects.digest();
802 if let Ok(executed_data) = self.complete_executed_data(effects, None).await
803 {
804 let executed_result = SubmitTxResult::Executed {
805 effects_digest,
806 details: Some(executed_data),
807 fast_path: false,
808 };
809 results[idx] = Some(executed_result);
810 continue;
811 }
812 }
813
814 debug!(?tx_digest, "Transaction rejected during submission: {e}");
816 metrics
817 .submission_rejected_transactions
818 .with_label_values(&[e.to_variant_name()])
819 .inc();
820 results[idx] = Some(SubmitTxResult::Rejected { error: e });
821 continue;
822 }
823 }
824
825 if epoch_store.protocol_config().address_aliases() {
826 consensus_transactions.push(ConsensusTransaction::new_user_transaction_v2_message(
827 &self.state.name,
828 verified_transaction.into(),
829 ));
830 } else {
831 consensus_transactions.push(ConsensusTransaction::new_user_transaction_message(
832 &self.state.name,
833 verified_transaction.into_tx().into(),
834 ));
835 }
836 transaction_indexes.push(idx);
837 total_size_bytes += tx_size;
838 }
839
840 if consensus_transactions.is_empty() && !is_ping_request {
841 return Ok((
842 tonic::Response::new(Self::try_from_submit_tx_response(results)?),
843 Weight::zero(),
844 ));
845 }
846
847 let max_transaction_bytes = if is_soft_bundle_request {
851 epoch_store
852 .protocol_config()
853 .consensus_max_transactions_in_block_bytes()
854 / 2
855 } else {
856 epoch_store
857 .protocol_config()
858 .consensus_max_transactions_in_block_bytes()
859 };
860 fp_ensure!(
861 total_size_bytes <= max_transaction_bytes as usize,
862 SuiErrorKind::UserInputError {
863 error: UserInputError::TotalTransactionSizeTooLargeInBatch {
864 size: total_size_bytes,
865 limit: max_transaction_bytes,
866 },
867 }
868 .into()
869 );
870
871 metrics
872 .handle_submit_transaction_bytes
873 .with_label_values(&[req_type])
874 .observe(total_size_bytes as f64);
875 metrics
876 .handle_submit_transaction_batch_size
877 .with_label_values(&[req_type])
878 .observe(consensus_transactions.len() as f64);
879
880 let _latency_metric_guard = metrics
881 .handle_submit_transaction_consensus_latency
882 .with_label_values(&[req_type])
883 .start_timer();
884
885 let consensus_positions = if is_soft_bundle_request || is_ping_request {
886 assert!(
889 is_ping_request || !consensus_transactions.is_empty(),
890 "A valid soft bundle must have at least one transaction"
891 );
892 debug!(
893 "handle_submit_transaction: submitting consensus transactions ({}): {}",
894 req_type,
895 consensus_transactions
896 .iter()
897 .map(|t| t.local_display())
898 .join(", ")
899 );
900 self.handle_submit_to_consensus_for_position(
901 consensus_transactions,
902 &epoch_store,
903 submitter_client_addr,
904 )
905 .await?
906 } else {
907 let futures = consensus_transactions.into_iter().map(|t| {
908 debug!(
909 "handle_submit_transaction: submitting consensus transaction ({}): {}",
910 req_type,
911 t.local_display(),
912 );
913 self.handle_submit_to_consensus_for_position(
914 vec![t],
915 &epoch_store,
916 submitter_client_addr,
917 )
918 });
919 future::try_join_all(futures)
920 .await?
921 .into_iter()
922 .flatten()
923 .collect()
924 };
925
926 if is_ping_request {
927 assert_eq!(consensus_positions.len(), 1);
929 results.push(Some(SubmitTxResult::Submitted {
930 consensus_position: consensus_positions[0],
931 }));
932 } else {
933 for ((idx, tx_digest), consensus_position) in transaction_indexes
935 .into_iter()
936 .zip(tx_digests)
937 .zip(consensus_positions)
938 {
939 debug!(
940 ?tx_digest,
941 "handle_submit_transaction: submitted consensus transaction at {}",
942 consensus_position,
943 );
944 results[idx] = Some(SubmitTxResult::Submitted { consensus_position });
945 }
946 }
947
948 Ok((
949 tonic::Response::new(Self::try_from_submit_tx_response(results)?),
950 Weight::zero(),
951 ))
952 }
953
954 fn try_from_submit_tx_response(
955 results: Vec<Option<SubmitTxResult>>,
956 ) -> Result<RawSubmitTxResponse, SuiError> {
957 let mut raw_results = Vec::new();
958 for (i, result) in results.into_iter().enumerate() {
959 let result = result.ok_or_else(|| SuiErrorKind::GenericAuthorityError {
960 error: format!("Missing transaction result at {}", i),
961 })?;
962 let raw_result = result.try_into()?;
963 raw_results.push(raw_result);
964 }
965 Ok(RawSubmitTxResponse {
966 results: raw_results,
967 })
968 }
969
970 async fn handle_certificates(
976 &self,
977 certificates: NonEmpty<CertifiedTransaction>,
978 include_events: bool,
979 include_input_objects: bool,
980 include_output_objects: bool,
981 include_auxiliary_data: bool,
982 epoch_store: &Arc<AuthorityPerEpochStore>,
983 wait_for_effects: bool,
984 ) -> Result<(Option<Vec<HandleCertificateResponseV3>>, Weight), tonic::Status> {
985 fp_ensure!(
988 !self.state.is_fullnode(epoch_store),
989 SuiErrorKind::FullNodeCantHandleCertificate.into()
990 );
991
992 let is_consensus_tx = certificates.iter().any(|cert| cert.is_consensus_tx());
993
994 let metrics = if certificates.len() == 1 {
995 if wait_for_effects {
996 if is_consensus_tx {
997 &self.metrics.handle_certificate_consensus_latency
998 } else {
999 &self.metrics.handle_certificate_non_consensus_latency
1000 }
1001 } else {
1002 &self.metrics.submit_certificate_consensus_latency
1003 }
1004 } else {
1005 &self
1007 .metrics
1008 .handle_soft_bundle_certificates_consensus_latency
1009 };
1010
1011 let _metrics_guard = metrics.start_timer();
1012
1013 if certificates.len() == 1 {
1017 let tx_digest = *certificates[0].digest();
1018 debug!(tx_digest=?tx_digest, "Checking if certificate is already executed");
1019
1020 if let Some(signed_effects) = self
1021 .state
1022 .get_signed_effects_and_maybe_resign(&tx_digest, epoch_store)?
1023 {
1024 let events = if include_events && signed_effects.events_digest().is_some() {
1025 Some(
1026 self.state
1027 .get_transaction_events(signed_effects.transaction_digest())?,
1028 )
1029 } else {
1030 None
1031 };
1032
1033 return Ok((
1034 Some(vec![HandleCertificateResponseV3 {
1035 effects: signed_effects.into_inner(),
1036 events,
1037 input_objects: None,
1038 output_objects: None,
1039 auxiliary_data: None,
1040 }]),
1041 Weight::one(),
1042 ));
1043 };
1044 }
1045
1046 for certificate in &certificates {
1049 let overload_check_res = self.state.check_system_overload(
1050 &*self.consensus_adapter,
1051 certificate.data(),
1052 self.state.check_system_overload_at_execution(),
1053 );
1054 if let Err(error) = overload_check_res {
1055 self.metrics
1056 .num_rejected_cert_during_overload
1057 .with_label_values(&[error.as_ref()])
1058 .inc();
1059 return Err(error.into());
1060 }
1061 }
1062
1063 let verified_certificates = {
1064 let _timer = self.metrics.cert_verification_latency.start_timer();
1065 epoch_store
1066 .signature_verifier
1067 .multi_verify_certs(certificates.into())
1068 .await
1069 .into_iter()
1070 .collect::<Result<Vec<_>, _>>()?
1071 };
1072 let consensus_transactions =
1073 NonEmpty::collect(verified_certificates.iter().map(|certificate| {
1074 ConsensusTransaction::new_certificate_message(
1075 &self.state.name,
1076 certificate.clone().into(),
1077 )
1078 }))
1079 .unwrap();
1080
1081 let (responses, weight) = self
1082 .handle_submit_to_consensus(
1083 consensus_transactions,
1084 include_events,
1085 include_input_objects,
1086 include_output_objects,
1087 include_auxiliary_data,
1088 epoch_store,
1089 wait_for_effects,
1090 )
1091 .await?;
1092 let responses = if let Some(responses) = responses {
1094 Some(
1095 responses
1096 .into_iter()
1097 .map(|response| {
1098 let signed_effects =
1099 self.state.sign_effects(response.effects, epoch_store)?;
1100 Ok(HandleCertificateResponseV3 {
1101 effects: signed_effects.into_inner(),
1102 events: response.events,
1103 input_objects: if response.input_objects.is_empty() {
1104 None
1105 } else {
1106 Some(response.input_objects)
1107 },
1108 output_objects: if response.output_objects.is_empty() {
1109 None
1110 } else {
1111 Some(response.output_objects)
1112 },
1113 auxiliary_data: None,
1114 })
1115 })
1116 .collect::<Result<Vec<HandleCertificateResponseV3>, tonic::Status>>()?,
1117 )
1118 } else {
1119 None
1120 };
1121
1122 Ok((responses, weight))
1123 }
1124
1125 #[instrument(
1126 name = "ValidatorService::handle_submit_to_consensus_for_position",
1127 level = "debug",
1128 skip_all,
1129 err(level = "debug")
1130 )]
1131 async fn handle_submit_to_consensus_for_position(
1132 &self,
1133 consensus_transactions: Vec<ConsensusTransaction>,
1135 epoch_store: &Arc<AuthorityPerEpochStore>,
1136 submitter_client_addr: Option<IpAddr>,
1137 ) -> Result<Vec<ConsensusPosition>, tonic::Status> {
1138 let (tx_consensus_positions, rx_consensus_positions) = oneshot::channel();
1139
1140 {
1141 let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
1143 if !reconfiguration_lock.should_accept_user_certs() {
1144 self.metrics.num_rejected_cert_in_epoch_boundary.inc();
1145 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1146 }
1147
1148 let _metrics_guard = self.metrics.consensus_latency.start_timer();
1152
1153 self.consensus_adapter.submit_batch(
1154 &consensus_transactions,
1155 Some(&reconfiguration_lock),
1156 epoch_store,
1157 Some(tx_consensus_positions),
1158 submitter_client_addr,
1159 )?;
1160 }
1161
1162 Ok(rx_consensus_positions.await.map_err(|e| {
1163 SuiErrorKind::FailedToSubmitToConsensus(format!(
1164 "Failed to get consensus position: {e}"
1165 ))
1166 })?)
1167 }
1168
1169 async fn handle_submit_to_consensus(
1170 &self,
1171 consensus_transactions: NonEmpty<ConsensusTransaction>,
1172 include_events: bool,
1173 include_input_objects: bool,
1174 include_output_objects: bool,
1175 _include_auxiliary_data: bool,
1176 epoch_store: &Arc<AuthorityPerEpochStore>,
1177 wait_for_effects: bool,
1178 ) -> Result<(Option<Vec<ExecutedData>>, Weight), tonic::Status> {
1179 let consensus_transactions: Vec<_> = consensus_transactions.into();
1180 {
1181 let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
1183 if !reconfiguration_lock.should_accept_user_certs() {
1184 self.metrics.num_rejected_cert_in_epoch_boundary.inc();
1185 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1186 }
1187
1188 if !epoch_store.all_external_consensus_messages_processed(
1194 consensus_transactions.iter().map(|tx| tx.key()),
1195 )? {
1196 let _metrics_guard = self.metrics.consensus_latency.start_timer();
1197 self.consensus_adapter.submit_batch(
1198 &consensus_transactions,
1199 Some(&reconfiguration_lock),
1200 epoch_store,
1201 None,
1202 None, )?;
1204 }
1207 }
1208
1209 if !wait_for_effects {
1210 let fast_path_certificates = consensus_transactions
1213 .iter()
1214 .filter_map(|tx| {
1215 if let ConsensusTransactionKind::CertifiedTransaction(certificate) = &tx.kind {
1216 (!certificate.is_consensus_tx())
1217 .then_some((
1219 VerifiedExecutableTransaction::new_from_certificate(
1220 VerifiedCertificate::new_unchecked(*(certificate.clone())),
1221 ),
1222 ExecutionEnv::new()
1223 .with_scheduling_source(SchedulingSource::NonFastPath),
1224 ))
1225 } else {
1226 None
1227 }
1228 })
1229 .map(|(tx, env)| (Schedulable::Transaction(tx), env))
1230 .collect::<Vec<_>>();
1231 if !fast_path_certificates.is_empty() {
1232 self.state
1233 .execution_scheduler()
1234 .enqueue(fast_path_certificates, epoch_store);
1235 }
1236 return Ok((None, Weight::zero()));
1237 }
1238
1239 let responses = futures::future::try_join_all(consensus_transactions.into_iter().map(
1242 |tx| async move {
1243 let effects = match &tx.kind {
1244 ConsensusTransactionKind::CertifiedTransaction(certificate) => {
1245 let certificate = VerifiedCertificate::new_unchecked(*(certificate.clone()));
1247 self.state
1248 .wait_for_certificate_execution(&certificate, epoch_store)
1249 .await?
1250 }
1251 ConsensusTransactionKind::UserTransaction(tx) => {
1252 self.state.await_transaction_effects(*tx.digest(), epoch_store).await?
1253 }
1254 ConsensusTransactionKind::UserTransactionV2(tx) => {
1255 self.state.await_transaction_effects(*tx.tx().digest(), epoch_store).await?
1256 }
1257 _ => panic!("`handle_submit_to_consensus` received transaction that is not a CertifiedTransaction, UserTransaction, or UserTransactionV2"),
1258 };
1259 let events = if include_events && effects.events_digest().is_some() {
1260 Some(self.state.get_transaction_events(effects.transaction_digest())?)
1261 } else {
1262 None
1263 };
1264
1265 let input_objects = if include_input_objects {
1266 self.state.get_transaction_input_objects(&effects)?
1267 } else {
1268 vec![]
1269 };
1270
1271 let output_objects = if include_output_objects {
1272 self.state.get_transaction_output_objects(&effects)?
1273 } else {
1274 vec![]
1275 };
1276
1277 if let ConsensusTransactionKind::CertifiedTransaction(certificate) = &tx.kind {
1278 epoch_store.insert_tx_cert_sig(certificate.digest(), certificate.auth_sig())?;
1279 }
1280
1281 Ok::<_, SuiError>(ExecutedData {
1282 effects,
1283 events,
1284 input_objects,
1285 output_objects,
1286 })
1287 },
1288 ))
1289 .await?;
1290
1291 Ok((Some(responses), Weight::zero()))
1292 }
1293
1294 async fn collect_effects_data(
1295 &self,
1296 effects: &TransactionEffects,
1297 include_events: bool,
1298 include_input_objects: bool,
1299 include_output_objects: bool,
1300 fastpath_outputs: Option<Arc<TransactionOutputs>>,
1301 ) -> SuiResult<(Option<TransactionEvents>, Vec<Object>, Vec<Object>)> {
1302 let events = if include_events && effects.events_digest().is_some() {
1303 if let Some(fastpath_outputs) = &fastpath_outputs {
1304 Some(fastpath_outputs.events.clone())
1305 } else {
1306 Some(
1307 self.state
1308 .get_transaction_events(effects.transaction_digest())?,
1309 )
1310 }
1311 } else {
1312 None
1313 };
1314
1315 let input_objects = if include_input_objects {
1316 self.state.get_transaction_input_objects(effects)?
1317 } else {
1318 vec![]
1319 };
1320
1321 let output_objects = if include_output_objects {
1322 if let Some(fastpath_outputs) = &fastpath_outputs {
1323 fastpath_outputs.written.values().cloned().collect()
1324 } else {
1325 self.state.get_transaction_output_objects(effects)?
1326 }
1327 } else {
1328 vec![]
1329 };
1330
1331 Ok((events, input_objects, output_objects))
1332 }
1333}
1334
1335type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
1336
1337impl ValidatorService {
1338 async fn transaction_impl(
1339 &self,
1340 request: tonic::Request<Transaction>,
1341 ) -> WrappedServiceResponse<HandleTransactionResponse> {
1342 self.handle_transaction(request).await
1343 }
1344
1345 async fn handle_submit_transaction_impl(
1346 &self,
1347 request: tonic::Request<RawSubmitTxRequest>,
1348 ) -> WrappedServiceResponse<RawSubmitTxResponse> {
1349 self.handle_submit_transaction(request).await
1350 }
1351
1352 async fn submit_certificate_impl(
1353 &self,
1354 request: tonic::Request<CertifiedTransaction>,
1355 ) -> WrappedServiceResponse<SubmitCertificateResponse> {
1356 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1357 let certificate = request.into_inner();
1358 certificate.validity_check(&epoch_store.tx_validity_check_context())?;
1359
1360 let span =
1361 error_span!("ValidatorService::submit_certificate", tx_digest = ?certificate.digest());
1362 self.handle_certificates(
1363 nonempty![certificate],
1364 true,
1365 false,
1366 false,
1367 false,
1368 &epoch_store,
1369 false,
1370 )
1371 .instrument(span)
1372 .await
1373 .map(|(executed, spam_weight)| {
1374 (
1375 tonic::Response::new(SubmitCertificateResponse {
1376 executed: executed.map(|mut x| x.remove(0)).map(Into::into),
1377 }),
1378 spam_weight,
1379 )
1380 })
1381 }
1382
1383 async fn handle_certificate_v2_impl(
1384 &self,
1385 request: tonic::Request<CertifiedTransaction>,
1386 ) -> WrappedServiceResponse<HandleCertificateResponseV2> {
1387 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1388 let certificate = request.into_inner();
1389 certificate.validity_check(&epoch_store.tx_validity_check_context())?;
1390
1391 let span = error_span!("ValidatorService::handle_certificate_v2", tx_digest = ?certificate.digest());
1392 self.handle_certificates(
1393 nonempty![certificate],
1394 true,
1395 false,
1396 false,
1397 false,
1398 &epoch_store,
1399 true,
1400 )
1401 .instrument(span)
1402 .await
1403 .map(|(resp, spam_weight)| {
1404 (
1405 tonic::Response::new(
1406 resp.expect(
1407 "handle_certificate should not return none with wait_for_effects=true",
1408 )
1409 .remove(0)
1410 .into(),
1411 ),
1412 spam_weight,
1413 )
1414 })
1415 }
1416
1417 async fn handle_certificate_v3_impl(
1418 &self,
1419 request: tonic::Request<HandleCertificateRequestV3>,
1420 ) -> WrappedServiceResponse<HandleCertificateResponseV3> {
1421 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1422 let request = request.into_inner();
1423 request
1424 .certificate
1425 .validity_check(&epoch_store.tx_validity_check_context())?;
1426
1427 let span = error_span!("ValidatorService::handle_certificate_v3", tx_digest = ?request.certificate.digest());
1428 self.handle_certificates(
1429 nonempty![request.certificate],
1430 request.include_events,
1431 request.include_input_objects,
1432 request.include_output_objects,
1433 request.include_auxiliary_data,
1434 &epoch_store,
1435 true,
1436 )
1437 .instrument(span)
1438 .await
1439 .map(|(resp, spam_weight)| {
1440 (
1441 tonic::Response::new(
1442 resp.expect(
1443 "handle_certificate should not return none with wait_for_effects=true",
1444 )
1445 .remove(0),
1446 ),
1447 spam_weight,
1448 )
1449 })
1450 }
1451
1452 async fn wait_for_effects_impl(
1453 &self,
1454 request: tonic::Request<RawWaitForEffectsRequest>,
1455 ) -> WrappedServiceResponse<RawWaitForEffectsResponse> {
1456 let request: WaitForEffectsRequest = request.into_inner().try_into()?;
1457 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1458 let response = timeout(
1459 Duration::from_secs(20),
1461 epoch_store
1462 .within_alive_epoch(self.wait_for_effects_response(request, &epoch_store))
1463 .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch())),
1464 )
1465 .await
1466 .map_err(|_| tonic::Status::internal("Timeout waiting for effects"))???
1467 .try_into()?;
1468 Ok((tonic::Response::new(response), Weight::zero()))
1469 }
1470
1471 #[instrument(name= "ValidatorService::wait_for_effects_response", level = "error", skip_all, fields(consensus_position = ?request.consensus_position, fast_path_effects = tracing::field::Empty))]
1472 async fn wait_for_effects_response(
1473 &self,
1474 request: WaitForEffectsRequest,
1475 epoch_store: &Arc<AuthorityPerEpochStore>,
1476 ) -> SuiResult<WaitForEffectsResponse> {
1477 if request.ping_type.is_some() {
1478 return timeout(
1479 Duration::from_secs(10),
1480 self.ping_response(request, epoch_store),
1481 )
1482 .await
1483 .map_err(|_| SuiErrorKind::TimeoutError)?;
1484 }
1485
1486 let Some(tx_digest) = request.transaction_digest else {
1487 return Err(SuiErrorKind::InvalidRequest(
1488 "Transaction digest is required for wait for effects requests".to_string(),
1489 )
1490 .into());
1491 };
1492 let tx_digests = [tx_digest];
1493
1494 let fastpath_effects_future: Pin<Box<dyn Future<Output = _> + Send>> =
1495 if let Some(consensus_position) = request.consensus_position {
1496 Box::pin(self.wait_for_fastpath_effects(
1497 consensus_position,
1498 &tx_digests,
1499 request.include_details,
1500 epoch_store,
1501 ))
1502 } else {
1503 Box::pin(futures::future::pending())
1504 };
1505
1506 tokio::select! {
1507 biased;
1509 mut effects = self.state
1516 .get_transaction_cache_reader()
1517 .notify_read_executed_effects(
1518 "AuthorityServer::wait_for_effects::notify_read_executed_effects_finalized",
1519 &tx_digests,
1520 ) => {
1521 tracing::Span::current().record("fast_path_effects", false);
1522 let effects = effects.pop().unwrap();
1523 let details = if request.include_details {
1524 Some(self.complete_executed_data(effects.clone(), None).await?)
1525 } else {
1526 None
1527 };
1528
1529 Ok(WaitForEffectsResponse::Executed {
1530 effects_digest: effects.digest(),
1531 details,
1532 fast_path: false,
1533 })
1534 }
1535
1536 fastpath_response = fastpath_effects_future => {
1537 tracing::Span::current().record("fast_path_effects", true);
1538 fastpath_response
1539 }
1540 }
1541 }
1542
1543 #[instrument(level = "error", skip_all, err(level = "debug"))]
1544 async fn ping_response(
1545 &self,
1546 request: WaitForEffectsRequest,
1547 epoch_store: &Arc<AuthorityPerEpochStore>,
1548 ) -> SuiResult<WaitForEffectsResponse> {
1549 let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1550 return Err(SuiErrorKind::UnsupportedFeatureError {
1551 error: "Mysticeti fastpath".to_string(),
1552 }
1553 .into());
1554 };
1555
1556 let Some(consensus_position) = request.consensus_position else {
1557 return Err(SuiErrorKind::InvalidRequest(
1558 "Consensus position is required for Ping requests".to_string(),
1559 )
1560 .into());
1561 };
1562
1563 let Some(ping) = request.ping_type else {
1565 return Err(SuiErrorKind::InvalidRequest(
1566 "Ping type is required for ping requests".to_string(),
1567 )
1568 .into());
1569 };
1570
1571 let _metrics_guard = self
1572 .metrics
1573 .handle_wait_for_effects_ping_latency
1574 .with_label_values(&[ping.as_str()])
1575 .start_timer();
1576
1577 consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1578
1579 let mut last_status = None;
1580 let details = if request.include_details {
1581 Some(Box::new(ExecutedData::default()))
1582 } else {
1583 None
1584 };
1585
1586 loop {
1587 let status = consensus_tx_status_cache
1588 .notify_read_transaction_status_change(consensus_position, last_status)
1589 .await;
1590 match status {
1591 NotifyReadConsensusTxStatusResult::Status(status) => match status {
1592 ConsensusTxStatus::FastpathCertified => {
1593 if ping == PingType::Consensus {
1595 last_status = Some(status);
1596 continue;
1597 }
1598 return Ok(WaitForEffectsResponse::Executed {
1599 effects_digest: TransactionEffectsDigest::ZERO,
1600 details,
1601 fast_path: true,
1602 });
1603 }
1604 ConsensusTxStatus::Rejected => {
1605 return Ok(WaitForEffectsResponse::Rejected { error: None });
1606 }
1607 ConsensusTxStatus::Finalized => {
1608 return Ok(WaitForEffectsResponse::Executed {
1609 effects_digest: TransactionEffectsDigest::ZERO,
1610 details,
1611 fast_path: false,
1612 });
1613 }
1614 },
1615 NotifyReadConsensusTxStatusResult::Expired(round) => {
1616 return Ok(WaitForEffectsResponse::Expired {
1617 epoch: epoch_store.epoch(),
1618 round: Some(round),
1619 });
1620 }
1621 }
1622 }
1623 }
1624
1625 #[instrument(level = "error", skip_all, err(level = "debug"))]
1626 async fn wait_for_fastpath_effects(
1627 &self,
1628 consensus_position: ConsensusPosition,
1629 tx_digests: &[TransactionDigest],
1630 include_details: bool,
1631 epoch_store: &Arc<AuthorityPerEpochStore>,
1632 ) -> SuiResult<WaitForEffectsResponse> {
1633 let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1634 return Err(SuiErrorKind::UnsupportedFeatureError {
1635 error: "Mysticeti fastpath".to_string(),
1636 }
1637 .into());
1638 };
1639
1640 let local_epoch = epoch_store.epoch();
1641 match consensus_position.epoch.cmp(&local_epoch) {
1642 Ordering::Less => {
1643 let response = WaitForEffectsResponse::Expired {
1646 epoch: local_epoch,
1647 round: None,
1648 };
1649 return Ok(response);
1650 }
1651 Ordering::Greater => {
1652 return Err(SuiErrorKind::WrongEpoch {
1654 expected_epoch: local_epoch,
1655 actual_epoch: consensus_position.epoch,
1656 }
1657 .into());
1658 }
1659 Ordering::Equal => {
1660 }
1663 };
1664
1665 consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1666
1667 let mut current_status = None;
1668 loop {
1669 tokio::select! {
1670 status_result = consensus_tx_status_cache
1671 .notify_read_transaction_status_change(consensus_position, current_status) => {
1672 match status_result {
1673 NotifyReadConsensusTxStatusResult::Status(new_status) => {
1674 match new_status {
1675 ConsensusTxStatus::Rejected => {
1676 return Ok(WaitForEffectsResponse::Rejected {
1677 error: epoch_store.get_rejection_vote_reason(
1678 consensus_position
1679 )
1680 });
1681 }
1682 ConsensusTxStatus::FastpathCertified => {
1683 current_status = Some(new_status);
1684 continue;
1685 }
1686 ConsensusTxStatus::Finalized => {
1687 current_status = Some(new_status);
1688 continue;
1689 }
1690 }
1691 }
1692 NotifyReadConsensusTxStatusResult::Expired(round) => {
1693 return Ok(WaitForEffectsResponse::Expired {
1694 epoch: epoch_store.epoch(),
1695 round: Some(round),
1696 });
1697 }
1698 }
1699 }
1700
1701 mut outputs = self.state.get_transaction_cache_reader().notify_read_fastpath_transaction_outputs(tx_digests),
1702 if current_status == Some(ConsensusTxStatus::FastpathCertified) || current_status == Some(ConsensusTxStatus::Finalized) => {
1703 let outputs = outputs.pop().unwrap();
1704 let effects = outputs.effects.clone();
1705
1706 let details = if include_details {
1707 Some(self.complete_executed_data(effects.clone(), Some(outputs)).await?)
1708 } else {
1709 None
1710 };
1711
1712 return Ok(WaitForEffectsResponse::Executed {
1713 effects_digest: effects.digest(),
1714 details,
1715 fast_path: current_status == Some(ConsensusTxStatus::FastpathCertified),
1716 });
1717 }
1718 }
1719 }
1720 }
1721
1722 async fn complete_executed_data(
1723 &self,
1724 effects: TransactionEffects,
1725 fastpath_outputs: Option<Arc<TransactionOutputs>>,
1726 ) -> SuiResult<Box<ExecutedData>> {
1727 let (events, input_objects, output_objects) = self
1728 .collect_effects_data(
1729 &effects,
1730 true,
1731 true,
1732 true,
1733 fastpath_outputs,
1734 )
1735 .await?;
1736 Ok(Box::new(ExecutedData {
1737 effects,
1738 events,
1739 input_objects,
1740 output_objects,
1741 }))
1742 }
1743
1744 async fn soft_bundle_validity_check(
1745 &self,
1746 certificates: &NonEmpty<CertifiedTransaction>,
1747 epoch_store: &Arc<AuthorityPerEpochStore>,
1748 total_size_bytes: u64,
1749 ) -> Result<(), tonic::Status> {
1750 let protocol_config = epoch_store.protocol_config();
1751 let node_config = &self.state.config;
1752
1753 fp_ensure!(
1759 protocol_config.soft_bundle() && node_config.enable_soft_bundle,
1760 SuiErrorKind::UnsupportedFeatureError {
1761 error: "Soft Bundle".to_string()
1762 }
1763 .into()
1764 );
1765
1766 fp_ensure!(
1773 certificates.len() as u64 <= protocol_config.max_soft_bundle_size(),
1774 SuiErrorKind::UserInputError {
1775 error: UserInputError::TooManyTransactionsInBatch {
1776 size: certificates.len(),
1777 limit: protocol_config.max_soft_bundle_size()
1778 }
1779 }
1780 .into()
1781 );
1782
1783 let soft_bundle_max_size_bytes =
1787 protocol_config.consensus_max_transactions_in_block_bytes() / 2;
1788 fp_ensure!(
1789 total_size_bytes <= soft_bundle_max_size_bytes,
1790 SuiErrorKind::UserInputError {
1791 error: UserInputError::TotalTransactionSizeTooLargeInBatch {
1792 size: total_size_bytes as usize,
1793 limit: soft_bundle_max_size_bytes,
1794 },
1795 }
1796 .into()
1797 );
1798
1799 let mut gas_price = None;
1800 for certificate in certificates {
1801 let tx_digest = *certificate.digest();
1802 fp_ensure!(
1803 certificate.is_consensus_tx(),
1804 SuiErrorKind::UserInputError {
1805 error: UserInputError::NoSharedObjectError { digest: tx_digest }
1806 }
1807 .into()
1808 );
1809 fp_ensure!(
1810 !self.state.is_tx_already_executed(&tx_digest),
1811 SuiErrorKind::UserInputError {
1812 error: UserInputError::AlreadyExecutedInSoftBundleError { digest: tx_digest }
1813 }
1814 .into()
1815 );
1816 if let Some(gas) = gas_price {
1817 fp_ensure!(
1818 gas == certificate.gas_price(),
1819 SuiErrorKind::UserInputError {
1820 error: UserInputError::GasPriceMismatchError {
1821 digest: tx_digest,
1822 expected: gas,
1823 actual: certificate.gas_price()
1824 }
1825 }
1826 .into()
1827 );
1828 } else {
1829 gas_price = Some(certificate.gas_price());
1830 }
1831 }
1832
1833 fp_ensure!(
1838 !epoch_store.is_any_tx_certs_consensus_message_processed(certificates.iter())?,
1839 SuiErrorKind::UserInputError {
1840 error: UserInputError::CertificateAlreadyProcessed
1841 }
1842 .into()
1843 );
1844
1845 Ok(())
1846 }
1847
1848 async fn handle_soft_bundle_certificates_v3_impl(
1849 &self,
1850 request: tonic::Request<HandleSoftBundleCertificatesRequestV3>,
1851 ) -> WrappedServiceResponse<HandleSoftBundleCertificatesResponseV3> {
1852 let epoch_store = self.state.load_epoch_store_one_call_per_task();
1853 let client_addr = if self.client_id_source.is_none() {
1854 self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
1855 } else {
1856 self.get_client_ip_addr(&request, self.client_id_source.as_ref().unwrap())
1857 };
1858 let request = request.into_inner();
1859
1860 let certificates = NonEmpty::from_vec(request.certificates)
1861 .ok_or(SuiErrorKind::NoCertificateProvidedError)?;
1862 let mut total_size_bytes = 0;
1863 for certificate in &certificates {
1864 total_size_bytes +=
1866 certificate.validity_check(&epoch_store.tx_validity_check_context())? as u64;
1867 }
1868
1869 self.metrics
1870 .handle_soft_bundle_certificates_count
1871 .observe(certificates.len() as f64);
1872
1873 self.metrics
1874 .handle_soft_bundle_certificates_size_bytes
1875 .observe(total_size_bytes as f64);
1876
1877 self.soft_bundle_validity_check(&certificates, &epoch_store, total_size_bytes)
1879 .await?;
1880
1881 info!(
1882 "Received Soft Bundle with {} certificates, from {}, tx digests are [{}], total size [{}]bytes",
1883 certificates.len(),
1884 client_addr
1885 .map(|x| x.to_string())
1886 .unwrap_or_else(|| "unknown".to_string()),
1887 certificates
1888 .iter()
1889 .map(|x| x.digest().to_string())
1890 .collect::<Vec<_>>()
1891 .join(", "),
1892 total_size_bytes
1893 );
1894
1895 let span = error_span!("ValidatorService::handle_soft_bundle_certificates_v3");
1896 self.handle_certificates(
1897 certificates,
1898 request.include_events,
1899 request.include_input_objects,
1900 request.include_output_objects,
1901 request.include_auxiliary_data,
1902 &epoch_store,
1903 request.wait_for_effects,
1904 )
1905 .instrument(span)
1906 .await
1907 .map(|(resp, spam_weight)| {
1908 (
1909 tonic::Response::new(HandleSoftBundleCertificatesResponseV3 {
1910 responses: resp.unwrap_or_default(),
1911 }),
1912 spam_weight,
1913 )
1914 })
1915 }
1916
1917 async fn object_info_impl(
1918 &self,
1919 request: tonic::Request<ObjectInfoRequest>,
1920 ) -> WrappedServiceResponse<ObjectInfoResponse> {
1921 let request = request.into_inner();
1922 let response = self.state.handle_object_info_request(request).await?;
1923 Ok((tonic::Response::new(response), Weight::one()))
1924 }
1925
1926 async fn transaction_info_impl(
1927 &self,
1928 request: tonic::Request<TransactionInfoRequest>,
1929 ) -> WrappedServiceResponse<TransactionInfoResponse> {
1930 let request = request.into_inner();
1931 let response = self.state.handle_transaction_info_request(request).await?;
1932 Ok((tonic::Response::new(response), Weight::one()))
1933 }
1934
1935 async fn checkpoint_impl(
1936 &self,
1937 request: tonic::Request<CheckpointRequest>,
1938 ) -> WrappedServiceResponse<CheckpointResponse> {
1939 let request = request.into_inner();
1940 let response = self.state.handle_checkpoint_request(&request)?;
1941 Ok((tonic::Response::new(response), Weight::one()))
1942 }
1943
1944 async fn checkpoint_v2_impl(
1945 &self,
1946 request: tonic::Request<CheckpointRequestV2>,
1947 ) -> WrappedServiceResponse<CheckpointResponseV2> {
1948 let request = request.into_inner();
1949 let response = self.state.handle_checkpoint_request_v2(&request)?;
1950 Ok((tonic::Response::new(response), Weight::one()))
1951 }
1952
1953 async fn get_system_state_object_impl(
1954 &self,
1955 _request: tonic::Request<SystemStateRequest>,
1956 ) -> WrappedServiceResponse<SuiSystemState> {
1957 let response = self
1958 .state
1959 .get_object_cache_reader()
1960 .get_sui_system_state_object_unsafe()?;
1961 Ok((tonic::Response::new(response), Weight::one()))
1962 }
1963
1964 async fn validator_health_impl(
1965 &self,
1966 _request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1967 ) -> WrappedServiceResponse<sui_types::messages_grpc::RawValidatorHealthResponse> {
1968 let state = &self.state;
1969
1970 let epoch_store = state.load_epoch_store_one_call_per_task();
1972
1973 let num_inflight_execution_transactions =
1975 state.execution_scheduler().num_pending_certificates() as u64;
1976
1977 let num_inflight_consensus_transactions =
1979 self.consensus_adapter.num_inflight_transactions();
1980
1981 let last_committed_leader_round = epoch_store
1983 .consensus_tx_status_cache
1984 .as_ref()
1985 .and_then(|cache| cache.get_last_committed_leader_round())
1986 .unwrap_or(0);
1987
1988 let last_locally_built_checkpoint = epoch_store
1990 .last_built_checkpoint_summary()
1991 .ok()
1992 .flatten()
1993 .map(|(_, summary)| summary.sequence_number)
1994 .unwrap_or(0);
1995
1996 let typed_response = sui_types::messages_grpc::ValidatorHealthResponse {
1997 num_inflight_consensus_transactions,
1998 num_inflight_execution_transactions,
1999 last_locally_built_checkpoint,
2000 last_committed_leader_round,
2001 };
2002
2003 let raw_response = typed_response
2004 .try_into()
2005 .map_err(|e: sui_types::error::SuiError| {
2006 tonic::Status::internal(format!("Failed to serialize health response: {}", e))
2007 })?;
2008
2009 Ok((tonic::Response::new(raw_response), Weight::one()))
2010 }
2011
2012 fn get_client_ip_addr<T>(
2013 &self,
2014 request: &tonic::Request<T>,
2015 source: &ClientIdSource,
2016 ) -> Option<IpAddr> {
2017 let forwarded_header = request.metadata().get_all("x-forwarded-for").iter().next();
2018
2019 if let Some(header) = forwarded_header {
2020 let num_hops = header
2021 .to_str()
2022 .map(|h| h.split(',').count().saturating_sub(1))
2023 .unwrap_or(0);
2024
2025 self.metrics.x_forwarded_for_num_hops.set(num_hops as f64);
2026 }
2027
2028 match source {
2029 ClientIdSource::SocketAddr => {
2030 let socket_addr: Option<SocketAddr> = request.remote_addr();
2031
2032 if let Some(socket_addr) = socket_addr {
2038 Some(socket_addr.ip())
2039 } else {
2040 if cfg!(msim) {
2041 } else if cfg!(test) {
2043 panic!("Failed to get remote address from request");
2044 } else {
2045 self.metrics.connection_ip_not_found.inc();
2046 error!("Failed to get remote address from request");
2047 }
2048 None
2049 }
2050 }
2051 ClientIdSource::XForwardedFor(num_hops) => {
2052 let do_header_parse = |op: &MetadataValue<Ascii>| {
2053 match op.to_str() {
2054 Ok(header_val) => {
2055 let header_contents =
2056 header_val.split(',').map(str::trim).collect::<Vec<_>>();
2057 if *num_hops == 0 {
2058 error!(
2059 "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
2060 number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
2061 to this node. Skipping traffic controller request handling.",
2062 header_contents,
2063 );
2064 return None;
2065 }
2066 let contents_len = header_contents.len();
2067 if contents_len < *num_hops {
2068 error!(
2069 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
2070 Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
2071 `client-id-source` in the node config.",
2072 header_contents, contents_len, num_hops, contents_len,
2073 );
2074 self.metrics.client_id_source_config_mismatch.inc();
2075 return None;
2076 }
2077 let Some(client_ip) = header_contents.get(contents_len - num_hops)
2078 else {
2079 error!(
2080 "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
2081 Expected at least {} values. Skipping traffic controller request handling.",
2082 header_contents, contents_len, num_hops, contents_len,
2083 );
2084 return None;
2085 };
2086 parse_ip(client_ip).or_else(|| {
2087 self.metrics.forwarded_header_parse_error.inc();
2088 None
2089 })
2090 }
2091 Err(e) => {
2092 self.metrics.forwarded_header_invalid.inc();
2096 error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
2097 None
2098 }
2099 }
2100 };
2101 if let Some(op) = request.metadata().get("x-forwarded-for") {
2102 do_header_parse(op)
2103 } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
2104 do_header_parse(op)
2105 } else {
2106 self.metrics.forwarded_header_not_included.inc();
2107 error!(
2108 "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
2109 );
2110 None
2111 }
2112 }
2113 }
2114 }
2115
2116 async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
2117 if let Some(traffic_controller) = &self.traffic_controller {
2118 if !traffic_controller.check(&client, &None).await {
2119 Err(tonic::Status::from_error(
2121 SuiErrorKind::TooManyRequests.into(),
2122 ))
2123 } else {
2124 Ok(())
2125 }
2126 } else {
2127 Ok(())
2128 }
2129 }
2130
2131 fn handle_traffic_resp<T>(
2132 &self,
2133 client: Option<IpAddr>,
2134 wrapped_response: WrappedServiceResponse<T>,
2135 ) -> Result<tonic::Response<T>, tonic::Status> {
2136 let (error, spam_weight, unwrapped_response) = match wrapped_response {
2137 Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
2138 Err(status) => (
2139 Some(SuiError::from(status.clone())),
2140 Weight::zero(),
2141 Err(status.clone()),
2142 ),
2143 };
2144
2145 if let Some(traffic_controller) = self.traffic_controller.clone() {
2146 traffic_controller.tally(TrafficTally {
2147 direct: client,
2148 through_fullnode: None,
2149 error_info: error.map(|e| {
2150 let error_type = String::from(e.clone().as_ref());
2151 let error_weight = normalize(e);
2152 (error_weight, error_type)
2153 }),
2154 spam_weight,
2155 timestamp: SystemTime::now(),
2156 })
2157 }
2158 unwrapped_response
2159 }
2160}
2161
2162fn make_tonic_request_for_testing<T>(message: T) -> tonic::Request<T> {
2163 let mut request = tonic::Request::new(message);
2166 let tcp_connect_info = TcpConnectInfo {
2167 local_addr: None,
2168 remote_addr: Some(SocketAddr::new([127, 0, 0, 1].into(), 0)),
2169 };
2170 request.extensions_mut().insert(tcp_connect_info);
2171 request
2172}
2173
2174fn normalize(err: SuiError) -> Weight {
2176 match err.as_inner() {
2177 SuiErrorKind::UserInputError {
2178 error: UserInputError::IncorrectUserSignature { .. },
2179 } => Weight::one(),
2180 SuiErrorKind::InvalidSignature { .. }
2181 | SuiErrorKind::SignerSignatureAbsent { .. }
2182 | SuiErrorKind::SignerSignatureNumberMismatch { .. }
2183 | SuiErrorKind::IncorrectSigner { .. }
2184 | SuiErrorKind::UnknownSigner { .. }
2185 | SuiErrorKind::WrongEpoch { .. } => Weight::one(),
2186 _ => Weight::zero(),
2187 }
2188}
2189
2190#[macro_export]
2194macro_rules! handle_with_decoration {
2195 ($self:ident, $func_name:ident, $request:ident) => {{
2196 if $self.client_id_source.is_none() {
2197 return $self.$func_name($request).await.map(|(result, _)| result);
2198 }
2199
2200 let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
2201
2202 $self.handle_traffic_req(client.clone()).await?;
2204
2205 let wrapped_response = $self.$func_name($request).await;
2207 $self.handle_traffic_resp(client, wrapped_response)
2208 }};
2209}
2210
2211#[async_trait]
2212impl Validator for ValidatorService {
2213 async fn submit_transaction(
2214 &self,
2215 request: tonic::Request<RawSubmitTxRequest>,
2216 ) -> Result<tonic::Response<RawSubmitTxResponse>, tonic::Status> {
2217 let validator_service = self.clone();
2218
2219 spawn_monitored_task!(async move {
2222 handle_with_decoration!(validator_service, handle_submit_transaction_impl, request)
2225 })
2226 .await
2227 .unwrap()
2228 }
2229
2230 async fn transaction(
2231 &self,
2232 request: tonic::Request<Transaction>,
2233 ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
2234 let validator_service = self.clone();
2235
2236 spawn_monitored_task!(async move {
2239 handle_with_decoration!(validator_service, transaction_impl, request)
2242 })
2243 .await
2244 .unwrap()
2245 }
2246
2247 async fn submit_certificate(
2248 &self,
2249 request: tonic::Request<CertifiedTransaction>,
2250 ) -> Result<tonic::Response<SubmitCertificateResponse>, tonic::Status> {
2251 let validator_service = self.clone();
2252
2253 spawn_monitored_task!(async move {
2256 handle_with_decoration!(validator_service, submit_certificate_impl, request)
2259 })
2260 .await
2261 .unwrap()
2262 }
2263
2264 async fn handle_certificate_v2(
2265 &self,
2266 request: tonic::Request<CertifiedTransaction>,
2267 ) -> Result<tonic::Response<HandleCertificateResponseV2>, tonic::Status> {
2268 handle_with_decoration!(self, handle_certificate_v2_impl, request)
2269 }
2270
2271 async fn handle_certificate_v3(
2272 &self,
2273 request: tonic::Request<HandleCertificateRequestV3>,
2274 ) -> Result<tonic::Response<HandleCertificateResponseV3>, tonic::Status> {
2275 handle_with_decoration!(self, handle_certificate_v3_impl, request)
2276 }
2277
2278 async fn wait_for_effects(
2279 &self,
2280 request: tonic::Request<RawWaitForEffectsRequest>,
2281 ) -> Result<tonic::Response<RawWaitForEffectsResponse>, tonic::Status> {
2282 handle_with_decoration!(self, wait_for_effects_impl, request)
2283 }
2284
2285 async fn handle_soft_bundle_certificates_v3(
2286 &self,
2287 request: tonic::Request<HandleSoftBundleCertificatesRequestV3>,
2288 ) -> Result<tonic::Response<HandleSoftBundleCertificatesResponseV3>, tonic::Status> {
2289 handle_with_decoration!(self, handle_soft_bundle_certificates_v3_impl, request)
2290 }
2291
2292 async fn object_info(
2293 &self,
2294 request: tonic::Request<ObjectInfoRequest>,
2295 ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
2296 handle_with_decoration!(self, object_info_impl, request)
2297 }
2298
2299 async fn transaction_info(
2300 &self,
2301 request: tonic::Request<TransactionInfoRequest>,
2302 ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
2303 handle_with_decoration!(self, transaction_info_impl, request)
2304 }
2305
2306 async fn checkpoint(
2307 &self,
2308 request: tonic::Request<CheckpointRequest>,
2309 ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
2310 handle_with_decoration!(self, checkpoint_impl, request)
2311 }
2312
2313 async fn checkpoint_v2(
2314 &self,
2315 request: tonic::Request<CheckpointRequestV2>,
2316 ) -> Result<tonic::Response<CheckpointResponseV2>, tonic::Status> {
2317 handle_with_decoration!(self, checkpoint_v2_impl, request)
2318 }
2319
2320 async fn get_system_state_object(
2321 &self,
2322 request: tonic::Request<SystemStateRequest>,
2323 ) -> Result<tonic::Response<SuiSystemState>, tonic::Status> {
2324 handle_with_decoration!(self, get_system_state_object_impl, request)
2325 }
2326
2327 async fn validator_health(
2328 &self,
2329 request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
2330 ) -> Result<tonic::Response<sui_types::messages_grpc::RawValidatorHealthResponse>, tonic::Status>
2331 {
2332 handle_with_decoration!(self, validator_health_impl, request)
2333 }
2334}