sui_core/
authority_server.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use anyhow::Result;
6use async_trait::async_trait;
7use fastcrypto::traits::KeyPair;
8use futures::{TryFutureExt, future};
9use itertools::Itertools as _;
10use mysten_common::ZipDebugEqIteratorExt;
11use mysten_common::{assert_reachable, debug_fatal};
12use mysten_metrics::spawn_monitored_task;
13use prometheus::{
14    Gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, Registry,
15    register_gauge_with_registry, register_histogram_vec_with_registry,
16    register_histogram_with_registry, register_int_counter_vec_with_registry,
17    register_int_counter_with_registry,
18};
19use std::{
20    io,
21    net::{IpAddr, SocketAddr},
22    sync::Arc,
23    time::{Duration, SystemTime},
24};
25use sui_network::{
26    api::{Validator, ValidatorServer},
27    tonic,
28    validator::server::SUI_TLS_SERVER_NAME,
29};
30use sui_types::effects::TransactionEffectsAPI;
31use sui_types::message_envelope::Message;
32use sui_types::messages_consensus::{ConsensusPosition, ConsensusTransaction};
33use sui_types::messages_grpc::{
34    ObjectInfoRequest, ObjectInfoResponse, RawSubmitTxResponse, SystemStateRequest,
35    TransactionInfoRequest, TransactionInfoResponse,
36};
37use sui_types::multiaddr::Multiaddr;
38use sui_types::object::Object;
39use sui_types::sui_system_state::SuiSystemState;
40use sui_types::traffic_control::{ClientIdSource, Weight};
41use sui_types::{
42    base_types::ObjectID,
43    digests::TransactionEffectsDigest,
44    error::{SuiErrorKind, UserInputError},
45};
46use sui_types::{
47    effects::TransactionEffects,
48    messages_grpc::{
49        ExecutedData, RawSubmitTxRequest, RawWaitForEffectsRequest, RawWaitForEffectsResponse,
50        SubmitTxResult, WaitForEffectsRequest, WaitForEffectsResponse,
51    },
52};
53use sui_types::{effects::TransactionEvents, messages_grpc::SubmitTxType};
54use sui_types::{error::*, transaction::*};
55use sui_types::{
56    fp_ensure,
57    messages_checkpoint::{
58        CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
59    },
60};
61use tokio::time::timeout;
62use tonic::metadata::{Ascii, MetadataValue};
63use tracing::{debug, error, info, instrument};
64
65use crate::admission_queue::{AdmissionQueueContext, AdmissionQueueManager};
66use crate::gasless_rate_limiter::GaslessRateLimiter;
67use crate::{
68    authority::{AuthorityState, consensus_tx_status_cache::ConsensusTxStatus},
69    consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics, ConsensusOverloadChecker},
70    traffic_controller::{TrafficController, parse_ip, policies::TrafficTally},
71};
72use crate::{
73    authority::{
74        authority_per_epoch_store::AuthorityPerEpochStore,
75        consensus_tx_status_cache::NotifyReadConsensusTxStatusResult,
76    },
77    checkpoints::CheckpointStore,
78    mysticeti_adapter::LazyMysticetiClient,
79};
80use sui_config::local_ip_utils::new_local_tcp_address_for_testing;
81
82#[cfg(test)]
83#[path = "unit_tests/server_tests.rs"]
84mod server_tests;
85
86#[cfg(test)]
87#[path = "unit_tests/wait_for_effects_tests.rs"]
88mod wait_for_effects_tests;
89
90#[cfg(test)]
91#[path = "unit_tests/submit_transaction_tests.rs"]
92mod submit_transaction_tests;
93
94pub struct AuthorityServerHandle {
95    server_handle: sui_network::validator::server::Server,
96}
97
98impl AuthorityServerHandle {
99    pub async fn join(self) -> Result<(), io::Error> {
100        self.server_handle.handle().wait_for_shutdown().await;
101        Ok(())
102    }
103
104    pub async fn kill(self) -> Result<(), io::Error> {
105        self.server_handle.handle().shutdown().await;
106        Ok(())
107    }
108
109    pub fn address(&self) -> &Multiaddr {
110        self.server_handle.local_addr()
111    }
112}
113
114pub struct AuthorityServer {
115    address: Multiaddr,
116    pub state: Arc<AuthorityState>,
117    consensus_adapter: Arc<ConsensusAdapter>,
118    pub metrics: Arc<ValidatorServiceMetrics>,
119}
120
121impl AuthorityServer {
122    pub fn new_for_test_with_consensus_adapter(
123        state: Arc<AuthorityState>,
124        consensus_adapter: Arc<ConsensusAdapter>,
125    ) -> Self {
126        let address = new_local_tcp_address_for_testing();
127        let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
128
129        Self {
130            address,
131            state,
132            consensus_adapter,
133            metrics,
134        }
135    }
136
137    pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
138        let slot_freed_notify = Arc::new(tokio::sync::Notify::new());
139        let consensus_adapter = Arc::new(ConsensusAdapter::new(
140            Arc::new(LazyMysticetiClient::new()),
141            CheckpointStore::new_for_tests(),
142            state.name,
143            100_000,
144            100_000,
145            ConsensusAdapterMetrics::new_test(),
146            slot_freed_notify,
147        ));
148        Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
149    }
150
151    pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
152        let address = self.address.clone();
153        self.spawn_with_bind_address_for_test(address).await
154    }
155
156    pub async fn spawn_with_bind_address_for_test(
157        self,
158        address: Multiaddr,
159    ) -> Result<AuthorityServerHandle, io::Error> {
160        let tls_config = sui_tls::create_rustls_server_config(
161            self.state.config.network_key_pair().copy().private(),
162            SUI_TLS_SERVER_NAME.to_string(),
163        );
164        let config = mysten_network::config::Config::new();
165        let server = sui_network::validator::server::ServerBuilder::from_config(
166            &config,
167            mysten_network::metrics::DefaultMetricsCallbackProvider::default(),
168        )
169        .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
170            self.state,
171            self.consensus_adapter,
172            self.metrics,
173        )))
174        .bind(&address, Some(tls_config))
175        .await
176        .unwrap();
177        let local_addr = server.local_addr().to_owned();
178        info!("Listening to traffic on {local_addr}");
179        let handle = AuthorityServerHandle {
180            server_handle: server,
181        };
182        Ok(handle)
183    }
184}
185
186pub struct ValidatorServiceMetrics {
187    pub signature_errors: IntCounter,
188    pub tx_verification_latency: Histogram,
189    pub cert_verification_latency: Histogram,
190    pub handle_transaction_latency: Histogram,
191    pub submit_certificate_consensus_latency: Histogram,
192    pub handle_certificate_consensus_latency: Histogram,
193    pub handle_certificate_non_consensus_latency: Histogram,
194    pub handle_soft_bundle_certificates_consensus_latency: Histogram,
195    pub handle_soft_bundle_certificates_count: Histogram,
196    pub handle_soft_bundle_certificates_size_bytes: Histogram,
197    pub handle_transaction_consensus_latency: Histogram,
198    pub handle_submit_transaction_consensus_latency: HistogramVec,
199    pub handle_wait_for_effects_ping_latency: HistogramVec,
200
201    handle_submit_transaction_latency: HistogramVec,
202    handle_submit_transaction_bytes: HistogramVec,
203    handle_submit_transaction_batch_size: HistogramVec,
204
205    num_rejected_tx_during_overload: IntCounterVec,
206    submission_rejected_transactions: IntCounterVec,
207    connection_ip_not_found: IntCounter,
208    forwarded_header_parse_error: IntCounter,
209    forwarded_header_invalid: IntCounter,
210    forwarded_header_not_included: IntCounter,
211    client_id_source_config_mismatch: IntCounter,
212    x_forwarded_for_num_hops: Gauge,
213    pub gasless_rate_limited_count: IntCounter,
214    pub gasless_submission_outcomes: IntCounterVec,
215    admission_queue_direct_bypasses: IntCounter,
216}
217
218impl ValidatorServiceMetrics {
219    pub fn new(registry: &Registry) -> Self {
220        Self {
221            signature_errors: register_int_counter_with_registry!(
222                "total_signature_errors",
223                "Number of transaction signature errors",
224                registry,
225            )
226            .unwrap(),
227            tx_verification_latency: register_histogram_with_registry!(
228                "validator_service_tx_verification_latency",
229                "Latency of verifying a transaction",
230                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
231                registry,
232            )
233            .unwrap(),
234            cert_verification_latency: register_histogram_with_registry!(
235                "validator_service_cert_verification_latency",
236                "Latency of verifying a certificate",
237                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
238                registry,
239            )
240            .unwrap(),
241            handle_transaction_latency: register_histogram_with_registry!(
242                "validator_service_handle_transaction_latency",
243                "Latency of handling a transaction",
244                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
245                registry,
246            )
247            .unwrap(),
248            handle_certificate_consensus_latency: register_histogram_with_registry!(
249                "validator_service_handle_certificate_consensus_latency",
250                "Latency of handling a consensus transaction certificate",
251                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
252                registry,
253            )
254            .unwrap(),
255            submit_certificate_consensus_latency: register_histogram_with_registry!(
256                "validator_service_submit_certificate_consensus_latency",
257                "Latency of submit_certificate RPC handler",
258                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
259                registry,
260            )
261            .unwrap(),
262            handle_certificate_non_consensus_latency: register_histogram_with_registry!(
263                "validator_service_handle_certificate_non_consensus_latency",
264                "Latency of handling a non-consensus transaction certificate",
265                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
266                registry,
267            )
268            .unwrap(),
269            handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
270                "validator_service_handle_soft_bundle_certificates_consensus_latency",
271                "Latency of handling a consensus soft bundle",
272                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
273                registry,
274            )
275            .unwrap(),
276            handle_soft_bundle_certificates_count: register_histogram_with_registry!(
277                "handle_soft_bundle_certificates_count",
278                "The number of certificates included in a soft bundle",
279                mysten_metrics::COUNT_BUCKETS.to_vec(),
280                registry,
281            )
282            .unwrap(),
283            handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
284                "handle_soft_bundle_certificates_size_bytes",
285                "The size of soft bundle in bytes",
286                mysten_metrics::BYTES_BUCKETS.to_vec(),
287                registry,
288            )
289            .unwrap(),
290            handle_transaction_consensus_latency: register_histogram_with_registry!(
291                "validator_service_handle_transaction_consensus_latency",
292                "Latency of handling a user transaction sent through consensus",
293                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
294                registry,
295            )
296            .unwrap(),
297            handle_submit_transaction_consensus_latency: register_histogram_vec_with_registry!(
298                "validator_service_submit_transaction_consensus_latency",
299                "Latency of submitting a user transaction sent through consensus",
300                &["req_type"],
301                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
302                registry,
303            )
304            .unwrap(),
305            handle_submit_transaction_latency: register_histogram_vec_with_registry!(
306                "validator_service_submit_transaction_latency",
307                "Latency of submit transaction handler",
308                &["req_type"],
309                mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
310                registry,
311            )
312            .unwrap(),
313            handle_wait_for_effects_ping_latency: register_histogram_vec_with_registry!(
314                "validator_service_handle_wait_for_effects_ping_latency",
315                "Latency of handling a ping request for wait_for_effects",
316                &["req_type"],
317                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
318                registry,
319            )
320            .unwrap(),
321            handle_submit_transaction_bytes: register_histogram_vec_with_registry!(
322                "validator_service_submit_transaction_bytes",
323                "The size of transactions in the submit transaction request",
324                &["req_type"],
325                mysten_metrics::BYTES_BUCKETS.to_vec(),
326                registry,
327            )
328            .unwrap(),
329            handle_submit_transaction_batch_size: register_histogram_vec_with_registry!(
330                "validator_service_submit_transaction_batch_size",
331                "The number of transactions in the submit transaction request",
332                &["req_type"],
333                mysten_metrics::COUNT_BUCKETS.to_vec(),
334                registry,
335            )
336            .unwrap(),
337            num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
338                "validator_service_num_rejected_tx_during_overload",
339                "Number of rejected transaction due to system overload",
340                &["error_type"],
341                registry,
342            )
343            .unwrap(),
344            submission_rejected_transactions: register_int_counter_vec_with_registry!(
345                "validator_service_submission_rejected_transactions",
346                "Number of transactions rejected during submission",
347                &["reason"],
348                registry,
349            )
350            .unwrap(),
351            connection_ip_not_found: register_int_counter_with_registry!(
352                "validator_service_connection_ip_not_found",
353                "Number of times connection IP was not extractable from request",
354                registry,
355            )
356            .unwrap(),
357            forwarded_header_parse_error: register_int_counter_with_registry!(
358                "validator_service_forwarded_header_parse_error",
359                "Number of times x-forwarded-for header could not be parsed",
360                registry,
361            )
362            .unwrap(),
363            forwarded_header_invalid: register_int_counter_with_registry!(
364                "validator_service_forwarded_header_invalid",
365                "Number of times x-forwarded-for header was invalid",
366                registry,
367            )
368            .unwrap(),
369            forwarded_header_not_included: register_int_counter_with_registry!(
370                "validator_service_forwarded_header_not_included",
371                "Number of times x-forwarded-for header was (unexpectedly) not included in request",
372                registry,
373            )
374            .unwrap(),
375            client_id_source_config_mismatch: register_int_counter_with_registry!(
376                "validator_service_client_id_source_config_mismatch",
377                "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
378                registry,
379            )
380            .unwrap(),
381            x_forwarded_for_num_hops: register_gauge_with_registry!(
382                "validator_service_x_forwarded_for_num_hops",
383                "Number of hops in x-forwarded-for header",
384                registry,
385            )
386            .unwrap(),
387            gasless_rate_limited_count: register_int_counter_with_registry!(
388                "validator_service_gasless_rate_limited_count",
389                "Number of gasless transactions rejected by rate limiter",
390                registry,
391            )
392            .unwrap(),
393            gasless_submission_outcomes: register_int_counter_vec_with_registry!(
394                "validator_service_gasless_submission_outcomes",
395                "Number of valid gasless transaction submissions by outcome",
396                &["outcome"],
397                registry,
398            )
399            .unwrap(),
400            admission_queue_direct_bypasses: register_int_counter_with_registry!(
401                "validator_service_admission_queue_direct_bypasses",
402                "Number of transactions that bypassed the queue (system not overloaded)",
403                registry,
404            )
405            .unwrap(),
406        }
407    }
408
409    pub fn new_for_tests() -> Self {
410        let registry = Registry::new();
411        Self::new(&registry)
412    }
413}
414
415/// Where `handle_submit_transaction` routes a request.
416enum AdmissionQueueSubmitMode {
417    /// System has capacity: submit directly to consensus, skipping the queue.
418    Bypass,
419    /// System is overloaded: admit via the priority queue.
420    Queue,
421    /// Queue is not available — either turned off by config or temporarily
422    /// disabled by failover. Submit directly to consensus, but reject
423    /// individual txs when consensus is saturated (pre-queue behavior).
424    Disabled,
425}
426
427#[derive(Clone)]
428pub struct ValidatorService {
429    state: Arc<AuthorityState>,
430    consensus_adapter: Arc<ConsensusAdapter>,
431    metrics: Arc<ValidatorServiceMetrics>,
432    traffic_controller: Option<Arc<TrafficController>>,
433    client_id_source: Option<ClientIdSource>,
434    gasless_limiter: GaslessRateLimiter,
435    admission_queue: Option<AdmissionQueueContext>,
436}
437
438impl ValidatorService {
439    pub fn new(
440        state: Arc<AuthorityState>,
441        consensus_adapter: Arc<ConsensusAdapter>,
442        validator_metrics: Arc<ValidatorServiceMetrics>,
443        client_id_source: Option<ClientIdSource>,
444        admission_queue: Option<AdmissionQueueContext>,
445    ) -> Self {
446        let traffic_controller = state.traffic_controller.clone();
447        let gasless_limiter = GaslessRateLimiter::new(state.consensus_gasless_counter.clone());
448        Self {
449            state,
450            consensus_adapter,
451            metrics: validator_metrics,
452            traffic_controller,
453            client_id_source,
454            gasless_limiter,
455            admission_queue,
456        }
457    }
458
459    pub fn new_for_tests(
460        state: Arc<AuthorityState>,
461        consensus_adapter: Arc<ConsensusAdapter>,
462        metrics: Arc<ValidatorServiceMetrics>,
463    ) -> Self {
464        let gasless_limiter = GaslessRateLimiter::new(state.consensus_gasless_counter.clone());
465        let epoch_store = state.epoch_store_for_testing().clone();
466        let slot_freed_notify = Arc::new(tokio::sync::Notify::new());
467        let manager = Arc::new(AdmissionQueueManager::new_for_tests(
468            consensus_adapter.clone(),
469            slot_freed_notify,
470        ));
471        let admission_queue = Some(AdmissionQueueContext::spawn(manager, epoch_store));
472        Self {
473            state,
474            consensus_adapter,
475            metrics,
476            traffic_controller: None,
477            client_id_source: None,
478            gasless_limiter,
479            admission_queue,
480        }
481    }
482
483    pub fn validator_state(&self) -> &Arc<AuthorityState> {
484        &self.state
485    }
486
487    /// Test method that performs transaction validation without going through gRPC.
488    pub fn handle_transaction_for_testing(&self, transaction: Transaction) -> SuiResult<()> {
489        let epoch_store = self.state.load_epoch_store_one_call_per_task();
490
491        // Validity check (basic structural validation)
492        transaction.validity_check(&epoch_store.tx_validity_check_context())?;
493
494        // Signature verification
495        let transaction = epoch_store
496            .verify_transaction_require_no_aliases(transaction)?
497            .into_tx();
498
499        // Validate the transaction
500        self.state
501            .handle_vote_transaction(&epoch_store, transaction)?;
502
503        Ok(())
504    }
505
506    /// Test method that performs transaction validation with overload checking.
507    /// Used for testing validator overload behavior.
508    pub fn handle_transaction_for_testing_with_overload_check(
509        &self,
510        transaction: Transaction,
511    ) -> SuiResult<()> {
512        let epoch_store = self.state.load_epoch_store_one_call_per_task();
513
514        // Validity check (basic structural validation)
515        transaction.validity_check(&epoch_store.tx_validity_check_context())?;
516
517        // Check system overload
518        self.state.check_system_overload(
519            transaction.data(),
520            self.state.check_system_overload_at_signing(),
521        )?;
522
523        // Signature verification
524        let transaction = epoch_store
525            .verify_transaction_require_no_aliases(transaction)?
526            .into_tx();
527
528        // Validate the transaction
529        self.state
530            .handle_vote_transaction(&epoch_store, transaction)?;
531
532        Ok(())
533    }
534
535    /// Collect the IDs of input objects that are immutable.
536    /// This is used to create the ImmutableInputObjects claim for consensus messages.
537    async fn collect_immutable_object_ids(
538        &self,
539        tx: &VerifiedTransaction,
540        state: &AuthorityState,
541    ) -> SuiResult<Vec<ObjectID>> {
542        let input_objects = tx.data().transaction_data().input_objects()?;
543
544        // Collect object IDs from ImmOrOwnedMoveObject inputs
545        let object_ids: Vec<ObjectID> = input_objects
546            .iter()
547            .filter_map(|obj| match obj {
548                InputObjectKind::ImmOrOwnedMoveObject((id, _, _)) => Some(*id),
549                _ => None,
550            })
551            .collect();
552        if object_ids.is_empty() {
553            return Ok(vec![]);
554        }
555
556        // Load objects from cache and filter to immutable ones
557        let objects = state.get_object_cache_reader().get_objects(&object_ids);
558
559        // All objects should be found, since owned input objects have been validated to exist.
560        objects
561            .into_iter()
562            .zip_debug_eq(object_ids.iter())
563            .filter_map(|(obj, id)| {
564                let Some(o) = obj else {
565                    return Some(Err::<ObjectID, SuiError>(
566                        SuiErrorKind::UserInputError {
567                            error: UserInputError::ObjectNotFound {
568                                object_id: *id,
569                                version: None,
570                            },
571                        }
572                        .into(),
573                    ));
574                };
575                if o.is_immutable() {
576                    Some(Ok(*id))
577                } else {
578                    None
579                }
580            })
581            .collect::<SuiResult<Vec<ObjectID>>>()
582    }
583
584    #[instrument(
585        name = "ValidatorService::handle_submit_transaction",
586        level = "error",
587        skip_all,
588        err(level = "debug")
589    )]
590    async fn handle_submit_transaction(
591        &self,
592        request: tonic::Request<RawSubmitTxRequest>,
593    ) -> WrappedServiceResponse<RawSubmitTxResponse> {
594        let Self {
595            state,
596            consensus_adapter: _,
597            metrics,
598            traffic_controller: _,
599            client_id_source,
600            gasless_limiter: _,
601            admission_queue: _,
602        } = self.clone();
603
604        let submitter_client_addr = if let Some(client_id_source) = &client_id_source {
605            self.get_client_ip_addr(&request, client_id_source)
606        } else {
607            self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
608        };
609
610        let inner = request.into_inner();
611        let start_epoch = state.load_epoch_store_one_call_per_task().epoch();
612
613        let next_epoch = start_epoch + 1;
614        let mut max_retries = 1;
615
616        loop {
617            let res = self
618                .handle_submit_transaction_inner(&state, &metrics, &inner, submitter_client_addr)
619                .await;
620            match res {
621                Ok((response, weight)) => return Ok((tonic::Response::new(response), weight)),
622                Err(err) => {
623                    if max_retries > 0
624                        && let SuiErrorKind::ValidatorHaltedAtEpochEnd = err.as_inner()
625                    {
626                        max_retries -= 1;
627
628                        debug!(
629                            "ValidatorHaltedAtEpochEnd. Will retry after validator reconfigures"
630                        );
631
632                        if let Ok(Ok(new_epoch)) =
633                            timeout(Duration::from_secs(15), state.wait_for_epoch(next_epoch)).await
634                        {
635                            assert_reachable!("retry submission at epoch end");
636                            if new_epoch == next_epoch {
637                                continue;
638                            }
639
640                            debug_fatal!(
641                                "expected epoch {} after reconfiguration. got {}",
642                                next_epoch,
643                                new_epoch
644                            );
645                        }
646                    }
647                    return Err(err.into());
648                }
649            }
650        }
651    }
652
653    async fn handle_submit_transaction_inner(
654        &self,
655        state: &AuthorityState,
656        metrics: &ValidatorServiceMetrics,
657        request: &RawSubmitTxRequest,
658        submitter_client_addr: Option<IpAddr>,
659    ) -> SuiResult<(RawSubmitTxResponse, Weight)> {
660        let epoch_store = state.load_epoch_store_one_call_per_task();
661        let submit_type = SubmitTxType::try_from(request.submit_type).map_err(|e| {
662            SuiErrorKind::GrpcMessageDeserializeError {
663                type_info: "RawSubmitTxRequest.submit_type".to_string(),
664                error: e.to_string(),
665            }
666        })?;
667
668        let is_ping_request = submit_type == SubmitTxType::Ping;
669        if is_ping_request {
670            fp_ensure!(
671                request.transactions.is_empty(),
672                SuiErrorKind::InvalidRequest(format!(
673                    "Ping request cannot contain {} transactions",
674                    request.transactions.len()
675                ))
676                .into()
677            );
678        } else {
679            // Ensure default and soft bundle requests contain at least one transaction.
680            fp_ensure!(
681                !request.transactions.is_empty(),
682                SuiErrorKind::InvalidRequest(
683                    "At least one transaction needs to be submitted".to_string(),
684                )
685                .into()
686            );
687        }
688
689        // NOTE: for soft bundle requests, the system tries to sequence the transactions in the same order
690        // if they use the same gas price. But this is only done with best effort.
691        // Transactions in a soft bundle can be individually rejected or deferred, without affecting
692        // other transactions in the same bundle.
693        let is_soft_bundle_request = submit_type == SubmitTxType::SoftBundle;
694
695        let max_num_transactions = if is_soft_bundle_request {
696            // Soft bundle cannot contain too many transactions.
697            // Otherwise it is hard to include all of them in a single block.
698            epoch_store.protocol_config().max_soft_bundle_size()
699        } else {
700            // Still enforce a limit even when transactions do not need to be in the same block.
701            epoch_store
702                .protocol_config()
703                .max_num_transactions_in_block()
704        };
705        fp_ensure!(
706            request.transactions.len() <= max_num_transactions as usize,
707            SuiErrorKind::InvalidRequest(format!(
708                "Too many transactions in request: {} vs {}",
709                request.transactions.len(),
710                max_num_transactions
711            ))
712            .into()
713        );
714
715        // Transaction digests.
716        let mut tx_digests = Vec::with_capacity(request.transactions.len());
717        // Transactions to submit to consensus.
718        let mut consensus_transactions = Vec::with_capacity(request.transactions.len());
719        // Indexes of transactions above in the request transactions.
720        let mut transaction_indexes = Vec::with_capacity(request.transactions.len());
721        // Results corresponding to each transaction in the request.
722        let mut results: Vec<Option<SubmitTxResult>> = vec![None; request.transactions.len()];
723        // Total size of all transactions in the request.
724        let mut total_size_bytes = 0;
725        // Traffic control spam weight to use for the transaction.
726        let mut spam_weight = Weight::zero();
727
728        let req_type = if is_ping_request {
729            "ping"
730        } else if request.transactions.len() == 1 {
731            "single_transaction"
732        } else if is_soft_bundle_request {
733            "soft_bundle"
734        } else {
735            "batch"
736        };
737
738        let _handle_tx_metrics_guard = metrics
739            .handle_submit_transaction_latency
740            .with_label_values(&[req_type])
741            .start_timer();
742
743        let submit_mode = self.classify_submit_mode(is_ping_request);
744
745        for (idx, tx_bytes) in request.transactions.iter().enumerate() {
746            let transaction = match bcs::from_bytes::<Transaction>(tx_bytes) {
747                Ok(txn) => txn,
748                Err(e) => {
749                    // Ok to fail the request when any transaction is invalid.
750                    return Err(SuiErrorKind::TransactionDeserializationError {
751                        error: format!("Failed to deserialize transaction at index {}: {}", idx, e),
752                    }
753                    .into());
754                }
755            };
756
757            // Ok to fail the request when any transaction is invalid.
758            let tx_size = transaction.validity_check(&epoch_store.tx_validity_check_context())?;
759            let is_gasless = transaction
760                .data()
761                .transaction_data()
762                .is_gasless_transaction();
763
764            if is_gasless {
765                // Gasless transactions count for traffic control, since they have no economic cost.
766                spam_weight = Weight::one();
767                metrics
768                    .gasless_submission_outcomes
769                    .with_label_values(&["attempted"])
770                    .inc();
771            }
772
773            let overload_check_res = state.check_system_overload(
774                transaction.data(),
775                state.check_system_overload_at_signing(),
776            );
777            if let Err(error) = overload_check_res {
778                metrics
779                    .num_rejected_tx_during_overload
780                    .with_label_values(&[error.as_ref()])
781                    .inc();
782                if is_gasless {
783                    metrics
784                        .gasless_submission_outcomes
785                        .with_label_values(&["rejected_overload"])
786                        .inc();
787                }
788                results[idx] = Some(SubmitTxResult::Rejected { error });
789                continue;
790            }
791
792            // Use the pre-queue per-tx consensus overload reject when the
793            // queue is disabled or in failover.
794            if matches!(submit_mode, AdmissionQueueSubmitMode::Disabled)
795                && let Err(error) = self.consensus_adapter.check_consensus_overload()
796            {
797                state.update_overload_metrics("consensus");
798                metrics
799                    .num_rejected_tx_during_overload
800                    .with_label_values(&[error.as_ref()])
801                    .inc();
802                results[idx] = Some(SubmitTxResult::Rejected { error });
803                continue;
804            }
805
806            if is_gasless
807                && !self
808                    .gasless_limiter
809                    .try_acquire(epoch_store.protocol_config())
810            {
811                metrics.gasless_rate_limited_count.inc();
812                metrics
813                    .gasless_submission_outcomes
814                    .with_label_values(&["rejected_rate_limited"])
815                    .inc();
816                results[idx] = Some(SubmitTxResult::Rejected {
817                    error: SuiErrorKind::ValidatorOverloadedRetryAfter {
818                        retry_after_secs: 1,
819                    }
820                    .into(),
821                });
822                continue;
823            }
824
825            // Ok to fail the request when any signature is invalid.
826            let verified_transaction = {
827                let _metrics_guard = metrics.tx_verification_latency.start_timer();
828                if epoch_store.protocol_config().address_aliases() {
829                    match epoch_store.verify_transaction_with_current_aliases(transaction) {
830                        Ok(tx) => tx,
831                        Err(e) => {
832                            metrics.signature_errors.inc();
833                            return Err(e);
834                        }
835                    }
836                } else {
837                    match epoch_store.verify_transaction_require_no_aliases(transaction) {
838                        Ok(tx) => tx,
839                        Err(e) => {
840                            metrics.signature_errors.inc();
841                            return Err(e);
842                        }
843                    }
844                }
845            };
846
847            let tx_digest = *verified_transaction.tx().digest();
848            debug!(
849                ?tx_digest,
850                "handle_submit_transaction: verified transaction"
851            );
852
853            // Check if the transaction has executed, before checking input objects
854            // which could have been consumed.
855            if let Some(effects) = state
856                .get_transaction_cache_reader()
857                .get_executed_effects(&tx_digest)
858            {
859                let effects_digest = effects.digest();
860                if let Ok(executed_data) = self.complete_executed_data(effects).await {
861                    let executed_result = SubmitTxResult::Executed {
862                        effects_digest,
863                        details: Some(executed_data),
864                    };
865                    results[idx] = Some(executed_result);
866                    debug!(?tx_digest, "handle_submit_transaction: already executed");
867                    continue;
868                }
869            }
870
871            if self
872                .state
873                .get_transaction_cache_reader()
874                .transaction_executed_in_last_epoch(&tx_digest, epoch_store.epoch())
875            {
876                results[idx] = Some(SubmitTxResult::Rejected {
877                    error: UserInputError::TransactionAlreadyExecuted { digest: tx_digest }.into(),
878                });
879                debug!(
880                    ?tx_digest,
881                    "handle_submit_transaction: transaction already executed in previous epoch"
882                );
883                continue;
884            }
885
886            debug!(
887                ?tx_digest,
888                "handle_submit_transaction: waiting for fastpath dependency objects"
889            );
890            if !state
891                .wait_for_fastpath_dependency_objects(
892                    verified_transaction.tx(),
893                    epoch_store.epoch(),
894                )
895                .await?
896            {
897                debug!(
898                    ?tx_digest,
899                    "fastpath input objects are still unavailable after waiting"
900                );
901            }
902
903            match state.handle_vote_transaction(&epoch_store, verified_transaction.tx().clone()) {
904                Ok(_) => { /* continue processing */ }
905                Err(e) => {
906                    // Check if transaction has been executed while being validated.
907                    // This is an edge case so checking executed effects twice is acceptable.
908                    if let Some(effects) = state
909                        .get_transaction_cache_reader()
910                        .get_executed_effects(&tx_digest)
911                    {
912                        let effects_digest = effects.digest();
913                        if let Ok(executed_data) = self.complete_executed_data(effects).await {
914                            let executed_result = SubmitTxResult::Executed {
915                                effects_digest,
916                                details: Some(executed_data),
917                            };
918                            results[idx] = Some(executed_result);
919                            continue;
920                        }
921                    }
922
923                    // When the transaction has not been executed, record the error for the transaction.
924                    debug!(?tx_digest, "Transaction rejected during submission: {e}");
925                    metrics
926                        .submission_rejected_transactions
927                        .with_label_values(&[e.to_variant_name()])
928                        .inc();
929                    results[idx] = Some(SubmitTxResult::Rejected { error: e });
930                    continue;
931                }
932            }
933
934            // Create claims with aliases and / or immutable objects.
935            let mut claims = vec![];
936
937            let immutable_object_ids = self
938                .collect_immutable_object_ids(verified_transaction.tx(), state)
939                .await?;
940            if !immutable_object_ids.is_empty() {
941                claims.push(TransactionClaim::ImmutableInputObjects(
942                    immutable_object_ids,
943                ));
944            }
945
946            let (tx, aliases) = verified_transaction.into_inner();
947            if epoch_store.protocol_config().address_aliases() {
948                if epoch_store
949                    .protocol_config()
950                    .fix_checkpoint_signature_mapping()
951                {
952                    claims.push(TransactionClaim::AddressAliasesV2(aliases));
953                } else {
954                    let v1_aliases: Vec<_> = tx
955                        .data()
956                        .intent_message()
957                        .value
958                        .required_signers()
959                        .into_iter()
960                        .zip_eq(aliases.into_iter().map(|(_, seq)| seq))
961                        .collect();
962                    #[allow(deprecated)]
963                    claims.push(TransactionClaim::AddressAliases(
964                        nonempty::NonEmpty::from_vec(v1_aliases)
965                            .expect("must have at least one required_signer"),
966                    ));
967                }
968            }
969
970            let tx_with_claims = TransactionWithClaims::new(tx.into(), claims);
971
972            consensus_transactions.push(ConsensusTransaction::new_user_transaction_v2_message(
973                &state.name,
974                tx_with_claims,
975            ));
976            if is_gasless {
977                metrics
978                    .gasless_submission_outcomes
979                    .with_label_values(&["submitted"])
980                    .inc();
981            }
982
983            transaction_indexes.push(idx);
984            tx_digests.push(tx_digest);
985            total_size_bytes += tx_size;
986        }
987
988        if consensus_transactions.is_empty() && !is_ping_request {
989            return Ok((Self::try_from_submit_tx_response(results)?, spam_weight));
990        }
991
992        // Set the max bytes size of the soft bundle to be half of the consensus max transactions in block size.
993        // We do this to account for serialization overheads and to ensure that the soft bundle is not too large
994        // when is attempted to be posted via consensus.
995        let max_transaction_bytes = if is_soft_bundle_request {
996            epoch_store
997                .protocol_config()
998                .consensus_max_transactions_in_block_bytes()
999                / 2
1000        } else {
1001            epoch_store
1002                .protocol_config()
1003                .consensus_max_transactions_in_block_bytes()
1004        };
1005        fp_ensure!(
1006            total_size_bytes <= max_transaction_bytes as usize,
1007            SuiErrorKind::UserInputError {
1008                error: UserInputError::TotalTransactionSizeTooLargeInBatch {
1009                    size: total_size_bytes,
1010                    limit: max_transaction_bytes,
1011                },
1012            }
1013            .into()
1014        );
1015
1016        metrics
1017            .handle_submit_transaction_bytes
1018            .with_label_values(&[req_type])
1019            .observe(total_size_bytes as f64);
1020        metrics
1021            .handle_submit_transaction_batch_size
1022            .with_label_values(&[req_type])
1023            .observe(consensus_transactions.len() as f64);
1024
1025        let _latency_metric_guard = metrics
1026            .handle_submit_transaction_consensus_latency
1027            .with_label_values(&[req_type])
1028            .start_timer();
1029
1030        if is_soft_bundle_request {
1031            // We only allow the `consensus_transactions` to be empty for ping requests. This is how it should and is be treated from the downstream components.
1032            // For any other case, having an empty `consensus_transactions` vector is an invalid state and we should have never reached at this point.
1033            assert!(
1034                !consensus_transactions.is_empty(),
1035                "A valid soft bundle must have at least one transaction"
1036            );
1037        }
1038
1039        // Soft bundles are inserted as a single queue entry.
1040        // Individual transactions are each inserted separately.
1041        let tx_groups: Vec<Vec<ConsensusTransaction>> = if is_soft_bundle_request || is_ping_request
1042        {
1043            vec![consensus_transactions]
1044        } else {
1045            consensus_transactions
1046                .into_iter()
1047                .map(|t| vec![t])
1048                .collect()
1049        };
1050
1051        let consensus_positions: Vec<ConsensusPosition> = match submit_mode {
1052            AdmissionQueueSubmitMode::Bypass | AdmissionQueueSubmitMode::Disabled => {
1053                if matches!(submit_mode, AdmissionQueueSubmitMode::Bypass) {
1054                    self.metrics.admission_queue_direct_bypasses.inc();
1055                }
1056                let futures = tx_groups.into_iter().map(|txns| {
1057                    debug!(
1058                        "handle_submit_transaction: submitting consensus transactions ({}): {}",
1059                        req_type,
1060                        txns.iter().map(|t| t.local_display()).join(", ")
1061                    );
1062                    self.consensus_adapter.submit_and_get_positions(
1063                        txns,
1064                        &epoch_store,
1065                        submitter_client_addr,
1066                    )
1067                });
1068                future::try_join_all(futures)
1069                    .await?
1070                    .into_iter()
1071                    .flatten()
1072                    .collect()
1073            }
1074            AdmissionQueueSubmitMode::Queue => {
1075                let aq = self
1076                    .admission_queue
1077                    .as_ref()
1078                    .expect("Queue mode implies admission_queue is Some")
1079                    .load();
1080                let mut receivers = Vec::with_capacity(tx_groups.len());
1081                for txns in tx_groups {
1082                    let gas_price = Self::extract_gas_price(&txns);
1083                    let (rx, newly_inserted) = aq
1084                        .try_insert(gas_price, txns, submitter_client_addr)
1085                        .await?;
1086                    if !newly_inserted {
1087                        // Count duplicate tx submissions towards spam tallies.
1088                        spam_weight = Weight::one();
1089                    }
1090                    receivers.push(rx);
1091                }
1092                let results = future::try_join_all(receivers.into_iter().map(|rx| async move {
1093                    rx.await.map_err(|_| {
1094                        SuiError::from(SuiErrorKind::TooManyTransactionsPendingConsensus)
1095                    })?
1096                }))
1097                .await?;
1098                results.into_iter().flatten().collect()
1099            }
1100        };
1101
1102        if is_ping_request {
1103            // For ping requests, return the special consensus position.
1104            assert_eq!(consensus_positions.len(), 1);
1105            results.push(Some(SubmitTxResult::Submitted {
1106                consensus_position: consensus_positions[0],
1107            }));
1108        } else {
1109            // Otherwise, return the consensus position for each transaction.
1110            for ((idx, tx_digest), consensus_position) in transaction_indexes
1111                .into_iter()
1112                .zip_debug_eq(tx_digests)
1113                .zip_debug_eq(consensus_positions)
1114            {
1115                debug!(
1116                    ?tx_digest,
1117                    "handle_submit_transaction: submitted consensus transaction at {}",
1118                    consensus_position,
1119                );
1120                results[idx] = Some(SubmitTxResult::Submitted { consensus_position });
1121            }
1122        }
1123
1124        Ok((Self::try_from_submit_tx_response(results)?, spam_weight))
1125    }
1126
1127    fn try_from_submit_tx_response(
1128        results: Vec<Option<SubmitTxResult>>,
1129    ) -> Result<RawSubmitTxResponse, SuiError> {
1130        let mut raw_results = Vec::new();
1131        for (i, result) in results.into_iter().enumerate() {
1132            let result = result.ok_or_else(|| SuiErrorKind::GenericAuthorityError {
1133                error: format!("Missing transaction result at {}", i),
1134            })?;
1135            let raw_result = result.try_into()?;
1136            raw_results.push(raw_result);
1137        }
1138        Ok(RawSubmitTxResponse {
1139            results: raw_results,
1140        })
1141    }
1142
1143    /// Extract the gas price from a batch of consensus transactions.
1144    /// Returns the minimum gas price in the batch, or 0 if no user transactions.
1145    fn extract_gas_price(transactions: &[ConsensusTransaction]) -> u64 {
1146        use sui_types::messages_consensus::ConsensusTransactionKind;
1147        transactions
1148            .iter()
1149            .filter_map(|tx| match &tx.kind {
1150                ConsensusTransactionKind::CertifiedTransaction(cert) => Some(cert.gas_price()),
1151                ConsensusTransactionKind::UserTransaction(t) => {
1152                    Some(t.data().transaction_data().gas_price())
1153                }
1154                ConsensusTransactionKind::UserTransactionV2(t) => {
1155                    Some(t.tx().data().transaction_data().gas_price())
1156                }
1157                _ => None,
1158            })
1159            .min()
1160            .unwrap_or(0)
1161    }
1162
1163    fn classify_submit_mode(&self, is_ping_request: bool) -> AdmissionQueueSubmitMode {
1164        let Some(aq) = &self.admission_queue else {
1165            return AdmissionQueueSubmitMode::Disabled;
1166        };
1167
1168        if is_ping_request {
1169            return AdmissionQueueSubmitMode::Bypass;
1170        }
1171
1172        let inflight = usize::try_from(self.consensus_adapter.num_inflight_transactions()).unwrap();
1173        if inflight < aq.bypass_threshold() {
1174            return AdmissionQueueSubmitMode::Bypass;
1175        }
1176
1177        // Failover is consulted only on the overloaded path so the hot path
1178        // avoids the ArcSwap load.
1179        if aq.load().failover_tripped() {
1180            return AdmissionQueueSubmitMode::Disabled;
1181        }
1182
1183        AdmissionQueueSubmitMode::Queue
1184    }
1185
1186    async fn collect_effects_data(
1187        &self,
1188        effects: &TransactionEffects,
1189        include_events: bool,
1190        include_input_objects: bool,
1191        include_output_objects: bool,
1192    ) -> SuiResult<(Option<TransactionEvents>, Vec<Object>, Vec<Object>)> {
1193        let events = if include_events && effects.events_digest().is_some() {
1194            Some(
1195                self.state
1196                    .get_transaction_events(effects.transaction_digest())?,
1197            )
1198        } else {
1199            None
1200        };
1201
1202        let input_objects = if include_input_objects {
1203            self.state.get_transaction_input_objects(effects)?
1204        } else {
1205            vec![]
1206        };
1207
1208        let output_objects = if include_output_objects {
1209            self.state.get_transaction_output_objects(effects)?
1210        } else {
1211            vec![]
1212        };
1213
1214        Ok((events, input_objects, output_objects))
1215    }
1216}
1217
1218type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
1219
1220impl ValidatorService {
1221    async fn handle_submit_transaction_impl(
1222        &self,
1223        request: tonic::Request<RawSubmitTxRequest>,
1224    ) -> WrappedServiceResponse<RawSubmitTxResponse> {
1225        self.handle_submit_transaction(request).await
1226    }
1227
1228    async fn wait_for_effects_impl(
1229        &self,
1230        request: tonic::Request<RawWaitForEffectsRequest>,
1231    ) -> WrappedServiceResponse<RawWaitForEffectsResponse> {
1232        let request: WaitForEffectsRequest = request.into_inner().try_into()?;
1233        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1234        let response = timeout(
1235            // TODO(fastpath): Tune this once we have a good estimate of the typical delay.
1236            Duration::from_secs(20),
1237            epoch_store
1238                .within_alive_epoch(self.wait_for_effects_response(request, &epoch_store))
1239                .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch())),
1240        )
1241        .await
1242        .map_err(|_| tonic::Status::internal("Timeout waiting for effects"))???
1243        .try_into()?;
1244        Ok((tonic::Response::new(response), Weight::zero()))
1245    }
1246
1247    #[instrument(name= "ValidatorService::wait_for_effects_response", level = "error", skip_all, fields(consensus_position = ?request.consensus_position))]
1248    async fn wait_for_effects_response(
1249        &self,
1250        request: WaitForEffectsRequest,
1251        epoch_store: &Arc<AuthorityPerEpochStore>,
1252    ) -> SuiResult<WaitForEffectsResponse> {
1253        if request.ping_type.is_some() {
1254            return timeout(
1255                Duration::from_secs(10),
1256                self.ping_response(request, epoch_store),
1257            )
1258            .await
1259            .map_err(|_| SuiErrorKind::TimeoutError)?;
1260        }
1261
1262        let Some(tx_digest) = request.transaction_digest else {
1263            return Err(SuiErrorKind::InvalidRequest(
1264                "Transaction digest is required for wait for effects requests".to_string(),
1265            )
1266            .into());
1267        };
1268        let tx_digests = [tx_digest];
1269
1270        // When consensus_position is provided, also watch the consensus status cache
1271        // so rejected/dropped transactions get a timely response instead of waiting
1272        // forever for effects that will never be produced.
1273        let consensus_status_future = async {
1274            let consensus_position = match request.consensus_position {
1275                Some(pos) => pos,
1276                None => return futures::future::pending().await,
1277            };
1278            let consensus_tx_status_cache = &epoch_store.consensus_tx_status_cache;
1279            consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1280            match consensus_tx_status_cache
1281                .notify_read_transaction_status(consensus_position)
1282                .await
1283            {
1284                NotifyReadConsensusTxStatusResult::Status(
1285                    ConsensusTxStatus::Rejected | ConsensusTxStatus::Dropped,
1286                ) => Ok(WaitForEffectsResponse::Rejected {
1287                    error: epoch_store.get_rejection_vote_reason(consensus_position),
1288                }),
1289                NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized) => {
1290                    // Effects will be produced — yield to let the effects future win.
1291                    futures::future::pending().await
1292                }
1293                NotifyReadConsensusTxStatusResult::Expired(round) => {
1294                    Ok(WaitForEffectsResponse::Expired {
1295                        epoch: epoch_store.epoch(),
1296                        round: Some(round),
1297                    })
1298                }
1299            }
1300        };
1301
1302        tokio::select! {
1303            effects_result = self.state
1304                .get_transaction_cache_reader()
1305                .notify_read_executed_effects_may_fail(
1306                    "AuthorityServer::wait_for_effects::notify_read_executed_effects_finalized",
1307                    &tx_digests,
1308                ) => {
1309                let effects = effects_result?.pop().unwrap();
1310                let effects_digest = effects.digest();
1311                let details = if request.include_details {
1312                    Some(self.complete_executed_data(effects).await?)
1313                } else {
1314                    None
1315                };
1316                Ok(WaitForEffectsResponse::Executed {
1317                    effects_digest,
1318                    details,
1319                })
1320            }
1321            status_response = consensus_status_future => {
1322                status_response
1323            }
1324        }
1325    }
1326
1327    #[instrument(level = "error", skip_all, err(level = "debug"))]
1328    async fn ping_response(
1329        &self,
1330        request: WaitForEffectsRequest,
1331        epoch_store: &Arc<AuthorityPerEpochStore>,
1332    ) -> SuiResult<WaitForEffectsResponse> {
1333        let consensus_tx_status_cache = &epoch_store.consensus_tx_status_cache;
1334
1335        let Some(consensus_position) = request.consensus_position else {
1336            return Err(SuiErrorKind::InvalidRequest(
1337                "Consensus position is required for Ping requests".to_string(),
1338            )
1339            .into());
1340        };
1341
1342        // We assume that the caller has already checked for the existence of the `ping` field, but handling it gracefully here.
1343        let Some(ping) = request.ping_type else {
1344            return Err(SuiErrorKind::InvalidRequest(
1345                "Ping type is required for ping requests".to_string(),
1346            )
1347            .into());
1348        };
1349
1350        let _metrics_guard = self
1351            .metrics
1352            .handle_wait_for_effects_ping_latency
1353            .with_label_values(&[ping.as_str()])
1354            .start_timer();
1355
1356        consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1357
1358        let details = if request.include_details {
1359            Some(Box::new(ExecutedData::default()))
1360        } else {
1361            None
1362        };
1363
1364        let status = consensus_tx_status_cache
1365            .notify_read_transaction_status(consensus_position)
1366            .await;
1367        match status {
1368            NotifyReadConsensusTxStatusResult::Status(status) => match status {
1369                ConsensusTxStatus::Rejected | ConsensusTxStatus::Dropped => {
1370                    Ok(WaitForEffectsResponse::Rejected {
1371                        error: epoch_store.get_rejection_vote_reason(consensus_position),
1372                    })
1373                }
1374                ConsensusTxStatus::Finalized => Ok(WaitForEffectsResponse::Executed {
1375                    effects_digest: TransactionEffectsDigest::ZERO,
1376                    details,
1377                }),
1378            },
1379            NotifyReadConsensusTxStatusResult::Expired(round) => {
1380                Ok(WaitForEffectsResponse::Expired {
1381                    epoch: epoch_store.epoch(),
1382                    round: Some(round),
1383                })
1384            }
1385        }
1386    }
1387
1388    async fn complete_executed_data(
1389        &self,
1390        effects: TransactionEffects,
1391    ) -> SuiResult<Box<ExecutedData>> {
1392        let (events, input_objects, output_objects) = self
1393            .collect_effects_data(
1394                &effects, /* include_events */ true, /* include_input_objects */ true,
1395                /* include_output_objects */ true,
1396            )
1397            .await?;
1398        Ok(Box::new(ExecutedData {
1399            effects,
1400            events,
1401            input_objects,
1402            output_objects,
1403        }))
1404    }
1405
1406    async fn object_info_impl(
1407        &self,
1408        request: tonic::Request<ObjectInfoRequest>,
1409    ) -> WrappedServiceResponse<ObjectInfoResponse> {
1410        let request = request.into_inner();
1411        let response = self.state.handle_object_info_request(request).await?;
1412        Ok((tonic::Response::new(response), Weight::one()))
1413    }
1414
1415    async fn transaction_info_impl(
1416        &self,
1417        request: tonic::Request<TransactionInfoRequest>,
1418    ) -> WrappedServiceResponse<TransactionInfoResponse> {
1419        let request = request.into_inner();
1420        let response = self.state.handle_transaction_info_request(request).await?;
1421        Ok((tonic::Response::new(response), Weight::one()))
1422    }
1423
1424    async fn checkpoint_impl(
1425        &self,
1426        request: tonic::Request<CheckpointRequest>,
1427    ) -> WrappedServiceResponse<CheckpointResponse> {
1428        let request = request.into_inner();
1429        let response = self.state.handle_checkpoint_request(&request)?;
1430        Ok((tonic::Response::new(response), Weight::one()))
1431    }
1432
1433    async fn checkpoint_v2_impl(
1434        &self,
1435        request: tonic::Request<CheckpointRequestV2>,
1436    ) -> WrappedServiceResponse<CheckpointResponseV2> {
1437        let request = request.into_inner();
1438        let response = self.state.handle_checkpoint_request_v2(&request)?;
1439        Ok((tonic::Response::new(response), Weight::one()))
1440    }
1441
1442    async fn get_system_state_object_impl(
1443        &self,
1444        _request: tonic::Request<SystemStateRequest>,
1445    ) -> WrappedServiceResponse<SuiSystemState> {
1446        let response = self
1447            .state
1448            .get_object_cache_reader()
1449            .get_sui_system_state_object_unsafe()?;
1450        Ok((tonic::Response::new(response), Weight::one()))
1451    }
1452
1453    async fn validator_health_impl(
1454        &self,
1455        _request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1456    ) -> WrappedServiceResponse<sui_types::messages_grpc::RawValidatorHealthResponse> {
1457        let state = &self.state;
1458
1459        // Get epoch store once for both metrics
1460        let epoch_store = state.load_epoch_store_one_call_per_task();
1461
1462        // Get in-flight execution transactions from execution scheduler
1463        let num_inflight_execution_transactions =
1464            state.execution_scheduler().num_pending_certificates() as u64;
1465
1466        // Get in-flight consensus transactions from consensus adapter
1467        let num_inflight_consensus_transactions =
1468            self.consensus_adapter.num_inflight_transactions();
1469
1470        // Get last committed leader round from epoch store
1471        let last_committed_leader_round = epoch_store
1472            .consensus_tx_status_cache
1473            .get_last_committed_leader_round()
1474            .unwrap_or(0);
1475
1476        // Get last locally built checkpoint sequence
1477        let last_locally_built_checkpoint = epoch_store
1478            .last_built_checkpoint_summary()
1479            .ok()
1480            .flatten()
1481            .map(|(_, summary)| summary.sequence_number)
1482            .unwrap_or(0);
1483
1484        let typed_response = sui_types::messages_grpc::ValidatorHealthResponse {
1485            num_inflight_consensus_transactions,
1486            num_inflight_execution_transactions,
1487            last_locally_built_checkpoint,
1488            last_committed_leader_round,
1489        };
1490
1491        let raw_response = typed_response
1492            .try_into()
1493            .map_err(|e: sui_types::error::SuiError| {
1494                tonic::Status::internal(format!("Failed to serialize health response: {}", e))
1495            })?;
1496
1497        Ok((tonic::Response::new(raw_response), Weight::one()))
1498    }
1499
1500    fn get_client_ip_addr<T>(
1501        &self,
1502        request: &tonic::Request<T>,
1503        source: &ClientIdSource,
1504    ) -> Option<IpAddr> {
1505        let forwarded_header = request.metadata().get_all("x-forwarded-for").iter().next();
1506
1507        if let Some(header) = forwarded_header {
1508            let num_hops = header
1509                .to_str()
1510                .map(|h| h.split(',').count().saturating_sub(1))
1511                .unwrap_or(0);
1512
1513            self.metrics.x_forwarded_for_num_hops.set(num_hops as f64);
1514        }
1515
1516        match source {
1517            ClientIdSource::SocketAddr => {
1518                let socket_addr: Option<SocketAddr> = request.remote_addr();
1519
1520                // We will hit this case if the IO type used does not
1521                // implement Connected or when using a unix domain socket.
1522                // TODO: once we have confirmed that no legitimate traffic
1523                // is hitting this case, we should reject such requests that
1524                // hit this case.
1525                if let Some(socket_addr) = socket_addr {
1526                    Some(socket_addr.ip())
1527                } else {
1528                    if cfg!(msim) {
1529                        // Ignore the error from simtests.
1530                    } else if cfg!(test) {
1531                        panic!("Failed to get remote address from request");
1532                    } else {
1533                        self.metrics.connection_ip_not_found.inc();
1534                        error!("Failed to get remote address from request");
1535                    }
1536                    None
1537                }
1538            }
1539            ClientIdSource::XForwardedFor(num_hops) => {
1540                let do_header_parse = |op: &MetadataValue<Ascii>| {
1541                    match op.to_str() {
1542                        Ok(header_val) => {
1543                            let header_contents =
1544                                header_val.split(',').map(str::trim).collect::<Vec<_>>();
1545                            if *num_hops == 0 {
1546                                error!(
1547                                    "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
1548                                    number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
1549                                    to this node. Skipping traffic controller request handling.",
1550                                    header_contents,
1551                                );
1552                                return None;
1553                            }
1554                            let contents_len = header_contents.len();
1555                            if contents_len < *num_hops {
1556                                error!(
1557                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1558                                    Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
1559                                    `client-id-source` in the node config.",
1560                                    header_contents, contents_len, num_hops, contents_len,
1561                                );
1562                                self.metrics.client_id_source_config_mismatch.inc();
1563                                return None;
1564                            }
1565                            let Some(client_ip) = header_contents.get(contents_len - num_hops)
1566                            else {
1567                                error!(
1568                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1569                                    Expected at least {} values. Skipping traffic controller request handling.",
1570                                    header_contents, contents_len, num_hops, contents_len,
1571                                );
1572                                return None;
1573                            };
1574                            parse_ip(client_ip).or_else(|| {
1575                                self.metrics.forwarded_header_parse_error.inc();
1576                                None
1577                            })
1578                        }
1579                        Err(e) => {
1580                            // TODO: once we have confirmed that no legitimate traffic
1581                            // is hitting this case, we should reject such requests that
1582                            // hit this case.
1583                            self.metrics.forwarded_header_invalid.inc();
1584                            error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
1585                            None
1586                        }
1587                    }
1588                };
1589                if let Some(op) = request.metadata().get("x-forwarded-for") {
1590                    do_header_parse(op)
1591                } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
1592                    do_header_parse(op)
1593                } else {
1594                    self.metrics.forwarded_header_not_included.inc();
1595                    error!(
1596                        "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
1597                    );
1598                    None
1599                }
1600            }
1601        }
1602    }
1603
1604    async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
1605        if let Some(traffic_controller) = &self.traffic_controller {
1606            if !traffic_controller.check(&client, &None).await {
1607                // Entity in blocklist
1608                Err(tonic::Status::from_error(
1609                    SuiErrorKind::TooManyRequests.into(),
1610                ))
1611            } else {
1612                Ok(())
1613            }
1614        } else {
1615            Ok(())
1616        }
1617    }
1618
1619    fn handle_traffic_resp<T>(
1620        &self,
1621        client: Option<IpAddr>,
1622        wrapped_response: WrappedServiceResponse<T>,
1623    ) -> Result<tonic::Response<T>, tonic::Status> {
1624        let (error, spam_weight, unwrapped_response) = match wrapped_response {
1625            Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
1626            Err(status) => (
1627                Some(SuiError::from(status.clone())),
1628                Weight::zero(),
1629                Err(status.clone()),
1630            ),
1631        };
1632
1633        if let Some(traffic_controller) = self.traffic_controller.clone() {
1634            traffic_controller.tally(TrafficTally {
1635                direct: client,
1636                through_fullnode: None,
1637                error_info: error.map(|e| {
1638                    let error_type = String::from(e.clone().as_ref());
1639                    let error_weight = normalize(e);
1640                    (error_weight, error_type)
1641                }),
1642                spam_weight,
1643                timestamp: SystemTime::now(),
1644            })
1645        }
1646        unwrapped_response
1647    }
1648}
1649
1650// TODO: refine error matching here
1651fn normalize(err: SuiError) -> Weight {
1652    match err.as_inner() {
1653        SuiErrorKind::UserInputError {
1654            error: UserInputError::IncorrectUserSignature { .. },
1655        } => Weight::one(),
1656        SuiErrorKind::InvalidSignature { .. }
1657        | SuiErrorKind::SignerSignatureAbsent { .. }
1658        | SuiErrorKind::SignerSignatureNumberMismatch { .. }
1659        | SuiErrorKind::IncorrectSigner { .. }
1660        | SuiErrorKind::UnknownSigner { .. }
1661        | SuiErrorKind::WrongEpoch { .. } => Weight::one(),
1662        _ => Weight::zero(),
1663    }
1664}
1665
1666/// Implements generic pre- and post-processing. Since this is on the critical
1667/// path, any heavy lifting should be done in a separate non-blocking task
1668/// unless it is necessary to override the return value.
1669#[macro_export]
1670macro_rules! handle_with_decoration {
1671    ($self:ident, $func_name:ident, $request:ident) => {{
1672        if $self.client_id_source.is_none() {
1673            return $self.$func_name($request).await.map(|(result, _)| result);
1674        }
1675
1676        let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
1677
1678        // check if either IP is blocked, in which case return early
1679        $self.handle_traffic_req(client.clone()).await?;
1680
1681        // handle traffic tallying
1682        let wrapped_response = $self.$func_name($request).await;
1683        $self.handle_traffic_resp(client, wrapped_response)
1684    }};
1685}
1686
1687#[async_trait]
1688impl Validator for ValidatorService {
1689    async fn submit_transaction(
1690        &self,
1691        request: tonic::Request<RawSubmitTxRequest>,
1692    ) -> Result<tonic::Response<RawSubmitTxResponse>, tonic::Status> {
1693        let validator_service = self.clone();
1694
1695        // Spawns a task which handles the transaction. The task will unconditionally continue
1696        // processing in the event that the client connection is dropped.
1697        spawn_monitored_task!(async move {
1698            // NB: traffic tally wrapping handled within the task rather than on task exit
1699            // to prevent an attacker from subverting traffic control by severing the connection
1700            handle_with_decoration!(validator_service, handle_submit_transaction_impl, request)
1701        })
1702        .await
1703        .unwrap()
1704    }
1705
1706    async fn wait_for_effects(
1707        &self,
1708        request: tonic::Request<RawWaitForEffectsRequest>,
1709    ) -> Result<tonic::Response<RawWaitForEffectsResponse>, tonic::Status> {
1710        handle_with_decoration!(self, wait_for_effects_impl, request)
1711    }
1712
1713    async fn object_info(
1714        &self,
1715        request: tonic::Request<ObjectInfoRequest>,
1716    ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
1717        handle_with_decoration!(self, object_info_impl, request)
1718    }
1719
1720    async fn transaction_info(
1721        &self,
1722        request: tonic::Request<TransactionInfoRequest>,
1723    ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
1724        handle_with_decoration!(self, transaction_info_impl, request)
1725    }
1726
1727    async fn checkpoint(
1728        &self,
1729        request: tonic::Request<CheckpointRequest>,
1730    ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
1731        handle_with_decoration!(self, checkpoint_impl, request)
1732    }
1733
1734    async fn checkpoint_v2(
1735        &self,
1736        request: tonic::Request<CheckpointRequestV2>,
1737    ) -> Result<tonic::Response<CheckpointResponseV2>, tonic::Status> {
1738        handle_with_decoration!(self, checkpoint_v2_impl, request)
1739    }
1740
1741    async fn get_system_state_object(
1742        &self,
1743        request: tonic::Request<SystemStateRequest>,
1744    ) -> Result<tonic::Response<SuiSystemState>, tonic::Status> {
1745        handle_with_decoration!(self, get_system_state_object_impl, request)
1746    }
1747
1748    async fn validator_health(
1749        &self,
1750        request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1751    ) -> Result<tonic::Response<sui_types::messages_grpc::RawValidatorHealthResponse>, tonic::Status>
1752    {
1753        handle_with_decoration!(self, validator_health_impl, request)
1754    }
1755}