sui_core/
authority_server.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use anyhow::Result;
6use async_trait::async_trait;
7use fastcrypto::traits::KeyPair;
8use futures::{TryFutureExt, future};
9use itertools::Itertools as _;
10use mysten_metrics::spawn_monitored_task;
11use prometheus::{
12    Gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, Registry,
13    register_gauge_with_registry, register_histogram_vec_with_registry,
14    register_histogram_with_registry, register_int_counter_vec_with_registry,
15    register_int_counter_with_registry,
16};
17use std::{
18    cmp::Ordering,
19    future::Future,
20    io,
21    net::{IpAddr, SocketAddr},
22    pin::Pin,
23    sync::Arc,
24    time::{Duration, SystemTime},
25};
26use sui_network::{
27    api::{Validator, ValidatorServer},
28    tonic,
29    validator::server::SUI_TLS_SERVER_NAME,
30};
31use sui_types::message_envelope::Message;
32use sui_types::messages_consensus::ConsensusPosition;
33use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKind};
34use sui_types::messages_grpc::{
35    HandleCertificateRequestV3, HandleCertificateResponseV3, RawSubmitTxResponse,
36};
37use sui_types::messages_grpc::{
38    HandleCertificateResponseV2, HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse,
39    SubmitCertificateResponse, SystemStateRequest, TransactionInfoRequest, TransactionInfoResponse,
40};
41use sui_types::messages_grpc::{
42    HandleSoftBundleCertificatesRequestV3, HandleSoftBundleCertificatesResponseV3,
43};
44use sui_types::multiaddr::Multiaddr;
45use sui_types::object::Object;
46use sui_types::sui_system_state::SuiSystemState;
47use sui_types::traffic_control::{ClientIdSource, Weight};
48use sui_types::{
49    digests::{TransactionDigest, TransactionEffectsDigest},
50    error::{SuiErrorKind, UserInputError},
51};
52use sui_types::{
53    effects::TransactionEffects,
54    messages_grpc::{
55        ExecutedData, RawSubmitTxRequest, RawWaitForEffectsRequest, RawWaitForEffectsResponse,
56        SubmitTxResult, WaitForEffectsRequest, WaitForEffectsResponse,
57    },
58};
59use sui_types::{
60    effects::TransactionEffectsAPI, executable_transaction::VerifiedExecutableTransaction,
61};
62use sui_types::{effects::TransactionEvents, messages_grpc::SubmitTxType};
63use sui_types::{error::*, transaction::*};
64use sui_types::{
65    fp_ensure,
66    messages_checkpoint::{
67        CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
68    },
69};
70use tap::TapFallible;
71use tokio::sync::oneshot;
72use tokio::time::timeout;
73use tonic::metadata::{Ascii, MetadataValue};
74use tracing::{Instrument, debug, error, error_span, info, instrument};
75
76use crate::consensus_adapter::ConnectionMonitorStatusForTests;
77use crate::{
78    authority::{AuthorityState, consensus_tx_status_cache::ConsensusTxStatus},
79    consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics},
80    traffic_controller::{TrafficController, parse_ip, policies::TrafficTally},
81};
82use crate::{
83    authority::{
84        ExecutionEnv, authority_per_epoch_store::AuthorityPerEpochStore,
85        consensus_tx_status_cache::NotifyReadConsensusTxStatusResult,
86        shared_object_version_manager::Schedulable,
87    },
88    checkpoints::CheckpointStore,
89    execution_scheduler::SchedulingSource,
90    mysticeti_adapter::LazyMysticetiClient,
91    transaction_outputs::TransactionOutputs,
92};
93use nonempty::{NonEmpty, nonempty};
94use sui_config::local_ip_utils::new_local_tcp_address_for_testing;
95use sui_types::messages_grpc::PingType;
96use tonic::transport::server::TcpConnectInfo;
97
98#[cfg(test)]
99#[path = "unit_tests/server_tests.rs"]
100mod server_tests;
101
102#[cfg(test)]
103#[path = "unit_tests/wait_for_effects_tests.rs"]
104mod wait_for_effects_tests;
105
106#[cfg(test)]
107#[path = "unit_tests/submit_transaction_tests.rs"]
108mod submit_transaction_tests;
109
110pub struct AuthorityServerHandle {
111    server_handle: sui_network::validator::server::Server,
112}
113
114impl AuthorityServerHandle {
115    pub async fn join(self) -> Result<(), io::Error> {
116        self.server_handle.handle().wait_for_shutdown().await;
117        Ok(())
118    }
119
120    pub async fn kill(self) -> Result<(), io::Error> {
121        self.server_handle.handle().shutdown().await;
122        Ok(())
123    }
124
125    pub fn address(&self) -> &Multiaddr {
126        self.server_handle.local_addr()
127    }
128}
129
130pub struct AuthorityServer {
131    address: Multiaddr,
132    pub state: Arc<AuthorityState>,
133    consensus_adapter: Arc<ConsensusAdapter>,
134    pub metrics: Arc<ValidatorServiceMetrics>,
135}
136
137impl AuthorityServer {
138    pub fn new_for_test_with_consensus_adapter(
139        state: Arc<AuthorityState>,
140        consensus_adapter: Arc<ConsensusAdapter>,
141    ) -> Self {
142        let address = new_local_tcp_address_for_testing();
143        let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
144
145        Self {
146            address,
147            state,
148            consensus_adapter,
149            metrics,
150        }
151    }
152
153    pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
154        let consensus_adapter = Arc::new(ConsensusAdapter::new(
155            Arc::new(LazyMysticetiClient::new()),
156            CheckpointStore::new_for_tests(),
157            state.name,
158            Arc::new(ConnectionMonitorStatusForTests {}),
159            100_000,
160            100_000,
161            None,
162            None,
163            ConsensusAdapterMetrics::new_test(),
164            state.epoch_store_for_testing().protocol_config().clone(),
165        ));
166        Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
167    }
168
169    pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
170        let address = self.address.clone();
171        self.spawn_with_bind_address_for_test(address).await
172    }
173
174    pub async fn spawn_with_bind_address_for_test(
175        self,
176        address: Multiaddr,
177    ) -> Result<AuthorityServerHandle, io::Error> {
178        let tls_config = sui_tls::create_rustls_server_config(
179            self.state.config.network_key_pair().copy().private(),
180            SUI_TLS_SERVER_NAME.to_string(),
181        );
182        let config = mysten_network::config::Config::new();
183        let server = sui_network::validator::server::ServerBuilder::from_config(
184            &config,
185            mysten_network::metrics::DefaultMetricsCallbackProvider::default(),
186        )
187        .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
188            self.state,
189            self.consensus_adapter,
190            self.metrics,
191        )))
192        .bind(&address, Some(tls_config))
193        .await
194        .unwrap();
195        let local_addr = server.local_addr().to_owned();
196        info!("Listening to traffic on {local_addr}");
197        let handle = AuthorityServerHandle {
198            server_handle: server,
199        };
200        Ok(handle)
201    }
202}
203
204pub struct ValidatorServiceMetrics {
205    pub signature_errors: IntCounter,
206    pub tx_verification_latency: Histogram,
207    pub cert_verification_latency: Histogram,
208    pub consensus_latency: Histogram,
209    pub handle_transaction_latency: Histogram,
210    pub submit_certificate_consensus_latency: Histogram,
211    pub handle_certificate_consensus_latency: Histogram,
212    pub handle_certificate_non_consensus_latency: Histogram,
213    pub handle_soft_bundle_certificates_consensus_latency: Histogram,
214    pub handle_soft_bundle_certificates_count: Histogram,
215    pub handle_soft_bundle_certificates_size_bytes: Histogram,
216    pub handle_transaction_consensus_latency: Histogram,
217    pub handle_submit_transaction_consensus_latency: HistogramVec,
218    pub handle_wait_for_effects_ping_latency: HistogramVec,
219
220    handle_submit_transaction_latency: HistogramVec,
221    handle_submit_transaction_bytes: HistogramVec,
222    handle_submit_transaction_batch_size: HistogramVec,
223
224    num_rejected_tx_in_epoch_boundary: IntCounter,
225    num_rejected_cert_in_epoch_boundary: IntCounter,
226    num_rejected_tx_during_overload: IntCounterVec,
227    num_rejected_cert_during_overload: IntCounterVec,
228    connection_ip_not_found: IntCounter,
229    forwarded_header_parse_error: IntCounter,
230    forwarded_header_invalid: IntCounter,
231    forwarded_header_not_included: IntCounter,
232    client_id_source_config_mismatch: IntCounter,
233    x_forwarded_for_num_hops: Gauge,
234}
235
236impl ValidatorServiceMetrics {
237    pub fn new(registry: &Registry) -> Self {
238        Self {
239            signature_errors: register_int_counter_with_registry!(
240                "total_signature_errors",
241                "Number of transaction signature errors",
242                registry,
243            )
244            .unwrap(),
245            tx_verification_latency: register_histogram_with_registry!(
246                "validator_service_tx_verification_latency",
247                "Latency of verifying a transaction",
248                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
249                registry,
250            )
251            .unwrap(),
252            cert_verification_latency: register_histogram_with_registry!(
253                "validator_service_cert_verification_latency",
254                "Latency of verifying a certificate",
255                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
256                registry,
257            )
258            .unwrap(),
259            consensus_latency: register_histogram_with_registry!(
260                "validator_service_consensus_latency",
261                "Time spent between submitting a txn to consensus and getting back local acknowledgement. Execution and finalization time are not included.",
262                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
263                registry,
264            )
265            .unwrap(),
266            handle_transaction_latency: register_histogram_with_registry!(
267                "validator_service_handle_transaction_latency",
268                "Latency of handling a transaction",
269                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
270                registry,
271            )
272            .unwrap(),
273            handle_certificate_consensus_latency: register_histogram_with_registry!(
274                "validator_service_handle_certificate_consensus_latency",
275                "Latency of handling a consensus transaction certificate",
276                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
277                registry,
278            )
279            .unwrap(),
280            submit_certificate_consensus_latency: register_histogram_with_registry!(
281                "validator_service_submit_certificate_consensus_latency",
282                "Latency of submit_certificate RPC handler",
283                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
284                registry,
285            )
286            .unwrap(),
287            handle_certificate_non_consensus_latency: register_histogram_with_registry!(
288                "validator_service_handle_certificate_non_consensus_latency",
289                "Latency of handling a non-consensus transaction certificate",
290                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
291                registry,
292            )
293            .unwrap(),
294            handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
295                "validator_service_handle_soft_bundle_certificates_consensus_latency",
296                "Latency of handling a consensus soft bundle",
297                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
298                registry,
299            )
300            .unwrap(),
301            handle_soft_bundle_certificates_count: register_histogram_with_registry!(
302                "handle_soft_bundle_certificates_count",
303                "The number of certificates included in a soft bundle",
304                mysten_metrics::COUNT_BUCKETS.to_vec(),
305                registry,
306            )
307            .unwrap(),
308            handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
309                "handle_soft_bundle_certificates_size_bytes",
310                "The size of soft bundle in bytes",
311                mysten_metrics::BYTES_BUCKETS.to_vec(),
312                registry,
313            )
314            .unwrap(),
315            handle_transaction_consensus_latency: register_histogram_with_registry!(
316                "validator_service_handle_transaction_consensus_latency",
317                "Latency of handling a user transaction sent through consensus",
318                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
319                registry,
320            )
321            .unwrap(),
322            handle_submit_transaction_consensus_latency: register_histogram_vec_with_registry!(
323                "validator_service_submit_transaction_consensus_latency",
324                "Latency of submitting a user transaction sent through consensus",
325                &["req_type"],
326                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
327                registry,
328            )
329            .unwrap(),
330            handle_submit_transaction_latency: register_histogram_vec_with_registry!(
331                "validator_service_submit_transaction_latency",
332                "Latency of submit transaction handler",
333                &["req_type"],
334                mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
335                registry,
336            )
337            .unwrap(),
338            handle_wait_for_effects_ping_latency: register_histogram_vec_with_registry!(
339                "validator_service_handle_wait_for_effects_ping_latency",
340                "Latency of handling a ping request for wait_for_effects",
341                &["req_type"],
342                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
343                registry,
344            )
345            .unwrap(),
346            handle_submit_transaction_bytes: register_histogram_vec_with_registry!(
347                "validator_service_submit_transaction_bytes",
348                "The size of transactions in the submit transaction request",
349                &["req_type"],
350                mysten_metrics::BYTES_BUCKETS.to_vec(),
351                registry,
352            )
353            .unwrap(),
354            handle_submit_transaction_batch_size: register_histogram_vec_with_registry!(
355                "validator_service_submit_transaction_batch_size",
356                "The number of transactions in the submit transaction request",
357                &["req_type"],
358                mysten_metrics::COUNT_BUCKETS.to_vec(),
359                registry,
360            )
361            .unwrap(),
362            num_rejected_tx_in_epoch_boundary: register_int_counter_with_registry!(
363                "validator_service_num_rejected_tx_in_epoch_boundary",
364                "Number of rejected transaction during epoch transitioning",
365                registry,
366            )
367            .unwrap(),
368            num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
369                "validator_service_num_rejected_cert_in_epoch_boundary",
370                "Number of rejected transaction certificate during epoch transitioning",
371                registry,
372            )
373            .unwrap(),
374            num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
375                "validator_service_num_rejected_tx_during_overload",
376                "Number of rejected transaction due to system overload",
377                &["error_type"],
378                registry,
379            )
380            .unwrap(),
381            num_rejected_cert_during_overload: register_int_counter_vec_with_registry!(
382                "validator_service_num_rejected_cert_during_overload",
383                "Number of rejected transaction certificate due to system overload",
384                &["error_type"],
385                registry,
386            )
387            .unwrap(),
388            connection_ip_not_found: register_int_counter_with_registry!(
389                "validator_service_connection_ip_not_found",
390                "Number of times connection IP was not extractable from request",
391                registry,
392            )
393            .unwrap(),
394            forwarded_header_parse_error: register_int_counter_with_registry!(
395                "validator_service_forwarded_header_parse_error",
396                "Number of times x-forwarded-for header could not be parsed",
397                registry,
398            )
399            .unwrap(),
400            forwarded_header_invalid: register_int_counter_with_registry!(
401                "validator_service_forwarded_header_invalid",
402                "Number of times x-forwarded-for header was invalid",
403                registry,
404            )
405            .unwrap(),
406            forwarded_header_not_included: register_int_counter_with_registry!(
407                "validator_service_forwarded_header_not_included",
408                "Number of times x-forwarded-for header was (unexpectedly) not included in request",
409                registry,
410            )
411            .unwrap(),
412            client_id_source_config_mismatch: register_int_counter_with_registry!(
413                "validator_service_client_id_source_config_mismatch",
414                "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
415                registry,
416            )
417            .unwrap(),
418            x_forwarded_for_num_hops: register_gauge_with_registry!(
419                "validator_service_x_forwarded_for_num_hops",
420                "Number of hops in x-forwarded-for header",
421                registry,
422            )
423            .unwrap(),
424        }
425    }
426
427    pub fn new_for_tests() -> Self {
428        let registry = Registry::new();
429        Self::new(&registry)
430    }
431}
432
433#[derive(Clone)]
434pub struct ValidatorService {
435    state: Arc<AuthorityState>,
436    consensus_adapter: Arc<ConsensusAdapter>,
437    metrics: Arc<ValidatorServiceMetrics>,
438    traffic_controller: Option<Arc<TrafficController>>,
439    client_id_source: Option<ClientIdSource>,
440}
441
442impl ValidatorService {
443    pub fn new(
444        state: Arc<AuthorityState>,
445        consensus_adapter: Arc<ConsensusAdapter>,
446        validator_metrics: Arc<ValidatorServiceMetrics>,
447        client_id_source: Option<ClientIdSource>,
448    ) -> Self {
449        let traffic_controller = state.traffic_controller.clone();
450        Self {
451            state,
452            consensus_adapter,
453            metrics: validator_metrics,
454            traffic_controller,
455            client_id_source,
456        }
457    }
458
459    pub fn new_for_tests(
460        state: Arc<AuthorityState>,
461        consensus_adapter: Arc<ConsensusAdapter>,
462        metrics: Arc<ValidatorServiceMetrics>,
463    ) -> Self {
464        Self {
465            state,
466            consensus_adapter,
467            metrics,
468            traffic_controller: None,
469            client_id_source: None,
470        }
471    }
472
473    pub fn validator_state(&self) -> &Arc<AuthorityState> {
474        &self.state
475    }
476
477    pub async fn execute_certificate_for_testing(
478        &self,
479        cert: CertifiedTransaction,
480    ) -> Result<tonic::Response<HandleCertificateResponseV2>, tonic::Status> {
481        let request = make_tonic_request_for_testing(cert);
482        self.handle_certificate_v2(request).await
483    }
484
485    pub async fn handle_transaction_for_benchmarking(
486        &self,
487        transaction: Transaction,
488    ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
489        let request = make_tonic_request_for_testing(transaction);
490        self.transaction(request).await
491    }
492
493    // When making changes to this function, see if the changes should be applied to
494    // `Self::handle_submit_transaction()` and `SuiTxValidator::vote_transaction()` as well.
495    async fn handle_transaction(
496        &self,
497        request: tonic::Request<Transaction>,
498    ) -> WrappedServiceResponse<HandleTransactionResponse> {
499        let Self {
500            state,
501            consensus_adapter,
502            metrics,
503            traffic_controller: _,
504            client_id_source: _,
505        } = self.clone();
506        let transaction = request.into_inner();
507        let epoch_store = state.load_epoch_store_one_call_per_task();
508
509        transaction.validity_check(&epoch_store.tx_validity_check_context())?;
510
511        // When authority is overloaded and decide to reject this tx, we still lock the object
512        // and ask the client to retry in the future. This is because without locking, the
513        // input objects can be locked by a different tx in the future, however, the input objects
514        // may already be locked by this tx in other validators. This can cause non of the txes
515        // to have enough quorum to form a certificate, causing the objects to be locked for
516        // the entire epoch. By doing locking but pushback, retrying transaction will have
517        // higher chance to succeed.
518        let mut validator_pushback_error = None;
519        let overload_check_res = state.check_system_overload(
520            &*consensus_adapter,
521            transaction.data(),
522            state.check_system_overload_at_signing(),
523        );
524        if let Err(error) = overload_check_res {
525            metrics
526                .num_rejected_tx_during_overload
527                .with_label_values(&[error.as_ref()])
528                .inc();
529            // TODO: consider change the behavior for other types of overload errors.
530            match error.as_inner() {
531                SuiErrorKind::ValidatorOverloadedRetryAfter { .. } => {
532                    validator_pushback_error = Some(error)
533                }
534                _ => return Err(error.into()),
535            }
536        }
537
538        let _handle_tx_metrics_guard = metrics.handle_transaction_latency.start_timer();
539
540        let tx_verif_metrics_guard = metrics.tx_verification_latency.start_timer();
541        let transaction = epoch_store.verify_transaction(transaction).tap_err(|_| {
542            metrics.signature_errors.inc();
543        })?;
544        drop(tx_verif_metrics_guard);
545
546        let tx_digest = transaction.digest();
547
548        // Enable Trace Propagation across spans/processes using tx_digest
549        let span = error_span!("ValidatorService::validator_state_process_tx", ?tx_digest);
550
551        let info = state
552            .handle_transaction(&epoch_store, transaction.clone())
553            .instrument(span)
554            .await
555            .tap_err(|e| {
556                if let SuiErrorKind::ValidatorHaltedAtEpochEnd = e.as_inner() {
557                    metrics.num_rejected_tx_in_epoch_boundary.inc();
558                }
559            })?;
560
561        if let Some(error) = validator_pushback_error {
562            // TODO: right now, we still sign the txn, but just don't return it. We can also skip signing
563            // to save more CPU.
564            return Err(error.into());
565        }
566
567        Ok((tonic::Response::new(info), Weight::zero()))
568    }
569
570    #[instrument(
571        name = "ValidatorService::handle_submit_transaction",
572        level = "error",
573        skip_all,
574        err(level = "debug")
575    )]
576    async fn handle_submit_transaction(
577        &self,
578        request: tonic::Request<RawSubmitTxRequest>,
579    ) -> WrappedServiceResponse<RawSubmitTxResponse> {
580        let Self {
581            state,
582            consensus_adapter,
583            metrics,
584            traffic_controller: _,
585            client_id_source,
586        } = self.clone();
587
588        let submitter_client_addr = if let Some(client_id_source) = &client_id_source {
589            self.get_client_ip_addr(&request, client_id_source)
590        } else {
591            self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
592        };
593
594        let epoch_store = state.load_epoch_store_one_call_per_task();
595        if !epoch_store.protocol_config().mysticeti_fastpath() {
596            return Err(SuiErrorKind::UnsupportedFeatureError {
597                error: "Mysticeti fastpath".to_string(),
598            }
599            .into());
600        }
601
602        let request = request.into_inner();
603        let submit_type = SubmitTxType::try_from(request.submit_type).map_err(|e| {
604            SuiErrorKind::GrpcMessageDeserializeError {
605                type_info: "RawSubmitTxRequest.submit_type".to_string(),
606                error: e.to_string(),
607            }
608        })?;
609
610        let is_ping_request = submit_type == SubmitTxType::Ping;
611        if is_ping_request {
612            fp_ensure!(
613                request.transactions.is_empty(),
614                SuiErrorKind::InvalidRequest(format!(
615                    "Ping request cannot contain {} transactions",
616                    request.transactions.len()
617                ))
618                .into()
619            );
620        } else {
621            // Ensure default and soft bundle requests contain at least one transaction.
622            fp_ensure!(
623                !request.transactions.is_empty(),
624                SuiErrorKind::InvalidRequest(
625                    "At least one transaction needs to be submitted".to_string(),
626                )
627                .into()
628            );
629        }
630
631        // NOTE: for soft bundle requests, the system tries to sequence the transactions in the same order
632        // if they use the same gas price. But this is only done with best effort.
633        // Transactions in a soft bundle can be individually rejected or deferred, without affecting
634        // other transactions in the same bundle.
635        let is_soft_bundle_request = submit_type == SubmitTxType::SoftBundle;
636
637        let max_num_transactions = if is_soft_bundle_request {
638            // Soft bundle cannot contain too many transactions.
639            // Otherwise it is hard to include all of them in a single block.
640            epoch_store.protocol_config().max_soft_bundle_size()
641        } else {
642            // Still enforce a limit even when transactions do not need to be in the same block.
643            epoch_store
644                .protocol_config()
645                .max_num_transactions_in_block()
646        };
647        fp_ensure!(
648            request.transactions.len() <= max_num_transactions as usize,
649            SuiErrorKind::InvalidRequest(format!(
650                "Too many transactions in request: {} vs {}",
651                request.transactions.len(),
652                max_num_transactions
653            ))
654            .into()
655        );
656
657        // Transaction digests.
658        let mut tx_digests = Vec::with_capacity(request.transactions.len());
659        // Transactions to submit to consensus.
660        let mut consensus_transactions = Vec::with_capacity(request.transactions.len());
661        // Indexes of transactions above in the request transactions.
662        let mut transaction_indexes = Vec::with_capacity(request.transactions.len());
663        // Results corresponding to each transaction in the request.
664        let mut results: Vec<Option<SubmitTxResult>> = vec![None; request.transactions.len()];
665        // Total size of all transactions in the request.
666        let mut total_size_bytes = 0;
667
668        let req_type = if is_ping_request {
669            "ping"
670        } else if request.transactions.len() == 1 {
671            "single_transaction"
672        } else if is_soft_bundle_request {
673            "soft_bundle"
674        } else {
675            "batch"
676        };
677
678        let _handle_tx_metrics_guard = metrics
679            .handle_submit_transaction_latency
680            .with_label_values(&[req_type])
681            .start_timer();
682
683        for (idx, tx_bytes) in request.transactions.iter().enumerate() {
684            let transaction = match bcs::from_bytes::<Transaction>(tx_bytes) {
685                Ok(txn) => txn,
686                Err(e) => {
687                    // Ok to fail the request when any transaction is invalid.
688                    return Err(SuiErrorKind::TransactionDeserializationError {
689                        error: format!("Failed to deserialize transaction at index {}: {}", idx, e),
690                    }
691                    .into());
692                }
693            };
694
695            // Ok to fail the request when any transaction is invalid.
696            let tx_size = transaction.validity_check(&epoch_store.tx_validity_check_context())?;
697
698            let overload_check_res = self.state.check_system_overload(
699                &*consensus_adapter,
700                transaction.data(),
701                state.check_system_overload_at_signing(),
702            );
703            if let Err(error) = overload_check_res {
704                metrics
705                    .num_rejected_tx_during_overload
706                    .with_label_values(&[error.as_ref()])
707                    .inc();
708                results[idx] = Some(SubmitTxResult::Rejected { error });
709                continue;
710            }
711
712            // Ok to fail the request when any signature is invalid.
713            let verified_transaction = {
714                let _metrics_guard = metrics.tx_verification_latency.start_timer();
715                match epoch_store.verify_transaction(transaction) {
716                    Ok(txn) => txn,
717                    Err(e) => {
718                        metrics.signature_errors.inc();
719                        return Err(e.into());
720                    }
721                }
722            };
723
724            let tx_digest = verified_transaction.digest();
725            tx_digests.push(*tx_digest);
726
727            debug!(
728                ?tx_digest,
729                "handle_submit_transaction: verified transaction"
730            );
731
732            // Check if the transaction has executed, before checking input objects
733            // which could have been consumed.
734            if let Some(effects) = self
735                .state
736                .get_transaction_cache_reader()
737                .get_executed_effects(tx_digest)
738            {
739                let effects_digest = effects.digest();
740                if let Ok(executed_data) = self.complete_executed_data(effects, None).await {
741                    let executed_result = SubmitTxResult::Executed {
742                        effects_digest,
743                        details: Some(executed_data),
744                        fast_path: false,
745                    };
746                    results[idx] = Some(executed_result);
747                    debug!(?tx_digest, "handle_submit_transaction: already executed");
748                    continue;
749                }
750            }
751
752            debug!(
753                ?tx_digest,
754                "handle_submit_transaction: waiting for fastpath dependency objects"
755            );
756            if !state
757                .wait_for_fastpath_dependency_objects(&verified_transaction, epoch_store.epoch())
758                .await?
759            {
760                debug!(
761                    ?tx_digest,
762                    "fastpath input objects are still unavailable after waiting"
763                );
764            }
765
766            match state.handle_vote_transaction(&epoch_store, verified_transaction.clone()) {
767                Ok(_) => { /* continue processing */ }
768                Err(e) => {
769                    // Check if transaction has been executed while being validated.
770                    // This is an edge case so checking executed effects twice is acceptable.
771                    if let Some(effects) = self
772                        .state
773                        .get_transaction_cache_reader()
774                        .get_executed_effects(tx_digest)
775                    {
776                        let effects_digest = effects.digest();
777                        if let Ok(executed_data) = self.complete_executed_data(effects, None).await
778                        {
779                            let executed_result = SubmitTxResult::Executed {
780                                effects_digest,
781                                details: Some(executed_data),
782                                fast_path: false,
783                            };
784                            results[idx] = Some(executed_result);
785                            continue;
786                        }
787                    }
788                    // If the transaction has not been executed, record the error for the transaction.
789                    results[idx] = Some(SubmitTxResult::Rejected { error: e });
790                    continue;
791                }
792            }
793
794            consensus_transactions.push(ConsensusTransaction::new_user_transaction_message(
795                &self.state.name,
796                verified_transaction.into(),
797            ));
798            transaction_indexes.push(idx);
799            total_size_bytes += tx_size;
800        }
801
802        if consensus_transactions.is_empty() && !is_ping_request {
803            return Ok((
804                tonic::Response::new(Self::try_from_submit_tx_response(results)?),
805                Weight::zero(),
806            ));
807        }
808
809        // Set the max bytes size of the soft bundle to be half of the consensus max transactions in block size.
810        // We do this to account for serialization overheads and to ensure that the soft bundle is not too large
811        // when is attempted to be posted via consensus.
812        let max_transaction_bytes = if is_soft_bundle_request {
813            epoch_store
814                .protocol_config()
815                .consensus_max_transactions_in_block_bytes()
816                / 2
817        } else {
818            epoch_store
819                .protocol_config()
820                .consensus_max_transactions_in_block_bytes()
821        };
822        fp_ensure!(
823            total_size_bytes <= max_transaction_bytes as usize,
824            SuiErrorKind::UserInputError {
825                error: UserInputError::TotalTransactionSizeTooLargeInBatch {
826                    size: total_size_bytes,
827                    limit: max_transaction_bytes,
828                },
829            }
830            .into()
831        );
832
833        metrics
834            .handle_submit_transaction_bytes
835            .with_label_values(&[req_type])
836            .observe(total_size_bytes as f64);
837        metrics
838            .handle_submit_transaction_batch_size
839            .with_label_values(&[req_type])
840            .observe(consensus_transactions.len() as f64);
841
842        let _latency_metric_guard = metrics
843            .handle_submit_transaction_consensus_latency
844            .with_label_values(&[req_type])
845            .start_timer();
846
847        let consensus_positions = if is_soft_bundle_request || is_ping_request {
848            // We only allow the `consensus_transactions` to be empty for ping requests. This is how it should and is be treated from the downstream components.
849            // For any other case, having an empty `consensus_transactions` vector is an invalid state and we should have never reached at this point.
850            assert!(
851                is_ping_request || !consensus_transactions.is_empty(),
852                "A valid soft bundle must have at least one transaction"
853            );
854            debug!(
855                "handle_submit_transaction: submitting consensus transactions ({}): {}",
856                req_type,
857                consensus_transactions
858                    .iter()
859                    .map(|t| t.local_display())
860                    .join(", ")
861            );
862            self.handle_submit_to_consensus_for_position(
863                consensus_transactions,
864                &epoch_store,
865                submitter_client_addr,
866            )
867            .await?
868        } else {
869            let futures = consensus_transactions.into_iter().map(|t| {
870                debug!(
871                    "handle_submit_transaction: submitting consensus transaction ({}): {}",
872                    req_type,
873                    t.local_display(),
874                );
875                self.handle_submit_to_consensus_for_position(
876                    vec![t],
877                    &epoch_store,
878                    submitter_client_addr,
879                )
880            });
881            future::try_join_all(futures)
882                .await?
883                .into_iter()
884                .flatten()
885                .collect()
886        };
887
888        if is_ping_request {
889            // For ping requests, return the special consensus position.
890            assert_eq!(consensus_positions.len(), 1);
891            results.push(Some(SubmitTxResult::Submitted {
892                consensus_position: consensus_positions[0],
893            }));
894        } else {
895            // Otherwise, return the consensus position for each transaction.
896            for ((idx, tx_digest), consensus_position) in transaction_indexes
897                .into_iter()
898                .zip(tx_digests)
899                .zip(consensus_positions)
900            {
901                debug!(
902                    ?tx_digest,
903                    "handle_submit_transaction: submitted consensus transaction at {}",
904                    consensus_position,
905                );
906                results[idx] = Some(SubmitTxResult::Submitted { consensus_position });
907            }
908        }
909
910        Ok((
911            tonic::Response::new(Self::try_from_submit_tx_response(results)?),
912            Weight::zero(),
913        ))
914    }
915
916    fn try_from_submit_tx_response(
917        results: Vec<Option<SubmitTxResult>>,
918    ) -> Result<RawSubmitTxResponse, SuiError> {
919        let mut raw_results = Vec::new();
920        for (i, result) in results.into_iter().enumerate() {
921            let result = result.ok_or_else(|| SuiErrorKind::GenericAuthorityError {
922                error: format!("Missing transaction result at {}", i),
923            })?;
924            let raw_result = result.try_into()?;
925            raw_results.push(raw_result);
926        }
927        Ok(RawSubmitTxResponse {
928            results: raw_results,
929        })
930    }
931
932    // In addition to the response from handling the certificates,
933    // returns a bool indicating whether the request should be tallied
934    // toward spam count. In general, this should be set to true for
935    // requests that are read-only and thus do not consume gas, such
936    // as when the transaction is already executed.
937    async fn handle_certificates(
938        &self,
939        certificates: NonEmpty<CertifiedTransaction>,
940        include_events: bool,
941        include_input_objects: bool,
942        include_output_objects: bool,
943        include_auxiliary_data: bool,
944        epoch_store: &Arc<AuthorityPerEpochStore>,
945        wait_for_effects: bool,
946    ) -> Result<(Option<Vec<HandleCertificateResponseV3>>, Weight), tonic::Status> {
947        // Validate if cert can be executed
948        // Fullnode does not serve handle_certificate call.
949        fp_ensure!(
950            !self.state.is_fullnode(epoch_store),
951            SuiErrorKind::FullNodeCantHandleCertificate.into()
952        );
953
954        let is_consensus_tx = certificates.iter().any(|cert| cert.is_consensus_tx());
955
956        let metrics = if certificates.len() == 1 {
957            if wait_for_effects {
958                if is_consensus_tx {
959                    &self.metrics.handle_certificate_consensus_latency
960                } else {
961                    &self.metrics.handle_certificate_non_consensus_latency
962                }
963            } else {
964                &self.metrics.submit_certificate_consensus_latency
965            }
966        } else {
967            // `soft_bundle_validity_check` ensured that all certificates contain shared objects.
968            &self
969                .metrics
970                .handle_soft_bundle_certificates_consensus_latency
971        };
972
973        let _metrics_guard = metrics.start_timer();
974
975        // 1) Check if the certificate is already executed.
976        //    This is only needed when we have only one certificate (not a soft bundle).
977        //    When multiple certificates are provided, we will either submit all of them or none of them to consensus.
978        if certificates.len() == 1 {
979            let tx_digest = *certificates[0].digest();
980            debug!(tx_digest=?tx_digest, "Checking if certificate is already executed");
981
982            if let Some(signed_effects) = self
983                .state
984                .get_signed_effects_and_maybe_resign(&tx_digest, epoch_store)?
985            {
986                let events = if include_events && signed_effects.events_digest().is_some() {
987                    Some(
988                        self.state
989                            .get_transaction_events(signed_effects.transaction_digest())?,
990                    )
991                } else {
992                    None
993                };
994
995                return Ok((
996                    Some(vec![HandleCertificateResponseV3 {
997                        effects: signed_effects.into_inner(),
998                        events,
999                        input_objects: None,
1000                        output_objects: None,
1001                        auxiliary_data: None,
1002                    }]),
1003                    Weight::one(),
1004                ));
1005            };
1006        }
1007
1008        // 2) Verify the certificates.
1009        // Check system overload
1010        for certificate in &certificates {
1011            let overload_check_res = self.state.check_system_overload(
1012                &*self.consensus_adapter,
1013                certificate.data(),
1014                self.state.check_system_overload_at_execution(),
1015            );
1016            if let Err(error) = overload_check_res {
1017                self.metrics
1018                    .num_rejected_cert_during_overload
1019                    .with_label_values(&[error.as_ref()])
1020                    .inc();
1021                return Err(error.into());
1022            }
1023        }
1024
1025        let verified_certificates = {
1026            let _timer = self.metrics.cert_verification_latency.start_timer();
1027            epoch_store
1028                .signature_verifier
1029                .multi_verify_certs(certificates.into())
1030                .await
1031                .into_iter()
1032                .collect::<Result<Vec<_>, _>>()?
1033        };
1034        let consensus_transactions =
1035            NonEmpty::collect(verified_certificates.iter().map(|certificate| {
1036                ConsensusTransaction::new_certificate_message(
1037                    &self.state.name,
1038                    certificate.clone().into(),
1039                )
1040            }))
1041            .unwrap();
1042
1043        let (responses, weight) = self
1044            .handle_submit_to_consensus(
1045                consensus_transactions,
1046                include_events,
1047                include_input_objects,
1048                include_output_objects,
1049                include_auxiliary_data,
1050                epoch_store,
1051                wait_for_effects,
1052            )
1053            .await?;
1054        // Sign the returned TransactionEffects.
1055        let responses = if let Some(responses) = responses {
1056            Some(
1057                responses
1058                    .into_iter()
1059                    .map(|response| {
1060                        let signed_effects =
1061                            self.state.sign_effects(response.effects, epoch_store)?;
1062                        Ok(HandleCertificateResponseV3 {
1063                            effects: signed_effects.into_inner(),
1064                            events: response.events,
1065                            input_objects: if response.input_objects.is_empty() {
1066                                None
1067                            } else {
1068                                Some(response.input_objects)
1069                            },
1070                            output_objects: if response.output_objects.is_empty() {
1071                                None
1072                            } else {
1073                                Some(response.output_objects)
1074                            },
1075                            auxiliary_data: None,
1076                        })
1077                    })
1078                    .collect::<Result<Vec<HandleCertificateResponseV3>, tonic::Status>>()?,
1079            )
1080        } else {
1081            None
1082        };
1083
1084        Ok((responses, weight))
1085    }
1086
1087    #[instrument(
1088        name = "ValidatorService::handle_submit_to_consensus_for_position",
1089        level = "debug",
1090        skip_all,
1091        err(level = "debug")
1092    )]
1093    async fn handle_submit_to_consensus_for_position(
1094        &self,
1095        // Empty when this is a ping request.
1096        consensus_transactions: Vec<ConsensusTransaction>,
1097        epoch_store: &Arc<AuthorityPerEpochStore>,
1098        submitter_client_addr: Option<IpAddr>,
1099    ) -> Result<Vec<ConsensusPosition>, tonic::Status> {
1100        let (tx_consensus_positions, rx_consensus_positions) = oneshot::channel();
1101
1102        {
1103            // code block within reconfiguration lock
1104            let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
1105            if !reconfiguration_lock.should_accept_user_certs() {
1106                self.metrics.num_rejected_cert_in_epoch_boundary.inc();
1107                return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1108            }
1109
1110            // Submit to consensus and wait for position, we do not check if tx
1111            // has been processed by consensus already as this method is called
1112            // to get back a consensus position.
1113            let _metrics_guard = self.metrics.consensus_latency.start_timer();
1114
1115            self.consensus_adapter.submit_batch(
1116                &consensus_transactions,
1117                Some(&reconfiguration_lock),
1118                epoch_store,
1119                Some(tx_consensus_positions),
1120                submitter_client_addr,
1121            )?;
1122        }
1123
1124        Ok(rx_consensus_positions.await.map_err(|e| {
1125            SuiErrorKind::FailedToSubmitToConsensus(format!(
1126                "Failed to get consensus position: {e}"
1127            ))
1128        })?)
1129    }
1130
1131    async fn handle_submit_to_consensus(
1132        &self,
1133        consensus_transactions: NonEmpty<ConsensusTransaction>,
1134        include_events: bool,
1135        include_input_objects: bool,
1136        include_output_objects: bool,
1137        _include_auxiliary_data: bool,
1138        epoch_store: &Arc<AuthorityPerEpochStore>,
1139        wait_for_effects: bool,
1140    ) -> Result<(Option<Vec<ExecutedData>>, Weight), tonic::Status> {
1141        let consensus_transactions: Vec<_> = consensus_transactions.into();
1142        {
1143            // code block within reconfiguration lock
1144            let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
1145            if !reconfiguration_lock.should_accept_user_certs() {
1146                self.metrics.num_rejected_cert_in_epoch_boundary.inc();
1147                return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1148            }
1149
1150            // 3) All transactions are sent to consensus (at least by some authorities)
1151            // For certs with shared objects this will wait until either timeout or we have heard back from consensus.
1152            // For certs with owned objects this will return without waiting for certificate to be sequenced.
1153            // For uncertified transactions this will wait for fast path processing.
1154            // First do quick dirty non-async check.
1155            if !epoch_store.all_external_consensus_messages_processed(
1156                consensus_transactions.iter().map(|tx| tx.key()),
1157            )? {
1158                let _metrics_guard = self.metrics.consensus_latency.start_timer();
1159                self.consensus_adapter.submit_batch(
1160                    &consensus_transactions,
1161                    Some(&reconfiguration_lock),
1162                    epoch_store,
1163                    None,
1164                    None, // not tracking submitter client addr for quorum driver path
1165                )?;
1166                // Do not wait for the result, because the transaction might have already executed.
1167                // Instead, check or wait for the existence of certificate effects below.
1168            }
1169        }
1170
1171        if !wait_for_effects {
1172            // It is useful to enqueue owned object transaction for execution locally,
1173            // even when we are not returning effects to user
1174            let fast_path_certificates = consensus_transactions
1175                .iter()
1176                .filter_map(|tx| {
1177                    if let ConsensusTransactionKind::CertifiedTransaction(certificate) = &tx.kind {
1178                        (!certificate.is_consensus_tx())
1179                            // Certificates already verified by callers of this function.
1180                            .then_some((
1181                                VerifiedExecutableTransaction::new_from_certificate(
1182                                    VerifiedCertificate::new_unchecked(*(certificate.clone())),
1183                                ),
1184                                ExecutionEnv::new()
1185                                    .with_scheduling_source(SchedulingSource::NonFastPath),
1186                            ))
1187                    } else {
1188                        None
1189                    }
1190                })
1191                .map(|(tx, env)| (Schedulable::Transaction(tx), env))
1192                .collect::<Vec<_>>();
1193            if !fast_path_certificates.is_empty() {
1194                self.state
1195                    .execution_scheduler()
1196                    .enqueue(fast_path_certificates, epoch_store);
1197            }
1198            return Ok((None, Weight::zero()));
1199        }
1200
1201        // 4) Execute the certificates immediately if they contain only owned object transactions,
1202        // or wait for the execution results if it contains shared objects.
1203        let responses = futures::future::try_join_all(consensus_transactions.into_iter().map(
1204            |tx| async move {
1205                let effects = match &tx.kind {
1206                    ConsensusTransactionKind::CertifiedTransaction(certificate) => {
1207                        // Certificates already verified by callers of this function.
1208                        let certificate = VerifiedCertificate::new_unchecked(*(certificate.clone()));
1209                        self.state
1210                            .wait_for_certificate_execution(&certificate, epoch_store)
1211                            .await?
1212                    }
1213                    ConsensusTransactionKind::UserTransaction(tx) => {
1214                        self.state.await_transaction_effects(*tx.digest(), epoch_store).await?
1215                    }
1216                    _ => panic!("`handle_submit_to_consensus` received transaction that is not a CertifiedTransaction or UserTransaction"),
1217                };
1218                let events = if include_events && effects.events_digest().is_some() {
1219                    Some(self.state.get_transaction_events(effects.transaction_digest())?)
1220                } else {
1221                    None
1222                };
1223
1224                let input_objects = if include_input_objects {
1225                    self.state.get_transaction_input_objects(&effects)?
1226                } else {
1227                    vec![]
1228                };
1229
1230                let output_objects = if include_output_objects {
1231                    self.state.get_transaction_output_objects(&effects)?
1232                } else {
1233                    vec![]
1234                };
1235
1236                if let ConsensusTransactionKind::CertifiedTransaction(certificate) = &tx.kind {
1237                    epoch_store.insert_tx_cert_sig(certificate.digest(), certificate.auth_sig())?;
1238                }
1239
1240                Ok::<_, SuiError>(ExecutedData {
1241                    effects,
1242                    events,
1243                    input_objects,
1244                    output_objects,
1245                })
1246            },
1247        ))
1248        .await?;
1249
1250        Ok((Some(responses), Weight::zero()))
1251    }
1252
1253    async fn collect_effects_data(
1254        &self,
1255        effects: &TransactionEffects,
1256        include_events: bool,
1257        include_input_objects: bool,
1258        include_output_objects: bool,
1259        fastpath_outputs: Option<Arc<TransactionOutputs>>,
1260    ) -> SuiResult<(Option<TransactionEvents>, Vec<Object>, Vec<Object>)> {
1261        let events = if include_events && effects.events_digest().is_some() {
1262            if let Some(fastpath_outputs) = &fastpath_outputs {
1263                Some(fastpath_outputs.events.clone())
1264            } else {
1265                Some(
1266                    self.state
1267                        .get_transaction_events(effects.transaction_digest())?,
1268                )
1269            }
1270        } else {
1271            None
1272        };
1273
1274        let input_objects = if include_input_objects {
1275            self.state.get_transaction_input_objects(effects)?
1276        } else {
1277            vec![]
1278        };
1279
1280        let output_objects = if include_output_objects {
1281            if let Some(fastpath_outputs) = &fastpath_outputs {
1282                fastpath_outputs.written.values().cloned().collect()
1283            } else {
1284                self.state.get_transaction_output_objects(effects)?
1285            }
1286        } else {
1287            vec![]
1288        };
1289
1290        Ok((events, input_objects, output_objects))
1291    }
1292}
1293
1294type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
1295
1296impl ValidatorService {
1297    async fn transaction_impl(
1298        &self,
1299        request: tonic::Request<Transaction>,
1300    ) -> WrappedServiceResponse<HandleTransactionResponse> {
1301        self.handle_transaction(request).await
1302    }
1303
1304    async fn handle_submit_transaction_impl(
1305        &self,
1306        request: tonic::Request<RawSubmitTxRequest>,
1307    ) -> WrappedServiceResponse<RawSubmitTxResponse> {
1308        self.handle_submit_transaction(request).await
1309    }
1310
1311    async fn submit_certificate_impl(
1312        &self,
1313        request: tonic::Request<CertifiedTransaction>,
1314    ) -> WrappedServiceResponse<SubmitCertificateResponse> {
1315        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1316        let certificate = request.into_inner();
1317        certificate.validity_check(&epoch_store.tx_validity_check_context())?;
1318
1319        let span =
1320            error_span!("ValidatorService::submit_certificate", tx_digest = ?certificate.digest());
1321        self.handle_certificates(
1322            nonempty![certificate],
1323            true,
1324            false,
1325            false,
1326            false,
1327            &epoch_store,
1328            false,
1329        )
1330        .instrument(span)
1331        .await
1332        .map(|(executed, spam_weight)| {
1333            (
1334                tonic::Response::new(SubmitCertificateResponse {
1335                    executed: executed.map(|mut x| x.remove(0)).map(Into::into),
1336                }),
1337                spam_weight,
1338            )
1339        })
1340    }
1341
1342    async fn handle_certificate_v2_impl(
1343        &self,
1344        request: tonic::Request<CertifiedTransaction>,
1345    ) -> WrappedServiceResponse<HandleCertificateResponseV2> {
1346        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1347        let certificate = request.into_inner();
1348        certificate.validity_check(&epoch_store.tx_validity_check_context())?;
1349
1350        let span = error_span!("ValidatorService::handle_certificate_v2", tx_digest = ?certificate.digest());
1351        self.handle_certificates(
1352            nonempty![certificate],
1353            true,
1354            false,
1355            false,
1356            false,
1357            &epoch_store,
1358            true,
1359        )
1360        .instrument(span)
1361        .await
1362        .map(|(resp, spam_weight)| {
1363            (
1364                tonic::Response::new(
1365                    resp.expect(
1366                        "handle_certificate should not return none with wait_for_effects=true",
1367                    )
1368                    .remove(0)
1369                    .into(),
1370                ),
1371                spam_weight,
1372            )
1373        })
1374    }
1375
1376    async fn handle_certificate_v3_impl(
1377        &self,
1378        request: tonic::Request<HandleCertificateRequestV3>,
1379    ) -> WrappedServiceResponse<HandleCertificateResponseV3> {
1380        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1381        let request = request.into_inner();
1382        request
1383            .certificate
1384            .validity_check(&epoch_store.tx_validity_check_context())?;
1385
1386        let span = error_span!("ValidatorService::handle_certificate_v3", tx_digest = ?request.certificate.digest());
1387        self.handle_certificates(
1388            nonempty![request.certificate],
1389            request.include_events,
1390            request.include_input_objects,
1391            request.include_output_objects,
1392            request.include_auxiliary_data,
1393            &epoch_store,
1394            true,
1395        )
1396        .instrument(span)
1397        .await
1398        .map(|(resp, spam_weight)| {
1399            (
1400                tonic::Response::new(
1401                    resp.expect(
1402                        "handle_certificate should not return none with wait_for_effects=true",
1403                    )
1404                    .remove(0),
1405                ),
1406                spam_weight,
1407            )
1408        })
1409    }
1410
1411    async fn wait_for_effects_impl(
1412        &self,
1413        request: tonic::Request<RawWaitForEffectsRequest>,
1414    ) -> WrappedServiceResponse<RawWaitForEffectsResponse> {
1415        let request: WaitForEffectsRequest = request.into_inner().try_into()?;
1416        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1417        let response = timeout(
1418            // TODO(fastpath): Tune this once we have a good estimate of the typical delay.
1419            Duration::from_secs(20),
1420            epoch_store
1421                .within_alive_epoch(self.wait_for_effects_response(request, &epoch_store))
1422                .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch())),
1423        )
1424        .await
1425        .map_err(|_| tonic::Status::internal("Timeout waiting for effects"))???
1426        .try_into()?;
1427        Ok((tonic::Response::new(response), Weight::zero()))
1428    }
1429
1430    #[instrument(name= "ValidatorService::wait_for_effects_response", level = "error", skip_all, err(level = "debug"), fields(consensus_position = ?request.consensus_position, fast_path_effects = tracing::field::Empty))]
1431    async fn wait_for_effects_response(
1432        &self,
1433        request: WaitForEffectsRequest,
1434        epoch_store: &Arc<AuthorityPerEpochStore>,
1435    ) -> SuiResult<WaitForEffectsResponse> {
1436        if request.ping_type.is_some() {
1437            return timeout(
1438                Duration::from_secs(10),
1439                self.ping_response(request, epoch_store),
1440            )
1441            .await
1442            .map_err(|_| SuiErrorKind::TimeoutError)?;
1443        }
1444
1445        let Some(tx_digest) = request.transaction_digest else {
1446            return Err(SuiErrorKind::InvalidRequest(
1447                "Transaction digest is required for wait for effects requests".to_string(),
1448            )
1449            .into());
1450        };
1451        let tx_digests = [tx_digest];
1452
1453        let fastpath_effects_future: Pin<Box<dyn Future<Output = _> + Send>> =
1454            if let Some(consensus_position) = request.consensus_position {
1455                Box::pin(self.wait_for_fastpath_effects(
1456                    consensus_position,
1457                    &tx_digests,
1458                    request.include_details,
1459                    epoch_store,
1460                ))
1461            } else {
1462                Box::pin(futures::future::pending())
1463            };
1464
1465        tokio::select! {
1466            // Ensure that finalized effects are always prioritized.
1467            biased;
1468            // We always wait for effects regardless of consensus position via
1469            // notify_read_executed_effects. This is safe because we have separated
1470            // mysticeti fastpath outputs to a separate dirty cache
1471            // UncommittedData::fastpath_transaction_outputs that will only get flushed
1472            // once finalized. So the output of notify_read_executed_effects is
1473            // guaranteed to be finalized effects or effects from QD execution.
1474            mut effects = self.state
1475                .get_transaction_cache_reader()
1476                .notify_read_executed_effects(
1477                    "AuthorityServer::wait_for_effects::notify_read_executed_effects_finalized",
1478                    &tx_digests,
1479                ) => {
1480                tracing::Span::current().record("fast_path_effects", false);
1481                let effects = effects.pop().unwrap();
1482                let details = if request.include_details {
1483                    Some(self.complete_executed_data(effects.clone(), None).await?)
1484                } else {
1485                    None
1486                };
1487
1488                Ok(WaitForEffectsResponse::Executed {
1489                    effects_digest: effects.digest(),
1490                    details,
1491                    fast_path: false,
1492                })
1493            }
1494
1495            fastpath_response = fastpath_effects_future => {
1496                tracing::Span::current().record("fast_path_effects", true);
1497                fastpath_response
1498            }
1499        }
1500    }
1501
1502    #[instrument(level = "error", skip_all, err(level = "debug"))]
1503    async fn ping_response(
1504        &self,
1505        request: WaitForEffectsRequest,
1506        epoch_store: &Arc<AuthorityPerEpochStore>,
1507    ) -> SuiResult<WaitForEffectsResponse> {
1508        let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1509            return Err(SuiErrorKind::UnsupportedFeatureError {
1510                error: "Mysticeti fastpath".to_string(),
1511            }
1512            .into());
1513        };
1514
1515        let Some(consensus_position) = request.consensus_position else {
1516            return Err(SuiErrorKind::InvalidRequest(
1517                "Consensus position is required for Ping requests".to_string(),
1518            )
1519            .into());
1520        };
1521
1522        // We assume that the caller has already checked for the existence of the `ping` field, but handling it gracefully here.
1523        let Some(ping) = request.ping_type else {
1524            return Err(SuiErrorKind::InvalidRequest(
1525                "Ping type is required for ping requests".to_string(),
1526            )
1527            .into());
1528        };
1529
1530        let _metrics_guard = self
1531            .metrics
1532            .handle_wait_for_effects_ping_latency
1533            .with_label_values(&[ping.as_str()])
1534            .start_timer();
1535
1536        consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1537
1538        let mut last_status = None;
1539        let details = if request.include_details {
1540            Some(Box::new(ExecutedData::default()))
1541        } else {
1542            None
1543        };
1544
1545        loop {
1546            let status = consensus_tx_status_cache
1547                .notify_read_transaction_status_change(consensus_position, last_status)
1548                .await;
1549            match status {
1550                NotifyReadConsensusTxStatusResult::Status(status) => match status {
1551                    ConsensusTxStatus::FastpathCertified => {
1552                        // If the request is for consensus, we need to wait for the transaction to be finalised via Consensus.
1553                        if ping == PingType::Consensus {
1554                            last_status = Some(status);
1555                            continue;
1556                        }
1557                        return Ok(WaitForEffectsResponse::Executed {
1558                            effects_digest: TransactionEffectsDigest::ZERO,
1559                            details,
1560                            fast_path: true,
1561                        });
1562                    }
1563                    ConsensusTxStatus::Rejected => {
1564                        return Ok(WaitForEffectsResponse::Rejected { error: None });
1565                    }
1566                    ConsensusTxStatus::Finalized => {
1567                        return Ok(WaitForEffectsResponse::Executed {
1568                            effects_digest: TransactionEffectsDigest::ZERO,
1569                            details,
1570                            fast_path: false,
1571                        });
1572                    }
1573                },
1574                NotifyReadConsensusTxStatusResult::Expired(round) => {
1575                    return Ok(WaitForEffectsResponse::Expired {
1576                        epoch: epoch_store.epoch(),
1577                        round: Some(round),
1578                    });
1579                }
1580            }
1581        }
1582    }
1583
1584    #[instrument(level = "error", skip_all, err(level = "debug"))]
1585    async fn wait_for_fastpath_effects(
1586        &self,
1587        consensus_position: ConsensusPosition,
1588        tx_digests: &[TransactionDigest],
1589        include_details: bool,
1590        epoch_store: &Arc<AuthorityPerEpochStore>,
1591    ) -> SuiResult<WaitForEffectsResponse> {
1592        let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1593            return Err(SuiErrorKind::UnsupportedFeatureError {
1594                error: "Mysticeti fastpath".to_string(),
1595            }
1596            .into());
1597        };
1598
1599        let local_epoch = epoch_store.epoch();
1600        match consensus_position.epoch.cmp(&local_epoch) {
1601            Ordering::Less => {
1602                // Ask TransactionDriver to retry submitting the transaction and get a new ConsensusPosition,
1603                // if response from this validator is desired.
1604                let response = WaitForEffectsResponse::Expired {
1605                    epoch: local_epoch,
1606                    round: None,
1607                };
1608                return Ok(response);
1609            }
1610            Ordering::Greater => {
1611                // Ask TransactionDriver to retry this RPC until the validator's epoch catches up.
1612                return Err(SuiErrorKind::WrongEpoch {
1613                    expected_epoch: local_epoch,
1614                    actual_epoch: consensus_position.epoch,
1615                }
1616                .into());
1617            }
1618            Ordering::Equal => {
1619                // The validator's epoch is the same as the epoch of the transaction.
1620                // We can proceed with the normal flow.
1621            }
1622        };
1623
1624        consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1625
1626        let mut current_status = None;
1627        loop {
1628            tokio::select! {
1629                status_result = consensus_tx_status_cache
1630                    .notify_read_transaction_status_change(consensus_position, current_status) => {
1631                    match status_result {
1632                        NotifyReadConsensusTxStatusResult::Status(new_status) => {
1633                            match new_status {
1634                                ConsensusTxStatus::Rejected => {
1635                                    return Ok(WaitForEffectsResponse::Rejected {
1636                                        error: epoch_store.get_rejection_vote_reason(
1637                                            consensus_position
1638                                        )
1639                                    });
1640                                }
1641                                ConsensusTxStatus::FastpathCertified => {
1642                                    current_status = Some(new_status);
1643                                    continue;
1644                                }
1645                                ConsensusTxStatus::Finalized => {
1646                                    current_status = Some(new_status);
1647                                    continue;
1648                                }
1649                            }
1650                        }
1651                        NotifyReadConsensusTxStatusResult::Expired(round) => {
1652                            return Ok(WaitForEffectsResponse::Expired {
1653                                epoch: epoch_store.epoch(),
1654                                round: Some(round),
1655                            });
1656                        }
1657                    }
1658                }
1659
1660                mut outputs = self.state.get_transaction_cache_reader().notify_read_fastpath_transaction_outputs(tx_digests),
1661                    if current_status == Some(ConsensusTxStatus::FastpathCertified) || current_status == Some(ConsensusTxStatus::Finalized) => {
1662                    let outputs = outputs.pop().unwrap();
1663                    let effects = outputs.effects.clone();
1664
1665                    let details = if include_details {
1666                        Some(self.complete_executed_data(effects.clone(), Some(outputs)).await?)
1667                    } else {
1668                        None
1669                    };
1670
1671                    return Ok(WaitForEffectsResponse::Executed {
1672                        effects_digest: effects.digest(),
1673                        details,
1674                        fast_path: current_status == Some(ConsensusTxStatus::FastpathCertified),
1675                    });
1676                }
1677            }
1678        }
1679    }
1680
1681    async fn complete_executed_data(
1682        &self,
1683        effects: TransactionEffects,
1684        fastpath_outputs: Option<Arc<TransactionOutputs>>,
1685    ) -> SuiResult<Box<ExecutedData>> {
1686        let (events, input_objects, output_objects) = self
1687            .collect_effects_data(
1688                &effects,
1689                /* include_events */ true,
1690                /* include_input_objects */ true,
1691                /* include_output_objects */ true,
1692                fastpath_outputs,
1693            )
1694            .await?;
1695        Ok(Box::new(ExecutedData {
1696            effects,
1697            events,
1698            input_objects,
1699            output_objects,
1700        }))
1701    }
1702
1703    async fn soft_bundle_validity_check(
1704        &self,
1705        certificates: &NonEmpty<CertifiedTransaction>,
1706        epoch_store: &Arc<AuthorityPerEpochStore>,
1707        total_size_bytes: u64,
1708    ) -> Result<(), tonic::Status> {
1709        let protocol_config = epoch_store.protocol_config();
1710        let node_config = &self.state.config;
1711
1712        // Soft Bundle MUST be enabled both in protocol config and local node config.
1713        //
1714        // The local node config is by default enabled, but can be turned off by the node operator.
1715        // This acts an extra safety measure where a validator node have the choice to turn this feature off,
1716        // without having to upgrade the entire network.
1717        fp_ensure!(
1718            protocol_config.soft_bundle() && node_config.enable_soft_bundle,
1719            SuiErrorKind::UnsupportedFeatureError {
1720                error: "Soft Bundle".to_string()
1721            }
1722            .into()
1723        );
1724
1725        // Enforce these checks per [SIP-19](https://github.com/sui-foundation/sips/blob/main/sips/sip-19.md):
1726        // - All certs must access at least one shared object.
1727        // - All certs must not be already executed.
1728        // - All certs must have the same gas price.
1729        // - Number of certs must not exceed the max allowed.
1730        // - Total size of all certs must not exceed the max allowed.
1731        fp_ensure!(
1732            certificates.len() as u64 <= protocol_config.max_soft_bundle_size(),
1733            SuiErrorKind::UserInputError {
1734                error: UserInputError::TooManyTransactionsInBatch {
1735                    size: certificates.len(),
1736                    limit: protocol_config.max_soft_bundle_size()
1737                }
1738            }
1739            .into()
1740        );
1741
1742        // We set the soft bundle max size to be half of the consensus max transactions in block size. We do this to account for
1743        // serialization overheads and to ensure that the soft bundle is not too large when is attempted to be posted via consensus.
1744        // Although half the block size is on the extreme side, it's should be good enough for now.
1745        let soft_bundle_max_size_bytes =
1746            protocol_config.consensus_max_transactions_in_block_bytes() / 2;
1747        fp_ensure!(
1748            total_size_bytes <= soft_bundle_max_size_bytes,
1749            SuiErrorKind::UserInputError {
1750                error: UserInputError::TotalTransactionSizeTooLargeInBatch {
1751                    size: total_size_bytes as usize,
1752                    limit: soft_bundle_max_size_bytes,
1753                },
1754            }
1755            .into()
1756        );
1757
1758        let mut gas_price = None;
1759        for certificate in certificates {
1760            let tx_digest = *certificate.digest();
1761            fp_ensure!(
1762                certificate.is_consensus_tx(),
1763                SuiErrorKind::UserInputError {
1764                    error: UserInputError::NoSharedObjectError { digest: tx_digest }
1765                }
1766                .into()
1767            );
1768            fp_ensure!(
1769                !self.state.is_tx_already_executed(&tx_digest),
1770                SuiErrorKind::UserInputError {
1771                    error: UserInputError::AlreadyExecutedInSoftBundleError { digest: tx_digest }
1772                }
1773                .into()
1774            );
1775            if let Some(gas) = gas_price {
1776                fp_ensure!(
1777                    gas == certificate.gas_price(),
1778                    SuiErrorKind::UserInputError {
1779                        error: UserInputError::GasPriceMismatchError {
1780                            digest: tx_digest,
1781                            expected: gas,
1782                            actual: certificate.gas_price()
1783                        }
1784                    }
1785                    .into()
1786                );
1787            } else {
1788                gas_price = Some(certificate.gas_price());
1789            }
1790        }
1791
1792        // For Soft Bundle, if at this point we know at least one certificate has already been processed,
1793        // reject the entire bundle.  Otherwise, submit all certificates in one request.
1794        // This is not a strict check as there may be race conditions where one or more certificates are
1795        // already being processed by another actor, and we could not know it.
1796        fp_ensure!(
1797            !epoch_store.is_any_tx_certs_consensus_message_processed(certificates.iter())?,
1798            SuiErrorKind::UserInputError {
1799                error: UserInputError::CertificateAlreadyProcessed
1800            }
1801            .into()
1802        );
1803
1804        Ok(())
1805    }
1806
1807    async fn handle_soft_bundle_certificates_v3_impl(
1808        &self,
1809        request: tonic::Request<HandleSoftBundleCertificatesRequestV3>,
1810    ) -> WrappedServiceResponse<HandleSoftBundleCertificatesResponseV3> {
1811        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1812        let client_addr = if self.client_id_source.is_none() {
1813            self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
1814        } else {
1815            self.get_client_ip_addr(&request, self.client_id_source.as_ref().unwrap())
1816        };
1817        let request = request.into_inner();
1818
1819        let certificates = NonEmpty::from_vec(request.certificates)
1820            .ok_or(SuiErrorKind::NoCertificateProvidedError)?;
1821        let mut total_size_bytes = 0;
1822        for certificate in &certificates {
1823            // We need to check this first because we haven't verified the cert signature.
1824            total_size_bytes +=
1825                certificate.validity_check(&epoch_store.tx_validity_check_context())? as u64;
1826        }
1827
1828        self.metrics
1829            .handle_soft_bundle_certificates_count
1830            .observe(certificates.len() as f64);
1831
1832        self.metrics
1833            .handle_soft_bundle_certificates_size_bytes
1834            .observe(total_size_bytes as f64);
1835
1836        // Now that individual certificates are valid, we check if the bundle is valid.
1837        self.soft_bundle_validity_check(&certificates, &epoch_store, total_size_bytes)
1838            .await?;
1839
1840        info!(
1841            "Received Soft Bundle with {} certificates, from {}, tx digests are [{}], total size [{}]bytes",
1842            certificates.len(),
1843            client_addr
1844                .map(|x| x.to_string())
1845                .unwrap_or_else(|| "unknown".to_string()),
1846            certificates
1847                .iter()
1848                .map(|x| x.digest().to_string())
1849                .collect::<Vec<_>>()
1850                .join(", "),
1851            total_size_bytes
1852        );
1853
1854        let span = error_span!("ValidatorService::handle_soft_bundle_certificates_v3");
1855        self.handle_certificates(
1856            certificates,
1857            request.include_events,
1858            request.include_input_objects,
1859            request.include_output_objects,
1860            request.include_auxiliary_data,
1861            &epoch_store,
1862            request.wait_for_effects,
1863        )
1864        .instrument(span)
1865        .await
1866        .map(|(resp, spam_weight)| {
1867            (
1868                tonic::Response::new(HandleSoftBundleCertificatesResponseV3 {
1869                    responses: resp.unwrap_or_default(),
1870                }),
1871                spam_weight,
1872            )
1873        })
1874    }
1875
1876    async fn object_info_impl(
1877        &self,
1878        request: tonic::Request<ObjectInfoRequest>,
1879    ) -> WrappedServiceResponse<ObjectInfoResponse> {
1880        let request = request.into_inner();
1881        let response = self.state.handle_object_info_request(request).await?;
1882        Ok((tonic::Response::new(response), Weight::one()))
1883    }
1884
1885    async fn transaction_info_impl(
1886        &self,
1887        request: tonic::Request<TransactionInfoRequest>,
1888    ) -> WrappedServiceResponse<TransactionInfoResponse> {
1889        let request = request.into_inner();
1890        let response = self.state.handle_transaction_info_request(request).await?;
1891        Ok((tonic::Response::new(response), Weight::one()))
1892    }
1893
1894    async fn checkpoint_impl(
1895        &self,
1896        request: tonic::Request<CheckpointRequest>,
1897    ) -> WrappedServiceResponse<CheckpointResponse> {
1898        let request = request.into_inner();
1899        let response = self.state.handle_checkpoint_request(&request)?;
1900        Ok((tonic::Response::new(response), Weight::one()))
1901    }
1902
1903    async fn checkpoint_v2_impl(
1904        &self,
1905        request: tonic::Request<CheckpointRequestV2>,
1906    ) -> WrappedServiceResponse<CheckpointResponseV2> {
1907        let request = request.into_inner();
1908        let response = self.state.handle_checkpoint_request_v2(&request)?;
1909        Ok((tonic::Response::new(response), Weight::one()))
1910    }
1911
1912    async fn get_system_state_object_impl(
1913        &self,
1914        _request: tonic::Request<SystemStateRequest>,
1915    ) -> WrappedServiceResponse<SuiSystemState> {
1916        let response = self
1917            .state
1918            .get_object_cache_reader()
1919            .get_sui_system_state_object_unsafe()?;
1920        Ok((tonic::Response::new(response), Weight::one()))
1921    }
1922
1923    async fn validator_health_impl(
1924        &self,
1925        _request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1926    ) -> WrappedServiceResponse<sui_types::messages_grpc::RawValidatorHealthResponse> {
1927        let state = &self.state;
1928
1929        // Get epoch store once for both metrics
1930        let epoch_store = state.load_epoch_store_one_call_per_task();
1931
1932        // Get in-flight execution transactions from execution scheduler
1933        let num_inflight_execution_transactions =
1934            state.execution_scheduler().num_pending_certificates() as u64;
1935
1936        // Get in-flight consensus transactions from consensus adapter
1937        let num_inflight_consensus_transactions =
1938            self.consensus_adapter.num_inflight_transactions();
1939
1940        // Get last committed leader round from epoch store
1941        let last_committed_leader_round = epoch_store
1942            .consensus_tx_status_cache
1943            .as_ref()
1944            .and_then(|cache| cache.get_last_committed_leader_round())
1945            .unwrap_or(0);
1946
1947        // Get last locally built checkpoint sequence
1948        let last_locally_built_checkpoint = epoch_store
1949            .last_built_checkpoint_summary()
1950            .ok()
1951            .flatten()
1952            .map(|(_, summary)| summary.sequence_number)
1953            .unwrap_or(0);
1954
1955        let typed_response = sui_types::messages_grpc::ValidatorHealthResponse {
1956            num_inflight_consensus_transactions,
1957            num_inflight_execution_transactions,
1958            last_locally_built_checkpoint,
1959            last_committed_leader_round,
1960        };
1961
1962        let raw_response = typed_response
1963            .try_into()
1964            .map_err(|e: sui_types::error::SuiError| {
1965                tonic::Status::internal(format!("Failed to serialize health response: {}", e))
1966            })?;
1967
1968        Ok((tonic::Response::new(raw_response), Weight::one()))
1969    }
1970
1971    fn get_client_ip_addr<T>(
1972        &self,
1973        request: &tonic::Request<T>,
1974        source: &ClientIdSource,
1975    ) -> Option<IpAddr> {
1976        let forwarded_header = request.metadata().get_all("x-forwarded-for").iter().next();
1977
1978        if let Some(header) = forwarded_header {
1979            let num_hops = header
1980                .to_str()
1981                .map(|h| h.split(',').count().saturating_sub(1))
1982                .unwrap_or(0);
1983
1984            self.metrics.x_forwarded_for_num_hops.set(num_hops as f64);
1985        }
1986
1987        match source {
1988            ClientIdSource::SocketAddr => {
1989                let socket_addr: Option<SocketAddr> = request.remote_addr();
1990
1991                // We will hit this case if the IO type used does not
1992                // implement Connected or when using a unix domain socket.
1993                // TODO: once we have confirmed that no legitimate traffic
1994                // is hitting this case, we should reject such requests that
1995                // hit this case.
1996                if let Some(socket_addr) = socket_addr {
1997                    Some(socket_addr.ip())
1998                } else {
1999                    if cfg!(msim) {
2000                        // Ignore the error from simtests.
2001                    } else if cfg!(test) {
2002                        panic!("Failed to get remote address from request");
2003                    } else {
2004                        self.metrics.connection_ip_not_found.inc();
2005                        error!("Failed to get remote address from request");
2006                    }
2007                    None
2008                }
2009            }
2010            ClientIdSource::XForwardedFor(num_hops) => {
2011                let do_header_parse = |op: &MetadataValue<Ascii>| {
2012                    match op.to_str() {
2013                        Ok(header_val) => {
2014                            let header_contents =
2015                                header_val.split(',').map(str::trim).collect::<Vec<_>>();
2016                            if *num_hops == 0 {
2017                                error!(
2018                                    "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
2019                                    number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
2020                                    to this node. Skipping traffic controller request handling.",
2021                                    header_contents,
2022                                );
2023                                return None;
2024                            }
2025                            let contents_len = header_contents.len();
2026                            if contents_len < *num_hops {
2027                                error!(
2028                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
2029                                    Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
2030                                    `client-id-source` in the node config.",
2031                                    header_contents, contents_len, num_hops, contents_len,
2032                                );
2033                                self.metrics.client_id_source_config_mismatch.inc();
2034                                return None;
2035                            }
2036                            let Some(client_ip) = header_contents.get(contents_len - num_hops)
2037                            else {
2038                                error!(
2039                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
2040                                    Expected at least {} values. Skipping traffic controller request handling.",
2041                                    header_contents, contents_len, num_hops, contents_len,
2042                                );
2043                                return None;
2044                            };
2045                            parse_ip(client_ip).or_else(|| {
2046                                self.metrics.forwarded_header_parse_error.inc();
2047                                None
2048                            })
2049                        }
2050                        Err(e) => {
2051                            // TODO: once we have confirmed that no legitimate traffic
2052                            // is hitting this case, we should reject such requests that
2053                            // hit this case.
2054                            self.metrics.forwarded_header_invalid.inc();
2055                            error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
2056                            None
2057                        }
2058                    }
2059                };
2060                if let Some(op) = request.metadata().get("x-forwarded-for") {
2061                    do_header_parse(op)
2062                } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
2063                    do_header_parse(op)
2064                } else {
2065                    self.metrics.forwarded_header_not_included.inc();
2066                    error!(
2067                        "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
2068                    );
2069                    None
2070                }
2071            }
2072        }
2073    }
2074
2075    async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
2076        if let Some(traffic_controller) = &self.traffic_controller {
2077            if !traffic_controller.check(&client, &None).await {
2078                // Entity in blocklist
2079                Err(tonic::Status::from_error(
2080                    SuiErrorKind::TooManyRequests.into(),
2081                ))
2082            } else {
2083                Ok(())
2084            }
2085        } else {
2086            Ok(())
2087        }
2088    }
2089
2090    fn handle_traffic_resp<T>(
2091        &self,
2092        client: Option<IpAddr>,
2093        wrapped_response: WrappedServiceResponse<T>,
2094    ) -> Result<tonic::Response<T>, tonic::Status> {
2095        let (error, spam_weight, unwrapped_response) = match wrapped_response {
2096            Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
2097            Err(status) => (
2098                Some(SuiError::from(status.clone())),
2099                Weight::zero(),
2100                Err(status.clone()),
2101            ),
2102        };
2103
2104        if let Some(traffic_controller) = self.traffic_controller.clone() {
2105            traffic_controller.tally(TrafficTally {
2106                direct: client,
2107                through_fullnode: None,
2108                error_info: error.map(|e| {
2109                    let error_type = String::from(e.clone().as_ref());
2110                    let error_weight = normalize(e);
2111                    (error_weight, error_type)
2112                }),
2113                spam_weight,
2114                timestamp: SystemTime::now(),
2115            })
2116        }
2117        unwrapped_response
2118    }
2119}
2120
2121fn make_tonic_request_for_testing<T>(message: T) -> tonic::Request<T> {
2122    // simulate a TCP connection, which would have added extensions to
2123    // the request object that would be used downstream
2124    let mut request = tonic::Request::new(message);
2125    let tcp_connect_info = TcpConnectInfo {
2126        local_addr: None,
2127        remote_addr: Some(SocketAddr::new([127, 0, 0, 1].into(), 0)),
2128    };
2129    request.extensions_mut().insert(tcp_connect_info);
2130    request
2131}
2132
2133// TODO: refine error matching here
2134fn normalize(err: SuiError) -> Weight {
2135    match err.as_inner() {
2136        SuiErrorKind::UserInputError {
2137            error: UserInputError::IncorrectUserSignature { .. },
2138        } => Weight::one(),
2139        SuiErrorKind::InvalidSignature { .. }
2140        | SuiErrorKind::SignerSignatureAbsent { .. }
2141        | SuiErrorKind::SignerSignatureNumberMismatch { .. }
2142        | SuiErrorKind::IncorrectSigner { .. }
2143        | SuiErrorKind::UnknownSigner { .. }
2144        | SuiErrorKind::WrongEpoch { .. } => Weight::one(),
2145        _ => Weight::zero(),
2146    }
2147}
2148
2149/// Implements generic pre- and post-processing. Since this is on the critical
2150/// path, any heavy lifting should be done in a separate non-blocking task
2151/// unless it is necessary to override the return value.
2152#[macro_export]
2153macro_rules! handle_with_decoration {
2154    ($self:ident, $func_name:ident, $request:ident) => {{
2155        if $self.client_id_source.is_none() {
2156            return $self.$func_name($request).await.map(|(result, _)| result);
2157        }
2158
2159        let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
2160
2161        // check if either IP is blocked, in which case return early
2162        $self.handle_traffic_req(client.clone()).await?;
2163
2164        // handle traffic tallying
2165        let wrapped_response = $self.$func_name($request).await;
2166        $self.handle_traffic_resp(client, wrapped_response)
2167    }};
2168}
2169
2170#[async_trait]
2171impl Validator for ValidatorService {
2172    async fn submit_transaction(
2173        &self,
2174        request: tonic::Request<RawSubmitTxRequest>,
2175    ) -> Result<tonic::Response<RawSubmitTxResponse>, tonic::Status> {
2176        let validator_service = self.clone();
2177
2178        // Spawns a task which handles the transaction. The task will unconditionally continue
2179        // processing in the event that the client connection is dropped.
2180        spawn_monitored_task!(async move {
2181            // NB: traffic tally wrapping handled within the task rather than on task exit
2182            // to prevent an attacker from subverting traffic control by severing the connection
2183            handle_with_decoration!(validator_service, handle_submit_transaction_impl, request)
2184        })
2185        .await
2186        .unwrap()
2187    }
2188
2189    async fn transaction(
2190        &self,
2191        request: tonic::Request<Transaction>,
2192    ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
2193        let validator_service = self.clone();
2194
2195        // Spawns a task which handles the transaction. The task will unconditionally continue
2196        // processing in the event that the client connection is dropped.
2197        spawn_monitored_task!(async move {
2198            // NB: traffic tally wrapping handled within the task rather than on task exit
2199            // to prevent an attacker from subverting traffic control by severing the connection
2200            handle_with_decoration!(validator_service, transaction_impl, request)
2201        })
2202        .await
2203        .unwrap()
2204    }
2205
2206    async fn submit_certificate(
2207        &self,
2208        request: tonic::Request<CertifiedTransaction>,
2209    ) -> Result<tonic::Response<SubmitCertificateResponse>, tonic::Status> {
2210        let validator_service = self.clone();
2211
2212        // Spawns a task which handles the certificate. The task will unconditionally continue
2213        // processing in the event that the client connection is dropped.
2214        spawn_monitored_task!(async move {
2215            // NB: traffic tally wrapping handled within the task rather than on task exit
2216            // to prevent an attacker from subverting traffic control by severing the connection.
2217            handle_with_decoration!(validator_service, submit_certificate_impl, request)
2218        })
2219        .await
2220        .unwrap()
2221    }
2222
2223    async fn handle_certificate_v2(
2224        &self,
2225        request: tonic::Request<CertifiedTransaction>,
2226    ) -> Result<tonic::Response<HandleCertificateResponseV2>, tonic::Status> {
2227        handle_with_decoration!(self, handle_certificate_v2_impl, request)
2228    }
2229
2230    async fn handle_certificate_v3(
2231        &self,
2232        request: tonic::Request<HandleCertificateRequestV3>,
2233    ) -> Result<tonic::Response<HandleCertificateResponseV3>, tonic::Status> {
2234        handle_with_decoration!(self, handle_certificate_v3_impl, request)
2235    }
2236
2237    async fn wait_for_effects(
2238        &self,
2239        request: tonic::Request<RawWaitForEffectsRequest>,
2240    ) -> Result<tonic::Response<RawWaitForEffectsResponse>, tonic::Status> {
2241        handle_with_decoration!(self, wait_for_effects_impl, request)
2242    }
2243
2244    async fn handle_soft_bundle_certificates_v3(
2245        &self,
2246        request: tonic::Request<HandleSoftBundleCertificatesRequestV3>,
2247    ) -> Result<tonic::Response<HandleSoftBundleCertificatesResponseV3>, tonic::Status> {
2248        handle_with_decoration!(self, handle_soft_bundle_certificates_v3_impl, request)
2249    }
2250
2251    async fn object_info(
2252        &self,
2253        request: tonic::Request<ObjectInfoRequest>,
2254    ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
2255        handle_with_decoration!(self, object_info_impl, request)
2256    }
2257
2258    async fn transaction_info(
2259        &self,
2260        request: tonic::Request<TransactionInfoRequest>,
2261    ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
2262        handle_with_decoration!(self, transaction_info_impl, request)
2263    }
2264
2265    async fn checkpoint(
2266        &self,
2267        request: tonic::Request<CheckpointRequest>,
2268    ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
2269        handle_with_decoration!(self, checkpoint_impl, request)
2270    }
2271
2272    async fn checkpoint_v2(
2273        &self,
2274        request: tonic::Request<CheckpointRequestV2>,
2275    ) -> Result<tonic::Response<CheckpointResponseV2>, tonic::Status> {
2276        handle_with_decoration!(self, checkpoint_v2_impl, request)
2277    }
2278
2279    async fn get_system_state_object(
2280        &self,
2281        request: tonic::Request<SystemStateRequest>,
2282    ) -> Result<tonic::Response<SuiSystemState>, tonic::Status> {
2283        handle_with_decoration!(self, get_system_state_object_impl, request)
2284    }
2285
2286    async fn validator_health(
2287        &self,
2288        request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
2289    ) -> Result<tonic::Response<sui_types::messages_grpc::RawValidatorHealthResponse>, tonic::Status>
2290    {
2291        handle_with_decoration!(self, validator_health_impl, request)
2292    }
2293}