sui_core/
authority_server.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use anyhow::Result;
6use async_trait::async_trait;
7use fastcrypto::traits::KeyPair;
8use futures::{TryFutureExt, future};
9use itertools::Itertools as _;
10use mysten_common::{assert_reachable, debug_fatal};
11use mysten_metrics::spawn_monitored_task;
12use prometheus::{
13    Gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, Registry,
14    register_gauge_with_registry, register_histogram_vec_with_registry,
15    register_histogram_with_registry, register_int_counter_vec_with_registry,
16    register_int_counter_with_registry,
17};
18use std::{
19    cmp::Ordering,
20    future::Future,
21    io,
22    net::{IpAddr, SocketAddr},
23    pin::Pin,
24    sync::Arc,
25    time::{Duration, SystemTime},
26};
27use sui_network::{
28    api::{Validator, ValidatorServer},
29    tonic,
30    validator::server::SUI_TLS_SERVER_NAME,
31};
32use sui_types::message_envelope::Message;
33use sui_types::messages_consensus::ConsensusPosition;
34use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKind};
35use sui_types::messages_grpc::{
36    HandleCertificateRequestV3, HandleCertificateResponseV3, RawSubmitTxResponse,
37};
38use sui_types::messages_grpc::{
39    HandleCertificateResponseV2, HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse,
40    SubmitCertificateResponse, SystemStateRequest, TransactionInfoRequest, TransactionInfoResponse,
41};
42use sui_types::messages_grpc::{
43    HandleSoftBundleCertificatesRequestV3, HandleSoftBundleCertificatesResponseV3,
44};
45use sui_types::multiaddr::Multiaddr;
46use sui_types::object::Object;
47use sui_types::sui_system_state::SuiSystemState;
48use sui_types::traffic_control::{ClientIdSource, Weight};
49use sui_types::{
50    digests::{TransactionDigest, TransactionEffectsDigest},
51    error::{SuiErrorKind, UserInputError},
52};
53use sui_types::{
54    effects::TransactionEffects,
55    messages_grpc::{
56        ExecutedData, RawSubmitTxRequest, RawWaitForEffectsRequest, RawWaitForEffectsResponse,
57        SubmitTxResult, WaitForEffectsRequest, WaitForEffectsResponse,
58    },
59};
60use sui_types::{
61    effects::TransactionEffectsAPI, executable_transaction::VerifiedExecutableTransaction,
62};
63use sui_types::{effects::TransactionEvents, messages_grpc::SubmitTxType};
64use sui_types::{error::*, transaction::*};
65use sui_types::{
66    fp_ensure,
67    messages_checkpoint::{
68        CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
69    },
70};
71use tap::TapFallible;
72use tokio::sync::oneshot;
73use tokio::time::timeout;
74use tonic::metadata::{Ascii, MetadataValue};
75use tracing::{Instrument, debug, error, error_span, info, instrument};
76
77use crate::consensus_adapter::ConnectionMonitorStatusForTests;
78use crate::{
79    authority::{AuthorityState, consensus_tx_status_cache::ConsensusTxStatus},
80    consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics},
81    traffic_controller::{TrafficController, parse_ip, policies::TrafficTally},
82};
83use crate::{
84    authority::{
85        ExecutionEnv, authority_per_epoch_store::AuthorityPerEpochStore,
86        consensus_tx_status_cache::NotifyReadConsensusTxStatusResult,
87        shared_object_version_manager::Schedulable,
88    },
89    checkpoints::CheckpointStore,
90    execution_scheduler::SchedulingSource,
91    mysticeti_adapter::LazyMysticetiClient,
92    transaction_outputs::TransactionOutputs,
93};
94use nonempty::{NonEmpty, nonempty};
95use sui_config::local_ip_utils::new_local_tcp_address_for_testing;
96use sui_types::messages_grpc::PingType;
97use tonic::transport::server::TcpConnectInfo;
98
99#[cfg(test)]
100#[path = "unit_tests/server_tests.rs"]
101mod server_tests;
102
103#[cfg(test)]
104#[path = "unit_tests/wait_for_effects_tests.rs"]
105mod wait_for_effects_tests;
106
107#[cfg(test)]
108#[path = "unit_tests/submit_transaction_tests.rs"]
109mod submit_transaction_tests;
110
111pub struct AuthorityServerHandle {
112    server_handle: sui_network::validator::server::Server,
113}
114
115impl AuthorityServerHandle {
116    pub async fn join(self) -> Result<(), io::Error> {
117        self.server_handle.handle().wait_for_shutdown().await;
118        Ok(())
119    }
120
121    pub async fn kill(self) -> Result<(), io::Error> {
122        self.server_handle.handle().shutdown().await;
123        Ok(())
124    }
125
126    pub fn address(&self) -> &Multiaddr {
127        self.server_handle.local_addr()
128    }
129}
130
131pub struct AuthorityServer {
132    address: Multiaddr,
133    pub state: Arc<AuthorityState>,
134    consensus_adapter: Arc<ConsensusAdapter>,
135    pub metrics: Arc<ValidatorServiceMetrics>,
136}
137
138impl AuthorityServer {
139    pub fn new_for_test_with_consensus_adapter(
140        state: Arc<AuthorityState>,
141        consensus_adapter: Arc<ConsensusAdapter>,
142    ) -> Self {
143        let address = new_local_tcp_address_for_testing();
144        let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
145
146        Self {
147            address,
148            state,
149            consensus_adapter,
150            metrics,
151        }
152    }
153
154    pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
155        let consensus_adapter = Arc::new(ConsensusAdapter::new(
156            Arc::new(LazyMysticetiClient::new()),
157            CheckpointStore::new_for_tests(),
158            state.name,
159            Arc::new(ConnectionMonitorStatusForTests {}),
160            100_000,
161            100_000,
162            None,
163            None,
164            ConsensusAdapterMetrics::new_test(),
165            state.epoch_store_for_testing().protocol_config().clone(),
166        ));
167        Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
168    }
169
170    pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
171        let address = self.address.clone();
172        self.spawn_with_bind_address_for_test(address).await
173    }
174
175    pub async fn spawn_with_bind_address_for_test(
176        self,
177        address: Multiaddr,
178    ) -> Result<AuthorityServerHandle, io::Error> {
179        let tls_config = sui_tls::create_rustls_server_config(
180            self.state.config.network_key_pair().copy().private(),
181            SUI_TLS_SERVER_NAME.to_string(),
182        );
183        let config = mysten_network::config::Config::new();
184        let server = sui_network::validator::server::ServerBuilder::from_config(
185            &config,
186            mysten_network::metrics::DefaultMetricsCallbackProvider::default(),
187        )
188        .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
189            self.state,
190            self.consensus_adapter,
191            self.metrics,
192        )))
193        .bind(&address, Some(tls_config))
194        .await
195        .unwrap();
196        let local_addr = server.local_addr().to_owned();
197        info!("Listening to traffic on {local_addr}");
198        let handle = AuthorityServerHandle {
199            server_handle: server,
200        };
201        Ok(handle)
202    }
203}
204
205pub struct ValidatorServiceMetrics {
206    pub signature_errors: IntCounter,
207    pub tx_verification_latency: Histogram,
208    pub cert_verification_latency: Histogram,
209    pub consensus_latency: Histogram,
210    pub handle_transaction_latency: Histogram,
211    pub submit_certificate_consensus_latency: Histogram,
212    pub handle_certificate_consensus_latency: Histogram,
213    pub handle_certificate_non_consensus_latency: Histogram,
214    pub handle_soft_bundle_certificates_consensus_latency: Histogram,
215    pub handle_soft_bundle_certificates_count: Histogram,
216    pub handle_soft_bundle_certificates_size_bytes: Histogram,
217    pub handle_transaction_consensus_latency: Histogram,
218    pub handle_submit_transaction_consensus_latency: HistogramVec,
219    pub handle_wait_for_effects_ping_latency: HistogramVec,
220
221    handle_submit_transaction_latency: HistogramVec,
222    handle_submit_transaction_bytes: HistogramVec,
223    handle_submit_transaction_batch_size: HistogramVec,
224
225    num_rejected_tx_in_epoch_boundary: IntCounter,
226    num_rejected_cert_in_epoch_boundary: IntCounter,
227    num_rejected_tx_during_overload: IntCounterVec,
228    num_rejected_cert_during_overload: IntCounterVec,
229    submission_rejected_transactions: IntCounterVec,
230    connection_ip_not_found: IntCounter,
231    forwarded_header_parse_error: IntCounter,
232    forwarded_header_invalid: IntCounter,
233    forwarded_header_not_included: IntCounter,
234    client_id_source_config_mismatch: IntCounter,
235    x_forwarded_for_num_hops: Gauge,
236}
237
238impl ValidatorServiceMetrics {
239    pub fn new(registry: &Registry) -> Self {
240        Self {
241            signature_errors: register_int_counter_with_registry!(
242                "total_signature_errors",
243                "Number of transaction signature errors",
244                registry,
245            )
246            .unwrap(),
247            tx_verification_latency: register_histogram_with_registry!(
248                "validator_service_tx_verification_latency",
249                "Latency of verifying a transaction",
250                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
251                registry,
252            )
253            .unwrap(),
254            cert_verification_latency: register_histogram_with_registry!(
255                "validator_service_cert_verification_latency",
256                "Latency of verifying a certificate",
257                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
258                registry,
259            )
260            .unwrap(),
261            consensus_latency: register_histogram_with_registry!(
262                "validator_service_consensus_latency",
263                "Time spent between submitting a txn to consensus and getting back local acknowledgement. Execution and finalization time are not included.",
264                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
265                registry,
266            )
267            .unwrap(),
268            handle_transaction_latency: register_histogram_with_registry!(
269                "validator_service_handle_transaction_latency",
270                "Latency of handling a transaction",
271                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
272                registry,
273            )
274            .unwrap(),
275            handle_certificate_consensus_latency: register_histogram_with_registry!(
276                "validator_service_handle_certificate_consensus_latency",
277                "Latency of handling a consensus transaction certificate",
278                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
279                registry,
280            )
281            .unwrap(),
282            submit_certificate_consensus_latency: register_histogram_with_registry!(
283                "validator_service_submit_certificate_consensus_latency",
284                "Latency of submit_certificate RPC handler",
285                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
286                registry,
287            )
288            .unwrap(),
289            handle_certificate_non_consensus_latency: register_histogram_with_registry!(
290                "validator_service_handle_certificate_non_consensus_latency",
291                "Latency of handling a non-consensus transaction certificate",
292                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
293                registry,
294            )
295            .unwrap(),
296            handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
297                "validator_service_handle_soft_bundle_certificates_consensus_latency",
298                "Latency of handling a consensus soft bundle",
299                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
300                registry,
301            )
302            .unwrap(),
303            handle_soft_bundle_certificates_count: register_histogram_with_registry!(
304                "handle_soft_bundle_certificates_count",
305                "The number of certificates included in a soft bundle",
306                mysten_metrics::COUNT_BUCKETS.to_vec(),
307                registry,
308            )
309            .unwrap(),
310            handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
311                "handle_soft_bundle_certificates_size_bytes",
312                "The size of soft bundle in bytes",
313                mysten_metrics::BYTES_BUCKETS.to_vec(),
314                registry,
315            )
316            .unwrap(),
317            handle_transaction_consensus_latency: register_histogram_with_registry!(
318                "validator_service_handle_transaction_consensus_latency",
319                "Latency of handling a user transaction sent through consensus",
320                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
321                registry,
322            )
323            .unwrap(),
324            handle_submit_transaction_consensus_latency: register_histogram_vec_with_registry!(
325                "validator_service_submit_transaction_consensus_latency",
326                "Latency of submitting a user transaction sent through consensus",
327                &["req_type"],
328                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
329                registry,
330            )
331            .unwrap(),
332            handle_submit_transaction_latency: register_histogram_vec_with_registry!(
333                "validator_service_submit_transaction_latency",
334                "Latency of submit transaction handler",
335                &["req_type"],
336                mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
337                registry,
338            )
339            .unwrap(),
340            handle_wait_for_effects_ping_latency: register_histogram_vec_with_registry!(
341                "validator_service_handle_wait_for_effects_ping_latency",
342                "Latency of handling a ping request for wait_for_effects",
343                &["req_type"],
344                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
345                registry,
346            )
347            .unwrap(),
348            handle_submit_transaction_bytes: register_histogram_vec_with_registry!(
349                "validator_service_submit_transaction_bytes",
350                "The size of transactions in the submit transaction request",
351                &["req_type"],
352                mysten_metrics::BYTES_BUCKETS.to_vec(),
353                registry,
354            )
355            .unwrap(),
356            handle_submit_transaction_batch_size: register_histogram_vec_with_registry!(
357                "validator_service_submit_transaction_batch_size",
358                "The number of transactions in the submit transaction request",
359                &["req_type"],
360                mysten_metrics::COUNT_BUCKETS.to_vec(),
361                registry,
362            )
363            .unwrap(),
364            num_rejected_tx_in_epoch_boundary: register_int_counter_with_registry!(
365                "validator_service_num_rejected_tx_in_epoch_boundary",
366                "Number of rejected transaction during epoch transitioning",
367                registry,
368            )
369            .unwrap(),
370            num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
371                "validator_service_num_rejected_cert_in_epoch_boundary",
372                "Number of rejected transaction certificate during epoch transitioning",
373                registry,
374            )
375            .unwrap(),
376            num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
377                "validator_service_num_rejected_tx_during_overload",
378                "Number of rejected transaction due to system overload",
379                &["error_type"],
380                registry,
381            )
382            .unwrap(),
383            num_rejected_cert_during_overload: register_int_counter_vec_with_registry!(
384                "validator_service_num_rejected_cert_during_overload",
385                "Number of rejected transaction certificate due to system overload",
386                &["error_type"],
387                registry,
388            )
389            .unwrap(),
390            submission_rejected_transactions: register_int_counter_vec_with_registry!(
391                "validator_service_submission_rejected_transactions",
392                "Number of transactions rejected during submission",
393                &["reason"],
394                registry,
395            )
396            .unwrap(),
397            connection_ip_not_found: register_int_counter_with_registry!(
398                "validator_service_connection_ip_not_found",
399                "Number of times connection IP was not extractable from request",
400                registry,
401            )
402            .unwrap(),
403            forwarded_header_parse_error: register_int_counter_with_registry!(
404                "validator_service_forwarded_header_parse_error",
405                "Number of times x-forwarded-for header could not be parsed",
406                registry,
407            )
408            .unwrap(),
409            forwarded_header_invalid: register_int_counter_with_registry!(
410                "validator_service_forwarded_header_invalid",
411                "Number of times x-forwarded-for header was invalid",
412                registry,
413            )
414            .unwrap(),
415            forwarded_header_not_included: register_int_counter_with_registry!(
416                "validator_service_forwarded_header_not_included",
417                "Number of times x-forwarded-for header was (unexpectedly) not included in request",
418                registry,
419            )
420            .unwrap(),
421            client_id_source_config_mismatch: register_int_counter_with_registry!(
422                "validator_service_client_id_source_config_mismatch",
423                "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
424                registry,
425            )
426            .unwrap(),
427            x_forwarded_for_num_hops: register_gauge_with_registry!(
428                "validator_service_x_forwarded_for_num_hops",
429                "Number of hops in x-forwarded-for header",
430                registry,
431            )
432            .unwrap(),
433        }
434    }
435
436    pub fn new_for_tests() -> Self {
437        let registry = Registry::new();
438        Self::new(&registry)
439    }
440}
441
442#[derive(Clone)]
443pub struct ValidatorService {
444    state: Arc<AuthorityState>,
445    consensus_adapter: Arc<ConsensusAdapter>,
446    metrics: Arc<ValidatorServiceMetrics>,
447    traffic_controller: Option<Arc<TrafficController>>,
448    client_id_source: Option<ClientIdSource>,
449}
450
451impl ValidatorService {
452    pub fn new(
453        state: Arc<AuthorityState>,
454        consensus_adapter: Arc<ConsensusAdapter>,
455        validator_metrics: Arc<ValidatorServiceMetrics>,
456        client_id_source: Option<ClientIdSource>,
457    ) -> Self {
458        let traffic_controller = state.traffic_controller.clone();
459        Self {
460            state,
461            consensus_adapter,
462            metrics: validator_metrics,
463            traffic_controller,
464            client_id_source,
465        }
466    }
467
468    pub fn new_for_tests(
469        state: Arc<AuthorityState>,
470        consensus_adapter: Arc<ConsensusAdapter>,
471        metrics: Arc<ValidatorServiceMetrics>,
472    ) -> Self {
473        Self {
474            state,
475            consensus_adapter,
476            metrics,
477            traffic_controller: None,
478            client_id_source: None,
479        }
480    }
481
482    pub fn validator_state(&self) -> &Arc<AuthorityState> {
483        &self.state
484    }
485
486    pub async fn execute_certificate_for_testing(
487        &self,
488        cert: CertifiedTransaction,
489    ) -> Result<tonic::Response<HandleCertificateResponseV2>, tonic::Status> {
490        let request = make_tonic_request_for_testing(cert);
491        self.handle_certificate_v2(request).await
492    }
493
494    pub async fn handle_transaction_for_benchmarking(
495        &self,
496        transaction: Transaction,
497    ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
498        let request = make_tonic_request_for_testing(transaction);
499        self.transaction(request).await
500    }
501
502    // When making changes to this function, see if the changes should be applied to
503    // `Self::handle_submit_transaction()` and `SuiTxValidator::vote_transaction()` as well.
504    async fn handle_transaction(
505        &self,
506        request: tonic::Request<Transaction>,
507    ) -> WrappedServiceResponse<HandleTransactionResponse> {
508        let Self {
509            state,
510            consensus_adapter,
511            metrics,
512            traffic_controller: _,
513            client_id_source: _,
514        } = self.clone();
515        let transaction = request.into_inner();
516        let epoch_store = state.load_epoch_store_one_call_per_task();
517
518        transaction.validity_check(&epoch_store.tx_validity_check_context())?;
519
520        // When authority is overloaded and decide to reject this tx, we still lock the object
521        // and ask the client to retry in the future. This is because without locking, the
522        // input objects can be locked by a different tx in the future, however, the input objects
523        // may already be locked by this tx in other validators. This can cause non of the txes
524        // to have enough quorum to form a certificate, causing the objects to be locked for
525        // the entire epoch. By doing locking but pushback, retrying transaction will have
526        // higher chance to succeed.
527        let mut validator_pushback_error = None;
528        let overload_check_res = state.check_system_overload(
529            &*consensus_adapter,
530            transaction.data(),
531            state.check_system_overload_at_signing(),
532        );
533        if let Err(error) = overload_check_res {
534            metrics
535                .num_rejected_tx_during_overload
536                .with_label_values(&[error.as_ref()])
537                .inc();
538            // TODO: consider change the behavior for other types of overload errors.
539            match error.as_inner() {
540                SuiErrorKind::ValidatorOverloadedRetryAfter { .. } => {
541                    validator_pushback_error = Some(error)
542                }
543                _ => return Err(error.into()),
544            }
545        }
546
547        let _handle_tx_metrics_guard = metrics.handle_transaction_latency.start_timer();
548
549        let tx_verif_metrics_guard = metrics.tx_verification_latency.start_timer();
550        let transaction = epoch_store
551            // Aliases are not supported outside of MFP.
552            .verify_transaction_require_no_aliases(transaction)
553            .tap_err(|_| {
554                metrics.signature_errors.inc();
555            })?
556            .into_tx();
557        drop(tx_verif_metrics_guard);
558
559        let tx_digest = transaction.digest();
560
561        // Enable Trace Propagation across spans/processes using tx_digest
562        let span = error_span!("ValidatorService::validator_state_process_tx", ?tx_digest);
563
564        let info = state
565            .handle_transaction(&epoch_store, transaction.clone())
566            .instrument(span)
567            .await
568            .tap_err(|e| {
569                if let SuiErrorKind::ValidatorHaltedAtEpochEnd = e.as_inner() {
570                    metrics.num_rejected_tx_in_epoch_boundary.inc();
571                }
572            })?;
573
574        if let Some(error) = validator_pushback_error {
575            // TODO: right now, we still sign the txn, but just don't return it. We can also skip signing
576            // to save more CPU.
577            return Err(error.into());
578        }
579
580        Ok((tonic::Response::new(info), Weight::zero()))
581    }
582
583    #[instrument(
584        name = "ValidatorService::handle_submit_transaction",
585        level = "error",
586        skip_all,
587        err(level = "debug")
588    )]
589    async fn handle_submit_transaction(
590        &self,
591        request: tonic::Request<RawSubmitTxRequest>,
592    ) -> WrappedServiceResponse<RawSubmitTxResponse> {
593        let Self {
594            state,
595            consensus_adapter,
596            metrics,
597            traffic_controller: _,
598            client_id_source,
599        } = self.clone();
600
601        let submitter_client_addr = if let Some(client_id_source) = &client_id_source {
602            self.get_client_ip_addr(&request, client_id_source)
603        } else {
604            self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
605        };
606
607        let inner = request.into_inner();
608        let start_epoch = state.load_epoch_store_one_call_per_task().epoch();
609
610        let next_epoch = start_epoch + 1;
611        let mut max_retries = 1;
612
613        loop {
614            let res = self
615                .handle_submit_transaction_inner(
616                    &state,
617                    &consensus_adapter,
618                    &metrics,
619                    &inner,
620                    submitter_client_addr,
621                )
622                .await;
623            match res {
624                Ok((response, weight)) => return Ok((tonic::Response::new(response), weight)),
625                Err(err) => {
626                    if max_retries > 0
627                        && let SuiErrorKind::ValidatorHaltedAtEpochEnd = err.as_inner()
628                    {
629                        max_retries -= 1;
630
631                        debug!(
632                            "ValidatorHaltedAtEpochEnd. Will retry after validator reconfigures"
633                        );
634
635                        if let Ok(Ok(new_epoch)) =
636                            timeout(Duration::from_secs(15), state.wait_for_epoch(next_epoch)).await
637                        {
638                            assert_reachable!("retry submission at epoch end");
639                            if new_epoch == next_epoch {
640                                continue;
641                            }
642
643                            debug_fatal!(
644                                "expected epoch {} after reconfiguration. got {}",
645                                next_epoch,
646                                new_epoch
647                            );
648                        }
649                    }
650                    return Err(err.into());
651                }
652            }
653        }
654    }
655
656    async fn handle_submit_transaction_inner(
657        &self,
658        state: &AuthorityState,
659        consensus_adapter: &ConsensusAdapter,
660        metrics: &ValidatorServiceMetrics,
661        request: &RawSubmitTxRequest,
662        submitter_client_addr: Option<IpAddr>,
663    ) -> SuiResult<(RawSubmitTxResponse, Weight)> {
664        let epoch_store = state.load_epoch_store_one_call_per_task();
665        if !epoch_store.protocol_config().mysticeti_fastpath() {
666            return Err(SuiErrorKind::UnsupportedFeatureError {
667                error: "Mysticeti fastpath".to_string(),
668            }
669            .into());
670        }
671
672        let submit_type = SubmitTxType::try_from(request.submit_type).map_err(|e| {
673            SuiErrorKind::GrpcMessageDeserializeError {
674                type_info: "RawSubmitTxRequest.submit_type".to_string(),
675                error: e.to_string(),
676            }
677        })?;
678
679        let is_ping_request = submit_type == SubmitTxType::Ping;
680        if is_ping_request {
681            fp_ensure!(
682                request.transactions.is_empty(),
683                SuiErrorKind::InvalidRequest(format!(
684                    "Ping request cannot contain {} transactions",
685                    request.transactions.len()
686                ))
687                .into()
688            );
689        } else {
690            // Ensure default and soft bundle requests contain at least one transaction.
691            fp_ensure!(
692                !request.transactions.is_empty(),
693                SuiErrorKind::InvalidRequest(
694                    "At least one transaction needs to be submitted".to_string(),
695                )
696                .into()
697            );
698        }
699
700        // NOTE: for soft bundle requests, the system tries to sequence the transactions in the same order
701        // if they use the same gas price. But this is only done with best effort.
702        // Transactions in a soft bundle can be individually rejected or deferred, without affecting
703        // other transactions in the same bundle.
704        let is_soft_bundle_request = submit_type == SubmitTxType::SoftBundle;
705
706        let max_num_transactions = if is_soft_bundle_request {
707            // Soft bundle cannot contain too many transactions.
708            // Otherwise it is hard to include all of them in a single block.
709            epoch_store.protocol_config().max_soft_bundle_size()
710        } else {
711            // Still enforce a limit even when transactions do not need to be in the same block.
712            epoch_store
713                .protocol_config()
714                .max_num_transactions_in_block()
715        };
716        fp_ensure!(
717            request.transactions.len() <= max_num_transactions as usize,
718            SuiErrorKind::InvalidRequest(format!(
719                "Too many transactions in request: {} vs {}",
720                request.transactions.len(),
721                max_num_transactions
722            ))
723            .into()
724        );
725
726        // Transaction digests.
727        let mut tx_digests = Vec::with_capacity(request.transactions.len());
728        // Transactions to submit to consensus.
729        let mut consensus_transactions = Vec::with_capacity(request.transactions.len());
730        // Indexes of transactions above in the request transactions.
731        let mut transaction_indexes = Vec::with_capacity(request.transactions.len());
732        // Results corresponding to each transaction in the request.
733        let mut results: Vec<Option<SubmitTxResult>> = vec![None; request.transactions.len()];
734        // Total size of all transactions in the request.
735        let mut total_size_bytes = 0;
736
737        let req_type = if is_ping_request {
738            "ping"
739        } else if request.transactions.len() == 1 {
740            "single_transaction"
741        } else if is_soft_bundle_request {
742            "soft_bundle"
743        } else {
744            "batch"
745        };
746
747        let _handle_tx_metrics_guard = metrics
748            .handle_submit_transaction_latency
749            .with_label_values(&[req_type])
750            .start_timer();
751
752        for (idx, tx_bytes) in request.transactions.iter().enumerate() {
753            let transaction = match bcs::from_bytes::<Transaction>(tx_bytes) {
754                Ok(txn) => txn,
755                Err(e) => {
756                    // Ok to fail the request when any transaction is invalid.
757                    return Err(SuiErrorKind::TransactionDeserializationError {
758                        error: format!("Failed to deserialize transaction at index {}: {}", idx, e),
759                    }
760                    .into());
761                }
762            };
763
764            // Ok to fail the request when any transaction is invalid.
765            let tx_size = transaction.validity_check(&epoch_store.tx_validity_check_context())?;
766
767            let overload_check_res = state.check_system_overload(
768                consensus_adapter,
769                transaction.data(),
770                state.check_system_overload_at_signing(),
771            );
772            if let Err(error) = overload_check_res {
773                metrics
774                    .num_rejected_tx_during_overload
775                    .with_label_values(&[error.as_ref()])
776                    .inc();
777                results[idx] = Some(SubmitTxResult::Rejected { error });
778                continue;
779            }
780
781            // Ok to fail the request when any signature is invalid.
782            let verified_transaction = {
783                let _metrics_guard = metrics.tx_verification_latency.start_timer();
784                if epoch_store.protocol_config().address_aliases() {
785                    match epoch_store.verify_transaction_with_current_aliases(transaction) {
786                        Ok(tx) => tx,
787                        Err(e) => {
788                            metrics.signature_errors.inc();
789                            return Err(e);
790                        }
791                    }
792                } else {
793                    match epoch_store.verify_transaction_require_no_aliases(transaction) {
794                        Ok(tx) => tx,
795                        Err(e) => {
796                            metrics.signature_errors.inc();
797                            return Err(e);
798                        }
799                    }
800                }
801            };
802
803            let tx_digest = verified_transaction.tx().digest();
804            tx_digests.push(*tx_digest);
805
806            debug!(
807                ?tx_digest,
808                "handle_submit_transaction: verified transaction"
809            );
810
811            // Check if the transaction has executed, before checking input objects
812            // which could have been consumed.
813            if let Some(effects) = state
814                .get_transaction_cache_reader()
815                .get_executed_effects(tx_digest)
816            {
817                let effects_digest = effects.digest();
818                if let Ok(executed_data) = self.complete_executed_data(effects, None).await {
819                    let executed_result = SubmitTxResult::Executed {
820                        effects_digest,
821                        details: Some(executed_data),
822                        fast_path: false,
823                    };
824                    results[idx] = Some(executed_result);
825                    debug!(?tx_digest, "handle_submit_transaction: already executed");
826                    continue;
827                }
828            }
829
830            if self
831                .state
832                .get_transaction_cache_reader()
833                .transaction_executed_in_last_epoch(tx_digest, epoch_store.epoch())
834            {
835                results[idx] = Some(SubmitTxResult::Rejected {
836                    error: UserInputError::TransactionAlreadyExecuted { digest: *tx_digest }.into(),
837                });
838                debug!(
839                    ?tx_digest,
840                    "handle_submit_transaction: transaction already executed in previous epoch"
841                );
842                continue;
843            }
844
845            debug!(
846                ?tx_digest,
847                "handle_submit_transaction: waiting for fastpath dependency objects"
848            );
849            if !state
850                .wait_for_fastpath_dependency_objects(
851                    verified_transaction.tx(),
852                    epoch_store.epoch(),
853                )
854                .await?
855            {
856                debug!(
857                    ?tx_digest,
858                    "fastpath input objects are still unavailable after waiting"
859                );
860            }
861
862            match state.handle_vote_transaction(&epoch_store, verified_transaction.tx().clone()) {
863                Ok(_) => { /* continue processing */ }
864                Err(e) => {
865                    // Check if transaction has been executed while being validated.
866                    // This is an edge case so checking executed effects twice is acceptable.
867                    if let Some(effects) = state
868                        .get_transaction_cache_reader()
869                        .get_executed_effects(tx_digest)
870                    {
871                        let effects_digest = effects.digest();
872                        if let Ok(executed_data) = self.complete_executed_data(effects, None).await
873                        {
874                            let executed_result = SubmitTxResult::Executed {
875                                effects_digest,
876                                details: Some(executed_data),
877                                fast_path: false,
878                            };
879                            results[idx] = Some(executed_result);
880                            continue;
881                        }
882                    }
883
884                    // When the transaction has not been executed, record the error for the transaction.
885                    debug!(?tx_digest, "Transaction rejected during submission: {e}");
886                    metrics
887                        .submission_rejected_transactions
888                        .with_label_values(&[e.to_variant_name()])
889                        .inc();
890                    results[idx] = Some(SubmitTxResult::Rejected { error: e });
891                    continue;
892                }
893            }
894
895            if epoch_store.protocol_config().address_aliases() {
896                consensus_transactions.push(ConsensusTransaction::new_user_transaction_v2_message(
897                    &state.name,
898                    verified_transaction.into(),
899                ));
900            } else {
901                consensus_transactions.push(ConsensusTransaction::new_user_transaction_message(
902                    &state.name,
903                    verified_transaction.into_tx().into(),
904                ));
905            }
906            transaction_indexes.push(idx);
907            total_size_bytes += tx_size;
908        }
909
910        if consensus_transactions.is_empty() && !is_ping_request {
911            return Ok((Self::try_from_submit_tx_response(results)?, Weight::zero()));
912        }
913
914        // Set the max bytes size of the soft bundle to be half of the consensus max transactions in block size.
915        // We do this to account for serialization overheads and to ensure that the soft bundle is not too large
916        // when is attempted to be posted via consensus.
917        let max_transaction_bytes = if is_soft_bundle_request {
918            epoch_store
919                .protocol_config()
920                .consensus_max_transactions_in_block_bytes()
921                / 2
922        } else {
923            epoch_store
924                .protocol_config()
925                .consensus_max_transactions_in_block_bytes()
926        };
927        fp_ensure!(
928            total_size_bytes <= max_transaction_bytes as usize,
929            SuiErrorKind::UserInputError {
930                error: UserInputError::TotalTransactionSizeTooLargeInBatch {
931                    size: total_size_bytes,
932                    limit: max_transaction_bytes,
933                },
934            }
935            .into()
936        );
937
938        metrics
939            .handle_submit_transaction_bytes
940            .with_label_values(&[req_type])
941            .observe(total_size_bytes as f64);
942        metrics
943            .handle_submit_transaction_batch_size
944            .with_label_values(&[req_type])
945            .observe(consensus_transactions.len() as f64);
946
947        let _latency_metric_guard = metrics
948            .handle_submit_transaction_consensus_latency
949            .with_label_values(&[req_type])
950            .start_timer();
951
952        let consensus_positions = if is_soft_bundle_request || is_ping_request {
953            // We only allow the `consensus_transactions` to be empty for ping requests. This is how it should and is be treated from the downstream components.
954            // For any other case, having an empty `consensus_transactions` vector is an invalid state and we should have never reached at this point.
955            assert!(
956                is_ping_request || !consensus_transactions.is_empty(),
957                "A valid soft bundle must have at least one transaction"
958            );
959            debug!(
960                "handle_submit_transaction: submitting consensus transactions ({}): {}",
961                req_type,
962                consensus_transactions
963                    .iter()
964                    .map(|t| t.local_display())
965                    .join(", ")
966            );
967            self.handle_submit_to_consensus_for_position(
968                consensus_transactions,
969                &epoch_store,
970                submitter_client_addr,
971            )
972            .await?
973        } else {
974            let futures = consensus_transactions.into_iter().map(|t| {
975                debug!(
976                    "handle_submit_transaction: submitting consensus transaction ({}): {}",
977                    req_type,
978                    t.local_display(),
979                );
980                self.handle_submit_to_consensus_for_position(
981                    vec![t],
982                    &epoch_store,
983                    submitter_client_addr,
984                )
985            });
986            future::try_join_all(futures)
987                .await?
988                .into_iter()
989                .flatten()
990                .collect()
991        };
992
993        if is_ping_request {
994            // For ping requests, return the special consensus position.
995            assert_eq!(consensus_positions.len(), 1);
996            results.push(Some(SubmitTxResult::Submitted {
997                consensus_position: consensus_positions[0],
998            }));
999        } else {
1000            // Otherwise, return the consensus position for each transaction.
1001            for ((idx, tx_digest), consensus_position) in transaction_indexes
1002                .into_iter()
1003                .zip(tx_digests)
1004                .zip(consensus_positions)
1005            {
1006                debug!(
1007                    ?tx_digest,
1008                    "handle_submit_transaction: submitted consensus transaction at {}",
1009                    consensus_position,
1010                );
1011                results[idx] = Some(SubmitTxResult::Submitted { consensus_position });
1012            }
1013        }
1014
1015        Ok((Self::try_from_submit_tx_response(results)?, Weight::zero()))
1016    }
1017
1018    fn try_from_submit_tx_response(
1019        results: Vec<Option<SubmitTxResult>>,
1020    ) -> Result<RawSubmitTxResponse, SuiError> {
1021        let mut raw_results = Vec::new();
1022        for (i, result) in results.into_iter().enumerate() {
1023            let result = result.ok_or_else(|| SuiErrorKind::GenericAuthorityError {
1024                error: format!("Missing transaction result at {}", i),
1025            })?;
1026            let raw_result = result.try_into()?;
1027            raw_results.push(raw_result);
1028        }
1029        Ok(RawSubmitTxResponse {
1030            results: raw_results,
1031        })
1032    }
1033
1034    // In addition to the response from handling the certificates,
1035    // returns a bool indicating whether the request should be tallied
1036    // toward spam count. In general, this should be set to true for
1037    // requests that are read-only and thus do not consume gas, such
1038    // as when the transaction is already executed.
1039    async fn handle_certificates(
1040        &self,
1041        certificates: NonEmpty<CertifiedTransaction>,
1042        include_events: bool,
1043        include_input_objects: bool,
1044        include_output_objects: bool,
1045        include_auxiliary_data: bool,
1046        epoch_store: &Arc<AuthorityPerEpochStore>,
1047        wait_for_effects: bool,
1048    ) -> Result<(Option<Vec<HandleCertificateResponseV3>>, Weight), tonic::Status> {
1049        // Validate if cert can be executed
1050        // Fullnode does not serve handle_certificate call.
1051        fp_ensure!(
1052            !self.state.is_fullnode(epoch_store),
1053            SuiErrorKind::FullNodeCantHandleCertificate.into()
1054        );
1055
1056        let is_consensus_tx = certificates.iter().any(|cert| cert.is_consensus_tx());
1057
1058        let metrics = if certificates.len() == 1 {
1059            if wait_for_effects {
1060                if is_consensus_tx {
1061                    &self.metrics.handle_certificate_consensus_latency
1062                } else {
1063                    &self.metrics.handle_certificate_non_consensus_latency
1064                }
1065            } else {
1066                &self.metrics.submit_certificate_consensus_latency
1067            }
1068        } else {
1069            // `soft_bundle_validity_check` ensured that all certificates contain shared objects.
1070            &self
1071                .metrics
1072                .handle_soft_bundle_certificates_consensus_latency
1073        };
1074
1075        let _metrics_guard = metrics.start_timer();
1076
1077        // 1) Check if the certificate is already executed.
1078        //    This is only needed when we have only one certificate (not a soft bundle).
1079        //    When multiple certificates are provided, we will either submit all of them or none of them to consensus.
1080        if certificates.len() == 1 {
1081            let tx_digest = *certificates[0].digest();
1082            debug!(tx_digest=?tx_digest, "Checking if certificate is already executed");
1083
1084            if let Some(signed_effects) = self
1085                .state
1086                .get_signed_effects_and_maybe_resign(&tx_digest, epoch_store)?
1087            {
1088                let events = if include_events && signed_effects.events_digest().is_some() {
1089                    Some(
1090                        self.state
1091                            .get_transaction_events(signed_effects.transaction_digest())?,
1092                    )
1093                } else {
1094                    None
1095                };
1096
1097                return Ok((
1098                    Some(vec![HandleCertificateResponseV3 {
1099                        effects: signed_effects.into_inner(),
1100                        events,
1101                        input_objects: None,
1102                        output_objects: None,
1103                        auxiliary_data: None,
1104                    }]),
1105                    Weight::one(),
1106                ));
1107            };
1108        }
1109
1110        // 2) Verify the certificates.
1111        // Check system overload
1112        for certificate in &certificates {
1113            let overload_check_res = self.state.check_system_overload(
1114                &*self.consensus_adapter,
1115                certificate.data(),
1116                self.state.check_system_overload_at_execution(),
1117            );
1118            if let Err(error) = overload_check_res {
1119                self.metrics
1120                    .num_rejected_cert_during_overload
1121                    .with_label_values(&[error.as_ref()])
1122                    .inc();
1123                return Err(error.into());
1124            }
1125        }
1126
1127        let verified_certificates = {
1128            let _timer = self.metrics.cert_verification_latency.start_timer();
1129            epoch_store
1130                .signature_verifier
1131                .multi_verify_certs(certificates.into())
1132                .await
1133                .into_iter()
1134                .collect::<Result<Vec<_>, _>>()?
1135        };
1136        let consensus_transactions =
1137            NonEmpty::collect(verified_certificates.iter().map(|certificate| {
1138                ConsensusTransaction::new_certificate_message(
1139                    &self.state.name,
1140                    certificate.clone().into(),
1141                )
1142            }))
1143            .unwrap();
1144
1145        let (responses, weight) = self
1146            .handle_submit_to_consensus(
1147                consensus_transactions,
1148                include_events,
1149                include_input_objects,
1150                include_output_objects,
1151                include_auxiliary_data,
1152                epoch_store,
1153                wait_for_effects,
1154            )
1155            .await?;
1156        // Sign the returned TransactionEffects.
1157        let responses = if let Some(responses) = responses {
1158            Some(
1159                responses
1160                    .into_iter()
1161                    .map(|response| {
1162                        let signed_effects =
1163                            self.state.sign_effects(response.effects, epoch_store)?;
1164                        Ok(HandleCertificateResponseV3 {
1165                            effects: signed_effects.into_inner(),
1166                            events: response.events,
1167                            input_objects: if response.input_objects.is_empty() {
1168                                None
1169                            } else {
1170                                Some(response.input_objects)
1171                            },
1172                            output_objects: if response.output_objects.is_empty() {
1173                                None
1174                            } else {
1175                                Some(response.output_objects)
1176                            },
1177                            auxiliary_data: None,
1178                        })
1179                    })
1180                    .collect::<Result<Vec<HandleCertificateResponseV3>, tonic::Status>>()?,
1181            )
1182        } else {
1183            None
1184        };
1185
1186        Ok((responses, weight))
1187    }
1188
1189    #[instrument(
1190        name = "ValidatorService::handle_submit_to_consensus_for_position",
1191        level = "debug",
1192        skip_all,
1193        err(level = "debug")
1194    )]
1195    async fn handle_submit_to_consensus_for_position(
1196        &self,
1197        // Empty when this is a ping request.
1198        consensus_transactions: Vec<ConsensusTransaction>,
1199        epoch_store: &Arc<AuthorityPerEpochStore>,
1200        submitter_client_addr: Option<IpAddr>,
1201    ) -> Result<Vec<ConsensusPosition>, tonic::Status> {
1202        let (tx_consensus_positions, rx_consensus_positions) = oneshot::channel();
1203
1204        {
1205            // code block within reconfiguration lock
1206            let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
1207            if !reconfiguration_lock.should_accept_user_certs() {
1208                self.metrics.num_rejected_cert_in_epoch_boundary.inc();
1209                return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1210            }
1211
1212            // Submit to consensus and wait for position, we do not check if tx
1213            // has been processed by consensus already as this method is called
1214            // to get back a consensus position.
1215            let _metrics_guard = self.metrics.consensus_latency.start_timer();
1216
1217            self.consensus_adapter.submit_batch(
1218                &consensus_transactions,
1219                Some(&reconfiguration_lock),
1220                epoch_store,
1221                Some(tx_consensus_positions),
1222                submitter_client_addr,
1223            )?;
1224        }
1225
1226        Ok(rx_consensus_positions.await.map_err(|e| {
1227            SuiErrorKind::FailedToSubmitToConsensus(format!(
1228                "Failed to get consensus position: {e}"
1229            ))
1230        })?)
1231    }
1232
1233    async fn handle_submit_to_consensus(
1234        &self,
1235        consensus_transactions: NonEmpty<ConsensusTransaction>,
1236        include_events: bool,
1237        include_input_objects: bool,
1238        include_output_objects: bool,
1239        _include_auxiliary_data: bool,
1240        epoch_store: &Arc<AuthorityPerEpochStore>,
1241        wait_for_effects: bool,
1242    ) -> Result<(Option<Vec<ExecutedData>>, Weight), tonic::Status> {
1243        let consensus_transactions: Vec<_> = consensus_transactions.into();
1244        {
1245            // code block within reconfiguration lock
1246            let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
1247            if !reconfiguration_lock.should_accept_user_certs() {
1248                self.metrics.num_rejected_cert_in_epoch_boundary.inc();
1249                return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1250            }
1251
1252            // 3) All transactions are sent to consensus (at least by some authorities)
1253            // For certs with shared objects this will wait until either timeout or we have heard back from consensus.
1254            // For certs with owned objects this will return without waiting for certificate to be sequenced.
1255            // For uncertified transactions this will wait for fast path processing.
1256            // First do quick dirty non-async check.
1257            if !epoch_store.all_external_consensus_messages_processed(
1258                consensus_transactions.iter().map(|tx| tx.key()),
1259            )? {
1260                let _metrics_guard = self.metrics.consensus_latency.start_timer();
1261                self.consensus_adapter.submit_batch(
1262                    &consensus_transactions,
1263                    Some(&reconfiguration_lock),
1264                    epoch_store,
1265                    None,
1266                    None, // not tracking submitter client addr for quorum driver path
1267                )?;
1268                // Do not wait for the result, because the transaction might have already executed.
1269                // Instead, check or wait for the existence of certificate effects below.
1270            }
1271        }
1272
1273        if !wait_for_effects {
1274            // It is useful to enqueue owned object transaction for execution locally,
1275            // even when we are not returning effects to user
1276            let fast_path_certificates = consensus_transactions
1277                .iter()
1278                .filter_map(|tx| {
1279                    if let ConsensusTransactionKind::CertifiedTransaction(certificate) = &tx.kind {
1280                        (!certificate.is_consensus_tx())
1281                            // Certificates already verified by callers of this function.
1282                            .then_some((
1283                                VerifiedExecutableTransaction::new_from_certificate(
1284                                    VerifiedCertificate::new_unchecked(*(certificate.clone())),
1285                                ),
1286                                ExecutionEnv::new()
1287                                    .with_scheduling_source(SchedulingSource::NonFastPath),
1288                            ))
1289                    } else {
1290                        None
1291                    }
1292                })
1293                .map(|(tx, env)| (Schedulable::Transaction(tx), env))
1294                .collect::<Vec<_>>();
1295            if !fast_path_certificates.is_empty() {
1296                self.state
1297                    .execution_scheduler()
1298                    .enqueue(fast_path_certificates, epoch_store);
1299            }
1300            return Ok((None, Weight::zero()));
1301        }
1302
1303        // 4) Execute the certificates immediately if they contain only owned object transactions,
1304        // or wait for the execution results if it contains shared objects.
1305        let responses = futures::future::try_join_all(consensus_transactions.into_iter().map(
1306            |tx| async move {
1307                let effects = match &tx.kind {
1308                    ConsensusTransactionKind::CertifiedTransaction(certificate) => {
1309                        // Certificates already verified by callers of this function.
1310                        let certificate = VerifiedCertificate::new_unchecked(*(certificate.clone()));
1311                        self.state
1312                            .wait_for_certificate_execution(&certificate, epoch_store)
1313                            .await?
1314                    }
1315                    ConsensusTransactionKind::UserTransaction(tx) => {
1316                        self.state.await_transaction_effects(*tx.digest(), epoch_store).await?
1317                    }
1318                    ConsensusTransactionKind::UserTransactionV2(tx) => {
1319                        self.state.await_transaction_effects(*tx.tx().digest(), epoch_store).await?
1320                    }
1321                    _ => panic!("`handle_submit_to_consensus` received transaction that is not a CertifiedTransaction, UserTransaction, or UserTransactionV2"),
1322                };
1323                let events = if include_events && effects.events_digest().is_some() {
1324                    Some(self.state.get_transaction_events(effects.transaction_digest())?)
1325                } else {
1326                    None
1327                };
1328
1329                let input_objects = if include_input_objects {
1330                    self.state.get_transaction_input_objects(&effects)?
1331                } else {
1332                    vec![]
1333                };
1334
1335                let output_objects = if include_output_objects {
1336                    self.state.get_transaction_output_objects(&effects)?
1337                } else {
1338                    vec![]
1339                };
1340
1341                if let ConsensusTransactionKind::CertifiedTransaction(certificate) = &tx.kind {
1342                    epoch_store.insert_tx_cert_sig(certificate.digest(), certificate.auth_sig())?;
1343                }
1344
1345                Ok::<_, SuiError>(ExecutedData {
1346                    effects,
1347                    events,
1348                    input_objects,
1349                    output_objects,
1350                })
1351            },
1352        ))
1353        .await?;
1354
1355        Ok((Some(responses), Weight::zero()))
1356    }
1357
1358    async fn collect_effects_data(
1359        &self,
1360        effects: &TransactionEffects,
1361        include_events: bool,
1362        include_input_objects: bool,
1363        include_output_objects: bool,
1364        fastpath_outputs: Option<Arc<TransactionOutputs>>,
1365    ) -> SuiResult<(Option<TransactionEvents>, Vec<Object>, Vec<Object>)> {
1366        let events = if include_events && effects.events_digest().is_some() {
1367            if let Some(fastpath_outputs) = &fastpath_outputs {
1368                Some(fastpath_outputs.events.clone())
1369            } else {
1370                Some(
1371                    self.state
1372                        .get_transaction_events(effects.transaction_digest())?,
1373                )
1374            }
1375        } else {
1376            None
1377        };
1378
1379        let input_objects = if include_input_objects {
1380            self.state.get_transaction_input_objects(effects)?
1381        } else {
1382            vec![]
1383        };
1384
1385        let output_objects = if include_output_objects {
1386            if let Some(fastpath_outputs) = &fastpath_outputs {
1387                fastpath_outputs.written.values().cloned().collect()
1388            } else {
1389                self.state.get_transaction_output_objects(effects)?
1390            }
1391        } else {
1392            vec![]
1393        };
1394
1395        Ok((events, input_objects, output_objects))
1396    }
1397}
1398
1399type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
1400
1401impl ValidatorService {
1402    async fn transaction_impl(
1403        &self,
1404        request: tonic::Request<Transaction>,
1405    ) -> WrappedServiceResponse<HandleTransactionResponse> {
1406        self.handle_transaction(request).await
1407    }
1408
1409    async fn handle_submit_transaction_impl(
1410        &self,
1411        request: tonic::Request<RawSubmitTxRequest>,
1412    ) -> WrappedServiceResponse<RawSubmitTxResponse> {
1413        self.handle_submit_transaction(request).await
1414    }
1415
1416    async fn submit_certificate_impl(
1417        &self,
1418        request: tonic::Request<CertifiedTransaction>,
1419    ) -> WrappedServiceResponse<SubmitCertificateResponse> {
1420        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1421        let certificate = request.into_inner();
1422        certificate.validity_check(&epoch_store.tx_validity_check_context())?;
1423
1424        let span =
1425            error_span!("ValidatorService::submit_certificate", tx_digest = ?certificate.digest());
1426        self.handle_certificates(
1427            nonempty![certificate],
1428            true,
1429            false,
1430            false,
1431            false,
1432            &epoch_store,
1433            false,
1434        )
1435        .instrument(span)
1436        .await
1437        .map(|(executed, spam_weight)| {
1438            (
1439                tonic::Response::new(SubmitCertificateResponse {
1440                    executed: executed.map(|mut x| x.remove(0)).map(Into::into),
1441                }),
1442                spam_weight,
1443            )
1444        })
1445    }
1446
1447    async fn handle_certificate_v2_impl(
1448        &self,
1449        request: tonic::Request<CertifiedTransaction>,
1450    ) -> WrappedServiceResponse<HandleCertificateResponseV2> {
1451        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1452        let certificate = request.into_inner();
1453        certificate.validity_check(&epoch_store.tx_validity_check_context())?;
1454
1455        let span = error_span!("ValidatorService::handle_certificate_v2", tx_digest = ?certificate.digest());
1456        self.handle_certificates(
1457            nonempty![certificate],
1458            true,
1459            false,
1460            false,
1461            false,
1462            &epoch_store,
1463            true,
1464        )
1465        .instrument(span)
1466        .await
1467        .map(|(resp, spam_weight)| {
1468            (
1469                tonic::Response::new(
1470                    resp.expect(
1471                        "handle_certificate should not return none with wait_for_effects=true",
1472                    )
1473                    .remove(0)
1474                    .into(),
1475                ),
1476                spam_weight,
1477            )
1478        })
1479    }
1480
1481    async fn handle_certificate_v3_impl(
1482        &self,
1483        request: tonic::Request<HandleCertificateRequestV3>,
1484    ) -> WrappedServiceResponse<HandleCertificateResponseV3> {
1485        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1486        let request = request.into_inner();
1487        request
1488            .certificate
1489            .validity_check(&epoch_store.tx_validity_check_context())?;
1490
1491        let span = error_span!("ValidatorService::handle_certificate_v3", tx_digest = ?request.certificate.digest());
1492        self.handle_certificates(
1493            nonempty![request.certificate],
1494            request.include_events,
1495            request.include_input_objects,
1496            request.include_output_objects,
1497            request.include_auxiliary_data,
1498            &epoch_store,
1499            true,
1500        )
1501        .instrument(span)
1502        .await
1503        .map(|(resp, spam_weight)| {
1504            (
1505                tonic::Response::new(
1506                    resp.expect(
1507                        "handle_certificate should not return none with wait_for_effects=true",
1508                    )
1509                    .remove(0),
1510                ),
1511                spam_weight,
1512            )
1513        })
1514    }
1515
1516    async fn wait_for_effects_impl(
1517        &self,
1518        request: tonic::Request<RawWaitForEffectsRequest>,
1519    ) -> WrappedServiceResponse<RawWaitForEffectsResponse> {
1520        let request: WaitForEffectsRequest = request.into_inner().try_into()?;
1521        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1522        let response = timeout(
1523            // TODO(fastpath): Tune this once we have a good estimate of the typical delay.
1524            Duration::from_secs(20),
1525            epoch_store
1526                .within_alive_epoch(self.wait_for_effects_response(request, &epoch_store))
1527                .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch())),
1528        )
1529        .await
1530        .map_err(|_| tonic::Status::internal("Timeout waiting for effects"))???
1531        .try_into()?;
1532        Ok((tonic::Response::new(response), Weight::zero()))
1533    }
1534
1535    #[instrument(name= "ValidatorService::wait_for_effects_response", level = "error", skip_all, fields(consensus_position = ?request.consensus_position, fast_path_effects = tracing::field::Empty))]
1536    async fn wait_for_effects_response(
1537        &self,
1538        request: WaitForEffectsRequest,
1539        epoch_store: &Arc<AuthorityPerEpochStore>,
1540    ) -> SuiResult<WaitForEffectsResponse> {
1541        if request.ping_type.is_some() {
1542            return timeout(
1543                Duration::from_secs(10),
1544                self.ping_response(request, epoch_store),
1545            )
1546            .await
1547            .map_err(|_| SuiErrorKind::TimeoutError)?;
1548        }
1549
1550        let Some(tx_digest) = request.transaction_digest else {
1551            return Err(SuiErrorKind::InvalidRequest(
1552                "Transaction digest is required for wait for effects requests".to_string(),
1553            )
1554            .into());
1555        };
1556        let tx_digests = [tx_digest];
1557
1558        let fastpath_effects_future: Pin<Box<dyn Future<Output = _> + Send>> =
1559            if let Some(consensus_position) = request.consensus_position {
1560                Box::pin(self.wait_for_fastpath_effects(
1561                    consensus_position,
1562                    &tx_digests,
1563                    request.include_details,
1564                    epoch_store,
1565                ))
1566            } else {
1567                Box::pin(futures::future::pending())
1568            };
1569
1570        tokio::select! {
1571            // Ensure that finalized effects are always prioritized.
1572            biased;
1573            // We always wait for effects regardless of consensus position via
1574            // notify_read_executed_effects. This is safe because we have separated
1575            // mysticeti fastpath outputs to a separate dirty cache
1576            // UncommittedData::fastpath_transaction_outputs that will only get flushed
1577            // once finalized. So the output of notify_read_executed_effects is
1578            // guaranteed to be finalized effects or effects from QD execution.
1579            mut effects = self.state
1580                .get_transaction_cache_reader()
1581                .notify_read_executed_effects(
1582                    "AuthorityServer::wait_for_effects::notify_read_executed_effects_finalized",
1583                    &tx_digests,
1584                ) => {
1585                tracing::Span::current().record("fast_path_effects", false);
1586                let effects = effects.pop().unwrap();
1587                let details = if request.include_details {
1588                    Some(self.complete_executed_data(effects.clone(), None).await?)
1589                } else {
1590                    None
1591                };
1592
1593                Ok(WaitForEffectsResponse::Executed {
1594                    effects_digest: effects.digest(),
1595                    details,
1596                    fast_path: false,
1597                })
1598            }
1599
1600            fastpath_response = fastpath_effects_future => {
1601                tracing::Span::current().record("fast_path_effects", true);
1602                fastpath_response
1603            }
1604        }
1605    }
1606
1607    #[instrument(level = "error", skip_all, err(level = "debug"))]
1608    async fn ping_response(
1609        &self,
1610        request: WaitForEffectsRequest,
1611        epoch_store: &Arc<AuthorityPerEpochStore>,
1612    ) -> SuiResult<WaitForEffectsResponse> {
1613        let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1614            return Err(SuiErrorKind::UnsupportedFeatureError {
1615                error: "Mysticeti fastpath".to_string(),
1616            }
1617            .into());
1618        };
1619
1620        let Some(consensus_position) = request.consensus_position else {
1621            return Err(SuiErrorKind::InvalidRequest(
1622                "Consensus position is required for Ping requests".to_string(),
1623            )
1624            .into());
1625        };
1626
1627        // We assume that the caller has already checked for the existence of the `ping` field, but handling it gracefully here.
1628        let Some(ping) = request.ping_type else {
1629            return Err(SuiErrorKind::InvalidRequest(
1630                "Ping type is required for ping requests".to_string(),
1631            )
1632            .into());
1633        };
1634
1635        let _metrics_guard = self
1636            .metrics
1637            .handle_wait_for_effects_ping_latency
1638            .with_label_values(&[ping.as_str()])
1639            .start_timer();
1640
1641        consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1642
1643        let mut last_status = None;
1644        let details = if request.include_details {
1645            Some(Box::new(ExecutedData::default()))
1646        } else {
1647            None
1648        };
1649
1650        loop {
1651            let status = consensus_tx_status_cache
1652                .notify_read_transaction_status_change(consensus_position, last_status)
1653                .await;
1654            match status {
1655                NotifyReadConsensusTxStatusResult::Status(status) => match status {
1656                    ConsensusTxStatus::FastpathCertified => {
1657                        // If the request is for consensus, we need to wait for the transaction to be finalised via Consensus.
1658                        if ping == PingType::Consensus {
1659                            last_status = Some(status);
1660                            continue;
1661                        }
1662                        return Ok(WaitForEffectsResponse::Executed {
1663                            effects_digest: TransactionEffectsDigest::ZERO,
1664                            details,
1665                            fast_path: true,
1666                        });
1667                    }
1668                    ConsensusTxStatus::Rejected => {
1669                        return Ok(WaitForEffectsResponse::Rejected { error: None });
1670                    }
1671                    ConsensusTxStatus::Finalized => {
1672                        return Ok(WaitForEffectsResponse::Executed {
1673                            effects_digest: TransactionEffectsDigest::ZERO,
1674                            details,
1675                            fast_path: false,
1676                        });
1677                    }
1678                },
1679                NotifyReadConsensusTxStatusResult::Expired(round) => {
1680                    return Ok(WaitForEffectsResponse::Expired {
1681                        epoch: epoch_store.epoch(),
1682                        round: Some(round),
1683                    });
1684                }
1685            }
1686        }
1687    }
1688
1689    #[instrument(level = "error", skip_all, err(level = "debug"))]
1690    async fn wait_for_fastpath_effects(
1691        &self,
1692        consensus_position: ConsensusPosition,
1693        tx_digests: &[TransactionDigest],
1694        include_details: bool,
1695        epoch_store: &Arc<AuthorityPerEpochStore>,
1696    ) -> SuiResult<WaitForEffectsResponse> {
1697        let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1698            return Err(SuiErrorKind::UnsupportedFeatureError {
1699                error: "Mysticeti fastpath".to_string(),
1700            }
1701            .into());
1702        };
1703
1704        let local_epoch = epoch_store.epoch();
1705        match consensus_position.epoch.cmp(&local_epoch) {
1706            Ordering::Less => {
1707                // Ask TransactionDriver to retry submitting the transaction and get a new ConsensusPosition,
1708                // if response from this validator is desired.
1709                let response = WaitForEffectsResponse::Expired {
1710                    epoch: local_epoch,
1711                    round: None,
1712                };
1713                return Ok(response);
1714            }
1715            Ordering::Greater => {
1716                // Ask TransactionDriver to retry this RPC until the validator's epoch catches up.
1717                return Err(SuiErrorKind::WrongEpoch {
1718                    expected_epoch: local_epoch,
1719                    actual_epoch: consensus_position.epoch,
1720                }
1721                .into());
1722            }
1723            Ordering::Equal => {
1724                // The validator's epoch is the same as the epoch of the transaction.
1725                // We can proceed with the normal flow.
1726            }
1727        };
1728
1729        consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1730
1731        let mut current_status = None;
1732        loop {
1733            tokio::select! {
1734                status_result = consensus_tx_status_cache
1735                    .notify_read_transaction_status_change(consensus_position, current_status) => {
1736                    match status_result {
1737                        NotifyReadConsensusTxStatusResult::Status(new_status) => {
1738                            match new_status {
1739                                ConsensusTxStatus::Rejected => {
1740                                    return Ok(WaitForEffectsResponse::Rejected {
1741                                        error: epoch_store.get_rejection_vote_reason(
1742                                            consensus_position
1743                                        )
1744                                    });
1745                                }
1746                                ConsensusTxStatus::FastpathCertified => {
1747                                    current_status = Some(new_status);
1748                                    continue;
1749                                }
1750                                ConsensusTxStatus::Finalized => {
1751                                    current_status = Some(new_status);
1752                                    continue;
1753                                }
1754                            }
1755                        }
1756                        NotifyReadConsensusTxStatusResult::Expired(round) => {
1757                            return Ok(WaitForEffectsResponse::Expired {
1758                                epoch: epoch_store.epoch(),
1759                                round: Some(round),
1760                            });
1761                        }
1762                    }
1763                }
1764
1765                mut outputs = self.state.get_transaction_cache_reader().notify_read_fastpath_transaction_outputs(tx_digests),
1766                    if current_status == Some(ConsensusTxStatus::FastpathCertified) || current_status == Some(ConsensusTxStatus::Finalized) => {
1767                    let outputs = outputs.pop().unwrap();
1768                    let effects = outputs.effects.clone();
1769
1770                    let details = if include_details {
1771                        Some(self.complete_executed_data(effects.clone(), Some(outputs)).await?)
1772                    } else {
1773                        None
1774                    };
1775
1776                    return Ok(WaitForEffectsResponse::Executed {
1777                        effects_digest: effects.digest(),
1778                        details,
1779                        fast_path: current_status == Some(ConsensusTxStatus::FastpathCertified),
1780                    });
1781                }
1782            }
1783        }
1784    }
1785
1786    async fn complete_executed_data(
1787        &self,
1788        effects: TransactionEffects,
1789        fastpath_outputs: Option<Arc<TransactionOutputs>>,
1790    ) -> SuiResult<Box<ExecutedData>> {
1791        let (events, input_objects, output_objects) = self
1792            .collect_effects_data(
1793                &effects,
1794                /* include_events */ true,
1795                /* include_input_objects */ true,
1796                /* include_output_objects */ true,
1797                fastpath_outputs,
1798            )
1799            .await?;
1800        Ok(Box::new(ExecutedData {
1801            effects,
1802            events,
1803            input_objects,
1804            output_objects,
1805        }))
1806    }
1807
1808    async fn soft_bundle_validity_check(
1809        &self,
1810        certificates: &NonEmpty<CertifiedTransaction>,
1811        epoch_store: &Arc<AuthorityPerEpochStore>,
1812        total_size_bytes: u64,
1813    ) -> Result<(), tonic::Status> {
1814        let protocol_config = epoch_store.protocol_config();
1815        let node_config = &self.state.config;
1816
1817        // Soft Bundle MUST be enabled both in protocol config and local node config.
1818        //
1819        // The local node config is by default enabled, but can be turned off by the node operator.
1820        // This acts an extra safety measure where a validator node have the choice to turn this feature off,
1821        // without having to upgrade the entire network.
1822        fp_ensure!(
1823            protocol_config.soft_bundle() && node_config.enable_soft_bundle,
1824            SuiErrorKind::UnsupportedFeatureError {
1825                error: "Soft Bundle".to_string()
1826            }
1827            .into()
1828        );
1829
1830        // Enforce these checks per [SIP-19](https://github.com/sui-foundation/sips/blob/main/sips/sip-19.md):
1831        // - All certs must access at least one shared object.
1832        // - All certs must not be already executed.
1833        // - All certs must have the same gas price.
1834        // - Number of certs must not exceed the max allowed.
1835        // - Total size of all certs must not exceed the max allowed.
1836        fp_ensure!(
1837            certificates.len() as u64 <= protocol_config.max_soft_bundle_size(),
1838            SuiErrorKind::UserInputError {
1839                error: UserInputError::TooManyTransactionsInBatch {
1840                    size: certificates.len(),
1841                    limit: protocol_config.max_soft_bundle_size()
1842                }
1843            }
1844            .into()
1845        );
1846
1847        // We set the soft bundle max size to be half of the consensus max transactions in block size. We do this to account for
1848        // serialization overheads and to ensure that the soft bundle is not too large when is attempted to be posted via consensus.
1849        // Although half the block size is on the extreme side, it's should be good enough for now.
1850        let soft_bundle_max_size_bytes =
1851            protocol_config.consensus_max_transactions_in_block_bytes() / 2;
1852        fp_ensure!(
1853            total_size_bytes <= soft_bundle_max_size_bytes,
1854            SuiErrorKind::UserInputError {
1855                error: UserInputError::TotalTransactionSizeTooLargeInBatch {
1856                    size: total_size_bytes as usize,
1857                    limit: soft_bundle_max_size_bytes,
1858                },
1859            }
1860            .into()
1861        );
1862
1863        let mut gas_price = None;
1864        for certificate in certificates {
1865            let tx_digest = *certificate.digest();
1866            fp_ensure!(
1867                certificate.is_consensus_tx(),
1868                SuiErrorKind::UserInputError {
1869                    error: UserInputError::NoSharedObjectError { digest: tx_digest }
1870                }
1871                .into()
1872            );
1873            fp_ensure!(
1874                !self.state.is_tx_already_executed(&tx_digest),
1875                SuiErrorKind::UserInputError {
1876                    error: UserInputError::AlreadyExecutedInSoftBundleError { digest: tx_digest }
1877                }
1878                .into()
1879            );
1880            if let Some(gas) = gas_price {
1881                fp_ensure!(
1882                    gas == certificate.gas_price(),
1883                    SuiErrorKind::UserInputError {
1884                        error: UserInputError::GasPriceMismatchError {
1885                            digest: tx_digest,
1886                            expected: gas,
1887                            actual: certificate.gas_price()
1888                        }
1889                    }
1890                    .into()
1891                );
1892            } else {
1893                gas_price = Some(certificate.gas_price());
1894            }
1895        }
1896
1897        // For Soft Bundle, if at this point we know at least one certificate has already been processed,
1898        // reject the entire bundle.  Otherwise, submit all certificates in one request.
1899        // This is not a strict check as there may be race conditions where one or more certificates are
1900        // already being processed by another actor, and we could not know it.
1901        fp_ensure!(
1902            !epoch_store.is_any_tx_certs_consensus_message_processed(certificates.iter())?,
1903            SuiErrorKind::UserInputError {
1904                error: UserInputError::CertificateAlreadyProcessed
1905            }
1906            .into()
1907        );
1908
1909        Ok(())
1910    }
1911
1912    async fn handle_soft_bundle_certificates_v3_impl(
1913        &self,
1914        request: tonic::Request<HandleSoftBundleCertificatesRequestV3>,
1915    ) -> WrappedServiceResponse<HandleSoftBundleCertificatesResponseV3> {
1916        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1917        let client_addr = if self.client_id_source.is_none() {
1918            self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
1919        } else {
1920            self.get_client_ip_addr(&request, self.client_id_source.as_ref().unwrap())
1921        };
1922        let request = request.into_inner();
1923
1924        let certificates = NonEmpty::from_vec(request.certificates)
1925            .ok_or(SuiErrorKind::NoCertificateProvidedError)?;
1926        let mut total_size_bytes = 0;
1927        for certificate in &certificates {
1928            // We need to check this first because we haven't verified the cert signature.
1929            total_size_bytes +=
1930                certificate.validity_check(&epoch_store.tx_validity_check_context())? as u64;
1931        }
1932
1933        self.metrics
1934            .handle_soft_bundle_certificates_count
1935            .observe(certificates.len() as f64);
1936
1937        self.metrics
1938            .handle_soft_bundle_certificates_size_bytes
1939            .observe(total_size_bytes as f64);
1940
1941        // Now that individual certificates are valid, we check if the bundle is valid.
1942        self.soft_bundle_validity_check(&certificates, &epoch_store, total_size_bytes)
1943            .await?;
1944
1945        info!(
1946            "Received Soft Bundle with {} certificates, from {}, tx digests are [{}], total size [{}]bytes",
1947            certificates.len(),
1948            client_addr
1949                .map(|x| x.to_string())
1950                .unwrap_or_else(|| "unknown".to_string()),
1951            certificates
1952                .iter()
1953                .map(|x| x.digest().to_string())
1954                .collect::<Vec<_>>()
1955                .join(", "),
1956            total_size_bytes
1957        );
1958
1959        let span = error_span!("ValidatorService::handle_soft_bundle_certificates_v3");
1960        self.handle_certificates(
1961            certificates,
1962            request.include_events,
1963            request.include_input_objects,
1964            request.include_output_objects,
1965            request.include_auxiliary_data,
1966            &epoch_store,
1967            request.wait_for_effects,
1968        )
1969        .instrument(span)
1970        .await
1971        .map(|(resp, spam_weight)| {
1972            (
1973                tonic::Response::new(HandleSoftBundleCertificatesResponseV3 {
1974                    responses: resp.unwrap_or_default(),
1975                }),
1976                spam_weight,
1977            )
1978        })
1979    }
1980
1981    async fn object_info_impl(
1982        &self,
1983        request: tonic::Request<ObjectInfoRequest>,
1984    ) -> WrappedServiceResponse<ObjectInfoResponse> {
1985        let request = request.into_inner();
1986        let response = self.state.handle_object_info_request(request).await?;
1987        Ok((tonic::Response::new(response), Weight::one()))
1988    }
1989
1990    async fn transaction_info_impl(
1991        &self,
1992        request: tonic::Request<TransactionInfoRequest>,
1993    ) -> WrappedServiceResponse<TransactionInfoResponse> {
1994        let request = request.into_inner();
1995        let response = self.state.handle_transaction_info_request(request).await?;
1996        Ok((tonic::Response::new(response), Weight::one()))
1997    }
1998
1999    async fn checkpoint_impl(
2000        &self,
2001        request: tonic::Request<CheckpointRequest>,
2002    ) -> WrappedServiceResponse<CheckpointResponse> {
2003        let request = request.into_inner();
2004        let response = self.state.handle_checkpoint_request(&request)?;
2005        Ok((tonic::Response::new(response), Weight::one()))
2006    }
2007
2008    async fn checkpoint_v2_impl(
2009        &self,
2010        request: tonic::Request<CheckpointRequestV2>,
2011    ) -> WrappedServiceResponse<CheckpointResponseV2> {
2012        let request = request.into_inner();
2013        let response = self.state.handle_checkpoint_request_v2(&request)?;
2014        Ok((tonic::Response::new(response), Weight::one()))
2015    }
2016
2017    async fn get_system_state_object_impl(
2018        &self,
2019        _request: tonic::Request<SystemStateRequest>,
2020    ) -> WrappedServiceResponse<SuiSystemState> {
2021        let response = self
2022            .state
2023            .get_object_cache_reader()
2024            .get_sui_system_state_object_unsafe()?;
2025        Ok((tonic::Response::new(response), Weight::one()))
2026    }
2027
2028    async fn validator_health_impl(
2029        &self,
2030        _request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
2031    ) -> WrappedServiceResponse<sui_types::messages_grpc::RawValidatorHealthResponse> {
2032        let state = &self.state;
2033
2034        // Get epoch store once for both metrics
2035        let epoch_store = state.load_epoch_store_one_call_per_task();
2036
2037        // Get in-flight execution transactions from execution scheduler
2038        let num_inflight_execution_transactions =
2039            state.execution_scheduler().num_pending_certificates() as u64;
2040
2041        // Get in-flight consensus transactions from consensus adapter
2042        let num_inflight_consensus_transactions =
2043            self.consensus_adapter.num_inflight_transactions();
2044
2045        // Get last committed leader round from epoch store
2046        let last_committed_leader_round = epoch_store
2047            .consensus_tx_status_cache
2048            .as_ref()
2049            .and_then(|cache| cache.get_last_committed_leader_round())
2050            .unwrap_or(0);
2051
2052        // Get last locally built checkpoint sequence
2053        let last_locally_built_checkpoint = epoch_store
2054            .last_built_checkpoint_summary()
2055            .ok()
2056            .flatten()
2057            .map(|(_, summary)| summary.sequence_number)
2058            .unwrap_or(0);
2059
2060        let typed_response = sui_types::messages_grpc::ValidatorHealthResponse {
2061            num_inflight_consensus_transactions,
2062            num_inflight_execution_transactions,
2063            last_locally_built_checkpoint,
2064            last_committed_leader_round,
2065        };
2066
2067        let raw_response = typed_response
2068            .try_into()
2069            .map_err(|e: sui_types::error::SuiError| {
2070                tonic::Status::internal(format!("Failed to serialize health response: {}", e))
2071            })?;
2072
2073        Ok((tonic::Response::new(raw_response), Weight::one()))
2074    }
2075
2076    fn get_client_ip_addr<T>(
2077        &self,
2078        request: &tonic::Request<T>,
2079        source: &ClientIdSource,
2080    ) -> Option<IpAddr> {
2081        let forwarded_header = request.metadata().get_all("x-forwarded-for").iter().next();
2082
2083        if let Some(header) = forwarded_header {
2084            let num_hops = header
2085                .to_str()
2086                .map(|h| h.split(',').count().saturating_sub(1))
2087                .unwrap_or(0);
2088
2089            self.metrics.x_forwarded_for_num_hops.set(num_hops as f64);
2090        }
2091
2092        match source {
2093            ClientIdSource::SocketAddr => {
2094                let socket_addr: Option<SocketAddr> = request.remote_addr();
2095
2096                // We will hit this case if the IO type used does not
2097                // implement Connected or when using a unix domain socket.
2098                // TODO: once we have confirmed that no legitimate traffic
2099                // is hitting this case, we should reject such requests that
2100                // hit this case.
2101                if let Some(socket_addr) = socket_addr {
2102                    Some(socket_addr.ip())
2103                } else {
2104                    if cfg!(msim) {
2105                        // Ignore the error from simtests.
2106                    } else if cfg!(test) {
2107                        panic!("Failed to get remote address from request");
2108                    } else {
2109                        self.metrics.connection_ip_not_found.inc();
2110                        error!("Failed to get remote address from request");
2111                    }
2112                    None
2113                }
2114            }
2115            ClientIdSource::XForwardedFor(num_hops) => {
2116                let do_header_parse = |op: &MetadataValue<Ascii>| {
2117                    match op.to_str() {
2118                        Ok(header_val) => {
2119                            let header_contents =
2120                                header_val.split(',').map(str::trim).collect::<Vec<_>>();
2121                            if *num_hops == 0 {
2122                                error!(
2123                                    "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
2124                                    number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
2125                                    to this node. Skipping traffic controller request handling.",
2126                                    header_contents,
2127                                );
2128                                return None;
2129                            }
2130                            let contents_len = header_contents.len();
2131                            if contents_len < *num_hops {
2132                                error!(
2133                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
2134                                    Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
2135                                    `client-id-source` in the node config.",
2136                                    header_contents, contents_len, num_hops, contents_len,
2137                                );
2138                                self.metrics.client_id_source_config_mismatch.inc();
2139                                return None;
2140                            }
2141                            let Some(client_ip) = header_contents.get(contents_len - num_hops)
2142                            else {
2143                                error!(
2144                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
2145                                    Expected at least {} values. Skipping traffic controller request handling.",
2146                                    header_contents, contents_len, num_hops, contents_len,
2147                                );
2148                                return None;
2149                            };
2150                            parse_ip(client_ip).or_else(|| {
2151                                self.metrics.forwarded_header_parse_error.inc();
2152                                None
2153                            })
2154                        }
2155                        Err(e) => {
2156                            // TODO: once we have confirmed that no legitimate traffic
2157                            // is hitting this case, we should reject such requests that
2158                            // hit this case.
2159                            self.metrics.forwarded_header_invalid.inc();
2160                            error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
2161                            None
2162                        }
2163                    }
2164                };
2165                if let Some(op) = request.metadata().get("x-forwarded-for") {
2166                    do_header_parse(op)
2167                } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
2168                    do_header_parse(op)
2169                } else {
2170                    self.metrics.forwarded_header_not_included.inc();
2171                    error!(
2172                        "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
2173                    );
2174                    None
2175                }
2176            }
2177        }
2178    }
2179
2180    async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
2181        if let Some(traffic_controller) = &self.traffic_controller {
2182            if !traffic_controller.check(&client, &None).await {
2183                // Entity in blocklist
2184                Err(tonic::Status::from_error(
2185                    SuiErrorKind::TooManyRequests.into(),
2186                ))
2187            } else {
2188                Ok(())
2189            }
2190        } else {
2191            Ok(())
2192        }
2193    }
2194
2195    fn handle_traffic_resp<T>(
2196        &self,
2197        client: Option<IpAddr>,
2198        wrapped_response: WrappedServiceResponse<T>,
2199    ) -> Result<tonic::Response<T>, tonic::Status> {
2200        let (error, spam_weight, unwrapped_response) = match wrapped_response {
2201            Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
2202            Err(status) => (
2203                Some(SuiError::from(status.clone())),
2204                Weight::zero(),
2205                Err(status.clone()),
2206            ),
2207        };
2208
2209        if let Some(traffic_controller) = self.traffic_controller.clone() {
2210            traffic_controller.tally(TrafficTally {
2211                direct: client,
2212                through_fullnode: None,
2213                error_info: error.map(|e| {
2214                    let error_type = String::from(e.clone().as_ref());
2215                    let error_weight = normalize(e);
2216                    (error_weight, error_type)
2217                }),
2218                spam_weight,
2219                timestamp: SystemTime::now(),
2220            })
2221        }
2222        unwrapped_response
2223    }
2224}
2225
2226fn make_tonic_request_for_testing<T>(message: T) -> tonic::Request<T> {
2227    // simulate a TCP connection, which would have added extensions to
2228    // the request object that would be used downstream
2229    let mut request = tonic::Request::new(message);
2230    let tcp_connect_info = TcpConnectInfo {
2231        local_addr: None,
2232        remote_addr: Some(SocketAddr::new([127, 0, 0, 1].into(), 0)),
2233    };
2234    request.extensions_mut().insert(tcp_connect_info);
2235    request
2236}
2237
2238// TODO: refine error matching here
2239fn normalize(err: SuiError) -> Weight {
2240    match err.as_inner() {
2241        SuiErrorKind::UserInputError {
2242            error: UserInputError::IncorrectUserSignature { .. },
2243        } => Weight::one(),
2244        SuiErrorKind::InvalidSignature { .. }
2245        | SuiErrorKind::SignerSignatureAbsent { .. }
2246        | SuiErrorKind::SignerSignatureNumberMismatch { .. }
2247        | SuiErrorKind::IncorrectSigner { .. }
2248        | SuiErrorKind::UnknownSigner { .. }
2249        | SuiErrorKind::WrongEpoch { .. } => Weight::one(),
2250        _ => Weight::zero(),
2251    }
2252}
2253
2254/// Implements generic pre- and post-processing. Since this is on the critical
2255/// path, any heavy lifting should be done in a separate non-blocking task
2256/// unless it is necessary to override the return value.
2257#[macro_export]
2258macro_rules! handle_with_decoration {
2259    ($self:ident, $func_name:ident, $request:ident) => {{
2260        if $self.client_id_source.is_none() {
2261            return $self.$func_name($request).await.map(|(result, _)| result);
2262        }
2263
2264        let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
2265
2266        // check if either IP is blocked, in which case return early
2267        $self.handle_traffic_req(client.clone()).await?;
2268
2269        // handle traffic tallying
2270        let wrapped_response = $self.$func_name($request).await;
2271        $self.handle_traffic_resp(client, wrapped_response)
2272    }};
2273}
2274
2275#[async_trait]
2276impl Validator for ValidatorService {
2277    async fn submit_transaction(
2278        &self,
2279        request: tonic::Request<RawSubmitTxRequest>,
2280    ) -> Result<tonic::Response<RawSubmitTxResponse>, tonic::Status> {
2281        let validator_service = self.clone();
2282
2283        // Spawns a task which handles the transaction. The task will unconditionally continue
2284        // processing in the event that the client connection is dropped.
2285        spawn_monitored_task!(async move {
2286            // NB: traffic tally wrapping handled within the task rather than on task exit
2287            // to prevent an attacker from subverting traffic control by severing the connection
2288            handle_with_decoration!(validator_service, handle_submit_transaction_impl, request)
2289        })
2290        .await
2291        .unwrap()
2292    }
2293
2294    async fn transaction(
2295        &self,
2296        request: tonic::Request<Transaction>,
2297    ) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
2298        let validator_service = self.clone();
2299
2300        // Spawns a task which handles the transaction. The task will unconditionally continue
2301        // processing in the event that the client connection is dropped.
2302        spawn_monitored_task!(async move {
2303            // NB: traffic tally wrapping handled within the task rather than on task exit
2304            // to prevent an attacker from subverting traffic control by severing the connection
2305            handle_with_decoration!(validator_service, transaction_impl, request)
2306        })
2307        .await
2308        .unwrap()
2309    }
2310
2311    async fn submit_certificate(
2312        &self,
2313        request: tonic::Request<CertifiedTransaction>,
2314    ) -> Result<tonic::Response<SubmitCertificateResponse>, tonic::Status> {
2315        let validator_service = self.clone();
2316
2317        // Spawns a task which handles the certificate. The task will unconditionally continue
2318        // processing in the event that the client connection is dropped.
2319        spawn_monitored_task!(async move {
2320            // NB: traffic tally wrapping handled within the task rather than on task exit
2321            // to prevent an attacker from subverting traffic control by severing the connection.
2322            handle_with_decoration!(validator_service, submit_certificate_impl, request)
2323        })
2324        .await
2325        .unwrap()
2326    }
2327
2328    async fn handle_certificate_v2(
2329        &self,
2330        request: tonic::Request<CertifiedTransaction>,
2331    ) -> Result<tonic::Response<HandleCertificateResponseV2>, tonic::Status> {
2332        handle_with_decoration!(self, handle_certificate_v2_impl, request)
2333    }
2334
2335    async fn handle_certificate_v3(
2336        &self,
2337        request: tonic::Request<HandleCertificateRequestV3>,
2338    ) -> Result<tonic::Response<HandleCertificateResponseV3>, tonic::Status> {
2339        handle_with_decoration!(self, handle_certificate_v3_impl, request)
2340    }
2341
2342    async fn wait_for_effects(
2343        &self,
2344        request: tonic::Request<RawWaitForEffectsRequest>,
2345    ) -> Result<tonic::Response<RawWaitForEffectsResponse>, tonic::Status> {
2346        handle_with_decoration!(self, wait_for_effects_impl, request)
2347    }
2348
2349    async fn handle_soft_bundle_certificates_v3(
2350        &self,
2351        request: tonic::Request<HandleSoftBundleCertificatesRequestV3>,
2352    ) -> Result<tonic::Response<HandleSoftBundleCertificatesResponseV3>, tonic::Status> {
2353        handle_with_decoration!(self, handle_soft_bundle_certificates_v3_impl, request)
2354    }
2355
2356    async fn object_info(
2357        &self,
2358        request: tonic::Request<ObjectInfoRequest>,
2359    ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
2360        handle_with_decoration!(self, object_info_impl, request)
2361    }
2362
2363    async fn transaction_info(
2364        &self,
2365        request: tonic::Request<TransactionInfoRequest>,
2366    ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
2367        handle_with_decoration!(self, transaction_info_impl, request)
2368    }
2369
2370    async fn checkpoint(
2371        &self,
2372        request: tonic::Request<CheckpointRequest>,
2373    ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
2374        handle_with_decoration!(self, checkpoint_impl, request)
2375    }
2376
2377    async fn checkpoint_v2(
2378        &self,
2379        request: tonic::Request<CheckpointRequestV2>,
2380    ) -> Result<tonic::Response<CheckpointResponseV2>, tonic::Status> {
2381        handle_with_decoration!(self, checkpoint_v2_impl, request)
2382    }
2383
2384    async fn get_system_state_object(
2385        &self,
2386        request: tonic::Request<SystemStateRequest>,
2387    ) -> Result<tonic::Response<SuiSystemState>, tonic::Status> {
2388        handle_with_decoration!(self, get_system_state_object_impl, request)
2389    }
2390
2391    async fn validator_health(
2392        &self,
2393        request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
2394    ) -> Result<tonic::Response<sui_types::messages_grpc::RawValidatorHealthResponse>, tonic::Status>
2395    {
2396        handle_with_decoration!(self, validator_health_impl, request)
2397    }
2398}