sui_core/
authority_server.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use anyhow::Result;
6use async_trait::async_trait;
7use fastcrypto::traits::KeyPair;
8use futures::{TryFutureExt, future};
9use itertools::Itertools as _;
10use mysten_metrics::spawn_monitored_task;
11use prometheus::{
12    Gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, Registry,
13    register_gauge_with_registry, register_histogram_vec_with_registry,
14    register_histogram_with_registry, register_int_counter_vec_with_registry,
15    register_int_counter_with_registry,
16};
17use std::{
18    cmp::Ordering,
19    future::Future,
20    io,
21    net::{IpAddr, SocketAddr},
22    pin::Pin,
23    sync::Arc,
24    time::{Duration, SystemTime},
25};
26use sui_network::{
27    api::{Validator, ValidatorServer},
28    tonic,
29    validator::server::SUI_TLS_SERVER_NAME,
30};
31use sui_types::message_envelope::Message;
32use sui_types::messages_consensus::ConsensusPosition;
33use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKind};
34use sui_types::messages_grpc::{
35    HandleCertificateRequestV3, HandleCertificateResponseV3, RawSubmitTxResponse,
36};
37use sui_types::messages_grpc::{
38    HandleCertificateResponseV2, HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse,
39    SubmitCertificateResponse, SystemStateRequest, TransactionInfoRequest, TransactionInfoResponse,
40};
41use sui_types::messages_grpc::{
42    HandleSoftBundleCertificatesRequestV3, HandleSoftBundleCertificatesResponseV3,
43};
44use sui_types::multiaddr::Multiaddr;
45use sui_types::object::Object;
46use sui_types::sui_system_state::SuiSystemState;
47use sui_types::traffic_control::{ClientIdSource, Weight};
48use sui_types::{
49    digests::{TransactionDigest, TransactionEffectsDigest},
50    error::{SuiErrorKind, UserInputError},
51};
52use sui_types::{
53    effects::TransactionEffects,
54    messages_grpc::{
55        ExecutedData, RawSubmitTxRequest, RawWaitForEffectsRequest, RawWaitForEffectsResponse,
56        SubmitTxResult, WaitForEffectsRequest, WaitForEffectsResponse,
57    },
58};
59use sui_types::{
60    effects::TransactionEffectsAPI, executable_transaction::VerifiedExecutableTransaction,
61};
62use sui_types::{effects::TransactionEvents, messages_grpc::SubmitTxType};
63use sui_types::{error::*, transaction::*};
64use sui_types::{
65    fp_ensure,
66    messages_checkpoint::{
67        CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
68    },
69};
70use tap::TapFallible;
71use tokio::sync::oneshot;
72use tokio::time::timeout;
73use tonic::metadata::{Ascii, MetadataValue};
74use tracing::{Instrument, debug, error, error_span, info, instrument};
75
76use crate::consensus_adapter::ConnectionMonitorStatusForTests;
77use crate::{
78    authority::{AuthorityState, consensus_tx_status_cache::ConsensusTxStatus},
79    consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics},
80    traffic_controller::{TrafficController, parse_ip, policies::TrafficTally},
81};
82use crate::{
83    authority::{
84        ExecutionEnv, authority_per_epoch_store::AuthorityPerEpochStore,
85        consensus_tx_status_cache::NotifyReadConsensusTxStatusResult,
86        shared_object_version_manager::Schedulable,
87    },
88    checkpoints::CheckpointStore,
89    execution_scheduler::SchedulingSource,
90    mysticeti_adapter::LazyMysticetiClient,
91    transaction_outputs::TransactionOutputs,
92};
93use nonempty::{NonEmpty, nonempty};
94use sui_config::local_ip_utils::new_local_tcp_address_for_testing;
95use sui_types::messages_grpc::PingType;
96use tonic::transport::server::TcpConnectInfo;
97
98#[cfg(test)]
99#[path = "unit_tests/server_tests.rs"]
100mod server_tests;
101
102#[cfg(test)]
103#[path = "unit_tests/wait_for_effects_tests.rs"]
104mod wait_for_effects_tests;
105
106#[cfg(test)]
107#[path = "unit_tests/submit_transaction_tests.rs"]
108mod submit_transaction_tests;
109
110pub struct AuthorityServerHandle {
111    server_handle: sui_network::validator::server::Server,
112}
113
114impl AuthorityServerHandle {
115    pub async fn join(self) -> Result<(), io::Error> {
116        self.server_handle.handle().wait_for_shutdown().await;
117        Ok(())
118    }
119
120    pub async fn kill(self) -> Result<(), io::Error> {
121        self.server_handle.handle().shutdown().await;
122        Ok(())
123    }
124
125    pub fn address(&self) -> &Multiaddr {
126        self.server_handle.local_addr()
127    }
128}
129
130pub struct AuthorityServer {
131    address: Multiaddr,
132    pub state: Arc<AuthorityState>,
133    consensus_adapter: Arc<ConsensusAdapter>,
134    pub metrics: Arc<ValidatorServiceMetrics>,
135}
136
137impl AuthorityServer {
138    pub fn new_for_test_with_consensus_adapter(
139        state: Arc<AuthorityState>,
140        consensus_adapter: Arc<ConsensusAdapter>,
141    ) -> Self {
142        let address = new_local_tcp_address_for_testing();
143        let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
144
145        Self {
146            address,
147            state,
148            consensus_adapter,
149            metrics,
150        }
151    }
152
153    pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
154        let consensus_adapter = Arc::new(ConsensusAdapter::new(
155            Arc::new(LazyMysticetiClient::new()),
156            CheckpointStore::new_for_tests(),
157            state.name,
158            Arc::new(ConnectionMonitorStatusForTests {}),
159            100_000,
160            100_000,
161            None,
162            None,
163            ConsensusAdapterMetrics::new_test(),
164            state.epoch_store_for_testing().protocol_config().clone(),
165        ));
166        Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
167    }
168
169    pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
170        let address = self.address.clone();
171        self.spawn_with_bind_address_for_test(address).await
172    }
173
174    pub async fn spawn_with_bind_address_for_test(
175        self,
176        address: Multiaddr,
177    ) -> Result<AuthorityServerHandle, io::Error> {
178        let tls_config = sui_tls::create_rustls_server_config(
179            self.state.config.network_key_pair().copy().private(),
180            SUI_TLS_SERVER_NAME.to_string(),
181        );
182        let config = mysten_network::config::Config::new();
183        let server = sui_network::validator::server::ServerBuilder::from_config(
184            &config,
185            mysten_network::metrics::DefaultMetricsCallbackProvider::default(),
186        )
187        .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
188            self.state,
189            self.consensus_adapter,
190            self.metrics,
191        )))
192        .bind(&address, Some(tls_config))
193        .await
194        .unwrap();
195        let local_addr = server.local_addr().to_owned();
196        info!("Listening to traffic on {local_addr}");
197        let handle = AuthorityServerHandle {
198            server_handle: server,
199        };
200        Ok(handle)
201    }
202}
203
204pub struct ValidatorServiceMetrics {
205    pub signature_errors: IntCounter,
206    pub tx_verification_latency: Histogram,
207    pub cert_verification_latency: Histogram,
208    pub consensus_latency: Histogram,
209    pub handle_transaction_latency: Histogram,
210    pub submit_certificate_consensus_latency: Histogram,
211    pub handle_certificate_consensus_latency: Histogram,
212    pub handle_certificate_non_consensus_latency: Histogram,
213    pub handle_soft_bundle_certificates_consensus_latency: Histogram,
214    pub handle_soft_bundle_certificates_count: Histogram,
215    pub handle_soft_bundle_certificates_size_bytes: Histogram,
216    pub handle_transaction_consensus_latency: Histogram,
217    pub handle_submit_transaction_consensus_latency: HistogramVec,
218    pub handle_wait_for_effects_ping_latency: HistogramVec,
219
220    handle_submit_transaction_latency: HistogramVec,
221    handle_submit_transaction_bytes: HistogramVec,
222    handle_submit_transaction_batch_size: HistogramVec,
223
224    num_rejected_tx_in_epoch_boundary: IntCounter,
225    num_rejected_cert_in_epoch_boundary: IntCounter,
226    num_rejected_tx_during_overload: IntCounterVec,
227    num_rejected_cert_during_overload: IntCounterVec,
228    submission_rejected_transactions: IntCounterVec,
229    connection_ip_not_found: IntCounter,
230    forwarded_header_parse_error: IntCounter,
231    forwarded_header_invalid: IntCounter,
232    forwarded_header_not_included: IntCounter,
233    client_id_source_config_mismatch: IntCounter,
234    x_forwarded_for_num_hops: Gauge,
235}
236
237impl ValidatorServiceMetrics {
238    pub fn new(registry: &Registry) -> Self {
239        Self {
240            signature_errors: register_int_counter_with_registry!(
241                "total_signature_errors",
242                "Number of transaction signature errors",
243                registry,
244            )
245            .unwrap(),
246            tx_verification_latency: register_histogram_with_registry!(
247                "validator_service_tx_verification_latency",
248                "Latency of verifying a transaction",
249                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
250                registry,
251            )
252            .unwrap(),
253            cert_verification_latency: register_histogram_with_registry!(
254                "validator_service_cert_verification_latency",
255                "Latency of verifying a certificate",
256                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
257                registry,
258            )
259            .unwrap(),
260            consensus_latency: register_histogram_with_registry!(
261                "validator_service_consensus_latency",
262                "Time spent between submitting a txn to consensus and getting back local acknowledgement. Execution and finalization time are not included.",
263                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
264                registry,
265            )
266            .unwrap(),
267            handle_transaction_latency: register_histogram_with_registry!(
268                "validator_service_handle_transaction_latency",
269                "Latency of handling a transaction",
270                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
271                registry,
272            )
273            .unwrap(),
274            handle_certificate_consensus_latency: register_histogram_with_registry!(
275                "validator_service_handle_certificate_consensus_latency",
276                "Latency of handling a consensus transaction certificate",
277                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
278                registry,
279            )
280            .unwrap(),
281            submit_certificate_consensus_latency: register_histogram_with_registry!(
282                "validator_service_submit_certificate_consensus_latency",
283                "Latency of submit_certificate RPC handler",
284                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
285                registry,
286            )
287            .unwrap(),
288            handle_certificate_non_consensus_latency: register_histogram_with_registry!(
289                "validator_service_handle_certificate_non_consensus_latency",
290                "Latency of handling a non-consensus transaction certificate",
291                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
292                registry,
293            )
294            .unwrap(),
295            handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
296                "validator_service_handle_soft_bundle_certificates_consensus_latency",
297                "Latency of handling a consensus soft bundle",
298                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
299                registry,
300            )
301            .unwrap(),
302            handle_soft_bundle_certificates_count: register_histogram_with_registry!(
303                "handle_soft_bundle_certificates_count",
304                "The number of certificates included in a soft bundle",
305                mysten_metrics::COUNT_BUCKETS.to_vec(),
306                registry,
307            )
308            .unwrap(),
309            handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
310                "handle_soft_bundle_certificates_size_bytes",
311                "The size of soft bundle in bytes",
312                mysten_metrics::BYTES_BUCKETS.to_vec(),
313                registry,
314            )
315            .unwrap(),
316            handle_transaction_consensus_latency: register_histogram_with_registry!(
317                "validator_service_handle_transaction_consensus_latency",
318                "Latency of handling a user transaction sent through consensus",
319                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
320                registry,
321            )
322            .unwrap(),
323            handle_submit_transaction_consensus_latency: register_histogram_vec_with_registry!(
324                "validator_service_submit_transaction_consensus_latency",
325                "Latency of submitting a user transaction sent through consensus",
326                &["req_type"],
327                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
328                registry,
329            )
330            .unwrap(),
331            handle_submit_transaction_latency: register_histogram_vec_with_registry!(
332                "validator_service_submit_transaction_latency",
333                "Latency of submit transaction handler",
334                &["req_type"],
335                mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
336                registry,
337            )
338            .unwrap(),
339            handle_wait_for_effects_ping_latency: register_histogram_vec_with_registry!(
340                "validator_service_handle_wait_for_effects_ping_latency",
341                "Latency of handling a ping request for wait_for_effects",
342                &["req_type"],
343                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
344                registry,
345            )
346            .unwrap(),
347            handle_submit_transaction_bytes: register_histogram_vec_with_registry!(
348                "validator_service_submit_transaction_bytes",
349                "The size of transactions in the submit transaction request",
350                &["req_type"],
351                mysten_metrics::BYTES_BUCKETS.to_vec(),
352                registry,
353            )
354            .unwrap(),
355            handle_submit_transaction_batch_size: register_histogram_vec_with_registry!(
356                "validator_service_submit_transaction_batch_size",
357                "The number of transactions in the submit transaction request",
358                &["req_type"],
359                mysten_metrics::COUNT_BUCKETS.to_vec(),
360                registry,
361            )
362            .unwrap(),
363            num_rejected_tx_in_epoch_boundary: register_int_counter_with_registry!(
364                "validator_service_num_rejected_tx_in_epoch_boundary",
365                "Number of rejected transaction during epoch transitioning",
366                registry,
367            )
368            .unwrap(),
369            num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
370                "validator_service_num_rejected_cert_in_epoch_boundary",
371                "Number of rejected transaction certificate during epoch transitioning",
372                registry,
373            )
374            .unwrap(),
375            num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
376                "validator_service_num_rejected_tx_during_overload",
377                "Number of rejected transaction due to system overload",
378                &["error_type"],
379                registry,
380            )
381            .unwrap(),
382            num_rejected_cert_during_overload: register_int_counter_vec_with_registry!(
383                "validator_service_num_rejected_cert_during_overload",
384                "Number of rejected transaction certificate due to system overload",
385                &["error_type"],
386                registry,
387            )
388            .unwrap(),
389            submission_rejected_transactions: register_int_counter_vec_with_registry!(
390                "validator_service_submission_rejected_transactions",
391                "Number of transactions rejected during submission",
392                &["reason"],
393                registry,
394            )
395            .unwrap(),
396            connection_ip_not_found: register_int_counter_with_registry!(
397                "validator_service_connection_ip_not_found",
398                "Number of times connection IP was not extractable from request",
399                registry,
400            )
401            .unwrap(),
402            forwarded_header_parse_error: register_int_counter_with_registry!(
403                "validator_service_forwarded_header_parse_error",
404                "Number of times x-forwarded-for header could not be parsed",
405                registry,
406            )
407            .unwrap(),
408            forwarded_header_invalid: register_int_counter_with_registry!(
409                "validator_service_forwarded_header_invalid",
410                "Number of times x-forwarded-for header was invalid",
411                registry,
412            )
413            .unwrap(),
414            forwarded_header_not_included: register_int_counter_with_registry!(
415                "validator_service_forwarded_header_not_included",
416                "Number of times x-forwarded-for header was (unexpectedly) not included in request",
417                registry,
418            )
419            .unwrap(),
420            client_id_source_config_mismatch: register_int_counter_with_registry!(
421                "validator_service_client_id_source_config_mismatch",
422                "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
423                registry,
424            )
425            .unwrap(),
426            x_forwarded_for_num_hops: register_gauge_with_registry!(
427                "validator_service_x_forwarded_for_num_hops",
428                "Number of hops in x-forwarded-for header",
429                registry,
430            )
431            .unwrap(),
432        }
433    }
434
435    pub fn new_for_tests() -> Self {
436        let registry = Registry::new();
437        Self::new(&registry)
438    }
439}
440
441#[derive(Clone)]
442pub struct ValidatorService {
443    state: Arc<AuthorityState>,
444    consensus_adapter: Arc<ConsensusAdapter>,
445    metrics: Arc<ValidatorServiceMetrics>,
446    traffic_controller: Option<Arc<TrafficController>>,
447    client_id_source: Option<ClientIdSource>,
448}
449
450impl ValidatorService {
451    pub fn new(
452        state: Arc<AuthorityState>,
453        consensus_adapter: Arc<ConsensusAdapter>,
454        validator_metrics: Arc<ValidatorServiceMetrics>,
455        client_id_source: Option<ClientIdSource>,
456    ) -> Self {
457        let traffic_controller = state.traffic_controller.clone();
458        Self {
459            state,
460            consensus_adapter,
461            metrics: validator_metrics,
462            traffic_controller,
463            client_id_source,
464        }
465    }
466
467    pub fn new_for_tests(
468        state: Arc<AuthorityState>,
469        consensus_adapter: Arc<ConsensusAdapter>,
470        metrics: Arc<ValidatorServiceMetrics>,
471    ) -> Self {
472        Self {
473            state,
474            consensus_adapter,
475            metrics,
476            traffic_controller: None,
477            client_id_source: None,
478        }
479    }
480
481    pub fn validator_state(&self) -> &Arc<AuthorityState> {
482        &self.state
483    }
484
485    pub async fn execute_certificate_for_testing(
486        &self,
487        cert: CertifiedTransaction,
488    ) -> Result<tonic::Response<HandleCertificateResponseV2>, tonic::Status> {
489        let request = make_tonic_request_for_testing(cert);
490        self.handle_certificate_v2(request).await
491    }
492
493    pub async fn handle_transaction_for_benchmarking(
494        &self,
495        transaction: Transaction,
496    ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
497        let request = make_tonic_request_for_testing(transaction);
498        self.transaction(request).await
499    }
500
501    // When making changes to this function, see if the changes should be applied to
502    // `Self::handle_submit_transaction()` and `SuiTxValidator::vote_transaction()` as well.
503    async fn handle_transaction(
504        &self,
505        request: tonic::Request<Transaction>,
506    ) -> WrappedServiceResponse<HandleTransactionResponse> {
507        let Self {
508            state,
509            consensus_adapter,
510            metrics,
511            traffic_controller: _,
512            client_id_source: _,
513        } = self.clone();
514        let transaction = request.into_inner();
515        let epoch_store = state.load_epoch_store_one_call_per_task();
516
517        transaction.validity_check(&epoch_store.tx_validity_check_context())?;
518
519        // When authority is overloaded and decide to reject this tx, we still lock the object
520        // and ask the client to retry in the future. This is because without locking, the
521        // input objects can be locked by a different tx in the future, however, the input objects
522        // may already be locked by this tx in other validators. This can cause non of the txes
523        // to have enough quorum to form a certificate, causing the objects to be locked for
524        // the entire epoch. By doing locking but pushback, retrying transaction will have
525        // higher chance to succeed.
526        let mut validator_pushback_error = None;
527        let overload_check_res = state.check_system_overload(
528            &*consensus_adapter,
529            transaction.data(),
530            state.check_system_overload_at_signing(),
531        );
532        if let Err(error) = overload_check_res {
533            metrics
534                .num_rejected_tx_during_overload
535                .with_label_values(&[error.as_ref()])
536                .inc();
537            // TODO: consider change the behavior for other types of overload errors.
538            match error.as_inner() {
539                SuiErrorKind::ValidatorOverloadedRetryAfter { .. } => {
540                    validator_pushback_error = Some(error)
541                }
542                _ => return Err(error.into()),
543            }
544        }
545
546        let _handle_tx_metrics_guard = metrics.handle_transaction_latency.start_timer();
547
548        let tx_verif_metrics_guard = metrics.tx_verification_latency.start_timer();
549        let transaction = epoch_store
550            // Aliases are not supported outside of MFP.
551            .verify_transaction_require_no_aliases(transaction)
552            .tap_err(|_| {
553                metrics.signature_errors.inc();
554            })?
555            .into_tx();
556        drop(tx_verif_metrics_guard);
557
558        let tx_digest = transaction.digest();
559
560        // Enable Trace Propagation across spans/processes using tx_digest
561        let span = error_span!("ValidatorService::validator_state_process_tx", ?tx_digest);
562
563        let info = state
564            .handle_transaction(&epoch_store, transaction.clone())
565            .instrument(span)
566            .await
567            .tap_err(|e| {
568                if let SuiErrorKind::ValidatorHaltedAtEpochEnd = e.as_inner() {
569                    metrics.num_rejected_tx_in_epoch_boundary.inc();
570                }
571            })?;
572
573        if let Some(error) = validator_pushback_error {
574            // TODO: right now, we still sign the txn, but just don't return it. We can also skip signing
575            // to save more CPU.
576            return Err(error.into());
577        }
578
579        Ok((tonic::Response::new(info), Weight::zero()))
580    }
581
582    #[instrument(
583        name = "ValidatorService::handle_submit_transaction",
584        level = "error",
585        skip_all,
586        err(level = "debug")
587    )]
588    async fn handle_submit_transaction(
589        &self,
590        request: tonic::Request<RawSubmitTxRequest>,
591    ) -> WrappedServiceResponse<RawSubmitTxResponse> {
592        let Self {
593            state,
594            consensus_adapter,
595            metrics,
596            traffic_controller: _,
597            client_id_source,
598        } = self.clone();
599
600        let submitter_client_addr = if let Some(client_id_source) = &client_id_source {
601            self.get_client_ip_addr(&request, client_id_source)
602        } else {
603            self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
604        };
605
606        let epoch_store = state.load_epoch_store_one_call_per_task();
607        if !epoch_store.protocol_config().mysticeti_fastpath() {
608            return Err(SuiErrorKind::UnsupportedFeatureError {
609                error: "Mysticeti fastpath".to_string(),
610            }
611            .into());
612        }
613
614        let request = request.into_inner();
615        let submit_type = SubmitTxType::try_from(request.submit_type).map_err(|e| {
616            SuiErrorKind::GrpcMessageDeserializeError {
617                type_info: "RawSubmitTxRequest.submit_type".to_string(),
618                error: e.to_string(),
619            }
620        })?;
621
622        let is_ping_request = submit_type == SubmitTxType::Ping;
623        if is_ping_request {
624            fp_ensure!(
625                request.transactions.is_empty(),
626                SuiErrorKind::InvalidRequest(format!(
627                    "Ping request cannot contain {} transactions",
628                    request.transactions.len()
629                ))
630                .into()
631            );
632        } else {
633            // Ensure default and soft bundle requests contain at least one transaction.
634            fp_ensure!(
635                !request.transactions.is_empty(),
636                SuiErrorKind::InvalidRequest(
637                    "At least one transaction needs to be submitted".to_string(),
638                )
639                .into()
640            );
641        }
642
643        // NOTE: for soft bundle requests, the system tries to sequence the transactions in the same order
644        // if they use the same gas price. But this is only done with best effort.
645        // Transactions in a soft bundle can be individually rejected or deferred, without affecting
646        // other transactions in the same bundle.
647        let is_soft_bundle_request = submit_type == SubmitTxType::SoftBundle;
648
649        let max_num_transactions = if is_soft_bundle_request {
650            // Soft bundle cannot contain too many transactions.
651            // Otherwise it is hard to include all of them in a single block.
652            epoch_store.protocol_config().max_soft_bundle_size()
653        } else {
654            // Still enforce a limit even when transactions do not need to be in the same block.
655            epoch_store
656                .protocol_config()
657                .max_num_transactions_in_block()
658        };
659        fp_ensure!(
660            request.transactions.len() <= max_num_transactions as usize,
661            SuiErrorKind::InvalidRequest(format!(
662                "Too many transactions in request: {} vs {}",
663                request.transactions.len(),
664                max_num_transactions
665            ))
666            .into()
667        );
668
669        // Transaction digests.
670        let mut tx_digests = Vec::with_capacity(request.transactions.len());
671        // Transactions to submit to consensus.
672        let mut consensus_transactions = Vec::with_capacity(request.transactions.len());
673        // Indexes of transactions above in the request transactions.
674        let mut transaction_indexes = Vec::with_capacity(request.transactions.len());
675        // Results corresponding to each transaction in the request.
676        let mut results: Vec<Option<SubmitTxResult>> = vec![None; request.transactions.len()];
677        // Total size of all transactions in the request.
678        let mut total_size_bytes = 0;
679
680        let req_type = if is_ping_request {
681            "ping"
682        } else if request.transactions.len() == 1 {
683            "single_transaction"
684        } else if is_soft_bundle_request {
685            "soft_bundle"
686        } else {
687            "batch"
688        };
689
690        let _handle_tx_metrics_guard = metrics
691            .handle_submit_transaction_latency
692            .with_label_values(&[req_type])
693            .start_timer();
694
695        for (idx, tx_bytes) in request.transactions.iter().enumerate() {
696            let transaction = match bcs::from_bytes::<Transaction>(tx_bytes) {
697                Ok(txn) => txn,
698                Err(e) => {
699                    // Ok to fail the request when any transaction is invalid.
700                    return Err(SuiErrorKind::TransactionDeserializationError {
701                        error: format!("Failed to deserialize transaction at index {}: {}", idx, e),
702                    }
703                    .into());
704                }
705            };
706
707            // Ok to fail the request when any transaction is invalid.
708            let tx_size = transaction.validity_check(&epoch_store.tx_validity_check_context())?;
709
710            let overload_check_res = self.state.check_system_overload(
711                &*consensus_adapter,
712                transaction.data(),
713                state.check_system_overload_at_signing(),
714            );
715            if let Err(error) = overload_check_res {
716                metrics
717                    .num_rejected_tx_during_overload
718                    .with_label_values(&[error.as_ref()])
719                    .inc();
720                results[idx] = Some(SubmitTxResult::Rejected { error });
721                continue;
722            }
723
724            // Ok to fail the request when any signature is invalid.
725            let verified_transaction = {
726                let _metrics_guard = metrics.tx_verification_latency.start_timer();
727                if epoch_store.protocol_config().address_aliases() {
728                    match epoch_store.verify_transaction_with_current_aliases(transaction) {
729                        Ok(tx) => tx,
730                        Err(e) => {
731                            metrics.signature_errors.inc();
732                            return Err(e.into());
733                        }
734                    }
735                } else {
736                    match epoch_store.verify_transaction_require_no_aliases(transaction) {
737                        Ok(tx) => tx,
738                        Err(e) => {
739                            metrics.signature_errors.inc();
740                            return Err(e.into());
741                        }
742                    }
743                }
744            };
745
746            let tx_digest = verified_transaction.tx().digest();
747            tx_digests.push(*tx_digest);
748
749            debug!(
750                ?tx_digest,
751                "handle_submit_transaction: verified transaction"
752            );
753
754            // Check if the transaction has executed, before checking input objects
755            // which could have been consumed.
756            if let Some(effects) = self
757                .state
758                .get_transaction_cache_reader()
759                .get_executed_effects(tx_digest)
760            {
761                let effects_digest = effects.digest();
762                if let Ok(executed_data) = self.complete_executed_data(effects, None).await {
763                    let executed_result = SubmitTxResult::Executed {
764                        effects_digest,
765                        details: Some(executed_data),
766                        fast_path: false,
767                    };
768                    results[idx] = Some(executed_result);
769                    debug!(?tx_digest, "handle_submit_transaction: already executed");
770                    continue;
771                }
772            }
773
774            debug!(
775                ?tx_digest,
776                "handle_submit_transaction: waiting for fastpath dependency objects"
777            );
778            if !state
779                .wait_for_fastpath_dependency_objects(
780                    verified_transaction.tx(),
781                    epoch_store.epoch(),
782                )
783                .await?
784            {
785                debug!(
786                    ?tx_digest,
787                    "fastpath input objects are still unavailable after waiting"
788                );
789            }
790
791            match state.handle_vote_transaction(&epoch_store, verified_transaction.tx().clone()) {
792                Ok(_) => { /* continue processing */ }
793                Err(e) => {
794                    // Check if transaction has been executed while being validated.
795                    // This is an edge case so checking executed effects twice is acceptable.
796                    if let Some(effects) = self
797                        .state
798                        .get_transaction_cache_reader()
799                        .get_executed_effects(tx_digest)
800                    {
801                        let effects_digest = effects.digest();
802                        if let Ok(executed_data) = self.complete_executed_data(effects, None).await
803                        {
804                            let executed_result = SubmitTxResult::Executed {
805                                effects_digest,
806                                details: Some(executed_data),
807                                fast_path: false,
808                            };
809                            results[idx] = Some(executed_result);
810                            continue;
811                        }
812                    }
813
814                    // When the transaction has not been executed, record the error for the transaction.
815                    debug!(?tx_digest, "Transaction rejected during submission: {e}");
816                    metrics
817                        .submission_rejected_transactions
818                        .with_label_values(&[e.to_variant_name()])
819                        .inc();
820                    results[idx] = Some(SubmitTxResult::Rejected { error: e });
821                    continue;
822                }
823            }
824
825            if epoch_store.protocol_config().address_aliases() {
826                consensus_transactions.push(ConsensusTransaction::new_user_transaction_v2_message(
827                    &self.state.name,
828                    verified_transaction.into(),
829                ));
830            } else {
831                consensus_transactions.push(ConsensusTransaction::new_user_transaction_message(
832                    &self.state.name,
833                    verified_transaction.into_tx().into(),
834                ));
835            }
836            transaction_indexes.push(idx);
837            total_size_bytes += tx_size;
838        }
839
840        if consensus_transactions.is_empty() && !is_ping_request {
841            return Ok((
842                tonic::Response::new(Self::try_from_submit_tx_response(results)?),
843                Weight::zero(),
844            ));
845        }
846
847        // Set the max bytes size of the soft bundle to be half of the consensus max transactions in block size.
848        // We do this to account for serialization overheads and to ensure that the soft bundle is not too large
849        // when is attempted to be posted via consensus.
850        let max_transaction_bytes = if is_soft_bundle_request {
851            epoch_store
852                .protocol_config()
853                .consensus_max_transactions_in_block_bytes()
854                / 2
855        } else {
856            epoch_store
857                .protocol_config()
858                .consensus_max_transactions_in_block_bytes()
859        };
860        fp_ensure!(
861            total_size_bytes <= max_transaction_bytes as usize,
862            SuiErrorKind::UserInputError {
863                error: UserInputError::TotalTransactionSizeTooLargeInBatch {
864                    size: total_size_bytes,
865                    limit: max_transaction_bytes,
866                },
867            }
868            .into()
869        );
870
871        metrics
872            .handle_submit_transaction_bytes
873            .with_label_values(&[req_type])
874            .observe(total_size_bytes as f64);
875        metrics
876            .handle_submit_transaction_batch_size
877            .with_label_values(&[req_type])
878            .observe(consensus_transactions.len() as f64);
879
880        let _latency_metric_guard = metrics
881            .handle_submit_transaction_consensus_latency
882            .with_label_values(&[req_type])
883            .start_timer();
884
885        let consensus_positions = if is_soft_bundle_request || is_ping_request {
886            // We only allow the `consensus_transactions` to be empty for ping requests. This is how it should and is be treated from the downstream components.
887            // For any other case, having an empty `consensus_transactions` vector is an invalid state and we should have never reached at this point.
888            assert!(
889                is_ping_request || !consensus_transactions.is_empty(),
890                "A valid soft bundle must have at least one transaction"
891            );
892            debug!(
893                "handle_submit_transaction: submitting consensus transactions ({}): {}",
894                req_type,
895                consensus_transactions
896                    .iter()
897                    .map(|t| t.local_display())
898                    .join(", ")
899            );
900            self.handle_submit_to_consensus_for_position(
901                consensus_transactions,
902                &epoch_store,
903                submitter_client_addr,
904            )
905            .await?
906        } else {
907            let futures = consensus_transactions.into_iter().map(|t| {
908                debug!(
909                    "handle_submit_transaction: submitting consensus transaction ({}): {}",
910                    req_type,
911                    t.local_display(),
912                );
913                self.handle_submit_to_consensus_for_position(
914                    vec![t],
915                    &epoch_store,
916                    submitter_client_addr,
917                )
918            });
919            future::try_join_all(futures)
920                .await?
921                .into_iter()
922                .flatten()
923                .collect()
924        };
925
926        if is_ping_request {
927            // For ping requests, return the special consensus position.
928            assert_eq!(consensus_positions.len(), 1);
929            results.push(Some(SubmitTxResult::Submitted {
930                consensus_position: consensus_positions[0],
931            }));
932        } else {
933            // Otherwise, return the consensus position for each transaction.
934            for ((idx, tx_digest), consensus_position) in transaction_indexes
935                .into_iter()
936                .zip(tx_digests)
937                .zip(consensus_positions)
938            {
939                debug!(
940                    ?tx_digest,
941                    "handle_submit_transaction: submitted consensus transaction at {}",
942                    consensus_position,
943                );
944                results[idx] = Some(SubmitTxResult::Submitted { consensus_position });
945            }
946        }
947
948        Ok((
949            tonic::Response::new(Self::try_from_submit_tx_response(results)?),
950            Weight::zero(),
951        ))
952    }
953
954    fn try_from_submit_tx_response(
955        results: Vec<Option<SubmitTxResult>>,
956    ) -> Result<RawSubmitTxResponse, SuiError> {
957        let mut raw_results = Vec::new();
958        for (i, result) in results.into_iter().enumerate() {
959            let result = result.ok_or_else(|| SuiErrorKind::GenericAuthorityError {
960                error: format!("Missing transaction result at {}", i),
961            })?;
962            let raw_result = result.try_into()?;
963            raw_results.push(raw_result);
964        }
965        Ok(RawSubmitTxResponse {
966            results: raw_results,
967        })
968    }
969
970    // In addition to the response from handling the certificates,
971    // returns a bool indicating whether the request should be tallied
972    // toward spam count. In general, this should be set to true for
973    // requests that are read-only and thus do not consume gas, such
974    // as when the transaction is already executed.
975    async fn handle_certificates(
976        &self,
977        certificates: NonEmpty<CertifiedTransaction>,
978        include_events: bool,
979        include_input_objects: bool,
980        include_output_objects: bool,
981        include_auxiliary_data: bool,
982        epoch_store: &Arc<AuthorityPerEpochStore>,
983        wait_for_effects: bool,
984    ) -> Result<(Option<Vec<HandleCertificateResponseV3>>, Weight), tonic::Status> {
985        // Validate if cert can be executed
986        // Fullnode does not serve handle_certificate call.
987        fp_ensure!(
988            !self.state.is_fullnode(epoch_store),
989            SuiErrorKind::FullNodeCantHandleCertificate.into()
990        );
991
992        let is_consensus_tx = certificates.iter().any(|cert| cert.is_consensus_tx());
993
994        let metrics = if certificates.len() == 1 {
995            if wait_for_effects {
996                if is_consensus_tx {
997                    &self.metrics.handle_certificate_consensus_latency
998                } else {
999                    &self.metrics.handle_certificate_non_consensus_latency
1000                }
1001            } else {
1002                &self.metrics.submit_certificate_consensus_latency
1003            }
1004        } else {
1005            // `soft_bundle_validity_check` ensured that all certificates contain shared objects.
1006            &self
1007                .metrics
1008                .handle_soft_bundle_certificates_consensus_latency
1009        };
1010
1011        let _metrics_guard = metrics.start_timer();
1012
1013        // 1) Check if the certificate is already executed.
1014        //    This is only needed when we have only one certificate (not a soft bundle).
1015        //    When multiple certificates are provided, we will either submit all of them or none of them to consensus.
1016        if certificates.len() == 1 {
1017            let tx_digest = *certificates[0].digest();
1018            debug!(tx_digest=?tx_digest, "Checking if certificate is already executed");
1019
1020            if let Some(signed_effects) = self
1021                .state
1022                .get_signed_effects_and_maybe_resign(&tx_digest, epoch_store)?
1023            {
1024                let events = if include_events && signed_effects.events_digest().is_some() {
1025                    Some(
1026                        self.state
1027                            .get_transaction_events(signed_effects.transaction_digest())?,
1028                    )
1029                } else {
1030                    None
1031                };
1032
1033                return Ok((
1034                    Some(vec![HandleCertificateResponseV3 {
1035                        effects: signed_effects.into_inner(),
1036                        events,
1037                        input_objects: None,
1038                        output_objects: None,
1039                        auxiliary_data: None,
1040                    }]),
1041                    Weight::one(),
1042                ));
1043            };
1044        }
1045
1046        // 2) Verify the certificates.
1047        // Check system overload
1048        for certificate in &certificates {
1049            let overload_check_res = self.state.check_system_overload(
1050                &*self.consensus_adapter,
1051                certificate.data(),
1052                self.state.check_system_overload_at_execution(),
1053            );
1054            if let Err(error) = overload_check_res {
1055                self.metrics
1056                    .num_rejected_cert_during_overload
1057                    .with_label_values(&[error.as_ref()])
1058                    .inc();
1059                return Err(error.into());
1060            }
1061        }
1062
1063        let verified_certificates = {
1064            let _timer = self.metrics.cert_verification_latency.start_timer();
1065            epoch_store
1066                .signature_verifier
1067                .multi_verify_certs(certificates.into())
1068                .await
1069                .into_iter()
1070                .collect::<Result<Vec<_>, _>>()?
1071        };
1072        let consensus_transactions =
1073            NonEmpty::collect(verified_certificates.iter().map(|certificate| {
1074                ConsensusTransaction::new_certificate_message(
1075                    &self.state.name,
1076                    certificate.clone().into(),
1077                )
1078            }))
1079            .unwrap();
1080
1081        let (responses, weight) = self
1082            .handle_submit_to_consensus(
1083                consensus_transactions,
1084                include_events,
1085                include_input_objects,
1086                include_output_objects,
1087                include_auxiliary_data,
1088                epoch_store,
1089                wait_for_effects,
1090            )
1091            .await?;
1092        // Sign the returned TransactionEffects.
1093        let responses = if let Some(responses) = responses {
1094            Some(
1095                responses
1096                    .into_iter()
1097                    .map(|response| {
1098                        let signed_effects =
1099                            self.state.sign_effects(response.effects, epoch_store)?;
1100                        Ok(HandleCertificateResponseV3 {
1101                            effects: signed_effects.into_inner(),
1102                            events: response.events,
1103                            input_objects: if response.input_objects.is_empty() {
1104                                None
1105                            } else {
1106                                Some(response.input_objects)
1107                            },
1108                            output_objects: if response.output_objects.is_empty() {
1109                                None
1110                            } else {
1111                                Some(response.output_objects)
1112                            },
1113                            auxiliary_data: None,
1114                        })
1115                    })
1116                    .collect::<Result<Vec<HandleCertificateResponseV3>, tonic::Status>>()?,
1117            )
1118        } else {
1119            None
1120        };
1121
1122        Ok((responses, weight))
1123    }
1124
1125    #[instrument(
1126        name = "ValidatorService::handle_submit_to_consensus_for_position",
1127        level = "debug",
1128        skip_all,
1129        err(level = "debug")
1130    )]
1131    async fn handle_submit_to_consensus_for_position(
1132        &self,
1133        // Empty when this is a ping request.
1134        consensus_transactions: Vec<ConsensusTransaction>,
1135        epoch_store: &Arc<AuthorityPerEpochStore>,
1136        submitter_client_addr: Option<IpAddr>,
1137    ) -> Result<Vec<ConsensusPosition>, tonic::Status> {
1138        let (tx_consensus_positions, rx_consensus_positions) = oneshot::channel();
1139
1140        {
1141            // code block within reconfiguration lock
1142            let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
1143            if !reconfiguration_lock.should_accept_user_certs() {
1144                self.metrics.num_rejected_cert_in_epoch_boundary.inc();
1145                return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1146            }
1147
1148            // Submit to consensus and wait for position, we do not check if tx
1149            // has been processed by consensus already as this method is called
1150            // to get back a consensus position.
1151            let _metrics_guard = self.metrics.consensus_latency.start_timer();
1152
1153            self.consensus_adapter.submit_batch(
1154                &consensus_transactions,
1155                Some(&reconfiguration_lock),
1156                epoch_store,
1157                Some(tx_consensus_positions),
1158                submitter_client_addr,
1159            )?;
1160        }
1161
1162        Ok(rx_consensus_positions.await.map_err(|e| {
1163            SuiErrorKind::FailedToSubmitToConsensus(format!(
1164                "Failed to get consensus position: {e}"
1165            ))
1166        })?)
1167    }
1168
1169    async fn handle_submit_to_consensus(
1170        &self,
1171        consensus_transactions: NonEmpty<ConsensusTransaction>,
1172        include_events: bool,
1173        include_input_objects: bool,
1174        include_output_objects: bool,
1175        _include_auxiliary_data: bool,
1176        epoch_store: &Arc<AuthorityPerEpochStore>,
1177        wait_for_effects: bool,
1178    ) -> Result<(Option<Vec<ExecutedData>>, Weight), tonic::Status> {
1179        let consensus_transactions: Vec<_> = consensus_transactions.into();
1180        {
1181            // code block within reconfiguration lock
1182            let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
1183            if !reconfiguration_lock.should_accept_user_certs() {
1184                self.metrics.num_rejected_cert_in_epoch_boundary.inc();
1185                return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1186            }
1187
1188            // 3) All transactions are sent to consensus (at least by some authorities)
1189            // For certs with shared objects this will wait until either timeout or we have heard back from consensus.
1190            // For certs with owned objects this will return without waiting for certificate to be sequenced.
1191            // For uncertified transactions this will wait for fast path processing.
1192            // First do quick dirty non-async check.
1193            if !epoch_store.all_external_consensus_messages_processed(
1194                consensus_transactions.iter().map(|tx| tx.key()),
1195            )? {
1196                let _metrics_guard = self.metrics.consensus_latency.start_timer();
1197                self.consensus_adapter.submit_batch(
1198                    &consensus_transactions,
1199                    Some(&reconfiguration_lock),
1200                    epoch_store,
1201                    None,
1202                    None, // not tracking submitter client addr for quorum driver path
1203                )?;
1204                // Do not wait for the result, because the transaction might have already executed.
1205                // Instead, check or wait for the existence of certificate effects below.
1206            }
1207        }
1208
1209        if !wait_for_effects {
1210            // It is useful to enqueue owned object transaction for execution locally,
1211            // even when we are not returning effects to user
1212            let fast_path_certificates = consensus_transactions
1213                .iter()
1214                .filter_map(|tx| {
1215                    if let ConsensusTransactionKind::CertifiedTransaction(certificate) = &tx.kind {
1216                        (!certificate.is_consensus_tx())
1217                            // Certificates already verified by callers of this function.
1218                            .then_some((
1219                                VerifiedExecutableTransaction::new_from_certificate(
1220                                    VerifiedCertificate::new_unchecked(*(certificate.clone())),
1221                                ),
1222                                ExecutionEnv::new()
1223                                    .with_scheduling_source(SchedulingSource::NonFastPath),
1224                            ))
1225                    } else {
1226                        None
1227                    }
1228                })
1229                .map(|(tx, env)| (Schedulable::Transaction(tx), env))
1230                .collect::<Vec<_>>();
1231            if !fast_path_certificates.is_empty() {
1232                self.state
1233                    .execution_scheduler()
1234                    .enqueue(fast_path_certificates, epoch_store);
1235            }
1236            return Ok((None, Weight::zero()));
1237        }
1238
1239        // 4) Execute the certificates immediately if they contain only owned object transactions,
1240        // or wait for the execution results if it contains shared objects.
1241        let responses = futures::future::try_join_all(consensus_transactions.into_iter().map(
1242            |tx| async move {
1243                let effects = match &tx.kind {
1244                    ConsensusTransactionKind::CertifiedTransaction(certificate) => {
1245                        // Certificates already verified by callers of this function.
1246                        let certificate = VerifiedCertificate::new_unchecked(*(certificate.clone()));
1247                        self.state
1248                            .wait_for_certificate_execution(&certificate, epoch_store)
1249                            .await?
1250                    }
1251                    ConsensusTransactionKind::UserTransaction(tx) => {
1252                        self.state.await_transaction_effects(*tx.digest(), epoch_store).await?
1253                    }
1254                    ConsensusTransactionKind::UserTransactionV2(tx) => {
1255                        self.state.await_transaction_effects(*tx.tx().digest(), epoch_store).await?
1256                    }
1257                    _ => panic!("`handle_submit_to_consensus` received transaction that is not a CertifiedTransaction, UserTransaction, or UserTransactionV2"),
1258                };
1259                let events = if include_events && effects.events_digest().is_some() {
1260                    Some(self.state.get_transaction_events(effects.transaction_digest())?)
1261                } else {
1262                    None
1263                };
1264
1265                let input_objects = if include_input_objects {
1266                    self.state.get_transaction_input_objects(&effects)?
1267                } else {
1268                    vec![]
1269                };
1270
1271                let output_objects = if include_output_objects {
1272                    self.state.get_transaction_output_objects(&effects)?
1273                } else {
1274                    vec![]
1275                };
1276
1277                if let ConsensusTransactionKind::CertifiedTransaction(certificate) = &tx.kind {
1278                    epoch_store.insert_tx_cert_sig(certificate.digest(), certificate.auth_sig())?;
1279                }
1280
1281                Ok::<_, SuiError>(ExecutedData {
1282                    effects,
1283                    events,
1284                    input_objects,
1285                    output_objects,
1286                })
1287            },
1288        ))
1289        .await?;
1290
1291        Ok((Some(responses), Weight::zero()))
1292    }
1293
1294    async fn collect_effects_data(
1295        &self,
1296        effects: &TransactionEffects,
1297        include_events: bool,
1298        include_input_objects: bool,
1299        include_output_objects: bool,
1300        fastpath_outputs: Option<Arc<TransactionOutputs>>,
1301    ) -> SuiResult<(Option<TransactionEvents>, Vec<Object>, Vec<Object>)> {
1302        let events = if include_events && effects.events_digest().is_some() {
1303            if let Some(fastpath_outputs) = &fastpath_outputs {
1304                Some(fastpath_outputs.events.clone())
1305            } else {
1306                Some(
1307                    self.state
1308                        .get_transaction_events(effects.transaction_digest())?,
1309                )
1310            }
1311        } else {
1312            None
1313        };
1314
1315        let input_objects = if include_input_objects {
1316            self.state.get_transaction_input_objects(effects)?
1317        } else {
1318            vec![]
1319        };
1320
1321        let output_objects = if include_output_objects {
1322            if let Some(fastpath_outputs) = &fastpath_outputs {
1323                fastpath_outputs.written.values().cloned().collect()
1324            } else {
1325                self.state.get_transaction_output_objects(effects)?
1326            }
1327        } else {
1328            vec![]
1329        };
1330
1331        Ok((events, input_objects, output_objects))
1332    }
1333}
1334
1335type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
1336
1337impl ValidatorService {
1338    async fn transaction_impl(
1339        &self,
1340        request: tonic::Request<Transaction>,
1341    ) -> WrappedServiceResponse<HandleTransactionResponse> {
1342        self.handle_transaction(request).await
1343    }
1344
1345    async fn handle_submit_transaction_impl(
1346        &self,
1347        request: tonic::Request<RawSubmitTxRequest>,
1348    ) -> WrappedServiceResponse<RawSubmitTxResponse> {
1349        self.handle_submit_transaction(request).await
1350    }
1351
1352    async fn submit_certificate_impl(
1353        &self,
1354        request: tonic::Request<CertifiedTransaction>,
1355    ) -> WrappedServiceResponse<SubmitCertificateResponse> {
1356        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1357        let certificate = request.into_inner();
1358        certificate.validity_check(&epoch_store.tx_validity_check_context())?;
1359
1360        let span =
1361            error_span!("ValidatorService::submit_certificate", tx_digest = ?certificate.digest());
1362        self.handle_certificates(
1363            nonempty![certificate],
1364            true,
1365            false,
1366            false,
1367            false,
1368            &epoch_store,
1369            false,
1370        )
1371        .instrument(span)
1372        .await
1373        .map(|(executed, spam_weight)| {
1374            (
1375                tonic::Response::new(SubmitCertificateResponse {
1376                    executed: executed.map(|mut x| x.remove(0)).map(Into::into),
1377                }),
1378                spam_weight,
1379            )
1380        })
1381    }
1382
1383    async fn handle_certificate_v2_impl(
1384        &self,
1385        request: tonic::Request<CertifiedTransaction>,
1386    ) -> WrappedServiceResponse<HandleCertificateResponseV2> {
1387        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1388        let certificate = request.into_inner();
1389        certificate.validity_check(&epoch_store.tx_validity_check_context())?;
1390
1391        let span = error_span!("ValidatorService::handle_certificate_v2", tx_digest = ?certificate.digest());
1392        self.handle_certificates(
1393            nonempty![certificate],
1394            true,
1395            false,
1396            false,
1397            false,
1398            &epoch_store,
1399            true,
1400        )
1401        .instrument(span)
1402        .await
1403        .map(|(resp, spam_weight)| {
1404            (
1405                tonic::Response::new(
1406                    resp.expect(
1407                        "handle_certificate should not return none with wait_for_effects=true",
1408                    )
1409                    .remove(0)
1410                    .into(),
1411                ),
1412                spam_weight,
1413            )
1414        })
1415    }
1416
1417    async fn handle_certificate_v3_impl(
1418        &self,
1419        request: tonic::Request<HandleCertificateRequestV3>,
1420    ) -> WrappedServiceResponse<HandleCertificateResponseV3> {
1421        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1422        let request = request.into_inner();
1423        request
1424            .certificate
1425            .validity_check(&epoch_store.tx_validity_check_context())?;
1426
1427        let span = error_span!("ValidatorService::handle_certificate_v3", tx_digest = ?request.certificate.digest());
1428        self.handle_certificates(
1429            nonempty![request.certificate],
1430            request.include_events,
1431            request.include_input_objects,
1432            request.include_output_objects,
1433            request.include_auxiliary_data,
1434            &epoch_store,
1435            true,
1436        )
1437        .instrument(span)
1438        .await
1439        .map(|(resp, spam_weight)| {
1440            (
1441                tonic::Response::new(
1442                    resp.expect(
1443                        "handle_certificate should not return none with wait_for_effects=true",
1444                    )
1445                    .remove(0),
1446                ),
1447                spam_weight,
1448            )
1449        })
1450    }
1451
1452    async fn wait_for_effects_impl(
1453        &self,
1454        request: tonic::Request<RawWaitForEffectsRequest>,
1455    ) -> WrappedServiceResponse<RawWaitForEffectsResponse> {
1456        let request: WaitForEffectsRequest = request.into_inner().try_into()?;
1457        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1458        let response = timeout(
1459            // TODO(fastpath): Tune this once we have a good estimate of the typical delay.
1460            Duration::from_secs(20),
1461            epoch_store
1462                .within_alive_epoch(self.wait_for_effects_response(request, &epoch_store))
1463                .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch())),
1464        )
1465        .await
1466        .map_err(|_| tonic::Status::internal("Timeout waiting for effects"))???
1467        .try_into()?;
1468        Ok((tonic::Response::new(response), Weight::zero()))
1469    }
1470
1471    #[instrument(name= "ValidatorService::wait_for_effects_response", level = "error", skip_all, fields(consensus_position = ?request.consensus_position, fast_path_effects = tracing::field::Empty))]
1472    async fn wait_for_effects_response(
1473        &self,
1474        request: WaitForEffectsRequest,
1475        epoch_store: &Arc<AuthorityPerEpochStore>,
1476    ) -> SuiResult<WaitForEffectsResponse> {
1477        if request.ping_type.is_some() {
1478            return timeout(
1479                Duration::from_secs(10),
1480                self.ping_response(request, epoch_store),
1481            )
1482            .await
1483            .map_err(|_| SuiErrorKind::TimeoutError)?;
1484        }
1485
1486        let Some(tx_digest) = request.transaction_digest else {
1487            return Err(SuiErrorKind::InvalidRequest(
1488                "Transaction digest is required for wait for effects requests".to_string(),
1489            )
1490            .into());
1491        };
1492        let tx_digests = [tx_digest];
1493
1494        let fastpath_effects_future: Pin<Box<dyn Future<Output = _> + Send>> =
1495            if let Some(consensus_position) = request.consensus_position {
1496                Box::pin(self.wait_for_fastpath_effects(
1497                    consensus_position,
1498                    &tx_digests,
1499                    request.include_details,
1500                    epoch_store,
1501                ))
1502            } else {
1503                Box::pin(futures::future::pending())
1504            };
1505
1506        tokio::select! {
1507            // Ensure that finalized effects are always prioritized.
1508            biased;
1509            // We always wait for effects regardless of consensus position via
1510            // notify_read_executed_effects. This is safe because we have separated
1511            // mysticeti fastpath outputs to a separate dirty cache
1512            // UncommittedData::fastpath_transaction_outputs that will only get flushed
1513            // once finalized. So the output of notify_read_executed_effects is
1514            // guaranteed to be finalized effects or effects from QD execution.
1515            mut effects = self.state
1516                .get_transaction_cache_reader()
1517                .notify_read_executed_effects(
1518                    "AuthorityServer::wait_for_effects::notify_read_executed_effects_finalized",
1519                    &tx_digests,
1520                ) => {
1521                tracing::Span::current().record("fast_path_effects", false);
1522                let effects = effects.pop().unwrap();
1523                let details = if request.include_details {
1524                    Some(self.complete_executed_data(effects.clone(), None).await?)
1525                } else {
1526                    None
1527                };
1528
1529                Ok(WaitForEffectsResponse::Executed {
1530                    effects_digest: effects.digest(),
1531                    details,
1532                    fast_path: false,
1533                })
1534            }
1535
1536            fastpath_response = fastpath_effects_future => {
1537                tracing::Span::current().record("fast_path_effects", true);
1538                fastpath_response
1539            }
1540        }
1541    }
1542
1543    #[instrument(level = "error", skip_all, err(level = "debug"))]
1544    async fn ping_response(
1545        &self,
1546        request: WaitForEffectsRequest,
1547        epoch_store: &Arc<AuthorityPerEpochStore>,
1548    ) -> SuiResult<WaitForEffectsResponse> {
1549        let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1550            return Err(SuiErrorKind::UnsupportedFeatureError {
1551                error: "Mysticeti fastpath".to_string(),
1552            }
1553            .into());
1554        };
1555
1556        let Some(consensus_position) = request.consensus_position else {
1557            return Err(SuiErrorKind::InvalidRequest(
1558                "Consensus position is required for Ping requests".to_string(),
1559            )
1560            .into());
1561        };
1562
1563        // We assume that the caller has already checked for the existence of the `ping` field, but handling it gracefully here.
1564        let Some(ping) = request.ping_type else {
1565            return Err(SuiErrorKind::InvalidRequest(
1566                "Ping type is required for ping requests".to_string(),
1567            )
1568            .into());
1569        };
1570
1571        let _metrics_guard = self
1572            .metrics
1573            .handle_wait_for_effects_ping_latency
1574            .with_label_values(&[ping.as_str()])
1575            .start_timer();
1576
1577        consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1578
1579        let mut last_status = None;
1580        let details = if request.include_details {
1581            Some(Box::new(ExecutedData::default()))
1582        } else {
1583            None
1584        };
1585
1586        loop {
1587            let status = consensus_tx_status_cache
1588                .notify_read_transaction_status_change(consensus_position, last_status)
1589                .await;
1590            match status {
1591                NotifyReadConsensusTxStatusResult::Status(status) => match status {
1592                    ConsensusTxStatus::FastpathCertified => {
1593                        // If the request is for consensus, we need to wait for the transaction to be finalised via Consensus.
1594                        if ping == PingType::Consensus {
1595                            last_status = Some(status);
1596                            continue;
1597                        }
1598                        return Ok(WaitForEffectsResponse::Executed {
1599                            effects_digest: TransactionEffectsDigest::ZERO,
1600                            details,
1601                            fast_path: true,
1602                        });
1603                    }
1604                    ConsensusTxStatus::Rejected => {
1605                        return Ok(WaitForEffectsResponse::Rejected { error: None });
1606                    }
1607                    ConsensusTxStatus::Finalized => {
1608                        return Ok(WaitForEffectsResponse::Executed {
1609                            effects_digest: TransactionEffectsDigest::ZERO,
1610                            details,
1611                            fast_path: false,
1612                        });
1613                    }
1614                },
1615                NotifyReadConsensusTxStatusResult::Expired(round) => {
1616                    return Ok(WaitForEffectsResponse::Expired {
1617                        epoch: epoch_store.epoch(),
1618                        round: Some(round),
1619                    });
1620                }
1621            }
1622        }
1623    }
1624
1625    #[instrument(level = "error", skip_all, err(level = "debug"))]
1626    async fn wait_for_fastpath_effects(
1627        &self,
1628        consensus_position: ConsensusPosition,
1629        tx_digests: &[TransactionDigest],
1630        include_details: bool,
1631        epoch_store: &Arc<AuthorityPerEpochStore>,
1632    ) -> SuiResult<WaitForEffectsResponse> {
1633        let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1634            return Err(SuiErrorKind::UnsupportedFeatureError {
1635                error: "Mysticeti fastpath".to_string(),
1636            }
1637            .into());
1638        };
1639
1640        let local_epoch = epoch_store.epoch();
1641        match consensus_position.epoch.cmp(&local_epoch) {
1642            Ordering::Less => {
1643                // Ask TransactionDriver to retry submitting the transaction and get a new ConsensusPosition,
1644                // if response from this validator is desired.
1645                let response = WaitForEffectsResponse::Expired {
1646                    epoch: local_epoch,
1647                    round: None,
1648                };
1649                return Ok(response);
1650            }
1651            Ordering::Greater => {
1652                // Ask TransactionDriver to retry this RPC until the validator's epoch catches up.
1653                return Err(SuiErrorKind::WrongEpoch {
1654                    expected_epoch: local_epoch,
1655                    actual_epoch: consensus_position.epoch,
1656                }
1657                .into());
1658            }
1659            Ordering::Equal => {
1660                // The validator's epoch is the same as the epoch of the transaction.
1661                // We can proceed with the normal flow.
1662            }
1663        };
1664
1665        consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1666
1667        let mut current_status = None;
1668        loop {
1669            tokio::select! {
1670                status_result = consensus_tx_status_cache
1671                    .notify_read_transaction_status_change(consensus_position, current_status) => {
1672                    match status_result {
1673                        NotifyReadConsensusTxStatusResult::Status(new_status) => {
1674                            match new_status {
1675                                ConsensusTxStatus::Rejected => {
1676                                    return Ok(WaitForEffectsResponse::Rejected {
1677                                        error: epoch_store.get_rejection_vote_reason(
1678                                            consensus_position
1679                                        )
1680                                    });
1681                                }
1682                                ConsensusTxStatus::FastpathCertified => {
1683                                    current_status = Some(new_status);
1684                                    continue;
1685                                }
1686                                ConsensusTxStatus::Finalized => {
1687                                    current_status = Some(new_status);
1688                                    continue;
1689                                }
1690                            }
1691                        }
1692                        NotifyReadConsensusTxStatusResult::Expired(round) => {
1693                            return Ok(WaitForEffectsResponse::Expired {
1694                                epoch: epoch_store.epoch(),
1695                                round: Some(round),
1696                            });
1697                        }
1698                    }
1699                }
1700
1701                mut outputs = self.state.get_transaction_cache_reader().notify_read_fastpath_transaction_outputs(tx_digests),
1702                    if current_status == Some(ConsensusTxStatus::FastpathCertified) || current_status == Some(ConsensusTxStatus::Finalized) => {
1703                    let outputs = outputs.pop().unwrap();
1704                    let effects = outputs.effects.clone();
1705
1706                    let details = if include_details {
1707                        Some(self.complete_executed_data(effects.clone(), Some(outputs)).await?)
1708                    } else {
1709                        None
1710                    };
1711
1712                    return Ok(WaitForEffectsResponse::Executed {
1713                        effects_digest: effects.digest(),
1714                        details,
1715                        fast_path: current_status == Some(ConsensusTxStatus::FastpathCertified),
1716                    });
1717                }
1718            }
1719        }
1720    }
1721
1722    async fn complete_executed_data(
1723        &self,
1724        effects: TransactionEffects,
1725        fastpath_outputs: Option<Arc<TransactionOutputs>>,
1726    ) -> SuiResult<Box<ExecutedData>> {
1727        let (events, input_objects, output_objects) = self
1728            .collect_effects_data(
1729                &effects,
1730                /* include_events */ true,
1731                /* include_input_objects */ true,
1732                /* include_output_objects */ true,
1733                fastpath_outputs,
1734            )
1735            .await?;
1736        Ok(Box::new(ExecutedData {
1737            effects,
1738            events,
1739            input_objects,
1740            output_objects,
1741        }))
1742    }
1743
1744    async fn soft_bundle_validity_check(
1745        &self,
1746        certificates: &NonEmpty<CertifiedTransaction>,
1747        epoch_store: &Arc<AuthorityPerEpochStore>,
1748        total_size_bytes: u64,
1749    ) -> Result<(), tonic::Status> {
1750        let protocol_config = epoch_store.protocol_config();
1751        let node_config = &self.state.config;
1752
1753        // Soft Bundle MUST be enabled both in protocol config and local node config.
1754        //
1755        // The local node config is by default enabled, but can be turned off by the node operator.
1756        // This acts an extra safety measure where a validator node have the choice to turn this feature off,
1757        // without having to upgrade the entire network.
1758        fp_ensure!(
1759            protocol_config.soft_bundle() && node_config.enable_soft_bundle,
1760            SuiErrorKind::UnsupportedFeatureError {
1761                error: "Soft Bundle".to_string()
1762            }
1763            .into()
1764        );
1765
1766        // Enforce these checks per [SIP-19](https://github.com/sui-foundation/sips/blob/main/sips/sip-19.md):
1767        // - All certs must access at least one shared object.
1768        // - All certs must not be already executed.
1769        // - All certs must have the same gas price.
1770        // - Number of certs must not exceed the max allowed.
1771        // - Total size of all certs must not exceed the max allowed.
1772        fp_ensure!(
1773            certificates.len() as u64 <= protocol_config.max_soft_bundle_size(),
1774            SuiErrorKind::UserInputError {
1775                error: UserInputError::TooManyTransactionsInBatch {
1776                    size: certificates.len(),
1777                    limit: protocol_config.max_soft_bundle_size()
1778                }
1779            }
1780            .into()
1781        );
1782
1783        // We set the soft bundle max size to be half of the consensus max transactions in block size. We do this to account for
1784        // serialization overheads and to ensure that the soft bundle is not too large when is attempted to be posted via consensus.
1785        // Although half the block size is on the extreme side, it's should be good enough for now.
1786        let soft_bundle_max_size_bytes =
1787            protocol_config.consensus_max_transactions_in_block_bytes() / 2;
1788        fp_ensure!(
1789            total_size_bytes <= soft_bundle_max_size_bytes,
1790            SuiErrorKind::UserInputError {
1791                error: UserInputError::TotalTransactionSizeTooLargeInBatch {
1792                    size: total_size_bytes as usize,
1793                    limit: soft_bundle_max_size_bytes,
1794                },
1795            }
1796            .into()
1797        );
1798
1799        let mut gas_price = None;
1800        for certificate in certificates {
1801            let tx_digest = *certificate.digest();
1802            fp_ensure!(
1803                certificate.is_consensus_tx(),
1804                SuiErrorKind::UserInputError {
1805                    error: UserInputError::NoSharedObjectError { digest: tx_digest }
1806                }
1807                .into()
1808            );
1809            fp_ensure!(
1810                !self.state.is_tx_already_executed(&tx_digest),
1811                SuiErrorKind::UserInputError {
1812                    error: UserInputError::AlreadyExecutedInSoftBundleError { digest: tx_digest }
1813                }
1814                .into()
1815            );
1816            if let Some(gas) = gas_price {
1817                fp_ensure!(
1818                    gas == certificate.gas_price(),
1819                    SuiErrorKind::UserInputError {
1820                        error: UserInputError::GasPriceMismatchError {
1821                            digest: tx_digest,
1822                            expected: gas,
1823                            actual: certificate.gas_price()
1824                        }
1825                    }
1826                    .into()
1827                );
1828            } else {
1829                gas_price = Some(certificate.gas_price());
1830            }
1831        }
1832
1833        // For Soft Bundle, if at this point we know at least one certificate has already been processed,
1834        // reject the entire bundle.  Otherwise, submit all certificates in one request.
1835        // This is not a strict check as there may be race conditions where one or more certificates are
1836        // already being processed by another actor, and we could not know it.
1837        fp_ensure!(
1838            !epoch_store.is_any_tx_certs_consensus_message_processed(certificates.iter())?,
1839            SuiErrorKind::UserInputError {
1840                error: UserInputError::CertificateAlreadyProcessed
1841            }
1842            .into()
1843        );
1844
1845        Ok(())
1846    }
1847
1848    async fn handle_soft_bundle_certificates_v3_impl(
1849        &self,
1850        request: tonic::Request<HandleSoftBundleCertificatesRequestV3>,
1851    ) -> WrappedServiceResponse<HandleSoftBundleCertificatesResponseV3> {
1852        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1853        let client_addr = if self.client_id_source.is_none() {
1854            self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
1855        } else {
1856            self.get_client_ip_addr(&request, self.client_id_source.as_ref().unwrap())
1857        };
1858        let request = request.into_inner();
1859
1860        let certificates = NonEmpty::from_vec(request.certificates)
1861            .ok_or(SuiErrorKind::NoCertificateProvidedError)?;
1862        let mut total_size_bytes = 0;
1863        for certificate in &certificates {
1864            // We need to check this first because we haven't verified the cert signature.
1865            total_size_bytes +=
1866                certificate.validity_check(&epoch_store.tx_validity_check_context())? as u64;
1867        }
1868
1869        self.metrics
1870            .handle_soft_bundle_certificates_count
1871            .observe(certificates.len() as f64);
1872
1873        self.metrics
1874            .handle_soft_bundle_certificates_size_bytes
1875            .observe(total_size_bytes as f64);
1876
1877        // Now that individual certificates are valid, we check if the bundle is valid.
1878        self.soft_bundle_validity_check(&certificates, &epoch_store, total_size_bytes)
1879            .await?;
1880
1881        info!(
1882            "Received Soft Bundle with {} certificates, from {}, tx digests are [{}], total size [{}]bytes",
1883            certificates.len(),
1884            client_addr
1885                .map(|x| x.to_string())
1886                .unwrap_or_else(|| "unknown".to_string()),
1887            certificates
1888                .iter()
1889                .map(|x| x.digest().to_string())
1890                .collect::<Vec<_>>()
1891                .join(", "),
1892            total_size_bytes
1893        );
1894
1895        let span = error_span!("ValidatorService::handle_soft_bundle_certificates_v3");
1896        self.handle_certificates(
1897            certificates,
1898            request.include_events,
1899            request.include_input_objects,
1900            request.include_output_objects,
1901            request.include_auxiliary_data,
1902            &epoch_store,
1903            request.wait_for_effects,
1904        )
1905        .instrument(span)
1906        .await
1907        .map(|(resp, spam_weight)| {
1908            (
1909                tonic::Response::new(HandleSoftBundleCertificatesResponseV3 {
1910                    responses: resp.unwrap_or_default(),
1911                }),
1912                spam_weight,
1913            )
1914        })
1915    }
1916
1917    async fn object_info_impl(
1918        &self,
1919        request: tonic::Request<ObjectInfoRequest>,
1920    ) -> WrappedServiceResponse<ObjectInfoResponse> {
1921        let request = request.into_inner();
1922        let response = self.state.handle_object_info_request(request).await?;
1923        Ok((tonic::Response::new(response), Weight::one()))
1924    }
1925
1926    async fn transaction_info_impl(
1927        &self,
1928        request: tonic::Request<TransactionInfoRequest>,
1929    ) -> WrappedServiceResponse<TransactionInfoResponse> {
1930        let request = request.into_inner();
1931        let response = self.state.handle_transaction_info_request(request).await?;
1932        Ok((tonic::Response::new(response), Weight::one()))
1933    }
1934
1935    async fn checkpoint_impl(
1936        &self,
1937        request: tonic::Request<CheckpointRequest>,
1938    ) -> WrappedServiceResponse<CheckpointResponse> {
1939        let request = request.into_inner();
1940        let response = self.state.handle_checkpoint_request(&request)?;
1941        Ok((tonic::Response::new(response), Weight::one()))
1942    }
1943
1944    async fn checkpoint_v2_impl(
1945        &self,
1946        request: tonic::Request<CheckpointRequestV2>,
1947    ) -> WrappedServiceResponse<CheckpointResponseV2> {
1948        let request = request.into_inner();
1949        let response = self.state.handle_checkpoint_request_v2(&request)?;
1950        Ok((tonic::Response::new(response), Weight::one()))
1951    }
1952
1953    async fn get_system_state_object_impl(
1954        &self,
1955        _request: tonic::Request<SystemStateRequest>,
1956    ) -> WrappedServiceResponse<SuiSystemState> {
1957        let response = self
1958            .state
1959            .get_object_cache_reader()
1960            .get_sui_system_state_object_unsafe()?;
1961        Ok((tonic::Response::new(response), Weight::one()))
1962    }
1963
1964    async fn validator_health_impl(
1965        &self,
1966        _request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1967    ) -> WrappedServiceResponse<sui_types::messages_grpc::RawValidatorHealthResponse> {
1968        let state = &self.state;
1969
1970        // Get epoch store once for both metrics
1971        let epoch_store = state.load_epoch_store_one_call_per_task();
1972
1973        // Get in-flight execution transactions from execution scheduler
1974        let num_inflight_execution_transactions =
1975            state.execution_scheduler().num_pending_certificates() as u64;
1976
1977        // Get in-flight consensus transactions from consensus adapter
1978        let num_inflight_consensus_transactions =
1979            self.consensus_adapter.num_inflight_transactions();
1980
1981        // Get last committed leader round from epoch store
1982        let last_committed_leader_round = epoch_store
1983            .consensus_tx_status_cache
1984            .as_ref()
1985            .and_then(|cache| cache.get_last_committed_leader_round())
1986            .unwrap_or(0);
1987
1988        // Get last locally built checkpoint sequence
1989        let last_locally_built_checkpoint = epoch_store
1990            .last_built_checkpoint_summary()
1991            .ok()
1992            .flatten()
1993            .map(|(_, summary)| summary.sequence_number)
1994            .unwrap_or(0);
1995
1996        let typed_response = sui_types::messages_grpc::ValidatorHealthResponse {
1997            num_inflight_consensus_transactions,
1998            num_inflight_execution_transactions,
1999            last_locally_built_checkpoint,
2000            last_committed_leader_round,
2001        };
2002
2003        let raw_response = typed_response
2004            .try_into()
2005            .map_err(|e: sui_types::error::SuiError| {
2006                tonic::Status::internal(format!("Failed to serialize health response: {}", e))
2007            })?;
2008
2009        Ok((tonic::Response::new(raw_response), Weight::one()))
2010    }
2011
2012    fn get_client_ip_addr<T>(
2013        &self,
2014        request: &tonic::Request<T>,
2015        source: &ClientIdSource,
2016    ) -> Option<IpAddr> {
2017        let forwarded_header = request.metadata().get_all("x-forwarded-for").iter().next();
2018
2019        if let Some(header) = forwarded_header {
2020            let num_hops = header
2021                .to_str()
2022                .map(|h| h.split(',').count().saturating_sub(1))
2023                .unwrap_or(0);
2024
2025            self.metrics.x_forwarded_for_num_hops.set(num_hops as f64);
2026        }
2027
2028        match source {
2029            ClientIdSource::SocketAddr => {
2030                let socket_addr: Option<SocketAddr> = request.remote_addr();
2031
2032                // We will hit this case if the IO type used does not
2033                // implement Connected or when using a unix domain socket.
2034                // TODO: once we have confirmed that no legitimate traffic
2035                // is hitting this case, we should reject such requests that
2036                // hit this case.
2037                if let Some(socket_addr) = socket_addr {
2038                    Some(socket_addr.ip())
2039                } else {
2040                    if cfg!(msim) {
2041                        // Ignore the error from simtests.
2042                    } else if cfg!(test) {
2043                        panic!("Failed to get remote address from request");
2044                    } else {
2045                        self.metrics.connection_ip_not_found.inc();
2046                        error!("Failed to get remote address from request");
2047                    }
2048                    None
2049                }
2050            }
2051            ClientIdSource::XForwardedFor(num_hops) => {
2052                let do_header_parse = |op: &MetadataValue<Ascii>| {
2053                    match op.to_str() {
2054                        Ok(header_val) => {
2055                            let header_contents =
2056                                header_val.split(',').map(str::trim).collect::<Vec<_>>();
2057                            if *num_hops == 0 {
2058                                error!(
2059                                    "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
2060                                    number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
2061                                    to this node. Skipping traffic controller request handling.",
2062                                    header_contents,
2063                                );
2064                                return None;
2065                            }
2066                            let contents_len = header_contents.len();
2067                            if contents_len < *num_hops {
2068                                error!(
2069                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
2070                                    Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
2071                                    `client-id-source` in the node config.",
2072                                    header_contents, contents_len, num_hops, contents_len,
2073                                );
2074                                self.metrics.client_id_source_config_mismatch.inc();
2075                                return None;
2076                            }
2077                            let Some(client_ip) = header_contents.get(contents_len - num_hops)
2078                            else {
2079                                error!(
2080                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
2081                                    Expected at least {} values. Skipping traffic controller request handling.",
2082                                    header_contents, contents_len, num_hops, contents_len,
2083                                );
2084                                return None;
2085                            };
2086                            parse_ip(client_ip).or_else(|| {
2087                                self.metrics.forwarded_header_parse_error.inc();
2088                                None
2089                            })
2090                        }
2091                        Err(e) => {
2092                            // TODO: once we have confirmed that no legitimate traffic
2093                            // is hitting this case, we should reject such requests that
2094                            // hit this case.
2095                            self.metrics.forwarded_header_invalid.inc();
2096                            error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
2097                            None
2098                        }
2099                    }
2100                };
2101                if let Some(op) = request.metadata().get("x-forwarded-for") {
2102                    do_header_parse(op)
2103                } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
2104                    do_header_parse(op)
2105                } else {
2106                    self.metrics.forwarded_header_not_included.inc();
2107                    error!(
2108                        "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
2109                    );
2110                    None
2111                }
2112            }
2113        }
2114    }
2115
2116    async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
2117        if let Some(traffic_controller) = &self.traffic_controller {
2118            if !traffic_controller.check(&client, &None).await {
2119                // Entity in blocklist
2120                Err(tonic::Status::from_error(
2121                    SuiErrorKind::TooManyRequests.into(),
2122                ))
2123            } else {
2124                Ok(())
2125            }
2126        } else {
2127            Ok(())
2128        }
2129    }
2130
2131    fn handle_traffic_resp<T>(
2132        &self,
2133        client: Option<IpAddr>,
2134        wrapped_response: WrappedServiceResponse<T>,
2135    ) -> Result<tonic::Response<T>, tonic::Status> {
2136        let (error, spam_weight, unwrapped_response) = match wrapped_response {
2137            Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
2138            Err(status) => (
2139                Some(SuiError::from(status.clone())),
2140                Weight::zero(),
2141                Err(status.clone()),
2142            ),
2143        };
2144
2145        if let Some(traffic_controller) = self.traffic_controller.clone() {
2146            traffic_controller.tally(TrafficTally {
2147                direct: client,
2148                through_fullnode: None,
2149                error_info: error.map(|e| {
2150                    let error_type = String::from(e.clone().as_ref());
2151                    let error_weight = normalize(e);
2152                    (error_weight, error_type)
2153                }),
2154                spam_weight,
2155                timestamp: SystemTime::now(),
2156            })
2157        }
2158        unwrapped_response
2159    }
2160}
2161
2162fn make_tonic_request_for_testing<T>(message: T) -> tonic::Request<T> {
2163    // simulate a TCP connection, which would have added extensions to
2164    // the request object that would be used downstream
2165    let mut request = tonic::Request::new(message);
2166    let tcp_connect_info = TcpConnectInfo {
2167        local_addr: None,
2168        remote_addr: Some(SocketAddr::new([127, 0, 0, 1].into(), 0)),
2169    };
2170    request.extensions_mut().insert(tcp_connect_info);
2171    request
2172}
2173
2174// TODO: refine error matching here
2175fn normalize(err: SuiError) -> Weight {
2176    match err.as_inner() {
2177        SuiErrorKind::UserInputError {
2178            error: UserInputError::IncorrectUserSignature { .. },
2179        } => Weight::one(),
2180        SuiErrorKind::InvalidSignature { .. }
2181        | SuiErrorKind::SignerSignatureAbsent { .. }
2182        | SuiErrorKind::SignerSignatureNumberMismatch { .. }
2183        | SuiErrorKind::IncorrectSigner { .. }
2184        | SuiErrorKind::UnknownSigner { .. }
2185        | SuiErrorKind::WrongEpoch { .. } => Weight::one(),
2186        _ => Weight::zero(),
2187    }
2188}
2189
2190/// Implements generic pre- and post-processing. Since this is on the critical
2191/// path, any heavy lifting should be done in a separate non-blocking task
2192/// unless it is necessary to override the return value.
2193#[macro_export]
2194macro_rules! handle_with_decoration {
2195    ($self:ident, $func_name:ident, $request:ident) => {{
2196        if $self.client_id_source.is_none() {
2197            return $self.$func_name($request).await.map(|(result, _)| result);
2198        }
2199
2200        let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
2201
2202        // check if either IP is blocked, in which case return early
2203        $self.handle_traffic_req(client.clone()).await?;
2204
2205        // handle traffic tallying
2206        let wrapped_response = $self.$func_name($request).await;
2207        $self.handle_traffic_resp(client, wrapped_response)
2208    }};
2209}
2210
2211#[async_trait]
2212impl Validator for ValidatorService {
2213    async fn submit_transaction(
2214        &self,
2215        request: tonic::Request<RawSubmitTxRequest>,
2216    ) -> Result<tonic::Response<RawSubmitTxResponse>, tonic::Status> {
2217        let validator_service = self.clone();
2218
2219        // Spawns a task which handles the transaction. The task will unconditionally continue
2220        // processing in the event that the client connection is dropped.
2221        spawn_monitored_task!(async move {
2222            // NB: traffic tally wrapping handled within the task rather than on task exit
2223            // to prevent an attacker from subverting traffic control by severing the connection
2224            handle_with_decoration!(validator_service, handle_submit_transaction_impl, request)
2225        })
2226        .await
2227        .unwrap()
2228    }
2229
2230    async fn transaction(
2231        &self,
2232        request: tonic::Request<Transaction>,
2233    ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
2234        let validator_service = self.clone();
2235
2236        // Spawns a task which handles the transaction. The task will unconditionally continue
2237        // processing in the event that the client connection is dropped.
2238        spawn_monitored_task!(async move {
2239            // NB: traffic tally wrapping handled within the task rather than on task exit
2240            // to prevent an attacker from subverting traffic control by severing the connection
2241            handle_with_decoration!(validator_service, transaction_impl, request)
2242        })
2243        .await
2244        .unwrap()
2245    }
2246
2247    async fn submit_certificate(
2248        &self,
2249        request: tonic::Request<CertifiedTransaction>,
2250    ) -> Result<tonic::Response<SubmitCertificateResponse>, tonic::Status> {
2251        let validator_service = self.clone();
2252
2253        // Spawns a task which handles the certificate. The task will unconditionally continue
2254        // processing in the event that the client connection is dropped.
2255        spawn_monitored_task!(async move {
2256            // NB: traffic tally wrapping handled within the task rather than on task exit
2257            // to prevent an attacker from subverting traffic control by severing the connection.
2258            handle_with_decoration!(validator_service, submit_certificate_impl, request)
2259        })
2260        .await
2261        .unwrap()
2262    }
2263
2264    async fn handle_certificate_v2(
2265        &self,
2266        request: tonic::Request<CertifiedTransaction>,
2267    ) -> Result<tonic::Response<HandleCertificateResponseV2>, tonic::Status> {
2268        handle_with_decoration!(self, handle_certificate_v2_impl, request)
2269    }
2270
2271    async fn handle_certificate_v3(
2272        &self,
2273        request: tonic::Request<HandleCertificateRequestV3>,
2274    ) -> Result<tonic::Response<HandleCertificateResponseV3>, tonic::Status> {
2275        handle_with_decoration!(self, handle_certificate_v3_impl, request)
2276    }
2277
2278    async fn wait_for_effects(
2279        &self,
2280        request: tonic::Request<RawWaitForEffectsRequest>,
2281    ) -> Result<tonic::Response<RawWaitForEffectsResponse>, tonic::Status> {
2282        handle_with_decoration!(self, wait_for_effects_impl, request)
2283    }
2284
2285    async fn handle_soft_bundle_certificates_v3(
2286        &self,
2287        request: tonic::Request<HandleSoftBundleCertificatesRequestV3>,
2288    ) -> Result<tonic::Response<HandleSoftBundleCertificatesResponseV3>, tonic::Status> {
2289        handle_with_decoration!(self, handle_soft_bundle_certificates_v3_impl, request)
2290    }
2291
2292    async fn object_info(
2293        &self,
2294        request: tonic::Request<ObjectInfoRequest>,
2295    ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
2296        handle_with_decoration!(self, object_info_impl, request)
2297    }
2298
2299    async fn transaction_info(
2300        &self,
2301        request: tonic::Request<TransactionInfoRequest>,
2302    ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
2303        handle_with_decoration!(self, transaction_info_impl, request)
2304    }
2305
2306    async fn checkpoint(
2307        &self,
2308        request: tonic::Request<CheckpointRequest>,
2309    ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
2310        handle_with_decoration!(self, checkpoint_impl, request)
2311    }
2312
2313    async fn checkpoint_v2(
2314        &self,
2315        request: tonic::Request<CheckpointRequestV2>,
2316    ) -> Result<tonic::Response<CheckpointResponseV2>, tonic::Status> {
2317        handle_with_decoration!(self, checkpoint_v2_impl, request)
2318    }
2319
2320    async fn get_system_state_object(
2321        &self,
2322        request: tonic::Request<SystemStateRequest>,
2323    ) -> Result<tonic::Response<SuiSystemState>, tonic::Status> {
2324        handle_with_decoration!(self, get_system_state_object_impl, request)
2325    }
2326
2327    async fn validator_health(
2328        &self,
2329        request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
2330    ) -> Result<tonic::Response<sui_types::messages_grpc::RawValidatorHealthResponse>, tonic::Status>
2331    {
2332        handle_with_decoration!(self, validator_health_impl, request)
2333    }
2334}