sui_core/
authority_server.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use anyhow::Result;
6use async_trait::async_trait;
7use fastcrypto::traits::KeyPair;
8use futures::{TryFutureExt, future};
9use itertools::Itertools as _;
10use mysten_common::ZipDebugEqIteratorExt;
11use mysten_common::{assert_reachable, debug_fatal};
12use mysten_metrics::spawn_monitored_task;
13use prometheus::{
14    Gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, Registry,
15    register_gauge_with_registry, register_histogram_vec_with_registry,
16    register_histogram_with_registry, register_int_counter_vec_with_registry,
17    register_int_counter_with_registry,
18};
19use std::{
20    io,
21    net::{IpAddr, SocketAddr},
22    sync::Arc,
23    time::{Duration, SystemTime},
24};
25use sui_network::{
26    api::{Validator, ValidatorServer},
27    tonic,
28    validator::server::SUI_TLS_SERVER_NAME,
29};
30use sui_types::effects::TransactionEffectsAPI;
31use sui_types::message_envelope::Message;
32use sui_types::messages_consensus::{ConsensusPosition, ConsensusTransaction};
33use sui_types::messages_grpc::{
34    ObjectInfoRequest, ObjectInfoResponse, RawSubmitTxResponse, SystemStateRequest,
35    TransactionInfoRequest, TransactionInfoResponse,
36};
37use sui_types::multiaddr::Multiaddr;
38use sui_types::object::Object;
39use sui_types::sui_system_state::SuiSystemState;
40use sui_types::traffic_control::{ClientIdSource, Weight};
41use sui_types::{
42    base_types::ObjectID,
43    digests::TransactionEffectsDigest,
44    error::{SuiErrorKind, UserInputError},
45};
46use sui_types::{
47    effects::TransactionEffects,
48    messages_grpc::{
49        ExecutedData, RawSubmitTxRequest, RawWaitForEffectsRequest, RawWaitForEffectsResponse,
50        SubmitTxResult, WaitForEffectsRequest, WaitForEffectsResponse,
51    },
52};
53use sui_types::{effects::TransactionEvents, messages_grpc::SubmitTxType};
54use sui_types::{error::*, transaction::*};
55use sui_types::{
56    fp_ensure,
57    messages_checkpoint::{
58        CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
59    },
60};
61use tokio::time::timeout;
62use tonic::metadata::{Ascii, MetadataValue};
63use tracing::{debug, error, info, instrument};
64
65use crate::admission_queue::{AdmissionQueueContext, AdmissionQueueManager};
66use crate::gasless_rate_limiter::GaslessRateLimiter;
67use crate::{
68    authority::{AuthorityState, consensus_tx_status_cache::ConsensusTxStatus},
69    consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics, ConsensusOverloadChecker},
70    traffic_controller::{TrafficController, parse_ip, policies::TrafficTally},
71};
72use crate::{
73    authority::{
74        authority_per_epoch_store::AuthorityPerEpochStore,
75        consensus_tx_status_cache::NotifyReadConsensusTxStatusResult,
76    },
77    checkpoints::CheckpointStore,
78    mysticeti_adapter::LazyMysticetiClient,
79};
80use sui_config::local_ip_utils::new_local_tcp_address_for_testing;
81
82#[cfg(test)]
83#[path = "unit_tests/server_tests.rs"]
84mod server_tests;
85
86#[cfg(test)]
87#[path = "unit_tests/wait_for_effects_tests.rs"]
88mod wait_for_effects_tests;
89
90#[cfg(test)]
91#[path = "unit_tests/submit_transaction_tests.rs"]
92mod submit_transaction_tests;
93
94pub struct AuthorityServerHandle {
95    server_handle: sui_network::validator::server::Server,
96}
97
98impl AuthorityServerHandle {
99    pub async fn join(self) -> Result<(), io::Error> {
100        self.server_handle.handle().wait_for_shutdown().await;
101        Ok(())
102    }
103
104    pub async fn kill(self) -> Result<(), io::Error> {
105        self.server_handle.handle().shutdown().await;
106        Ok(())
107    }
108
109    pub fn address(&self) -> &Multiaddr {
110        self.server_handle.local_addr()
111    }
112}
113
114pub struct AuthorityServer {
115    address: Multiaddr,
116    pub state: Arc<AuthorityState>,
117    consensus_adapter: Arc<ConsensusAdapter>,
118    pub metrics: Arc<ValidatorServiceMetrics>,
119}
120
121impl AuthorityServer {
122    pub fn new_for_test_with_consensus_adapter(
123        state: Arc<AuthorityState>,
124        consensus_adapter: Arc<ConsensusAdapter>,
125    ) -> Self {
126        let address = new_local_tcp_address_for_testing();
127        let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
128
129        Self {
130            address,
131            state,
132            consensus_adapter,
133            metrics,
134        }
135    }
136
137    pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
138        let slot_freed_notify = Arc::new(tokio::sync::Notify::new());
139        let consensus_adapter = Arc::new(ConsensusAdapter::new(
140            Arc::new(LazyMysticetiClient::new()),
141            CheckpointStore::new_for_tests(),
142            state.name,
143            100_000,
144            100_000,
145            ConsensusAdapterMetrics::new_test(),
146            slot_freed_notify,
147        ));
148        Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
149    }
150
151    pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
152        let address = self.address.clone();
153        self.spawn_with_bind_address_for_test(address).await
154    }
155
156    pub async fn spawn_with_bind_address_for_test(
157        self,
158        address: Multiaddr,
159    ) -> Result<AuthorityServerHandle, io::Error> {
160        let tls_config = sui_tls::create_rustls_server_config(
161            self.state.config.network_key_pair().copy().private(),
162            SUI_TLS_SERVER_NAME.to_string(),
163        );
164        let config = mysten_network::config::Config::new();
165        let server = sui_network::validator::server::ServerBuilder::from_config(
166            &config,
167            mysten_network::metrics::DefaultMetricsCallbackProvider::default(),
168        )
169        .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
170            self.state,
171            self.consensus_adapter,
172            self.metrics,
173        )))
174        .bind(&address, Some(tls_config))
175        .await
176        .unwrap();
177        let local_addr = server.local_addr().to_owned();
178        info!("Listening to traffic on {local_addr}");
179        let handle = AuthorityServerHandle {
180            server_handle: server,
181        };
182        Ok(handle)
183    }
184}
185
186pub struct ValidatorServiceMetrics {
187    pub signature_errors: IntCounter,
188    pub tx_verification_latency: Histogram,
189    pub handle_transaction_latency: Histogram,
190    pub handle_transaction_consensus_latency: Histogram,
191    pub handle_submit_transaction_consensus_latency: HistogramVec,
192    pub handle_wait_for_effects_ping_latency: HistogramVec,
193
194    handle_submit_transaction_latency: HistogramVec,
195    handle_submit_transaction_bytes: HistogramVec,
196    handle_submit_transaction_batch_size: HistogramVec,
197
198    num_rejected_tx_during_overload: IntCounterVec,
199    submission_rejected_transactions: IntCounterVec,
200    connection_ip_not_found: IntCounter,
201    forwarded_header_parse_error: IntCounter,
202    forwarded_header_invalid: IntCounter,
203    forwarded_header_not_included: IntCounter,
204    client_id_source_config_mismatch: IntCounter,
205    x_forwarded_for_num_hops: Gauge,
206    pub gasless_rate_limited_count: IntCounter,
207    pub gasless_submission_outcomes: IntCounterVec,
208    admission_queue_direct_bypasses: IntCounter,
209}
210
211impl ValidatorServiceMetrics {
212    pub fn new(registry: &Registry) -> Self {
213        Self {
214            signature_errors: register_int_counter_with_registry!(
215                "total_signature_errors",
216                "Number of transaction signature errors",
217                registry,
218            )
219            .unwrap(),
220            tx_verification_latency: register_histogram_with_registry!(
221                "validator_service_tx_verification_latency",
222                "Latency of verifying a transaction",
223                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
224                registry,
225            )
226            .unwrap(),
227            handle_transaction_latency: register_histogram_with_registry!(
228                "validator_service_handle_transaction_latency",
229                "Latency of handling a transaction",
230                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
231                registry,
232            )
233            .unwrap(),
234            handle_transaction_consensus_latency: register_histogram_with_registry!(
235                "validator_service_handle_transaction_consensus_latency",
236                "Latency of handling a user transaction sent through consensus",
237                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
238                registry,
239            )
240            .unwrap(),
241            handle_submit_transaction_consensus_latency: register_histogram_vec_with_registry!(
242                "validator_service_submit_transaction_consensus_latency",
243                "Latency of submitting a user transaction sent through consensus",
244                &["req_type"],
245                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
246                registry,
247            )
248            .unwrap(),
249            handle_submit_transaction_latency: register_histogram_vec_with_registry!(
250                "validator_service_submit_transaction_latency",
251                "Latency of submit transaction handler",
252                &["req_type"],
253                mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
254                registry,
255            )
256            .unwrap(),
257            handle_wait_for_effects_ping_latency: register_histogram_vec_with_registry!(
258                "validator_service_handle_wait_for_effects_ping_latency",
259                "Latency of handling a ping request for wait_for_effects",
260                &["req_type"],
261                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
262                registry,
263            )
264            .unwrap(),
265            handle_submit_transaction_bytes: register_histogram_vec_with_registry!(
266                "validator_service_submit_transaction_bytes",
267                "The size of transactions in the submit transaction request",
268                &["req_type"],
269                mysten_metrics::BYTES_BUCKETS.to_vec(),
270                registry,
271            )
272            .unwrap(),
273            handle_submit_transaction_batch_size: register_histogram_vec_with_registry!(
274                "validator_service_submit_transaction_batch_size",
275                "The number of transactions in the submit transaction request",
276                &["req_type"],
277                mysten_metrics::COUNT_BUCKETS.to_vec(),
278                registry,
279            )
280            .unwrap(),
281            num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
282                "validator_service_num_rejected_tx_during_overload",
283                "Number of rejected transaction due to system overload",
284                &["error_type"],
285                registry,
286            )
287            .unwrap(),
288            submission_rejected_transactions: register_int_counter_vec_with_registry!(
289                "validator_service_submission_rejected_transactions",
290                "Number of transactions rejected during submission",
291                &["reason"],
292                registry,
293            )
294            .unwrap(),
295            connection_ip_not_found: register_int_counter_with_registry!(
296                "validator_service_connection_ip_not_found",
297                "Number of times connection IP was not extractable from request",
298                registry,
299            )
300            .unwrap(),
301            forwarded_header_parse_error: register_int_counter_with_registry!(
302                "validator_service_forwarded_header_parse_error",
303                "Number of times x-forwarded-for header could not be parsed",
304                registry,
305            )
306            .unwrap(),
307            forwarded_header_invalid: register_int_counter_with_registry!(
308                "validator_service_forwarded_header_invalid",
309                "Number of times x-forwarded-for header was invalid",
310                registry,
311            )
312            .unwrap(),
313            forwarded_header_not_included: register_int_counter_with_registry!(
314                "validator_service_forwarded_header_not_included",
315                "Number of times x-forwarded-for header was (unexpectedly) not included in request",
316                registry,
317            )
318            .unwrap(),
319            client_id_source_config_mismatch: register_int_counter_with_registry!(
320                "validator_service_client_id_source_config_mismatch",
321                "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
322                registry,
323            )
324            .unwrap(),
325            x_forwarded_for_num_hops: register_gauge_with_registry!(
326                "validator_service_x_forwarded_for_num_hops",
327                "Number of hops in x-forwarded-for header",
328                registry,
329            )
330            .unwrap(),
331            gasless_rate_limited_count: register_int_counter_with_registry!(
332                "validator_service_gasless_rate_limited_count",
333                "Number of gasless transactions rejected by rate limiter",
334                registry,
335            )
336            .unwrap(),
337            gasless_submission_outcomes: register_int_counter_vec_with_registry!(
338                "validator_service_gasless_submission_outcomes",
339                "Number of valid gasless transaction submissions by outcome",
340                &["outcome"],
341                registry,
342            )
343            .unwrap(),
344            admission_queue_direct_bypasses: register_int_counter_with_registry!(
345                "validator_service_admission_queue_direct_bypasses",
346                "Number of transactions that bypassed the queue (system not overloaded)",
347                registry,
348            )
349            .unwrap(),
350        }
351    }
352
353    pub fn new_for_tests() -> Self {
354        let registry = Registry::new();
355        Self::new(&registry)
356    }
357}
358
359/// Where `handle_submit_transaction` routes a request.
360enum AdmissionQueueSubmitMode {
361    /// System has capacity: submit directly to consensus, skipping the queue.
362    Bypass,
363    /// System is overloaded: admit via the priority queue.
364    Queue,
365    /// Queue is not available — either turned off by config or temporarily
366    /// disabled by failover. Submit directly to consensus, but reject
367    /// individual txs when consensus is saturated (pre-queue behavior).
368    Disabled,
369}
370
371#[derive(Clone)]
372pub struct ValidatorService {
373    state: Arc<AuthorityState>,
374    consensus_adapter: Arc<ConsensusAdapter>,
375    metrics: Arc<ValidatorServiceMetrics>,
376    traffic_controller: Option<Arc<TrafficController>>,
377    client_id_source: Option<ClientIdSource>,
378    gasless_limiter: GaslessRateLimiter,
379    admission_queue: Option<AdmissionQueueContext>,
380}
381
382impl ValidatorService {
383    pub fn new(
384        state: Arc<AuthorityState>,
385        consensus_adapter: Arc<ConsensusAdapter>,
386        validator_metrics: Arc<ValidatorServiceMetrics>,
387        client_id_source: Option<ClientIdSource>,
388        admission_queue: Option<AdmissionQueueContext>,
389    ) -> Self {
390        let traffic_controller = state.traffic_controller.clone();
391        let gasless_limiter = GaslessRateLimiter::new(state.consensus_gasless_counter.clone());
392        Self {
393            state,
394            consensus_adapter,
395            metrics: validator_metrics,
396            traffic_controller,
397            client_id_source,
398            gasless_limiter,
399            admission_queue,
400        }
401    }
402
403    pub fn new_for_tests(
404        state: Arc<AuthorityState>,
405        consensus_adapter: Arc<ConsensusAdapter>,
406        metrics: Arc<ValidatorServiceMetrics>,
407    ) -> Self {
408        let gasless_limiter = GaslessRateLimiter::new(state.consensus_gasless_counter.clone());
409        let epoch_store = state.epoch_store_for_testing().clone();
410        let slot_freed_notify = Arc::new(tokio::sync::Notify::new());
411        let manager = Arc::new(AdmissionQueueManager::new_for_tests(
412            consensus_adapter.clone(),
413            slot_freed_notify,
414        ));
415        let admission_queue = Some(AdmissionQueueContext::spawn(manager, epoch_store));
416        Self {
417            state,
418            consensus_adapter,
419            metrics,
420            traffic_controller: None,
421            client_id_source: None,
422            gasless_limiter,
423            admission_queue,
424        }
425    }
426
427    pub fn validator_state(&self) -> &Arc<AuthorityState> {
428        &self.state
429    }
430
431    /// Test method that performs transaction validation without going through gRPC.
432    pub fn handle_transaction_for_testing(&self, transaction: Transaction) -> SuiResult<()> {
433        let epoch_store = self.state.load_epoch_store_one_call_per_task();
434
435        // Validity check (basic structural validation)
436        transaction.validity_check(&epoch_store.tx_validity_check_context())?;
437
438        // Signature verification
439        let transaction = epoch_store
440            .verify_transaction_require_no_aliases(transaction)?
441            .into_tx();
442
443        // Validate the transaction
444        self.state
445            .handle_vote_transaction(&epoch_store, transaction)?;
446
447        Ok(())
448    }
449
450    /// Test method that performs transaction validation with overload checking.
451    /// Used for testing validator overload behavior.
452    pub fn handle_transaction_for_testing_with_overload_check(
453        &self,
454        transaction: Transaction,
455    ) -> SuiResult<()> {
456        let epoch_store = self.state.load_epoch_store_one_call_per_task();
457
458        // Validity check (basic structural validation)
459        transaction.validity_check(&epoch_store.tx_validity_check_context())?;
460
461        // Check system overload
462        self.state.check_system_overload(
463            transaction.data(),
464            self.state.check_system_overload_at_signing(),
465        )?;
466
467        // Signature verification
468        let transaction = epoch_store
469            .verify_transaction_require_no_aliases(transaction)?
470            .into_tx();
471
472        // Validate the transaction
473        self.state
474            .handle_vote_transaction(&epoch_store, transaction)?;
475
476        Ok(())
477    }
478
479    /// Collect the IDs of input objects that are immutable.
480    /// This is used to create the ImmutableInputObjects claim for consensus messages.
481    async fn collect_immutable_object_ids(
482        &self,
483        tx: &VerifiedTransaction,
484        state: &AuthorityState,
485    ) -> SuiResult<Vec<ObjectID>> {
486        let input_objects = tx.data().transaction_data().input_objects()?;
487
488        // Collect object IDs from ImmOrOwnedMoveObject inputs
489        let object_ids: Vec<ObjectID> = input_objects
490            .iter()
491            .filter_map(|obj| match obj {
492                InputObjectKind::ImmOrOwnedMoveObject((id, _, _)) => Some(*id),
493                _ => None,
494            })
495            .collect();
496        if object_ids.is_empty() {
497            return Ok(vec![]);
498        }
499
500        // Load objects from cache and filter to immutable ones
501        let objects = state.get_object_cache_reader().get_objects(&object_ids);
502
503        // All objects should be found, since owned input objects have been validated to exist.
504        objects
505            .into_iter()
506            .zip_debug_eq(object_ids.iter())
507            .filter_map(|(obj, id)| {
508                let Some(o) = obj else {
509                    return Some(Err::<ObjectID, SuiError>(
510                        SuiErrorKind::UserInputError {
511                            error: UserInputError::ObjectNotFound {
512                                object_id: *id,
513                                version: None,
514                            },
515                        }
516                        .into(),
517                    ));
518                };
519                if o.is_immutable() {
520                    Some(Ok(*id))
521                } else {
522                    None
523                }
524            })
525            .collect::<SuiResult<Vec<ObjectID>>>()
526    }
527
528    #[instrument(
529        name = "ValidatorService::handle_submit_transaction",
530        level = "error",
531        skip_all,
532        err(level = "debug")
533    )]
534    async fn handle_submit_transaction(
535        &self,
536        request: tonic::Request<RawSubmitTxRequest>,
537    ) -> WrappedServiceResponse<RawSubmitTxResponse> {
538        let Self {
539            state,
540            consensus_adapter: _,
541            metrics,
542            traffic_controller: _,
543            client_id_source,
544            gasless_limiter: _,
545            admission_queue: _,
546        } = self.clone();
547
548        let submitter_client_addr = if let Some(client_id_source) = &client_id_source {
549            self.get_client_ip_addr(&request, client_id_source)
550        } else {
551            self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
552        };
553
554        let inner = request.into_inner();
555        let start_epoch = state.load_epoch_store_one_call_per_task().epoch();
556
557        let next_epoch = start_epoch + 1;
558        let mut max_retries = 1;
559
560        loop {
561            let res = self
562                .handle_submit_transaction_inner(&state, &metrics, &inner, submitter_client_addr)
563                .await;
564            match res {
565                Ok((response, weight)) => return Ok((tonic::Response::new(response), weight)),
566                Err(err) => {
567                    if max_retries > 0
568                        && let SuiErrorKind::ValidatorHaltedAtEpochEnd = err.as_inner()
569                    {
570                        max_retries -= 1;
571
572                        debug!(
573                            "ValidatorHaltedAtEpochEnd. Will retry after validator reconfigures"
574                        );
575
576                        if let Ok(Ok(new_epoch)) =
577                            timeout(Duration::from_secs(15), state.wait_for_epoch(next_epoch)).await
578                        {
579                            assert_reachable!("retry submission at epoch end");
580                            if new_epoch == next_epoch {
581                                continue;
582                            }
583
584                            debug_fatal!(
585                                "expected epoch {} after reconfiguration. got {}",
586                                next_epoch,
587                                new_epoch
588                            );
589                        }
590                    }
591                    return Err(err.into());
592                }
593            }
594        }
595    }
596
597    async fn handle_submit_transaction_inner(
598        &self,
599        state: &AuthorityState,
600        metrics: &ValidatorServiceMetrics,
601        request: &RawSubmitTxRequest,
602        submitter_client_addr: Option<IpAddr>,
603    ) -> SuiResult<(RawSubmitTxResponse, Weight)> {
604        let epoch_store = state.load_epoch_store_one_call_per_task();
605        let submit_type = SubmitTxType::try_from(request.submit_type).map_err(|e| {
606            SuiErrorKind::GrpcMessageDeserializeError {
607                type_info: "RawSubmitTxRequest.submit_type".to_string(),
608                error: e.to_string(),
609            }
610        })?;
611
612        let is_ping_request = submit_type == SubmitTxType::Ping;
613        if is_ping_request {
614            fp_ensure!(
615                request.transactions.is_empty(),
616                SuiErrorKind::InvalidRequest(format!(
617                    "Ping request cannot contain {} transactions",
618                    request.transactions.len()
619                ))
620                .into()
621            );
622        } else {
623            // Ensure default and soft bundle requests contain at least one transaction.
624            fp_ensure!(
625                !request.transactions.is_empty(),
626                SuiErrorKind::InvalidRequest(
627                    "At least one transaction needs to be submitted".to_string(),
628                )
629                .into()
630            );
631        }
632
633        // NOTE: for soft bundle requests, the system tries to sequence the transactions in the same order
634        // if they use the same gas price. But this is only done with best effort.
635        // Transactions in a soft bundle can be individually rejected or deferred, without affecting
636        // other transactions in the same bundle.
637        let is_soft_bundle_request = submit_type == SubmitTxType::SoftBundle;
638
639        let max_num_transactions = if is_soft_bundle_request {
640            // Soft bundle cannot contain too many transactions.
641            // Otherwise it is hard to include all of them in a single block.
642            epoch_store.protocol_config().max_soft_bundle_size()
643        } else {
644            // Still enforce a limit even when transactions do not need to be in the same block.
645            epoch_store
646                .protocol_config()
647                .max_num_transactions_in_block()
648        };
649        fp_ensure!(
650            request.transactions.len() <= max_num_transactions as usize,
651            SuiErrorKind::InvalidRequest(format!(
652                "Too many transactions in request: {} vs {}",
653                request.transactions.len(),
654                max_num_transactions
655            ))
656            .into()
657        );
658
659        // Transaction digests.
660        let mut tx_digests = Vec::with_capacity(request.transactions.len());
661        // Transactions to submit to consensus.
662        let mut consensus_transactions = Vec::with_capacity(request.transactions.len());
663        // Indexes of transactions above in the request transactions.
664        let mut transaction_indexes = Vec::with_capacity(request.transactions.len());
665        // Results corresponding to each transaction in the request.
666        let mut results: Vec<Option<SubmitTxResult>> = vec![None; request.transactions.len()];
667        // Total size of all transactions in the request.
668        let mut total_size_bytes = 0;
669        // Traffic control spam weight to use for the transaction.
670        let mut spam_weight = Weight::zero();
671
672        let req_type = if is_ping_request {
673            "ping"
674        } else if request.transactions.len() == 1 {
675            "single_transaction"
676        } else if is_soft_bundle_request {
677            "soft_bundle"
678        } else {
679            "batch"
680        };
681
682        let _handle_tx_metrics_guard = metrics
683            .handle_submit_transaction_latency
684            .with_label_values(&[req_type])
685            .start_timer();
686
687        let submit_mode = self.classify_submit_mode(is_ping_request);
688
689        for (idx, tx_bytes) in request.transactions.iter().enumerate() {
690            let transaction = match bcs::from_bytes::<Transaction>(tx_bytes) {
691                Ok(txn) => txn,
692                Err(e) => {
693                    // Ok to fail the request when any transaction is invalid.
694                    return Err(SuiErrorKind::TransactionDeserializationError {
695                        error: format!("Failed to deserialize transaction at index {}: {}", idx, e),
696                    }
697                    .into());
698                }
699            };
700
701            // Ok to fail the request when any transaction is invalid.
702            let tx_size = transaction.validity_check(&epoch_store.tx_validity_check_context())?;
703            let is_gasless = transaction
704                .data()
705                .transaction_data()
706                .is_gasless_transaction();
707
708            if is_gasless {
709                // Gasless transactions count for traffic control, since they have no economic cost.
710                spam_weight = Weight::one();
711                metrics
712                    .gasless_submission_outcomes
713                    .with_label_values(&["attempted"])
714                    .inc();
715            }
716
717            let overload_check_res = state.check_system_overload(
718                transaction.data(),
719                state.check_system_overload_at_signing(),
720            );
721            if let Err(error) = overload_check_res {
722                metrics
723                    .num_rejected_tx_during_overload
724                    .with_label_values(&[error.as_ref()])
725                    .inc();
726                if is_gasless {
727                    metrics
728                        .gasless_submission_outcomes
729                        .with_label_values(&["rejected_overload"])
730                        .inc();
731                }
732                results[idx] = Some(SubmitTxResult::Rejected { error });
733                continue;
734            }
735
736            // Use the pre-queue per-tx consensus overload reject when the
737            // queue is disabled or in failover.
738            if matches!(submit_mode, AdmissionQueueSubmitMode::Disabled)
739                && let Err(error) = self.consensus_adapter.check_consensus_overload()
740            {
741                state.update_overload_metrics("consensus");
742                metrics
743                    .num_rejected_tx_during_overload
744                    .with_label_values(&[error.as_ref()])
745                    .inc();
746                results[idx] = Some(SubmitTxResult::Rejected { error });
747                continue;
748            }
749
750            if is_gasless
751                && !self
752                    .gasless_limiter
753                    .try_acquire(epoch_store.protocol_config())
754            {
755                metrics.gasless_rate_limited_count.inc();
756                metrics
757                    .gasless_submission_outcomes
758                    .with_label_values(&["rejected_rate_limited"])
759                    .inc();
760                results[idx] = Some(SubmitTxResult::Rejected {
761                    error: SuiErrorKind::ValidatorOverloadedRetryAfter {
762                        retry_after_secs: 1,
763                    }
764                    .into(),
765                });
766                continue;
767            }
768
769            // Ok to fail the request when any signature is invalid.
770            let verified_transaction = {
771                let _metrics_guard = metrics.tx_verification_latency.start_timer();
772                if epoch_store.protocol_config().address_aliases() {
773                    match epoch_store.verify_transaction_with_current_aliases(transaction) {
774                        Ok(tx) => tx,
775                        Err(e) => {
776                            metrics.signature_errors.inc();
777                            return Err(e);
778                        }
779                    }
780                } else {
781                    match epoch_store.verify_transaction_require_no_aliases(transaction) {
782                        Ok(tx) => tx,
783                        Err(e) => {
784                            metrics.signature_errors.inc();
785                            return Err(e);
786                        }
787                    }
788                }
789            };
790
791            let tx_digest = *verified_transaction.tx().digest();
792            debug!(
793                ?tx_digest,
794                "handle_submit_transaction: verified transaction"
795            );
796
797            // Check if the transaction has executed, before checking input objects
798            // which could have been consumed.
799            if let Some(effects) = state
800                .get_transaction_cache_reader()
801                .get_executed_effects(&tx_digest)
802            {
803                let effects_digest = effects.digest();
804                if let Ok(executed_data) = self.complete_executed_data(effects).await {
805                    let executed_result = SubmitTxResult::Executed {
806                        effects_digest,
807                        details: Some(executed_data),
808                    };
809                    results[idx] = Some(executed_result);
810                    debug!(?tx_digest, "handle_submit_transaction: already executed");
811                    continue;
812                }
813            }
814
815            if self
816                .state
817                .get_transaction_cache_reader()
818                .transaction_executed_in_last_epoch(&tx_digest, epoch_store.epoch())
819            {
820                results[idx] = Some(SubmitTxResult::Rejected {
821                    error: UserInputError::TransactionAlreadyExecuted { digest: tx_digest }.into(),
822                });
823                debug!(
824                    ?tx_digest,
825                    "handle_submit_transaction: transaction already executed in previous epoch"
826                );
827                continue;
828            }
829
830            debug!(
831                ?tx_digest,
832                "handle_submit_transaction: waiting for fastpath dependency objects"
833            );
834            if !state
835                .wait_for_fastpath_dependency_objects(
836                    verified_transaction.tx(),
837                    epoch_store.epoch(),
838                )
839                .await?
840            {
841                debug!(
842                    ?tx_digest,
843                    "fastpath input objects are still unavailable after waiting"
844                );
845            }
846
847            match state.handle_vote_transaction(&epoch_store, verified_transaction.tx().clone()) {
848                Ok(_) => { /* continue processing */ }
849                Err(e) => {
850                    // Check if transaction has been executed while being validated.
851                    // This is an edge case so checking executed effects twice is acceptable.
852                    if let Some(effects) = state
853                        .get_transaction_cache_reader()
854                        .get_executed_effects(&tx_digest)
855                    {
856                        let effects_digest = effects.digest();
857                        if let Ok(executed_data) = self.complete_executed_data(effects).await {
858                            let executed_result = SubmitTxResult::Executed {
859                                effects_digest,
860                                details: Some(executed_data),
861                            };
862                            results[idx] = Some(executed_result);
863                            continue;
864                        }
865                    }
866
867                    // When the transaction has not been executed, record the error for the transaction.
868                    debug!(?tx_digest, "Transaction rejected during submission: {e}");
869                    metrics
870                        .submission_rejected_transactions
871                        .with_label_values(&[e.to_variant_name()])
872                        .inc();
873                    results[idx] = Some(SubmitTxResult::Rejected { error: e });
874                    continue;
875                }
876            }
877
878            // Create claims with aliases and / or immutable objects.
879            let mut claims = vec![];
880
881            let immutable_object_ids = self
882                .collect_immutable_object_ids(verified_transaction.tx(), state)
883                .await?;
884            if !immutable_object_ids.is_empty() {
885                claims.push(TransactionClaim::ImmutableInputObjects(
886                    immutable_object_ids,
887                ));
888            }
889
890            let (tx, aliases) = verified_transaction.into_inner();
891            if epoch_store.protocol_config().address_aliases() {
892                if epoch_store
893                    .protocol_config()
894                    .fix_checkpoint_signature_mapping()
895                {
896                    claims.push(TransactionClaim::AddressAliasesV2(aliases));
897                } else {
898                    let v1_aliases: Vec<_> = tx
899                        .data()
900                        .intent_message()
901                        .value
902                        .required_signers()
903                        .into_iter()
904                        .zip_eq(aliases.into_iter().map(|(_, seq)| seq))
905                        .collect();
906                    #[allow(deprecated)]
907                    claims.push(TransactionClaim::AddressAliases(
908                        nonempty::NonEmpty::from_vec(v1_aliases)
909                            .expect("must have at least one required_signer"),
910                    ));
911                }
912            }
913
914            let tx_with_claims = TransactionWithClaims::new(tx.into(), claims);
915
916            consensus_transactions.push(ConsensusTransaction::new_user_transaction_v2_message(
917                &state.name,
918                tx_with_claims,
919            ));
920            if is_gasless {
921                metrics
922                    .gasless_submission_outcomes
923                    .with_label_values(&["submitted"])
924                    .inc();
925            }
926
927            transaction_indexes.push(idx);
928            tx_digests.push(tx_digest);
929            total_size_bytes += tx_size;
930        }
931
932        if consensus_transactions.is_empty() && !is_ping_request {
933            return Ok((Self::try_from_submit_tx_response(results)?, spam_weight));
934        }
935
936        // Set the max bytes size of the soft bundle to be half of the consensus max transactions in block size.
937        // We do this to account for serialization overheads and to ensure that the soft bundle is not too large
938        // when is attempted to be posted via consensus.
939        let max_transaction_bytes = if is_soft_bundle_request {
940            epoch_store
941                .protocol_config()
942                .consensus_max_transactions_in_block_bytes()
943                / 2
944        } else {
945            epoch_store
946                .protocol_config()
947                .consensus_max_transactions_in_block_bytes()
948        };
949        fp_ensure!(
950            total_size_bytes <= max_transaction_bytes as usize,
951            SuiErrorKind::UserInputError {
952                error: UserInputError::TotalTransactionSizeTooLargeInBatch {
953                    size: total_size_bytes,
954                    limit: max_transaction_bytes,
955                },
956            }
957            .into()
958        );
959
960        metrics
961            .handle_submit_transaction_bytes
962            .with_label_values(&[req_type])
963            .observe(total_size_bytes as f64);
964        metrics
965            .handle_submit_transaction_batch_size
966            .with_label_values(&[req_type])
967            .observe(consensus_transactions.len() as f64);
968
969        let _latency_metric_guard = metrics
970            .handle_submit_transaction_consensus_latency
971            .with_label_values(&[req_type])
972            .start_timer();
973
974        if is_soft_bundle_request {
975            // We only allow the `consensus_transactions` to be empty for ping requests. This is how it should and is be treated from the downstream components.
976            // For any other case, having an empty `consensus_transactions` vector is an invalid state and we should have never reached at this point.
977            assert!(
978                !consensus_transactions.is_empty(),
979                "A valid soft bundle must have at least one transaction"
980            );
981        }
982
983        // Soft bundles are inserted as a single queue entry.
984        // Individual transactions are each inserted separately.
985        let tx_groups: Vec<Vec<ConsensusTransaction>> = if is_soft_bundle_request || is_ping_request
986        {
987            vec![consensus_transactions]
988        } else {
989            consensus_transactions
990                .into_iter()
991                .map(|t| vec![t])
992                .collect()
993        };
994
995        let consensus_positions: Vec<ConsensusPosition> = match submit_mode {
996            AdmissionQueueSubmitMode::Bypass | AdmissionQueueSubmitMode::Disabled => {
997                if matches!(submit_mode, AdmissionQueueSubmitMode::Bypass) {
998                    self.metrics.admission_queue_direct_bypasses.inc();
999                }
1000                let futures = tx_groups.into_iter().map(|txns| {
1001                    debug!(
1002                        "handle_submit_transaction: submitting consensus transactions ({}): {}",
1003                        req_type,
1004                        txns.iter().map(|t| t.local_display()).join(", ")
1005                    );
1006                    self.consensus_adapter.submit_and_get_positions(
1007                        txns,
1008                        &epoch_store,
1009                        submitter_client_addr,
1010                    )
1011                });
1012                future::try_join_all(futures)
1013                    .await?
1014                    .into_iter()
1015                    .flatten()
1016                    .collect()
1017            }
1018            AdmissionQueueSubmitMode::Queue => {
1019                let aq = self
1020                    .admission_queue
1021                    .as_ref()
1022                    .expect("Queue mode implies admission_queue is Some")
1023                    .load();
1024                let mut receivers = Vec::with_capacity(tx_groups.len());
1025                for txns in tx_groups {
1026                    let gas_price = Self::extract_gas_price(&txns);
1027                    let (rx, newly_inserted) = aq
1028                        .try_insert(gas_price, txns, submitter_client_addr)
1029                        .await?;
1030                    if !newly_inserted {
1031                        // Count duplicate tx submissions towards spam tallies.
1032                        spam_weight = Weight::one();
1033                    }
1034                    receivers.push(rx);
1035                }
1036                let results = future::try_join_all(receivers.into_iter().map(|rx| async move {
1037                    rx.await.map_err(|_| {
1038                        SuiError::from(SuiErrorKind::TooManyTransactionsPendingConsensus)
1039                    })?
1040                }))
1041                .await?;
1042                results.into_iter().flatten().collect()
1043            }
1044        };
1045
1046        if is_ping_request {
1047            // For ping requests, return the special consensus position.
1048            assert_eq!(consensus_positions.len(), 1);
1049            results.push(Some(SubmitTxResult::Submitted {
1050                consensus_position: consensus_positions[0],
1051            }));
1052        } else {
1053            // Otherwise, return the consensus position for each transaction.
1054            for ((idx, tx_digest), consensus_position) in transaction_indexes
1055                .into_iter()
1056                .zip_debug_eq(tx_digests)
1057                .zip_debug_eq(consensus_positions)
1058            {
1059                debug!(
1060                    ?tx_digest,
1061                    "handle_submit_transaction: submitted consensus transaction at {}",
1062                    consensus_position,
1063                );
1064                results[idx] = Some(SubmitTxResult::Submitted { consensus_position });
1065            }
1066        }
1067
1068        Ok((Self::try_from_submit_tx_response(results)?, spam_weight))
1069    }
1070
1071    fn try_from_submit_tx_response(
1072        results: Vec<Option<SubmitTxResult>>,
1073    ) -> Result<RawSubmitTxResponse, SuiError> {
1074        let mut raw_results = Vec::new();
1075        for (i, result) in results.into_iter().enumerate() {
1076            let result = result.ok_or_else(|| SuiErrorKind::GenericAuthorityError {
1077                error: format!("Missing transaction result at {}", i),
1078            })?;
1079            let raw_result = result.try_into()?;
1080            raw_results.push(raw_result);
1081        }
1082        Ok(RawSubmitTxResponse {
1083            results: raw_results,
1084        })
1085    }
1086
1087    /// Extract the gas price from a batch of consensus transactions.
1088    /// Returns the minimum gas price in the batch, or 0 if no user transactions.
1089    fn extract_gas_price(transactions: &[ConsensusTransaction]) -> u64 {
1090        use sui_types::messages_consensus::ConsensusTransactionKind;
1091        transactions
1092            .iter()
1093            .filter_map(|tx| match &tx.kind {
1094                ConsensusTransactionKind::CertifiedTransaction(cert) => Some(cert.gas_price()),
1095                ConsensusTransactionKind::UserTransaction(t) => {
1096                    Some(t.data().transaction_data().gas_price())
1097                }
1098                ConsensusTransactionKind::UserTransactionV2(t) => {
1099                    Some(t.tx().data().transaction_data().gas_price())
1100                }
1101                _ => None,
1102            })
1103            .min()
1104            .unwrap_or(0)
1105    }
1106
1107    fn classify_submit_mode(&self, is_ping_request: bool) -> AdmissionQueueSubmitMode {
1108        let Some(aq) = &self.admission_queue else {
1109            return AdmissionQueueSubmitMode::Disabled;
1110        };
1111
1112        if is_ping_request {
1113            return AdmissionQueueSubmitMode::Bypass;
1114        }
1115
1116        let inflight = usize::try_from(self.consensus_adapter.num_inflight_transactions()).unwrap();
1117        if inflight < aq.bypass_threshold() {
1118            return AdmissionQueueSubmitMode::Bypass;
1119        }
1120
1121        // Failover is consulted only on the overloaded path so the hot path
1122        // avoids the ArcSwap load.
1123        if aq.load().failover_tripped() {
1124            return AdmissionQueueSubmitMode::Disabled;
1125        }
1126
1127        AdmissionQueueSubmitMode::Queue
1128    }
1129
1130    async fn collect_effects_data(
1131        &self,
1132        effects: &TransactionEffects,
1133        include_events: bool,
1134        include_input_objects: bool,
1135        include_output_objects: bool,
1136    ) -> SuiResult<(Option<TransactionEvents>, Vec<Object>, Vec<Object>)> {
1137        let events = if include_events && effects.events_digest().is_some() {
1138            Some(
1139                self.state
1140                    .get_transaction_events(effects.transaction_digest())?,
1141            )
1142        } else {
1143            None
1144        };
1145
1146        let input_objects = if include_input_objects {
1147            self.state.get_transaction_input_objects(effects)?
1148        } else {
1149            vec![]
1150        };
1151
1152        let output_objects = if include_output_objects {
1153            self.state.get_transaction_output_objects(effects)?
1154        } else {
1155            vec![]
1156        };
1157
1158        Ok((events, input_objects, output_objects))
1159    }
1160}
1161
1162type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
1163
1164impl ValidatorService {
1165    async fn handle_submit_transaction_impl(
1166        &self,
1167        request: tonic::Request<RawSubmitTxRequest>,
1168    ) -> WrappedServiceResponse<RawSubmitTxResponse> {
1169        self.handle_submit_transaction(request).await
1170    }
1171
1172    async fn wait_for_effects_impl(
1173        &self,
1174        request: tonic::Request<RawWaitForEffectsRequest>,
1175    ) -> WrappedServiceResponse<RawWaitForEffectsResponse> {
1176        let request: WaitForEffectsRequest = request.into_inner().try_into()?;
1177        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1178        let response = timeout(
1179            // TODO(fastpath): Tune this once we have a good estimate of the typical delay.
1180            Duration::from_secs(20),
1181            epoch_store
1182                .within_alive_epoch(self.wait_for_effects_response(request, &epoch_store))
1183                .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch())),
1184        )
1185        .await
1186        .map_err(|_| tonic::Status::internal("Timeout waiting for effects"))???
1187        .try_into()?;
1188        Ok((tonic::Response::new(response), Weight::zero()))
1189    }
1190
1191    #[instrument(name= "ValidatorService::wait_for_effects_response", level = "debug", skip_all, fields(consensus_position = ?request.consensus_position))]
1192    async fn wait_for_effects_response(
1193        &self,
1194        request: WaitForEffectsRequest,
1195        epoch_store: &Arc<AuthorityPerEpochStore>,
1196    ) -> SuiResult<WaitForEffectsResponse> {
1197        if request.ping_type.is_some() {
1198            return timeout(
1199                Duration::from_secs(10),
1200                self.ping_response(request, epoch_store),
1201            )
1202            .await
1203            .map_err(|_| SuiErrorKind::TimeoutError)?;
1204        }
1205
1206        let Some(tx_digest) = request.transaction_digest else {
1207            return Err(SuiErrorKind::InvalidRequest(
1208                "Transaction digest is required for wait for effects requests".to_string(),
1209            )
1210            .into());
1211        };
1212        let tx_digests = [tx_digest];
1213
1214        // When consensus_position is provided, also watch the consensus status cache
1215        // so rejected/dropped transactions get a timely response instead of waiting
1216        // forever for effects that will never be produced.
1217        let consensus_status_future = async {
1218            let consensus_position = match request.consensus_position {
1219                Some(pos) => pos,
1220                None => return futures::future::pending().await,
1221            };
1222            let consensus_tx_status_cache = &epoch_store.consensus_tx_status_cache;
1223            consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1224            match consensus_tx_status_cache
1225                .notify_read_transaction_status(consensus_position)
1226                .await
1227            {
1228                NotifyReadConsensusTxStatusResult::Status(
1229                    ConsensusTxStatus::Rejected | ConsensusTxStatus::Dropped,
1230                ) => Ok(WaitForEffectsResponse::Rejected {
1231                    error: epoch_store.get_rejection_vote_reason(consensus_position),
1232                }),
1233                NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized) => {
1234                    // Effects will be produced — yield to let the effects future win.
1235                    futures::future::pending().await
1236                }
1237                NotifyReadConsensusTxStatusResult::Expired(round) => {
1238                    Ok(WaitForEffectsResponse::Expired {
1239                        epoch: epoch_store.epoch(),
1240                        round: Some(round),
1241                    })
1242                }
1243            }
1244        };
1245
1246        tokio::select! {
1247            effects_result = self.state
1248                .get_transaction_cache_reader()
1249                .notify_read_executed_effects_may_fail(
1250                    "AuthorityServer::wait_for_effects::notify_read_executed_effects_finalized",
1251                    &tx_digests,
1252                ) => {
1253                let effects = effects_result?.pop().unwrap();
1254                let effects_digest = effects.digest();
1255                let details = if request.include_details {
1256                    Some(self.complete_executed_data(effects).await?)
1257                } else {
1258                    None
1259                };
1260                Ok(WaitForEffectsResponse::Executed {
1261                    effects_digest,
1262                    details,
1263                })
1264            }
1265            status_response = consensus_status_future => {
1266                status_response
1267            }
1268        }
1269    }
1270
1271    #[instrument(level = "error", skip_all, err(level = "debug"))]
1272    async fn ping_response(
1273        &self,
1274        request: WaitForEffectsRequest,
1275        epoch_store: &Arc<AuthorityPerEpochStore>,
1276    ) -> SuiResult<WaitForEffectsResponse> {
1277        let consensus_tx_status_cache = &epoch_store.consensus_tx_status_cache;
1278
1279        let Some(consensus_position) = request.consensus_position else {
1280            return Err(SuiErrorKind::InvalidRequest(
1281                "Consensus position is required for Ping requests".to_string(),
1282            )
1283            .into());
1284        };
1285
1286        // We assume that the caller has already checked for the existence of the `ping` field, but handling it gracefully here.
1287        let Some(ping) = request.ping_type else {
1288            return Err(SuiErrorKind::InvalidRequest(
1289                "Ping type is required for ping requests".to_string(),
1290            )
1291            .into());
1292        };
1293
1294        let _metrics_guard = self
1295            .metrics
1296            .handle_wait_for_effects_ping_latency
1297            .with_label_values(&[ping.as_str()])
1298            .start_timer();
1299
1300        consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1301
1302        let details = if request.include_details {
1303            Some(Box::new(ExecutedData::default()))
1304        } else {
1305            None
1306        };
1307
1308        let status = consensus_tx_status_cache
1309            .notify_read_transaction_status(consensus_position)
1310            .await;
1311        match status {
1312            NotifyReadConsensusTxStatusResult::Status(status) => match status {
1313                ConsensusTxStatus::Rejected | ConsensusTxStatus::Dropped => {
1314                    Ok(WaitForEffectsResponse::Rejected {
1315                        error: epoch_store.get_rejection_vote_reason(consensus_position),
1316                    })
1317                }
1318                ConsensusTxStatus::Finalized => Ok(WaitForEffectsResponse::Executed {
1319                    effects_digest: TransactionEffectsDigest::ZERO,
1320                    details,
1321                }),
1322            },
1323            NotifyReadConsensusTxStatusResult::Expired(round) => {
1324                Ok(WaitForEffectsResponse::Expired {
1325                    epoch: epoch_store.epoch(),
1326                    round: Some(round),
1327                })
1328            }
1329        }
1330    }
1331
1332    async fn complete_executed_data(
1333        &self,
1334        effects: TransactionEffects,
1335    ) -> SuiResult<Box<ExecutedData>> {
1336        let (events, input_objects, output_objects) = self
1337            .collect_effects_data(
1338                &effects, /* include_events */ true, /* include_input_objects */ true,
1339                /* include_output_objects */ true,
1340            )
1341            .await?;
1342        Ok(Box::new(ExecutedData {
1343            effects,
1344            events,
1345            input_objects,
1346            output_objects,
1347        }))
1348    }
1349
1350    async fn object_info_impl(
1351        &self,
1352        request: tonic::Request<ObjectInfoRequest>,
1353    ) -> WrappedServiceResponse<ObjectInfoResponse> {
1354        let request = request.into_inner();
1355        let response = self.state.handle_object_info_request(request).await?;
1356        Ok((tonic::Response::new(response), Weight::one()))
1357    }
1358
1359    async fn transaction_info_impl(
1360        &self,
1361        request: tonic::Request<TransactionInfoRequest>,
1362    ) -> WrappedServiceResponse<TransactionInfoResponse> {
1363        let request = request.into_inner();
1364        let response = self.state.handle_transaction_info_request(request).await?;
1365        Ok((tonic::Response::new(response), Weight::one()))
1366    }
1367
1368    async fn checkpoint_impl(
1369        &self,
1370        request: tonic::Request<CheckpointRequest>,
1371    ) -> WrappedServiceResponse<CheckpointResponse> {
1372        let request = request.into_inner();
1373        let response = self.state.handle_checkpoint_request(&request)?;
1374        Ok((tonic::Response::new(response), Weight::one()))
1375    }
1376
1377    async fn checkpoint_v2_impl(
1378        &self,
1379        request: tonic::Request<CheckpointRequestV2>,
1380    ) -> WrappedServiceResponse<CheckpointResponseV2> {
1381        let request = request.into_inner();
1382        let response = self.state.handle_checkpoint_request_v2(&request)?;
1383        Ok((tonic::Response::new(response), Weight::one()))
1384    }
1385
1386    async fn get_system_state_object_impl(
1387        &self,
1388        _request: tonic::Request<SystemStateRequest>,
1389    ) -> WrappedServiceResponse<SuiSystemState> {
1390        let response = self
1391            .state
1392            .get_object_cache_reader()
1393            .get_sui_system_state_object_unsafe()?;
1394        Ok((tonic::Response::new(response), Weight::one()))
1395    }
1396
1397    async fn validator_health_impl(
1398        &self,
1399        _request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1400    ) -> WrappedServiceResponse<sui_types::messages_grpc::RawValidatorHealthResponse> {
1401        let state = &self.state;
1402
1403        // Get epoch store once for both metrics
1404        let epoch_store = state.load_epoch_store_one_call_per_task();
1405
1406        // Get in-flight execution transactions from execution scheduler
1407        let num_inflight_execution_transactions =
1408            state.execution_scheduler().num_pending_certificates() as u64;
1409
1410        // Get in-flight consensus transactions from consensus adapter
1411        let num_inflight_consensus_transactions =
1412            self.consensus_adapter.num_inflight_transactions();
1413
1414        // Get last committed leader round from epoch store
1415        let last_committed_leader_round = epoch_store
1416            .consensus_tx_status_cache
1417            .get_last_committed_leader_round()
1418            .unwrap_or(0);
1419
1420        // Get last locally built checkpoint sequence
1421        let last_locally_built_checkpoint = epoch_store
1422            .last_built_checkpoint_summary()
1423            .ok()
1424            .flatten()
1425            .map(|(_, summary)| summary.sequence_number)
1426            .unwrap_or(0);
1427
1428        let typed_response = sui_types::messages_grpc::ValidatorHealthResponse {
1429            num_inflight_consensus_transactions,
1430            num_inflight_execution_transactions,
1431            last_locally_built_checkpoint,
1432            last_committed_leader_round,
1433        };
1434
1435        let raw_response = typed_response
1436            .try_into()
1437            .map_err(|e: sui_types::error::SuiError| {
1438                tonic::Status::internal(format!("Failed to serialize health response: {}", e))
1439            })?;
1440
1441        Ok((tonic::Response::new(raw_response), Weight::one()))
1442    }
1443
1444    fn get_client_ip_addr<T>(
1445        &self,
1446        request: &tonic::Request<T>,
1447        source: &ClientIdSource,
1448    ) -> Option<IpAddr> {
1449        let forwarded_header = request.metadata().get_all("x-forwarded-for").iter().next();
1450
1451        if let Some(header) = forwarded_header {
1452            let num_hops = header
1453                .to_str()
1454                .map(|h| h.split(',').count().saturating_sub(1))
1455                .unwrap_or(0);
1456
1457            self.metrics.x_forwarded_for_num_hops.set(num_hops as f64);
1458        }
1459
1460        match source {
1461            ClientIdSource::SocketAddr => {
1462                let socket_addr: Option<SocketAddr> = request.remote_addr();
1463
1464                // We will hit this case if the IO type used does not
1465                // implement Connected or when using a unix domain socket.
1466                // TODO: once we have confirmed that no legitimate traffic
1467                // is hitting this case, we should reject such requests that
1468                // hit this case.
1469                if let Some(socket_addr) = socket_addr {
1470                    Some(socket_addr.ip())
1471                } else {
1472                    if cfg!(msim) {
1473                        // Ignore the error from simtests.
1474                    } else if cfg!(test) {
1475                        panic!("Failed to get remote address from request");
1476                    } else {
1477                        self.metrics.connection_ip_not_found.inc();
1478                        error!("Failed to get remote address from request");
1479                    }
1480                    None
1481                }
1482            }
1483            ClientIdSource::XForwardedFor(num_hops) => {
1484                let do_header_parse = |op: &MetadataValue<Ascii>| {
1485                    match op.to_str() {
1486                        Ok(header_val) => {
1487                            let header_contents =
1488                                header_val.split(',').map(str::trim).collect::<Vec<_>>();
1489                            if *num_hops == 0 {
1490                                error!(
1491                                    "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
1492                                    number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
1493                                    to this node. Skipping traffic controller request handling.",
1494                                    header_contents,
1495                                );
1496                                return None;
1497                            }
1498                            let contents_len = header_contents.len();
1499                            if contents_len < *num_hops {
1500                                error!(
1501                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1502                                    Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
1503                                    `client-id-source` in the node config.",
1504                                    header_contents, contents_len, num_hops, contents_len,
1505                                );
1506                                self.metrics.client_id_source_config_mismatch.inc();
1507                                return None;
1508                            }
1509                            let Some(client_ip) = header_contents.get(contents_len - num_hops)
1510                            else {
1511                                error!(
1512                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1513                                    Expected at least {} values. Skipping traffic controller request handling.",
1514                                    header_contents, contents_len, num_hops, contents_len,
1515                                );
1516                                return None;
1517                            };
1518                            parse_ip(client_ip).or_else(|| {
1519                                self.metrics.forwarded_header_parse_error.inc();
1520                                None
1521                            })
1522                        }
1523                        Err(e) => {
1524                            // TODO: once we have confirmed that no legitimate traffic
1525                            // is hitting this case, we should reject such requests that
1526                            // hit this case.
1527                            self.metrics.forwarded_header_invalid.inc();
1528                            error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
1529                            None
1530                        }
1531                    }
1532                };
1533                if let Some(op) = request.metadata().get("x-forwarded-for") {
1534                    do_header_parse(op)
1535                } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
1536                    do_header_parse(op)
1537                } else {
1538                    self.metrics.forwarded_header_not_included.inc();
1539                    error!(
1540                        "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
1541                    );
1542                    None
1543                }
1544            }
1545        }
1546    }
1547
1548    async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
1549        if let Some(traffic_controller) = &self.traffic_controller {
1550            if !traffic_controller.check(&client, &None).await {
1551                // Entity in blocklist
1552                Err(tonic::Status::from_error(
1553                    SuiErrorKind::TooManyRequests.into(),
1554                ))
1555            } else {
1556                Ok(())
1557            }
1558        } else {
1559            Ok(())
1560        }
1561    }
1562
1563    fn handle_traffic_resp<T>(
1564        &self,
1565        client: Option<IpAddr>,
1566        wrapped_response: WrappedServiceResponse<T>,
1567    ) -> Result<tonic::Response<T>, tonic::Status> {
1568        let (error, spam_weight, unwrapped_response) = match wrapped_response {
1569            Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
1570            Err(status) => (
1571                Some(SuiError::from(status.clone())),
1572                Weight::zero(),
1573                Err(status.clone()),
1574            ),
1575        };
1576
1577        if let Some(traffic_controller) = self.traffic_controller.clone() {
1578            traffic_controller.tally(TrafficTally {
1579                direct: client,
1580                through_fullnode: None,
1581                error_info: error.map(|e| {
1582                    let error_type = String::from(e.clone().as_ref());
1583                    let error_weight = normalize(e);
1584                    (error_weight, error_type)
1585                }),
1586                spam_weight,
1587                timestamp: SystemTime::now(),
1588            })
1589        }
1590        unwrapped_response
1591    }
1592}
1593
1594// TODO: refine error matching here
1595fn normalize(err: SuiError) -> Weight {
1596    match err.as_inner() {
1597        SuiErrorKind::UserInputError {
1598            error: UserInputError::IncorrectUserSignature { .. },
1599        } => Weight::one(),
1600        SuiErrorKind::InvalidSignature { .. }
1601        | SuiErrorKind::SignerSignatureAbsent { .. }
1602        | SuiErrorKind::SignerSignatureNumberMismatch { .. }
1603        | SuiErrorKind::IncorrectSigner { .. }
1604        | SuiErrorKind::UnknownSigner { .. }
1605        | SuiErrorKind::WrongEpoch { .. } => Weight::one(),
1606        _ => Weight::zero(),
1607    }
1608}
1609
1610/// Implements generic pre- and post-processing. Since this is on the critical
1611/// path, any heavy lifting should be done in a separate non-blocking task
1612/// unless it is necessary to override the return value.
1613#[macro_export]
1614macro_rules! handle_with_decoration {
1615    ($self:ident, $func_name:ident, $request:ident) => {{
1616        if $self.client_id_source.is_none() {
1617            return $self.$func_name($request).await.map(|(result, _)| result);
1618        }
1619
1620        let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
1621
1622        // check if either IP is blocked, in which case return early
1623        $self.handle_traffic_req(client.clone()).await?;
1624
1625        // handle traffic tallying
1626        let wrapped_response = $self.$func_name($request).await;
1627        $self.handle_traffic_resp(client, wrapped_response)
1628    }};
1629}
1630
1631#[async_trait]
1632impl Validator for ValidatorService {
1633    async fn submit_transaction(
1634        &self,
1635        request: tonic::Request<RawSubmitTxRequest>,
1636    ) -> Result<tonic::Response<RawSubmitTxResponse>, tonic::Status> {
1637        let validator_service = self.clone();
1638
1639        // Spawns a task which handles the transaction. The task will unconditionally continue
1640        // processing in the event that the client connection is dropped.
1641        spawn_monitored_task!(async move {
1642            // NB: traffic tally wrapping handled within the task rather than on task exit
1643            // to prevent an attacker from subverting traffic control by severing the connection
1644            handle_with_decoration!(validator_service, handle_submit_transaction_impl, request)
1645        })
1646        .await
1647        .unwrap()
1648    }
1649
1650    async fn wait_for_effects(
1651        &self,
1652        request: tonic::Request<RawWaitForEffectsRequest>,
1653    ) -> Result<tonic::Response<RawWaitForEffectsResponse>, tonic::Status> {
1654        handle_with_decoration!(self, wait_for_effects_impl, request)
1655    }
1656
1657    async fn object_info(
1658        &self,
1659        request: tonic::Request<ObjectInfoRequest>,
1660    ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
1661        handle_with_decoration!(self, object_info_impl, request)
1662    }
1663
1664    async fn transaction_info(
1665        &self,
1666        request: tonic::Request<TransactionInfoRequest>,
1667    ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
1668        handle_with_decoration!(self, transaction_info_impl, request)
1669    }
1670
1671    async fn checkpoint(
1672        &self,
1673        request: tonic::Request<CheckpointRequest>,
1674    ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
1675        handle_with_decoration!(self, checkpoint_impl, request)
1676    }
1677
1678    async fn checkpoint_v2(
1679        &self,
1680        request: tonic::Request<CheckpointRequestV2>,
1681    ) -> Result<tonic::Response<CheckpointResponseV2>, tonic::Status> {
1682        handle_with_decoration!(self, checkpoint_v2_impl, request)
1683    }
1684
1685    async fn get_system_state_object(
1686        &self,
1687        request: tonic::Request<SystemStateRequest>,
1688    ) -> Result<tonic::Response<SuiSystemState>, tonic::Status> {
1689        handle_with_decoration!(self, get_system_state_object_impl, request)
1690    }
1691
1692    async fn validator_health(
1693        &self,
1694        request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1695    ) -> Result<tonic::Response<sui_types::messages_grpc::RawValidatorHealthResponse>, tonic::Status>
1696    {
1697        handle_with_decoration!(self, validator_health_impl, request)
1698    }
1699}