sui_core/
authority_server.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use anyhow::Result;
6use async_trait::async_trait;
7use fastcrypto::traits::KeyPair;
8use futures::{TryFutureExt, future};
9use itertools::Itertools as _;
10use moka::sync::Cache;
11use mysten_common::ZipDebugEqIteratorExt;
12use mysten_common::{assert_reachable, debug_fatal};
13use mysten_metrics::spawn_monitored_task;
14use prometheus::{
15    Gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, Registry,
16    register_gauge_with_registry, register_histogram_vec_with_registry,
17    register_histogram_with_registry, register_int_counter_vec_with_registry,
18    register_int_counter_with_registry, register_int_gauge_with_registry,
19};
20use std::{
21    collections::HashSet,
22    io,
23    net::{IpAddr, SocketAddr},
24    sync::Arc,
25    time::{Duration, Instant, SystemTime},
26};
27use sui_network::{
28    api::{Validator, ValidatorServer},
29    tonic,
30    validator::server::SUI_TLS_SERVER_NAME,
31};
32use sui_types::effects::TransactionEffectsAPI;
33use sui_types::message_envelope::Message;
34use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKey};
35use sui_types::messages_grpc::{
36    ObjectInfoRequest, ObjectInfoResponse, RawSubmitTxResponse, SystemStateRequest,
37    TransactionInfoRequest, TransactionInfoResponse,
38};
39use sui_types::multiaddr::Multiaddr;
40use sui_types::object::Object;
41use sui_types::sui_system_state::SuiSystemState;
42use sui_types::traffic_control::{ClientIdSource, Weight};
43use sui_types::{
44    base_types::ObjectID,
45    digests::{TransactionDigest, TransactionEffectsDigest},
46    error::{SuiErrorKind, UserInputError},
47};
48use sui_types::{
49    effects::TransactionEffects,
50    messages_grpc::{
51        ExecutedData, RawSubmitTxRequest, RawWaitForEffectsRequest, RawWaitForEffectsResponse,
52        SubmitTxResult, WaitForEffectsRequest, WaitForEffectsResponse,
53    },
54};
55use sui_types::{effects::TransactionEvents, messages_grpc::SubmitTxType};
56use sui_types::{error::*, transaction::*};
57use sui_types::{
58    fp_ensure,
59    messages_checkpoint::{
60        CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
61    },
62};
63use tokio::time::timeout;
64use tonic::metadata::{Ascii, MetadataValue};
65use tracing::{debug, error, info, instrument};
66
67use crate::admission_queue::{AdmissionQueueContext, AdmissionQueueManager};
68use crate::gasless_rate_limiter::GaslessRateLimiter;
69use crate::{
70    authority::{AuthorityState, consensus_tx_status_cache::ConsensusTxStatus},
71    consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics, ConsensusOverloadChecker},
72    consensus_handler::SequencedConsensusTransactionKey,
73    traffic_controller::{TrafficController, parse_ip, policies::TrafficTally},
74};
75use crate::{
76    authority::{
77        authority_per_epoch_store::AuthorityPerEpochStore,
78        consensus_tx_status_cache::NotifyReadConsensusTxStatusResult,
79    },
80    checkpoints::CheckpointStore,
81    mysticeti_adapter::LazyMysticetiClient,
82};
83use sui_config::local_ip_utils::new_local_tcp_address_for_testing;
84
85#[cfg(test)]
86#[path = "unit_tests/server_tests.rs"]
87mod server_tests;
88
89#[cfg(test)]
90#[path = "unit_tests/wait_for_effects_tests.rs"]
91mod wait_for_effects_tests;
92
93#[cfg(test)]
94#[path = "unit_tests/submit_transaction_tests.rs"]
95mod submit_transaction_tests;
96
97pub struct AuthorityServerHandle {
98    server_handle: sui_network::validator::server::Server,
99}
100
101impl AuthorityServerHandle {
102    pub async fn join(self) -> Result<(), io::Error> {
103        self.server_handle.handle().wait_for_shutdown().await;
104        Ok(())
105    }
106
107    pub async fn kill(self) -> Result<(), io::Error> {
108        self.server_handle.handle().shutdown().await;
109        Ok(())
110    }
111
112    pub fn address(&self) -> &Multiaddr {
113        self.server_handle.local_addr()
114    }
115}
116
117pub struct AuthorityServer {
118    address: Multiaddr,
119    pub state: Arc<AuthorityState>,
120    consensus_adapter: Arc<ConsensusAdapter>,
121    pub metrics: Arc<ValidatorServiceMetrics>,
122}
123
124impl AuthorityServer {
125    pub fn new_for_test_with_consensus_adapter(
126        state: Arc<AuthorityState>,
127        consensus_adapter: Arc<ConsensusAdapter>,
128    ) -> Self {
129        let address = new_local_tcp_address_for_testing();
130        let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
131
132        Self {
133            address,
134            state,
135            consensus_adapter,
136            metrics,
137        }
138    }
139
140    pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
141        let slot_freed_notify = Arc::new(tokio::sync::Notify::new());
142        let consensus_adapter = Arc::new(ConsensusAdapter::new(
143            Arc::new(LazyMysticetiClient::new()),
144            CheckpointStore::new_for_tests(),
145            state.name,
146            100_000,
147            100_000,
148            ConsensusAdapterMetrics::new_test(),
149            slot_freed_notify,
150        ));
151        Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
152    }
153
154    pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
155        let address = self.address.clone();
156        self.spawn_with_bind_address_for_test(address).await
157    }
158
159    pub async fn spawn_with_bind_address_for_test(
160        self,
161        address: Multiaddr,
162    ) -> Result<AuthorityServerHandle, io::Error> {
163        let tls_config = sui_tls::create_rustls_server_config(
164            self.state.config.network_key_pair().copy().private(),
165            SUI_TLS_SERVER_NAME.to_string(),
166        );
167        let config = mysten_network::config::Config::new();
168        let server = sui_network::validator::server::ServerBuilder::from_config(
169            &config,
170            mysten_network::metrics::DefaultMetricsCallbackProvider::default(),
171        )
172        .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
173            self.state,
174            self.consensus_adapter,
175            self.metrics,
176        )))
177        .bind(&address, Some(tls_config))
178        .await
179        .unwrap();
180        let local_addr = server.local_addr().to_owned();
181        info!("Listening to traffic on {local_addr}");
182        let handle = AuthorityServerHandle {
183            server_handle: server,
184        };
185        Ok(handle)
186    }
187}
188
189pub struct ValidatorServiceMetrics {
190    pub signature_errors: IntCounter,
191    pub tx_verification_latency: Histogram,
192    pub handle_transaction_latency: Histogram,
193    pub handle_transaction_consensus_latency: Histogram,
194    pub handle_submit_transaction_consensus_latency: HistogramVec,
195    pub handle_wait_for_effects_ping_latency: HistogramVec,
196
197    handle_submit_transaction_latency: HistogramVec,
198    handle_submit_transaction_bytes: HistogramVec,
199    handle_submit_transaction_batch_size: HistogramVec,
200
201    num_rejected_tx_during_overload: IntCounterVec,
202    submission_rejected_transactions: IntCounterVec,
203    submission_suppressed_already_processed: IntCounterVec,
204    submission_suppressed_recently_submitted: IntCounterVec,
205    recently_submitted_cache_size: IntGauge,
206    recently_submitted_resubmission_interval: Histogram,
207    connection_ip_not_found: IntCounter,
208    forwarded_header_parse_error: IntCounter,
209    forwarded_header_invalid: IntCounter,
210    forwarded_header_not_included: IntCounter,
211    client_id_source_config_mismatch: IntCounter,
212    x_forwarded_for_num_hops: Gauge,
213    pub gasless_rate_limited_count: IntCounter,
214    pub gasless_submission_outcomes: IntCounterVec,
215    admission_queue_direct_bypasses: IntCounter,
216}
217
218impl ValidatorServiceMetrics {
219    pub fn new(registry: &Registry) -> Self {
220        Self {
221            signature_errors: register_int_counter_with_registry!(
222                "total_signature_errors",
223                "Number of transaction signature errors",
224                registry,
225            )
226            .unwrap(),
227            tx_verification_latency: register_histogram_with_registry!(
228                "validator_service_tx_verification_latency",
229                "Latency of verifying a transaction",
230                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
231                registry,
232            )
233            .unwrap(),
234            handle_transaction_latency: register_histogram_with_registry!(
235                "validator_service_handle_transaction_latency",
236                "Latency of handling a transaction",
237                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
238                registry,
239            )
240            .unwrap(),
241            handle_transaction_consensus_latency: register_histogram_with_registry!(
242                "validator_service_handle_transaction_consensus_latency",
243                "Latency of handling a user transaction sent through consensus",
244                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
245                registry,
246            )
247            .unwrap(),
248            handle_submit_transaction_consensus_latency: register_histogram_vec_with_registry!(
249                "validator_service_submit_transaction_consensus_latency",
250                "Latency of submitting a user transaction sent through consensus",
251                &["req_type"],
252                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
253                registry,
254            )
255            .unwrap(),
256            handle_submit_transaction_latency: register_histogram_vec_with_registry!(
257                "validator_service_submit_transaction_latency",
258                "Latency of submit transaction handler",
259                &["req_type"],
260                mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
261                registry,
262            )
263            .unwrap(),
264            handle_wait_for_effects_ping_latency: register_histogram_vec_with_registry!(
265                "validator_service_handle_wait_for_effects_ping_latency",
266                "Latency of handling a ping request for wait_for_effects",
267                &["req_type"],
268                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
269                registry,
270            )
271            .unwrap(),
272            handle_submit_transaction_bytes: register_histogram_vec_with_registry!(
273                "validator_service_submit_transaction_bytes",
274                "The size of transactions in the submit transaction request",
275                &["req_type"],
276                mysten_metrics::BYTES_BUCKETS.to_vec(),
277                registry,
278            )
279            .unwrap(),
280            handle_submit_transaction_batch_size: register_histogram_vec_with_registry!(
281                "validator_service_submit_transaction_batch_size",
282                "The number of transactions in the submit transaction request",
283                &["req_type"],
284                mysten_metrics::COUNT_BUCKETS.to_vec(),
285                registry,
286            )
287            .unwrap(),
288            num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
289                "validator_service_num_rejected_tx_during_overload",
290                "Number of rejected transaction due to system overload",
291                &["error_type"],
292                registry,
293            )
294            .unwrap(),
295            submission_rejected_transactions: register_int_counter_vec_with_registry!(
296                "validator_service_submission_rejected_transactions",
297                "Number of transactions rejected during submission",
298                &["reason"],
299                registry,
300            )
301            .unwrap(),
302            submission_suppressed_already_processed: register_int_counter_vec_with_registry!(
303                "validator_service_submission_suppressed_already_processed",
304                "Number of submitted transactions suppressed because consensus had already \
305                 processed them this epoch (re-submission of already-processed transactions)",
306                &["req_type"],
307                registry,
308            )
309            .unwrap(),
310            submission_suppressed_recently_submitted: register_int_counter_vec_with_registry!(
311                "validator_service_submission_suppressed_recently_submitted",
312                "Number of submitted transactions suppressed because the same transaction was \
313                 submitted within the recent-submission window",
314                &["req_type"],
315                registry,
316            )
317            .unwrap(),
318            recently_submitted_cache_size: register_int_gauge_with_registry!(
319                "validator_service_recently_submitted_cache_size",
320                "Approximate number of transaction digests held in the recent-submission duplicate-suppression cache",
321                registry,
322            )
323            .unwrap(),
324            recently_submitted_resubmission_interval: register_histogram_with_registry!(
325                "validator_service_recently_submitted_resubmission_interval_seconds",
326                "Time between a transaction being recorded and a duplicate resubmission of it being suppressed",
327                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
328                registry,
329            )
330            .unwrap(),
331            connection_ip_not_found: register_int_counter_with_registry!(
332                "validator_service_connection_ip_not_found",
333                "Number of times connection IP was not extractable from request",
334                registry,
335            )
336            .unwrap(),
337            forwarded_header_parse_error: register_int_counter_with_registry!(
338                "validator_service_forwarded_header_parse_error",
339                "Number of times x-forwarded-for header could not be parsed",
340                registry,
341            )
342            .unwrap(),
343            forwarded_header_invalid: register_int_counter_with_registry!(
344                "validator_service_forwarded_header_invalid",
345                "Number of times x-forwarded-for header was invalid",
346                registry,
347            )
348            .unwrap(),
349            forwarded_header_not_included: register_int_counter_with_registry!(
350                "validator_service_forwarded_header_not_included",
351                "Number of times x-forwarded-for header was (unexpectedly) not included in request",
352                registry,
353            )
354            .unwrap(),
355            client_id_source_config_mismatch: register_int_counter_with_registry!(
356                "validator_service_client_id_source_config_mismatch",
357                "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
358                registry,
359            )
360            .unwrap(),
361            x_forwarded_for_num_hops: register_gauge_with_registry!(
362                "validator_service_x_forwarded_for_num_hops",
363                "Number of hops in x-forwarded-for header",
364                registry,
365            )
366            .unwrap(),
367            gasless_rate_limited_count: register_int_counter_with_registry!(
368                "validator_service_gasless_rate_limited_count",
369                "Number of gasless transactions rejected by rate limiter",
370                registry,
371            )
372            .unwrap(),
373            gasless_submission_outcomes: register_int_counter_vec_with_registry!(
374                "validator_service_gasless_submission_outcomes",
375                "Number of valid gasless transaction submissions by outcome",
376                &["outcome"],
377                registry,
378            )
379            .unwrap(),
380            admission_queue_direct_bypasses: register_int_counter_with_registry!(
381                "validator_service_admission_queue_direct_bypasses",
382                "Number of transactions that bypassed the queue (system not overloaded)",
383                registry,
384            )
385            .unwrap(),
386        }
387    }
388
389    pub fn new_for_tests() -> Self {
390        let registry = Registry::new();
391        Self::new(&registry)
392    }
393}
394
395/// Where `handle_submit_transaction` routes a request.
396enum AdmissionQueueSubmitMode {
397    /// System has capacity: submit directly to consensus, skipping the queue.
398    Bypass,
399    /// System is overloaded: admit via the priority queue.
400    Queue,
401    /// Queue is not available — either turned off by config or temporarily
402    /// disabled by failover. Submit directly to consensus, but reject
403    /// individual txs when consensus is saturated (pre-queue behavior).
404    Disabled,
405}
406
407#[derive(Clone)]
408pub struct ValidatorService {
409    state: Arc<AuthorityState>,
410    consensus_adapter: Arc<ConsensusAdapter>,
411    metrics: Arc<ValidatorServiceMetrics>,
412    traffic_controller: Option<Arc<TrafficController>>,
413    client_id_source: Option<ClientIdSource>,
414    gasless_limiter: GaslessRateLimiter,
415    admission_queue: Option<AdmissionQueueContext>,
416    /// Digests submitted within the last `recent_submission_window` (value: when recorded), to
417    /// drop duplicate resubmissions before they reach consensus.
418    recently_submitted: Cache<TransactionDigest, Instant>,
419    /// How long a transaction is suppressed after submission (from node config).
420    recent_submission_window: Duration,
421}
422
423/// Assumed peak distinct-submission rate, used to size the dedup cache (per window).
424const RECENT_SUBMISSION_PEAK_TPS: u64 = 50_000;
425
426impl ValidatorService {
427    pub fn new(
428        state: Arc<AuthorityState>,
429        consensus_adapter: Arc<ConsensusAdapter>,
430        validator_metrics: Arc<ValidatorServiceMetrics>,
431        client_id_source: Option<ClientIdSource>,
432        admission_queue: Option<AdmissionQueueContext>,
433    ) -> Self {
434        let traffic_controller = state.traffic_controller.clone();
435        let gasless_limiter = GaslessRateLimiter::new(state.consensus_gasless_counter.clone());
436        let recent_submission_window = state.config.recent_submission_dedup_window();
437        Self {
438            state,
439            consensus_adapter,
440            metrics: validator_metrics,
441            traffic_controller,
442            client_id_source,
443            gasless_limiter,
444            admission_queue,
445            recently_submitted: Self::new_recently_submitted_cache(recent_submission_window),
446            recent_submission_window,
447        }
448    }
449
450    fn new_recently_submitted_cache(window: Duration) -> Cache<TransactionDigest, Instant> {
451        // Memory backstop only; the window bounds the cache, and amplified duplicates do not add
452        // entries (they share a digest). Sized for roughly one window at peak throughput.
453        let max_capacity = window.as_secs().max(1) * RECENT_SUBMISSION_PEAK_TPS;
454        Cache::builder()
455            .time_to_live(window)
456            .max_capacity(max_capacity)
457            .build()
458    }
459
460    pub fn new_for_tests(
461        state: Arc<AuthorityState>,
462        consensus_adapter: Arc<ConsensusAdapter>,
463        metrics: Arc<ValidatorServiceMetrics>,
464    ) -> Self {
465        let gasless_limiter = GaslessRateLimiter::new(state.consensus_gasless_counter.clone());
466        let epoch_store = state.epoch_store_for_testing().clone();
467        let slot_freed_notify = Arc::new(tokio::sync::Notify::new());
468        let manager = Arc::new(AdmissionQueueManager::new_for_tests(
469            consensus_adapter.clone(),
470            slot_freed_notify,
471        ));
472        let admission_queue = Some(AdmissionQueueContext::spawn(manager, epoch_store));
473        let recent_submission_window = state.config.recent_submission_dedup_window();
474        Self {
475            state,
476            consensus_adapter,
477            metrics,
478            traffic_controller: None,
479            client_id_source: None,
480            gasless_limiter,
481            admission_queue,
482            recently_submitted: Self::new_recently_submitted_cache(recent_submission_window),
483            recent_submission_window,
484        }
485    }
486
487    pub fn validator_state(&self) -> &Arc<AuthorityState> {
488        &self.state
489    }
490
491    /// Test method that performs transaction validation without going through gRPC.
492    pub fn handle_transaction_for_testing(&self, transaction: Transaction) -> SuiResult<()> {
493        let epoch_store = self.state.load_epoch_store_one_call_per_task();
494
495        // Validity check (basic structural validation)
496        transaction.validity_check(&epoch_store.tx_validity_check_context())?;
497
498        // Signature verification
499        let transaction = epoch_store
500            .verify_transaction_require_no_aliases(transaction)?
501            .into_tx();
502
503        // Validate the transaction
504        self.state
505            .handle_vote_transaction(&epoch_store, transaction)?;
506
507        Ok(())
508    }
509
510    /// Test method that performs transaction validation with overload checking.
511    /// Used for testing validator overload behavior.
512    pub fn handle_transaction_for_testing_with_overload_check(
513        &self,
514        transaction: Transaction,
515    ) -> SuiResult<()> {
516        let epoch_store = self.state.load_epoch_store_one_call_per_task();
517
518        // Validity check (basic structural validation)
519        transaction.validity_check(&epoch_store.tx_validity_check_context())?;
520
521        // Check system overload
522        self.state.check_system_overload(
523            transaction.data(),
524            self.state.check_system_overload_at_signing(),
525        )?;
526
527        // Signature verification
528        let transaction = epoch_store
529            .verify_transaction_require_no_aliases(transaction)?
530            .into_tx();
531
532        // Validate the transaction
533        self.state
534            .handle_vote_transaction(&epoch_store, transaction)?;
535
536        Ok(())
537    }
538
539    /// Collect the IDs of input objects that are immutable.
540    /// This is used to create the ImmutableInputObjects claim for consensus messages.
541    async fn collect_immutable_object_ids(
542        &self,
543        tx: &VerifiedTransaction,
544        state: &AuthorityState,
545    ) -> SuiResult<Vec<ObjectID>> {
546        let input_objects = tx.data().transaction_data().input_objects()?;
547
548        // Collect object IDs from ImmOrOwnedMoveObject inputs
549        let object_ids: Vec<ObjectID> = input_objects
550            .iter()
551            .filter_map(|obj| match obj {
552                InputObjectKind::ImmOrOwnedMoveObject((id, _, _)) => Some(*id),
553                _ => None,
554            })
555            .collect();
556        if object_ids.is_empty() {
557            return Ok(vec![]);
558        }
559
560        // Load objects from cache and filter to immutable ones
561        let objects = state.get_object_cache_reader().get_objects(&object_ids);
562
563        // All objects should be found, since owned input objects have been validated to exist.
564        objects
565            .into_iter()
566            .zip_debug_eq(object_ids.iter())
567            .filter_map(|(obj, id)| {
568                let Some(o) = obj else {
569                    return Some(Err::<ObjectID, SuiError>(
570                        SuiErrorKind::UserInputError {
571                            error: UserInputError::ObjectNotFound {
572                                object_id: *id,
573                                version: None,
574                            },
575                        }
576                        .into(),
577                    ));
578                };
579                if o.is_immutable() {
580                    Some(Ok(*id))
581                } else {
582                    None
583                }
584            })
585            .collect::<SuiResult<Vec<ObjectID>>>()
586    }
587
588    #[instrument(
589        name = "ValidatorService::handle_submit_transaction",
590        level = "error",
591        skip_all,
592        err(level = "debug")
593    )]
594    async fn handle_submit_transaction(
595        &self,
596        request: tonic::Request<RawSubmitTxRequest>,
597    ) -> WrappedServiceResponse<RawSubmitTxResponse> {
598        let Self {
599            state,
600            consensus_adapter: _,
601            metrics,
602            traffic_controller: _,
603            client_id_source,
604            gasless_limiter: _,
605            admission_queue: _,
606            recently_submitted: _,
607            recent_submission_window: _,
608        } = self.clone();
609
610        let submitter_client_addr = if let Some(client_id_source) = &client_id_source {
611            self.get_client_ip_addr(&request, client_id_source)
612        } else {
613            self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
614        };
615
616        let inner = request.into_inner();
617        let start_epoch = state.load_epoch_store_one_call_per_task().epoch();
618
619        let next_epoch = start_epoch + 1;
620        let mut max_retries = 1;
621
622        loop {
623            let res = self
624                .handle_submit_transaction_inner(&state, &metrics, &inner, submitter_client_addr)
625                .await;
626            match res {
627                Ok((response, weight)) => return Ok((tonic::Response::new(response), weight)),
628                Err(err) => {
629                    if max_retries > 0
630                        && let SuiErrorKind::ValidatorHaltedAtEpochEnd = err.as_inner()
631                    {
632                        max_retries -= 1;
633
634                        debug!(
635                            "ValidatorHaltedAtEpochEnd. Will retry after validator reconfigures"
636                        );
637
638                        if let Ok(Ok(new_epoch)) =
639                            timeout(Duration::from_secs(15), state.wait_for_epoch(next_epoch)).await
640                        {
641                            assert_reachable!("retry submission at epoch end");
642                            if new_epoch >= next_epoch {
643                                continue;
644                            }
645                            // wait_for_epoch guarantees >= target; < would indicate a bug there.
646                            debug_fatal!(
647                                "wait_for_epoch returned early: expected >= {}, got {}",
648                                next_epoch,
649                                new_epoch
650                            );
651                        }
652                    }
653                    return Err(err.into());
654                }
655            }
656        }
657    }
658
659    async fn handle_submit_transaction_inner(
660        &self,
661        state: &AuthorityState,
662        metrics: &ValidatorServiceMetrics,
663        request: &RawSubmitTxRequest,
664        submitter_client_addr: Option<IpAddr>,
665    ) -> SuiResult<(RawSubmitTxResponse, Weight)> {
666        let epoch_store = state.load_epoch_store_one_call_per_task();
667        let submit_type = SubmitTxType::try_from(request.submit_type).map_err(|e| {
668            SuiErrorKind::GrpcMessageDeserializeError {
669                type_info: "RawSubmitTxRequest.submit_type".to_string(),
670                error: e.to_string(),
671            }
672        })?;
673
674        let is_ping_request = submit_type == SubmitTxType::Ping;
675        if is_ping_request {
676            fp_ensure!(
677                request.transactions.is_empty(),
678                SuiErrorKind::InvalidRequest(format!(
679                    "Ping request cannot contain {} transactions",
680                    request.transactions.len()
681                ))
682                .into()
683            );
684        } else {
685            // Ensure default and soft bundle requests contain at least one transaction.
686            fp_ensure!(
687                !request.transactions.is_empty(),
688                SuiErrorKind::InvalidRequest(
689                    "At least one transaction needs to be submitted".to_string(),
690                )
691                .into()
692            );
693        }
694
695        // NOTE: for soft bundle requests, the system tries to sequence the transactions in the same order
696        // if they use the same gas price. But this is only done with best effort.
697        // Transactions in a soft bundle can be individually rejected or deferred, without affecting
698        // other transactions in the same bundle.
699        let is_soft_bundle_request = submit_type == SubmitTxType::SoftBundle;
700
701        let max_num_transactions = if is_soft_bundle_request {
702            // Soft bundle cannot contain too many transactions.
703            // Otherwise it is hard to include all of them in a single block.
704            epoch_store.protocol_config().max_soft_bundle_size()
705        } else {
706            // Still enforce a limit even when transactions do not need to be in the same block.
707            epoch_store
708                .protocol_config()
709                .max_num_transactions_in_block()
710        };
711        fp_ensure!(
712            request.transactions.len() <= max_num_transactions as usize,
713            SuiErrorKind::InvalidRequest(format!(
714                "Too many transactions in request: {} vs {}",
715                request.transactions.len(),
716                max_num_transactions
717            ))
718            .into()
719        );
720
721        // Transaction digests.
722        let mut tx_digests = Vec::with_capacity(request.transactions.len());
723        // Transactions to submit to consensus.
724        let mut consensus_transactions = Vec::with_capacity(request.transactions.len());
725        // Indexes of transactions above in the request transactions.
726        let mut transaction_indexes = Vec::with_capacity(request.transactions.len());
727        // Results corresponding to each transaction in the request.
728        let mut results: Vec<Option<SubmitTxResult>> = vec![None; request.transactions.len()];
729        // Total size of all transactions in the request.
730        let mut total_size_bytes = 0;
731        // Whether the request contains any gasless transaction.
732        let mut has_gasless = false;
733        // Set when a transaction duplicates an in-flight submission at admission. Tracked
734        // separately because it is detected after the per-tx results are finalized (those
735        // remain Submitted), so it cannot be derived from the results alone.
736        let mut duplicate_at_admission = false;
737        // First gas price seen in this soft bundle.
738        let mut expected_soft_bundle_gas_price = None;
739        // Transaction digests seen in this soft bundle, used to reject bundles that repeat a
740        // transaction. No legitimate client constructs such a bundle.
741        let mut soft_bundle_digests = HashSet::new();
742
743        let req_type = if is_ping_request {
744            "ping"
745        } else if request.transactions.len() == 1 {
746            "single_transaction"
747        } else if is_soft_bundle_request {
748            "soft_bundle"
749        } else {
750            "batch"
751        };
752
753        let _handle_tx_metrics_guard = metrics
754            .handle_submit_transaction_latency
755            .with_label_values(&[req_type])
756            .start_timer();
757
758        let submit_mode = self.classify_submit_mode(is_ping_request);
759
760        for (idx, tx_bytes) in request.transactions.iter().enumerate() {
761            let transaction = match bcs::from_bytes::<Transaction>(tx_bytes) {
762                Ok(txn) => txn,
763                Err(e) => {
764                    // Ok to fail the request when any transaction is invalid.
765                    return Err(SuiErrorKind::TransactionDeserializationError {
766                        error: format!("Failed to deserialize transaction at index {}: {}", idx, e),
767                    }
768                    .into());
769                }
770            };
771
772            // Ok to fail the request when any transaction is invalid.
773            let tx_size = transaction.validity_check(&epoch_store.tx_validity_check_context())?;
774            let tx_digest = *transaction.digest();
775
776            // Soft bundles must not repeat a transaction. Fail the whole request if they do.
777            if is_soft_bundle_request {
778                fp_ensure!(
779                    soft_bundle_digests.insert(tx_digest),
780                    SuiErrorKind::UserInputError {
781                        error: UserInputError::RepeatedTransactionInSoftBundle {
782                            digest: tx_digest
783                        }
784                    }
785                    .into()
786                );
787            }
788
789            // Soft bundles require all transactions to use the same gas price.
790            if is_soft_bundle_request {
791                let gas_price = transaction.data().transaction_data().gas_price();
792                if let Some(expected) = expected_soft_bundle_gas_price {
793                    fp_ensure!(
794                        gas_price == expected,
795                        SuiErrorKind::UserInputError {
796                            error: UserInputError::GasPriceMismatchError {
797                                digest: tx_digest,
798                                expected,
799                                actual: gas_price,
800                            }
801                        }
802                        .into()
803                    );
804                } else {
805                    expected_soft_bundle_gas_price = Some(gas_price);
806                }
807            }
808
809            let is_gasless = transaction
810                .data()
811                .transaction_data()
812                .is_gasless_transaction();
813
814            if is_gasless {
815                has_gasless = true;
816                metrics
817                    .gasless_submission_outcomes
818                    .with_label_values(&["attempted"])
819                    .inc();
820            }
821
822            let overload_check_res = state.check_system_overload(
823                transaction.data(),
824                state.check_system_overload_at_signing(),
825            );
826            if let Err(error) = overload_check_res {
827                metrics
828                    .num_rejected_tx_during_overload
829                    .with_label_values(&[error.as_ref()])
830                    .inc();
831                if is_gasless {
832                    metrics
833                        .gasless_submission_outcomes
834                        .with_label_values(&["rejected_overload"])
835                        .inc();
836                }
837                results[idx] = Some(SubmitTxResult::Rejected { error });
838                continue;
839            }
840
841            // Use the pre-queue per-tx consensus overload reject when the
842            // queue is disabled or in failover.
843            if matches!(submit_mode, AdmissionQueueSubmitMode::Disabled)
844                && let Err(error) = self.consensus_adapter.check_consensus_overload()
845            {
846                state.update_overload_metrics("consensus");
847                metrics
848                    .num_rejected_tx_during_overload
849                    .with_label_values(&[error.as_ref()])
850                    .inc();
851                if is_gasless {
852                    metrics
853                        .gasless_submission_outcomes
854                        .with_label_values(&["rejected_overload"])
855                        .inc();
856                }
857                results[idx] = Some(SubmitTxResult::Rejected { error });
858                continue;
859            }
860
861            if is_gasless
862                && !self
863                    .gasless_limiter
864                    .try_acquire(epoch_store.protocol_config())
865            {
866                metrics.gasless_rate_limited_count.inc();
867                metrics
868                    .gasless_submission_outcomes
869                    .with_label_values(&["rejected_rate_limited"])
870                    .inc();
871                results[idx] = Some(SubmitTxResult::Rejected {
872                    error: SuiErrorKind::ValidatorOverloadedRetryAfter {
873                        retry_after_secs: 1,
874                    }
875                    .into(),
876                });
877                continue;
878            }
879
880            // Ok to fail the request when any signature is invalid.
881            let verified_transaction = {
882                let _metrics_guard = metrics.tx_verification_latency.start_timer();
883                if epoch_store.protocol_config().address_aliases() {
884                    match epoch_store.verify_transaction_with_current_aliases(transaction) {
885                        Ok(tx) => tx,
886                        Err(e) => {
887                            metrics.signature_errors.inc();
888                            return Err(e);
889                        }
890                    }
891                } else {
892                    match epoch_store.verify_transaction_require_no_aliases(transaction) {
893                        Ok(tx) => tx,
894                        Err(e) => {
895                            metrics.signature_errors.inc();
896                            return Err(e);
897                        }
898                    }
899                }
900            };
901
902            debug!(
903                ?tx_digest,
904                "handle_submit_transaction: verified transaction"
905            );
906
907            // Check if the transaction has executed, before checking input objects
908            // which could have been consumed.
909            if let Some(effects) = state
910                .get_transaction_cache_reader()
911                .get_executed_effects(&tx_digest)
912            {
913                let effects_digest = effects.digest();
914                if let Ok(executed_data) = self.complete_executed_data(effects).await {
915                    let executed_result = SubmitTxResult::Executed {
916                        effects_digest,
917                        details: Some(executed_data),
918                    };
919                    results[idx] = Some(executed_result);
920                    debug!(?tx_digest, "handle_submit_transaction: already executed");
921                    continue;
922                }
923            }
924
925            if self
926                .state
927                .get_transaction_cache_reader()
928                .transaction_executed_in_last_epoch(&tx_digest, epoch_store.epoch())
929            {
930                results[idx] = Some(SubmitTxResult::Rejected {
931                    error: UserInputError::TransactionAlreadyExecuted { digest: tx_digest }.into(),
932                });
933                debug!(
934                    ?tx_digest,
935                    "handle_submit_transaction: transaction already executed in previous epoch"
936                );
937                continue;
938            }
939
940            // Suppress resubmission of transactions consensus already processed this epoch:
941            // executed transactions whose effects details could not be reconstructed above (e.g.
942            // objects pruned), and sequenced-but-deferred transactions. Rejected is imperfect for
943            // the deferred case, but acceptable since well-behaved clients get Executed before
944            // pruning; this mainly sheds continuous resubmissions from faulty clients.
945            let consensus_key = SequencedConsensusTransactionKey::External(
946                ConsensusTransactionKey::Certificate(tx_digest),
947            );
948            if epoch_store.is_consensus_message_processed(&consensus_key)? {
949                metrics
950                    .submission_suppressed_already_processed
951                    .with_label_values(&[req_type])
952                    .inc();
953                results[idx] = Some(SubmitTxResult::Rejected {
954                    error: SuiErrorKind::TransactionProcessing {
955                        digest: tx_digest,
956                        status: "sequenced by consensus".to_string(),
957                    }
958                    .into(),
959                });
960                debug!(
961                    ?tx_digest,
962                    "handle_submit_transaction: consensus message already processed"
963                );
964                continue;
965            }
966
967            // Allow a given transaction into consensus at most once per `recent_submission_window`,
968            // dropping duplicate resubmissions early.
969            if let Some(recorded_at) = self.recently_submitted.get(&tx_digest)
970                && recorded_at.elapsed() < self.recent_submission_window
971            {
972                metrics
973                    .submission_suppressed_recently_submitted
974                    .with_label_values(&[req_type])
975                    .inc();
976                metrics
977                    .recently_submitted_resubmission_interval
978                    .observe(recorded_at.elapsed().as_secs_f64());
979                results[idx] = Some(SubmitTxResult::Rejected {
980                    error: SuiErrorKind::TransactionProcessing {
981                        digest: tx_digest,
982                        status: "recently submitted".to_string(),
983                    }
984                    .into(),
985                });
986                debug!(?tx_digest, "handle_submit_transaction: recently submitted");
987                continue;
988            }
989            self.recently_submitted.insert(tx_digest, Instant::now());
990            metrics
991                .recently_submitted_cache_size
992                .set(self.recently_submitted.entry_count() as i64);
993
994            debug!(
995                ?tx_digest,
996                "handle_submit_transaction: waiting for fastpath dependency objects"
997            );
998            if !state
999                .wait_for_fastpath_dependency_objects(
1000                    verified_transaction.tx(),
1001                    epoch_store.epoch(),
1002                )
1003                .await?
1004            {
1005                debug!(
1006                    ?tx_digest,
1007                    "fastpath input objects are still unavailable after waiting"
1008                );
1009            }
1010
1011            match state.handle_vote_transaction(&epoch_store, verified_transaction.tx().clone()) {
1012                Ok(_) => { /* continue processing */ }
1013                Err(e) => {
1014                    // Check if transaction has been executed while being validated.
1015                    // This is an edge case so checking executed effects twice is acceptable.
1016                    if let Some(effects) = state
1017                        .get_transaction_cache_reader()
1018                        .get_executed_effects(&tx_digest)
1019                    {
1020                        let effects_digest = effects.digest();
1021                        if let Ok(executed_data) = self.complete_executed_data(effects).await {
1022                            let executed_result = SubmitTxResult::Executed {
1023                                effects_digest,
1024                                details: Some(executed_data),
1025                            };
1026                            results[idx] = Some(executed_result);
1027                            continue;
1028                        }
1029                    }
1030
1031                    // When the transaction has not been executed, record the error for the transaction.
1032                    debug!(?tx_digest, "Transaction rejected during submission: {e}");
1033                    metrics
1034                        .submission_rejected_transactions
1035                        .with_label_values(&[e.to_variant_name()])
1036                        .inc();
1037                    results[idx] = Some(SubmitTxResult::Rejected { error: e });
1038                    continue;
1039                }
1040            }
1041
1042            // Create claims with aliases and / or immutable objects.
1043            let mut claims = vec![];
1044
1045            let immutable_object_ids = self
1046                .collect_immutable_object_ids(verified_transaction.tx(), state)
1047                .await?;
1048            if !immutable_object_ids.is_empty() {
1049                claims.push(TransactionClaim::ImmutableInputObjects(
1050                    immutable_object_ids,
1051                ));
1052            }
1053
1054            let (tx, aliases) = verified_transaction.into_inner();
1055            if epoch_store.protocol_config().address_aliases() {
1056                if epoch_store
1057                    .protocol_config()
1058                    .fix_checkpoint_signature_mapping()
1059                {
1060                    claims.push(TransactionClaim::AddressAliasesV2(aliases));
1061                } else {
1062                    let v1_aliases: Vec<_> = tx
1063                        .data()
1064                        .intent_message()
1065                        .value
1066                        .required_signers()
1067                        .into_iter()
1068                        .zip_eq(aliases.into_iter().map(|(_, seq)| seq))
1069                        .collect();
1070                    #[allow(deprecated)]
1071                    claims.push(TransactionClaim::AddressAliases(
1072                        nonempty::NonEmpty::from_vec(v1_aliases)
1073                            .expect("must have at least one required_signer"),
1074                    ));
1075                }
1076            }
1077
1078            let tx_with_claims = TransactionWithClaims::new(tx.into(), claims);
1079
1080            consensus_transactions.push(ConsensusTransaction::new_user_transaction_v2_message(
1081                &state.name,
1082                tx_with_claims,
1083            ));
1084            if is_gasless {
1085                metrics
1086                    .gasless_submission_outcomes
1087                    .with_label_values(&["submitted"])
1088                    .inc();
1089            }
1090
1091            transaction_indexes.push(idx);
1092            tx_digests.push(tx_digest);
1093            total_size_bytes += tx_size;
1094        }
1095
1096        if consensus_transactions.is_empty() && !is_ping_request {
1097            let spam_weight = Self::request_spam_weight(
1098                &results,
1099                has_gasless,
1100                duplicate_at_admission,
1101                is_ping_request,
1102            );
1103            let response = Self::try_from_submit_tx_response(results)?;
1104            return Ok((response, spam_weight));
1105        }
1106
1107        // Set the max bytes size of the soft bundle to be half of the consensus max transactions in block size.
1108        // We do this to account for serialization overheads and to ensure that the soft bundle is not too large
1109        // when is attempted to be posted via consensus.
1110        let max_transaction_bytes = if is_soft_bundle_request {
1111            epoch_store
1112                .protocol_config()
1113                .consensus_max_transactions_in_block_bytes()
1114                / 2
1115        } else {
1116            epoch_store
1117                .protocol_config()
1118                .consensus_max_transactions_in_block_bytes()
1119        };
1120        fp_ensure!(
1121            total_size_bytes <= max_transaction_bytes as usize,
1122            SuiErrorKind::UserInputError {
1123                error: UserInputError::TotalTransactionSizeTooLargeInBatch {
1124                    size: total_size_bytes,
1125                    limit: max_transaction_bytes,
1126                },
1127            }
1128            .into()
1129        );
1130
1131        metrics
1132            .handle_submit_transaction_bytes
1133            .with_label_values(&[req_type])
1134            .observe(total_size_bytes as f64);
1135        metrics
1136            .handle_submit_transaction_batch_size
1137            .with_label_values(&[req_type])
1138            .observe(consensus_transactions.len() as f64);
1139
1140        let _latency_metric_guard = metrics
1141            .handle_submit_transaction_consensus_latency
1142            .with_label_values(&[req_type])
1143            .start_timer();
1144
1145        if is_soft_bundle_request {
1146            // We only allow the `consensus_transactions` to be empty for ping requests. This is how it should and is be treated from the downstream components.
1147            // For any other case, having an empty `consensus_transactions` vector is an invalid state and we should have never reached at this point.
1148            assert!(
1149                !consensus_transactions.is_empty(),
1150                "A valid soft bundle must have at least one transaction"
1151            );
1152        }
1153
1154        // Soft bundles are inserted as a single queue entry.
1155        // Individual transactions are each inserted separately.
1156        let tx_groups: Vec<Vec<ConsensusTransaction>> = if is_soft_bundle_request || is_ping_request
1157        {
1158            vec![consensus_transactions]
1159        } else {
1160            consensus_transactions
1161                .into_iter()
1162                .map(|t| vec![t])
1163                .collect()
1164        };
1165
1166        // Map each submission group back to the (result index, digest) of the transactions it
1167        // contains, so a per-group outcome — consensus positions, or an "already processing"
1168        // error — can be recorded against each individual transaction. Soft bundles submit as a
1169        // single group; individual transactions submit one group each.
1170        let group_tx_meta = if is_soft_bundle_request {
1171            vec![
1172                transaction_indexes
1173                    .into_iter()
1174                    .zip_eq(tx_digests)
1175                    .collect::<Vec<_>>(),
1176            ]
1177        } else {
1178            transaction_indexes
1179                .into_iter()
1180                .zip_eq(tx_digests)
1181                .map(|pair| vec![pair])
1182                .collect::<Vec<_>>()
1183        };
1184
1185        // Collect one result per submission group WITHOUT short-circuiting. An
1186        // already-processing transaction is reported per-tx as a retriable below;
1187        // any other error fails the whole request, after all groups have settled.
1188        // Soft bundles submit as a single group; individual transactions submit one group each.
1189        let group_results = match submit_mode {
1190            AdmissionQueueSubmitMode::Bypass | AdmissionQueueSubmitMode::Disabled => {
1191                if matches!(submit_mode, AdmissionQueueSubmitMode::Bypass) {
1192                    self.metrics.admission_queue_direct_bypasses.inc();
1193                }
1194                let futures = tx_groups.into_iter().map(|txns| {
1195                    debug!(
1196                        "handle_submit_transaction: submitting consensus transactions ({}): {}",
1197                        req_type,
1198                        txns.iter().map(|t| t.local_display()).join(", ")
1199                    );
1200                    self.consensus_adapter.submit_and_get_positions(
1201                        txns,
1202                        &epoch_store,
1203                        submitter_client_addr,
1204                    )
1205                });
1206                future::join_all(futures).await
1207            }
1208            AdmissionQueueSubmitMode::Queue => {
1209                let aq = self
1210                    .admission_queue
1211                    .as_ref()
1212                    .expect("Queue mode implies admission_queue is Some")
1213                    .load();
1214                let mut receivers = Vec::with_capacity(tx_groups.len());
1215                for txns in tx_groups {
1216                    let gas_price = Self::extract_gas_price(&txns);
1217                    let (rx, newly_inserted) = aq
1218                        .try_insert(gas_price, txns, submitter_client_addr)
1219                        .await?;
1220                    if !newly_inserted {
1221                        // Duplicate of an in-flight submission; flag the request as spam. The
1222                        // per-tx result is still Submitted, so this is tracked separately.
1223                        duplicate_at_admission = true;
1224                    }
1225                    receivers.push(rx);
1226                }
1227                future::join_all(receivers.into_iter().map(|rx| async move {
1228                    match rx.await {
1229                        Ok(result) => result.map_err(SuiError::from),
1230                        Err(_) => Err(SuiError::from(
1231                            SuiErrorKind::TooManyTransactionsPendingConsensus,
1232                        )),
1233                    }
1234                }))
1235                .await
1236            }
1237        };
1238
1239        if is_ping_request {
1240            // For ping requests there is a single group returning the special consensus position.
1241            let consensus_positions = group_results
1242                .into_iter()
1243                .next()
1244                .expect("Ping request must have exactly one submission group")?;
1245            assert_eq!(consensus_positions.len(), 1);
1246            results.push(Some(SubmitTxResult::Submitted {
1247                consensus_position: consensus_positions[0],
1248            }));
1249        } else {
1250            for (group_result, txns_meta) in group_results.into_iter().zip_debug_eq(group_tx_meta) {
1251                match group_result {
1252                    Ok(consensus_positions) => {
1253                        for ((idx, tx_digest), consensus_position) in
1254                            txns_meta.into_iter().zip_debug_eq(consensus_positions)
1255                        {
1256                            debug!(
1257                                ?tx_digest,
1258                                "handle_submit_transaction: submitted consensus transaction at {}",
1259                                consensus_position,
1260                            );
1261                            results[idx] = Some(SubmitTxResult::Submitted { consensus_position });
1262                        }
1263                    }
1264                    // The transaction(s) in this group are already being processed by consensus.
1265                    // Report per-tx as a retriable rejection rather than failing the whole request.
1266                    Err(err) => {
1267                        let SuiErrorKind::TransactionProcessing { status, .. } =
1268                            err.as_inner().clone()
1269                        else {
1270                            return Err(err);
1271                        };
1272                        for (idx, tx_digest) in txns_meta {
1273                            debug!(
1274                                ?tx_digest,
1275                                "handle_submit_transaction: transaction already processing: {err}"
1276                            );
1277                            // Same suppression the upfront `is_consensus_message_processed` check
1278                            // records, just detected during submission instead of before it. The
1279                            // two paths are mutually exclusive, so this does not double-count.
1280                            metrics
1281                                .submission_suppressed_already_processed
1282                                .with_label_values(&[req_type])
1283                                .inc();
1284                            results[idx] = Some(SubmitTxResult::Rejected {
1285                                error: SuiErrorKind::TransactionProcessing {
1286                                    digest: tx_digest,
1287                                    status: status.clone(),
1288                                }
1289                                .into(),
1290                            });
1291                        }
1292                    }
1293                }
1294            }
1295        }
1296
1297        let spam_weight = Self::request_spam_weight(
1298            &results,
1299            has_gasless,
1300            duplicate_at_admission,
1301            is_ping_request,
1302        );
1303        let response = Self::try_from_submit_tx_response(results)?;
1304        Ok((response, spam_weight))
1305    }
1306
1307    /// Traffic-control spam weight for a whole submit request. The request is spam unless it is
1308    /// entirely accepted gas-chargable work.
1309    fn request_spam_weight(
1310        results: &[Option<SubmitTxResult>],
1311        has_gasless: bool,
1312        duplicate_at_admission: bool,
1313        is_ping: bool,
1314    ) -> Weight {
1315        if is_ping || has_gasless || duplicate_at_admission {
1316            return Weight::one();
1317        }
1318        for result in results {
1319            let Some(result) = result else {
1320                // `results` is expected to be fully populated (every entry `Some`) for the
1321                // request's transactions; a missing entry is a bug and is conservatively
1322                // treated as spam.
1323                debug_fatal!("transaction outcome unset when computing spam weight");
1324                return Weight::one();
1325            };
1326            if Self::submission_spam_weight(result) == Weight::one() {
1327                return Weight::one();
1328            }
1329        }
1330        Weight::zero()
1331    }
1332
1333    fn submission_spam_weight(result: &SubmitTxResult) -> Weight {
1334        match result {
1335            SubmitTxResult::Submitted { .. } => Weight::zero(),
1336            // Non-submitted results can't be charged.
1337            SubmitTxResult::Executed { .. } | SubmitTxResult::Rejected { .. } => Weight::one(),
1338        }
1339    }
1340
1341    fn try_from_submit_tx_response(
1342        results: Vec<Option<SubmitTxResult>>,
1343    ) -> Result<RawSubmitTxResponse, SuiError> {
1344        let mut raw_results = Vec::new();
1345        for (i, result) in results.into_iter().enumerate() {
1346            let result = result.ok_or_else(|| SuiErrorKind::GenericAuthorityError {
1347                error: format!("Missing transaction result at {}", i),
1348            })?;
1349            let raw_result = result.try_into()?;
1350            raw_results.push(raw_result);
1351        }
1352        Ok(RawSubmitTxResponse {
1353            results: raw_results,
1354        })
1355    }
1356
1357    /// Extract the gas price from a batch of consensus transactions.
1358    /// Returns the minimum gas price in the batch, or 0 if no user transactions.
1359    fn extract_gas_price(transactions: &[ConsensusTransaction]) -> u64 {
1360        use sui_types::messages_consensus::ConsensusTransactionKind;
1361        transactions
1362            .iter()
1363            .filter_map(|tx| match &tx.kind {
1364                ConsensusTransactionKind::CertifiedTransaction(cert) => Some(cert.gas_price()),
1365                ConsensusTransactionKind::UserTransaction(t) => {
1366                    Some(t.data().transaction_data().gas_price())
1367                }
1368                ConsensusTransactionKind::UserTransactionV2(t) => {
1369                    Some(t.tx().data().transaction_data().gas_price())
1370                }
1371                _ => None,
1372            })
1373            .min()
1374            .unwrap_or(0)
1375    }
1376
1377    fn classify_submit_mode(&self, is_ping_request: bool) -> AdmissionQueueSubmitMode {
1378        let Some(aq) = &self.admission_queue else {
1379            return AdmissionQueueSubmitMode::Disabled;
1380        };
1381
1382        if is_ping_request {
1383            return AdmissionQueueSubmitMode::Bypass;
1384        }
1385
1386        let inflight = usize::try_from(self.consensus_adapter.num_inflight_transactions()).unwrap();
1387        if inflight < aq.bypass_threshold() {
1388            return AdmissionQueueSubmitMode::Bypass;
1389        }
1390
1391        // Failover is consulted only on the overloaded path so the hot path
1392        // avoids the ArcSwap load.
1393        if aq.load().failover_tripped() {
1394            return AdmissionQueueSubmitMode::Disabled;
1395        }
1396
1397        AdmissionQueueSubmitMode::Queue
1398    }
1399
1400    async fn collect_effects_data(
1401        &self,
1402        effects: &TransactionEffects,
1403        include_events: bool,
1404        include_input_objects: bool,
1405        include_output_objects: bool,
1406    ) -> SuiResult<(Option<TransactionEvents>, Vec<Object>, Vec<Object>)> {
1407        let events = if include_events && effects.events_digest().is_some() {
1408            Some(
1409                self.state
1410                    .get_transaction_events(effects.transaction_digest())?,
1411            )
1412        } else {
1413            None
1414        };
1415
1416        let input_objects = if include_input_objects {
1417            self.state.get_transaction_input_objects(effects)?
1418        } else {
1419            vec![]
1420        };
1421
1422        let output_objects = if include_output_objects {
1423            self.state.get_transaction_output_objects(effects)?
1424        } else {
1425            vec![]
1426        };
1427
1428        Ok((events, input_objects, output_objects))
1429    }
1430}
1431
1432type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
1433
1434impl ValidatorService {
1435    async fn handle_submit_transaction_impl(
1436        &self,
1437        request: tonic::Request<RawSubmitTxRequest>,
1438    ) -> WrappedServiceResponse<RawSubmitTxResponse> {
1439        self.handle_submit_transaction(request).await
1440    }
1441
1442    async fn wait_for_effects_impl(
1443        &self,
1444        request: tonic::Request<RawWaitForEffectsRequest>,
1445    ) -> WrappedServiceResponse<RawWaitForEffectsResponse> {
1446        let request: WaitForEffectsRequest = request.into_inner().try_into()?;
1447        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1448        let response = timeout(
1449            // TODO(fastpath): Tune this once we have a good estimate of the typical delay.
1450            Duration::from_secs(20),
1451            epoch_store
1452                .within_alive_epoch(self.wait_for_effects_response(request, &epoch_store))
1453                .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch())),
1454        )
1455        .await
1456        .map_err(|_| tonic::Status::internal("Timeout waiting for effects"))???
1457        .try_into()?;
1458        Ok((tonic::Response::new(response), Weight::zero()))
1459    }
1460
1461    #[instrument(name= "ValidatorService::wait_for_effects_response", level = "debug", skip_all, fields(consensus_position = ?request.consensus_position))]
1462    async fn wait_for_effects_response(
1463        &self,
1464        request: WaitForEffectsRequest,
1465        epoch_store: &Arc<AuthorityPerEpochStore>,
1466    ) -> SuiResult<WaitForEffectsResponse> {
1467        if request.ping_type.is_some() {
1468            return timeout(
1469                Duration::from_secs(10),
1470                self.ping_response(request, epoch_store),
1471            )
1472            .await
1473            .map_err(|_| SuiErrorKind::TimeoutError)?;
1474        }
1475
1476        let Some(tx_digest) = request.transaction_digest else {
1477            return Err(SuiErrorKind::InvalidRequest(
1478                "Transaction digest is required for wait for effects requests".to_string(),
1479            )
1480            .into());
1481        };
1482        let tx_digests = [tx_digest];
1483
1484        // When consensus_position is provided, also watch the consensus status cache
1485        // so rejected/dropped transactions get a timely response instead of waiting
1486        // forever for effects that will never be produced.
1487        let consensus_status_future = async {
1488            let consensus_position = match request.consensus_position {
1489                Some(pos) => pos,
1490                None => return futures::future::pending().await,
1491            };
1492            let consensus_tx_status_cache = &epoch_store.consensus_tx_status_cache;
1493            consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1494            match consensus_tx_status_cache
1495                .notify_read_transaction_status(consensus_position)
1496                .await
1497            {
1498                NotifyReadConsensusTxStatusResult::Status(
1499                    ConsensusTxStatus::Rejected | ConsensusTxStatus::Dropped,
1500                ) => Ok(WaitForEffectsResponse::Rejected {
1501                    error: epoch_store.get_rejection_vote_reason(consensus_position),
1502                }),
1503                NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized) => {
1504                    // Effects will be produced — yield to let the effects future win.
1505                    futures::future::pending().await
1506                }
1507                NotifyReadConsensusTxStatusResult::Expired(round) => {
1508                    Ok(WaitForEffectsResponse::Expired {
1509                        epoch: epoch_store.epoch(),
1510                        round: Some(round),
1511                    })
1512                }
1513            }
1514        };
1515
1516        tokio::select! {
1517            effects_result = self.state
1518                .get_transaction_cache_reader()
1519                .notify_read_executed_effects_may_fail(
1520                    "AuthorityServer::wait_for_effects::notify_read_executed_effects_finalized",
1521                    &tx_digests,
1522                ) => {
1523                let effects = effects_result?.pop().unwrap();
1524                let effects_digest = effects.digest();
1525                let details = if request.include_details {
1526                    Some(self.complete_executed_data(effects).await?)
1527                } else {
1528                    None
1529                };
1530                Ok(WaitForEffectsResponse::Executed {
1531                    effects_digest,
1532                    details,
1533                })
1534            }
1535            status_response = consensus_status_future => {
1536                status_response
1537            }
1538        }
1539    }
1540
1541    #[instrument(level = "error", skip_all, err(level = "debug"))]
1542    async fn ping_response(
1543        &self,
1544        request: WaitForEffectsRequest,
1545        epoch_store: &Arc<AuthorityPerEpochStore>,
1546    ) -> SuiResult<WaitForEffectsResponse> {
1547        let consensus_tx_status_cache = &epoch_store.consensus_tx_status_cache;
1548
1549        let Some(consensus_position) = request.consensus_position else {
1550            return Err(SuiErrorKind::InvalidRequest(
1551                "Consensus position is required for Ping requests".to_string(),
1552            )
1553            .into());
1554        };
1555
1556        // We assume that the caller has already checked for the existence of the `ping` field, but handling it gracefully here.
1557        let Some(ping) = request.ping_type else {
1558            return Err(SuiErrorKind::InvalidRequest(
1559                "Ping type is required for ping requests".to_string(),
1560            )
1561            .into());
1562        };
1563
1564        let _metrics_guard = self
1565            .metrics
1566            .handle_wait_for_effects_ping_latency
1567            .with_label_values(&[ping.as_str()])
1568            .start_timer();
1569
1570        consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1571
1572        let details = if request.include_details {
1573            Some(Box::new(ExecutedData::default()))
1574        } else {
1575            None
1576        };
1577
1578        let status = consensus_tx_status_cache
1579            .notify_read_transaction_status(consensus_position)
1580            .await;
1581        match status {
1582            NotifyReadConsensusTxStatusResult::Status(status) => match status {
1583                ConsensusTxStatus::Rejected | ConsensusTxStatus::Dropped => {
1584                    Ok(WaitForEffectsResponse::Rejected {
1585                        error: epoch_store.get_rejection_vote_reason(consensus_position),
1586                    })
1587                }
1588                ConsensusTxStatus::Finalized => Ok(WaitForEffectsResponse::Executed {
1589                    effects_digest: TransactionEffectsDigest::ZERO,
1590                    details,
1591                }),
1592            },
1593            NotifyReadConsensusTxStatusResult::Expired(round) => {
1594                Ok(WaitForEffectsResponse::Expired {
1595                    epoch: epoch_store.epoch(),
1596                    round: Some(round),
1597                })
1598            }
1599        }
1600    }
1601
1602    async fn complete_executed_data(
1603        &self,
1604        effects: TransactionEffects,
1605    ) -> SuiResult<Box<ExecutedData>> {
1606        let (events, input_objects, output_objects) = self
1607            .collect_effects_data(
1608                &effects, /* include_events */ true, /* include_input_objects */ true,
1609                /* include_output_objects */ true,
1610            )
1611            .await?;
1612        Ok(Box::new(ExecutedData {
1613            effects,
1614            events,
1615            input_objects,
1616            output_objects,
1617        }))
1618    }
1619
1620    async fn object_info_impl(
1621        &self,
1622        request: tonic::Request<ObjectInfoRequest>,
1623    ) -> WrappedServiceResponse<ObjectInfoResponse> {
1624        let request = request.into_inner();
1625        let response = self.state.handle_object_info_request(request).await?;
1626        Ok((tonic::Response::new(response), Weight::one()))
1627    }
1628
1629    async fn transaction_info_impl(
1630        &self,
1631        request: tonic::Request<TransactionInfoRequest>,
1632    ) -> WrappedServiceResponse<TransactionInfoResponse> {
1633        let request = request.into_inner();
1634        let response = self.state.handle_transaction_info_request(request).await?;
1635        Ok((tonic::Response::new(response), Weight::one()))
1636    }
1637
1638    async fn checkpoint_impl(
1639        &self,
1640        request: tonic::Request<CheckpointRequest>,
1641    ) -> WrappedServiceResponse<CheckpointResponse> {
1642        let request = request.into_inner();
1643        let response = self.state.handle_checkpoint_request(&request)?;
1644        Ok((tonic::Response::new(response), Weight::one()))
1645    }
1646
1647    async fn checkpoint_v2_impl(
1648        &self,
1649        request: tonic::Request<CheckpointRequestV2>,
1650    ) -> WrappedServiceResponse<CheckpointResponseV2> {
1651        let request = request.into_inner();
1652        let response = self.state.handle_checkpoint_request_v2(&request)?;
1653        Ok((tonic::Response::new(response), Weight::one()))
1654    }
1655
1656    async fn get_system_state_object_impl(
1657        &self,
1658        _request: tonic::Request<SystemStateRequest>,
1659    ) -> WrappedServiceResponse<SuiSystemState> {
1660        let response = self
1661            .state
1662            .get_object_cache_reader()
1663            .get_sui_system_state_object_unsafe()?;
1664        Ok((tonic::Response::new(response), Weight::one()))
1665    }
1666
1667    async fn validator_health_impl(
1668        &self,
1669        _request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1670    ) -> WrappedServiceResponse<sui_types::messages_grpc::RawValidatorHealthResponse> {
1671        let state = &self.state;
1672
1673        // Get epoch store once for both metrics
1674        let epoch_store = state.load_epoch_store_one_call_per_task();
1675
1676        // Get in-flight execution transactions from execution scheduler
1677        let num_inflight_execution_transactions =
1678            state.execution_scheduler().num_pending_certificates() as u64;
1679
1680        // Get in-flight consensus transactions from consensus adapter
1681        let num_inflight_consensus_transactions =
1682            self.consensus_adapter.num_inflight_transactions();
1683
1684        // Get last committed leader round from epoch store
1685        let last_committed_leader_round = epoch_store
1686            .consensus_tx_status_cache
1687            .get_last_committed_leader_round()
1688            .unwrap_or(0);
1689
1690        // Get last locally built checkpoint sequence
1691        let last_locally_built_checkpoint = epoch_store
1692            .last_built_checkpoint_summary()
1693            .ok()
1694            .flatten()
1695            .map(|(_, summary)| summary.sequence_number)
1696            .unwrap_or(0);
1697
1698        let typed_response = sui_types::messages_grpc::ValidatorHealthResponse {
1699            num_inflight_consensus_transactions,
1700            num_inflight_execution_transactions,
1701            last_locally_built_checkpoint,
1702            last_committed_leader_round,
1703        };
1704
1705        let raw_response = typed_response
1706            .try_into()
1707            .map_err(|e: sui_types::error::SuiError| {
1708                tonic::Status::internal(format!("Failed to serialize health response: {}", e))
1709            })?;
1710
1711        Ok((tonic::Response::new(raw_response), Weight::one()))
1712    }
1713
1714    fn get_client_ip_addr<T>(
1715        &self,
1716        request: &tonic::Request<T>,
1717        source: &ClientIdSource,
1718    ) -> Option<IpAddr> {
1719        let forwarded_header = request.metadata().get_all("x-forwarded-for").iter().next();
1720
1721        if let Some(header) = forwarded_header {
1722            let num_hops = header
1723                .to_str()
1724                .map(|h| h.split(',').count().saturating_sub(1))
1725                .unwrap_or(0);
1726
1727            self.metrics.x_forwarded_for_num_hops.set(num_hops as f64);
1728        }
1729
1730        match source {
1731            ClientIdSource::SocketAddr => {
1732                let socket_addr: Option<SocketAddr> = request.remote_addr();
1733
1734                // We will hit this case if the IO type used does not
1735                // implement Connected or when using a unix domain socket.
1736                // TODO: once we have confirmed that no legitimate traffic
1737                // is hitting this case, we should reject such requests that
1738                // hit this case.
1739                if let Some(socket_addr) = socket_addr {
1740                    Some(socket_addr.ip())
1741                } else {
1742                    if cfg!(msim) {
1743                        // Ignore the error from simtests.
1744                    } else if cfg!(test) {
1745                        panic!("Failed to get remote address from request");
1746                    } else {
1747                        self.metrics.connection_ip_not_found.inc();
1748                        error!("Failed to get remote address from request");
1749                    }
1750                    None
1751                }
1752            }
1753            ClientIdSource::XForwardedFor(num_hops) => {
1754                let do_header_parse = |op: &MetadataValue<Ascii>| {
1755                    match op.to_str() {
1756                        Ok(header_val) => {
1757                            let header_contents =
1758                                header_val.split(',').map(str::trim).collect::<Vec<_>>();
1759                            if *num_hops == 0 {
1760                                error!(
1761                                    "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
1762                                    number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
1763                                    to this node. Skipping traffic controller request handling.",
1764                                    header_contents,
1765                                );
1766                                return None;
1767                            }
1768                            let contents_len = header_contents.len();
1769                            if contents_len < *num_hops {
1770                                error!(
1771                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1772                                    Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
1773                                    `client-id-source` in the node config.",
1774                                    header_contents, contents_len, num_hops, contents_len,
1775                                );
1776                                self.metrics.client_id_source_config_mismatch.inc();
1777                                return None;
1778                            }
1779                            let Some(client_ip) = header_contents.get(contents_len - num_hops)
1780                            else {
1781                                error!(
1782                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1783                                    Expected at least {} values. Skipping traffic controller request handling.",
1784                                    header_contents, contents_len, num_hops, contents_len,
1785                                );
1786                                return None;
1787                            };
1788                            parse_ip(client_ip).or_else(|| {
1789                                self.metrics.forwarded_header_parse_error.inc();
1790                                None
1791                            })
1792                        }
1793                        Err(e) => {
1794                            // TODO: once we have confirmed that no legitimate traffic
1795                            // is hitting this case, we should reject such requests that
1796                            // hit this case.
1797                            self.metrics.forwarded_header_invalid.inc();
1798                            error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
1799                            None
1800                        }
1801                    }
1802                };
1803                if let Some(op) = request.metadata().get("x-forwarded-for") {
1804                    do_header_parse(op)
1805                } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
1806                    do_header_parse(op)
1807                } else {
1808                    self.metrics.forwarded_header_not_included.inc();
1809                    error!(
1810                        "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
1811                    );
1812                    None
1813                }
1814            }
1815        }
1816    }
1817
1818    async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
1819        if let Some(traffic_controller) = &self.traffic_controller {
1820            if !traffic_controller.check(&client, &None).await {
1821                // Entity in blocklist
1822                Err(tonic::Status::from_error(
1823                    SuiErrorKind::TooManyRequests.into(),
1824                ))
1825            } else {
1826                Ok(())
1827            }
1828        } else {
1829            Ok(())
1830        }
1831    }
1832
1833    fn handle_traffic_resp<T>(
1834        &self,
1835        client: Option<IpAddr>,
1836        wrapped_response: WrappedServiceResponse<T>,
1837        method_name: &str,
1838    ) -> Result<tonic::Response<T>, tonic::Status> {
1839        let (error, spam_weight, unwrapped_response) = match wrapped_response {
1840            Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
1841            Err(status) => (
1842                Some(SuiError::from(status.clone())),
1843                Weight::zero(),
1844                Err(status.clone()),
1845            ),
1846        };
1847
1848        if let Some(traffic_controller) = self.traffic_controller.clone() {
1849            traffic_controller.tally(TrafficTally {
1850                direct: client,
1851                through_fullnode: None,
1852                error_info: error.map(|e| {
1853                    let error_type = String::from(e.clone().as_ref());
1854                    let error_weight = normalize(e);
1855                    (error_weight, error_type)
1856                }),
1857                spam_weight,
1858                timestamp: SystemTime::now(),
1859                method: Some(method_name.to_string()),
1860            })
1861        }
1862        unwrapped_response
1863    }
1864}
1865
1866// TODO: refine error matching here
1867fn normalize(err: SuiError) -> Weight {
1868    match err.as_inner() {
1869        SuiErrorKind::UserInputError {
1870            error: UserInputError::IncorrectUserSignature { .. },
1871        } => Weight::one(),
1872        SuiErrorKind::InvalidSignature { .. }
1873        | SuiErrorKind::SignerSignatureAbsent { .. }
1874        | SuiErrorKind::SignerSignatureNumberMismatch { .. }
1875        | SuiErrorKind::IncorrectSigner { .. }
1876        | SuiErrorKind::UnknownSigner { .. }
1877        | SuiErrorKind::WrongEpoch { .. } => Weight::one(),
1878        _ => Weight::zero(),
1879    }
1880}
1881
1882/// Implements generic pre- and post-processing. Since this is on the critical
1883/// path, any heavy lifting should be done in a separate non-blocking task
1884/// unless it is necessary to override the return value.
1885#[macro_export]
1886macro_rules! handle_with_decoration {
1887    ($self:ident, $func_name:ident, $request:ident, $method_name:expr) => {{
1888        if $self.client_id_source.is_none() {
1889            return $self.$func_name($request).await.map(|(result, _)| result);
1890        }
1891
1892        let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
1893
1894        // check if either IP is blocked, in which case return early
1895        $self.handle_traffic_req(client.clone()).await?;
1896
1897        // handle traffic tallying
1898        let wrapped_response = $self.$func_name($request).await;
1899        $self.handle_traffic_resp(client, wrapped_response, $method_name)
1900    }};
1901}
1902
1903#[async_trait]
1904impl Validator for ValidatorService {
1905    async fn submit_transaction(
1906        &self,
1907        request: tonic::Request<RawSubmitTxRequest>,
1908    ) -> Result<tonic::Response<RawSubmitTxResponse>, tonic::Status> {
1909        let validator_service = self.clone();
1910
1911        // Spawns a task which handles the transaction. The task will unconditionally continue
1912        // processing in the event that the client connection is dropped.
1913        spawn_monitored_task!(async move {
1914            // NB: traffic tally wrapping handled within the task rather than on task exit
1915            // to prevent an attacker from subverting traffic control by severing the connection
1916            handle_with_decoration!(
1917                validator_service,
1918                handle_submit_transaction_impl,
1919                request,
1920                "submit_transaction"
1921            )
1922        })
1923        .await
1924        .unwrap()
1925    }
1926
1927    async fn wait_for_effects(
1928        &self,
1929        request: tonic::Request<RawWaitForEffectsRequest>,
1930    ) -> Result<tonic::Response<RawWaitForEffectsResponse>, tonic::Status> {
1931        handle_with_decoration!(self, wait_for_effects_impl, request, "wait_for_effects")
1932    }
1933
1934    async fn object_info(
1935        &self,
1936        request: tonic::Request<ObjectInfoRequest>,
1937    ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
1938        handle_with_decoration!(self, object_info_impl, request, "object_info")
1939    }
1940
1941    async fn transaction_info(
1942        &self,
1943        request: tonic::Request<TransactionInfoRequest>,
1944    ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
1945        handle_with_decoration!(self, transaction_info_impl, request, "transaction_info")
1946    }
1947
1948    async fn checkpoint(
1949        &self,
1950        request: tonic::Request<CheckpointRequest>,
1951    ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
1952        handle_with_decoration!(self, checkpoint_impl, request, "checkpoint")
1953    }
1954
1955    async fn checkpoint_v2(
1956        &self,
1957        request: tonic::Request<CheckpointRequestV2>,
1958    ) -> Result<tonic::Response<CheckpointResponseV2>, tonic::Status> {
1959        handle_with_decoration!(self, checkpoint_v2_impl, request, "checkpoint_v2")
1960    }
1961
1962    async fn get_system_state_object(
1963        &self,
1964        request: tonic::Request<SystemStateRequest>,
1965    ) -> Result<tonic::Response<SuiSystemState>, tonic::Status> {
1966        handle_with_decoration!(
1967            self,
1968            get_system_state_object_impl,
1969            request,
1970            "get_system_state_object"
1971        )
1972    }
1973
1974    async fn validator_health(
1975        &self,
1976        request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1977    ) -> Result<tonic::Response<sui_types::messages_grpc::RawValidatorHealthResponse>, tonic::Status>
1978    {
1979        handle_with_decoration!(self, validator_health_impl, request, "validator_health")
1980    }
1981}