sui_core/
authority_server.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use anyhow::Result;
6use async_trait::async_trait;
7use fastcrypto::traits::KeyPair;
8use futures::{TryFutureExt, future};
9use itertools::Itertools as _;
10use mysten_common::ZipDebugEqIteratorExt;
11use mysten_common::{assert_reachable, debug_fatal};
12use mysten_metrics::spawn_monitored_task;
13use prometheus::{
14    Gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, Registry,
15    register_gauge_with_registry, register_histogram_vec_with_registry,
16    register_histogram_with_registry, register_int_counter_vec_with_registry,
17    register_int_counter_with_registry,
18};
19use std::{
20    io,
21    net::{IpAddr, SocketAddr},
22    sync::Arc,
23    time::{Duration, SystemTime},
24};
25use sui_network::{
26    api::{Validator, ValidatorServer},
27    tonic,
28    validator::server::SUI_TLS_SERVER_NAME,
29};
30use sui_types::effects::TransactionEffectsAPI;
31use sui_types::message_envelope::Message;
32use sui_types::messages_consensus::ConsensusPosition;
33use sui_types::messages_consensus::ConsensusTransaction;
34use sui_types::messages_grpc::{
35    ObjectInfoRequest, ObjectInfoResponse, RawSubmitTxResponse, SystemStateRequest,
36    TransactionInfoRequest, TransactionInfoResponse,
37};
38use sui_types::multiaddr::Multiaddr;
39use sui_types::object::Object;
40use sui_types::sui_system_state::SuiSystemState;
41use sui_types::traffic_control::{ClientIdSource, Weight};
42use sui_types::{
43    base_types::ObjectID,
44    digests::TransactionEffectsDigest,
45    error::{SuiErrorKind, UserInputError},
46};
47use sui_types::{
48    effects::TransactionEffects,
49    messages_grpc::{
50        ExecutedData, RawSubmitTxRequest, RawWaitForEffectsRequest, RawWaitForEffectsResponse,
51        SubmitTxResult, WaitForEffectsRequest, WaitForEffectsResponse,
52    },
53};
54use sui_types::{effects::TransactionEvents, messages_grpc::SubmitTxType};
55use sui_types::{error::*, transaction::*};
56use sui_types::{
57    fp_ensure,
58    messages_checkpoint::{
59        CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
60    },
61};
62use tokio::sync::oneshot;
63use tokio::time::timeout;
64use tonic::metadata::{Ascii, MetadataValue};
65use tracing::{debug, error, info, instrument};
66
67use crate::consensus_adapter::ConnectionMonitorStatusForTests;
68use crate::gasless_rate_limiter::GaslessRateLimiter;
69use crate::{
70    authority::{AuthorityState, consensus_tx_status_cache::ConsensusTxStatus},
71    consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics},
72    traffic_controller::{TrafficController, parse_ip, policies::TrafficTally},
73};
74use crate::{
75    authority::{
76        authority_per_epoch_store::AuthorityPerEpochStore,
77        consensus_tx_status_cache::NotifyReadConsensusTxStatusResult,
78    },
79    checkpoints::CheckpointStore,
80    mysticeti_adapter::LazyMysticetiClient,
81};
82use sui_config::local_ip_utils::new_local_tcp_address_for_testing;
83
84#[cfg(test)]
85#[path = "unit_tests/server_tests.rs"]
86mod server_tests;
87
88#[cfg(test)]
89#[path = "unit_tests/wait_for_effects_tests.rs"]
90mod wait_for_effects_tests;
91
92#[cfg(test)]
93#[path = "unit_tests/submit_transaction_tests.rs"]
94mod submit_transaction_tests;
95
96pub struct AuthorityServerHandle {
97    server_handle: sui_network::validator::server::Server,
98}
99
100impl AuthorityServerHandle {
101    pub async fn join(self) -> Result<(), io::Error> {
102        self.server_handle.handle().wait_for_shutdown().await;
103        Ok(())
104    }
105
106    pub async fn kill(self) -> Result<(), io::Error> {
107        self.server_handle.handle().shutdown().await;
108        Ok(())
109    }
110
111    pub fn address(&self) -> &Multiaddr {
112        self.server_handle.local_addr()
113    }
114}
115
116pub struct AuthorityServer {
117    address: Multiaddr,
118    pub state: Arc<AuthorityState>,
119    consensus_adapter: Arc<ConsensusAdapter>,
120    pub metrics: Arc<ValidatorServiceMetrics>,
121}
122
123impl AuthorityServer {
124    pub fn new_for_test_with_consensus_adapter(
125        state: Arc<AuthorityState>,
126        consensus_adapter: Arc<ConsensusAdapter>,
127    ) -> Self {
128        let address = new_local_tcp_address_for_testing();
129        let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
130
131        Self {
132            address,
133            state,
134            consensus_adapter,
135            metrics,
136        }
137    }
138
139    pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
140        let consensus_adapter = Arc::new(ConsensusAdapter::new(
141            Arc::new(LazyMysticetiClient::new()),
142            CheckpointStore::new_for_tests(),
143            state.name,
144            Arc::new(ConnectionMonitorStatusForTests {}),
145            100_000,
146            100_000,
147            None,
148            None,
149            ConsensusAdapterMetrics::new_test(),
150            state.epoch_store_for_testing().protocol_config().clone(),
151        ));
152        Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
153    }
154
155    pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
156        let address = self.address.clone();
157        self.spawn_with_bind_address_for_test(address).await
158    }
159
160    pub async fn spawn_with_bind_address_for_test(
161        self,
162        address: Multiaddr,
163    ) -> Result<AuthorityServerHandle, io::Error> {
164        let tls_config = sui_tls::create_rustls_server_config(
165            self.state.config.network_key_pair().copy().private(),
166            SUI_TLS_SERVER_NAME.to_string(),
167        );
168        let config = mysten_network::config::Config::new();
169        let server = sui_network::validator::server::ServerBuilder::from_config(
170            &config,
171            mysten_network::metrics::DefaultMetricsCallbackProvider::default(),
172        )
173        .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
174            self.state,
175            self.consensus_adapter,
176            self.metrics,
177        )))
178        .bind(&address, Some(tls_config))
179        .await
180        .unwrap();
181        let local_addr = server.local_addr().to_owned();
182        info!("Listening to traffic on {local_addr}");
183        let handle = AuthorityServerHandle {
184            server_handle: server,
185        };
186        Ok(handle)
187    }
188}
189
190pub struct ValidatorServiceMetrics {
191    pub signature_errors: IntCounter,
192    pub tx_verification_latency: Histogram,
193    pub cert_verification_latency: Histogram,
194    pub consensus_latency: Histogram,
195    pub handle_transaction_latency: Histogram,
196    pub submit_certificate_consensus_latency: Histogram,
197    pub handle_certificate_consensus_latency: Histogram,
198    pub handle_certificate_non_consensus_latency: Histogram,
199    pub handle_soft_bundle_certificates_consensus_latency: Histogram,
200    pub handle_soft_bundle_certificates_count: Histogram,
201    pub handle_soft_bundle_certificates_size_bytes: Histogram,
202    pub handle_transaction_consensus_latency: Histogram,
203    pub handle_submit_transaction_consensus_latency: HistogramVec,
204    pub handle_wait_for_effects_ping_latency: HistogramVec,
205
206    handle_submit_transaction_latency: HistogramVec,
207    handle_submit_transaction_bytes: HistogramVec,
208    handle_submit_transaction_batch_size: HistogramVec,
209
210    num_rejected_cert_in_epoch_boundary: IntCounter,
211    num_rejected_tx_during_overload: IntCounterVec,
212    submission_rejected_transactions: IntCounterVec,
213    connection_ip_not_found: IntCounter,
214    forwarded_header_parse_error: IntCounter,
215    forwarded_header_invalid: IntCounter,
216    forwarded_header_not_included: IntCounter,
217    client_id_source_config_mismatch: IntCounter,
218    x_forwarded_for_num_hops: Gauge,
219    pub gasless_rate_limited_count: IntCounter,
220}
221
222impl ValidatorServiceMetrics {
223    pub fn new(registry: &Registry) -> Self {
224        Self {
225            signature_errors: register_int_counter_with_registry!(
226                "total_signature_errors",
227                "Number of transaction signature errors",
228                registry,
229            )
230            .unwrap(),
231            tx_verification_latency: register_histogram_with_registry!(
232                "validator_service_tx_verification_latency",
233                "Latency of verifying a transaction",
234                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
235                registry,
236            )
237            .unwrap(),
238            cert_verification_latency: register_histogram_with_registry!(
239                "validator_service_cert_verification_latency",
240                "Latency of verifying a certificate",
241                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
242                registry,
243            )
244            .unwrap(),
245            consensus_latency: register_histogram_with_registry!(
246                "validator_service_consensus_latency",
247                "Time spent between submitting a txn to consensus and getting back local acknowledgement. Execution and finalization time are not included.",
248                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
249                registry,
250            )
251            .unwrap(),
252            handle_transaction_latency: register_histogram_with_registry!(
253                "validator_service_handle_transaction_latency",
254                "Latency of handling a transaction",
255                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
256                registry,
257            )
258            .unwrap(),
259            handle_certificate_consensus_latency: register_histogram_with_registry!(
260                "validator_service_handle_certificate_consensus_latency",
261                "Latency of handling a consensus transaction certificate",
262                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
263                registry,
264            )
265            .unwrap(),
266            submit_certificate_consensus_latency: register_histogram_with_registry!(
267                "validator_service_submit_certificate_consensus_latency",
268                "Latency of submit_certificate RPC handler",
269                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
270                registry,
271            )
272            .unwrap(),
273            handle_certificate_non_consensus_latency: register_histogram_with_registry!(
274                "validator_service_handle_certificate_non_consensus_latency",
275                "Latency of handling a non-consensus transaction certificate",
276                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
277                registry,
278            )
279            .unwrap(),
280            handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
281                "validator_service_handle_soft_bundle_certificates_consensus_latency",
282                "Latency of handling a consensus soft bundle",
283                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
284                registry,
285            )
286            .unwrap(),
287            handle_soft_bundle_certificates_count: register_histogram_with_registry!(
288                "handle_soft_bundle_certificates_count",
289                "The number of certificates included in a soft bundle",
290                mysten_metrics::COUNT_BUCKETS.to_vec(),
291                registry,
292            )
293            .unwrap(),
294            handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
295                "handle_soft_bundle_certificates_size_bytes",
296                "The size of soft bundle in bytes",
297                mysten_metrics::BYTES_BUCKETS.to_vec(),
298                registry,
299            )
300            .unwrap(),
301            handle_transaction_consensus_latency: register_histogram_with_registry!(
302                "validator_service_handle_transaction_consensus_latency",
303                "Latency of handling a user transaction sent through consensus",
304                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
305                registry,
306            )
307            .unwrap(),
308            handle_submit_transaction_consensus_latency: register_histogram_vec_with_registry!(
309                "validator_service_submit_transaction_consensus_latency",
310                "Latency of submitting a user transaction sent through consensus",
311                &["req_type"],
312                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
313                registry,
314            )
315            .unwrap(),
316            handle_submit_transaction_latency: register_histogram_vec_with_registry!(
317                "validator_service_submit_transaction_latency",
318                "Latency of submit transaction handler",
319                &["req_type"],
320                mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
321                registry,
322            )
323            .unwrap(),
324            handle_wait_for_effects_ping_latency: register_histogram_vec_with_registry!(
325                "validator_service_handle_wait_for_effects_ping_latency",
326                "Latency of handling a ping request for wait_for_effects",
327                &["req_type"],
328                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
329                registry,
330            )
331            .unwrap(),
332            handle_submit_transaction_bytes: register_histogram_vec_with_registry!(
333                "validator_service_submit_transaction_bytes",
334                "The size of transactions in the submit transaction request",
335                &["req_type"],
336                mysten_metrics::BYTES_BUCKETS.to_vec(),
337                registry,
338            )
339            .unwrap(),
340            handle_submit_transaction_batch_size: register_histogram_vec_with_registry!(
341                "validator_service_submit_transaction_batch_size",
342                "The number of transactions in the submit transaction request",
343                &["req_type"],
344                mysten_metrics::COUNT_BUCKETS.to_vec(),
345                registry,
346            )
347            .unwrap(),
348            num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
349                "validator_service_num_rejected_cert_in_epoch_boundary",
350                "Number of rejected transaction certificate during epoch transitioning",
351                registry,
352            )
353            .unwrap(),
354            num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
355                "validator_service_num_rejected_tx_during_overload",
356                "Number of rejected transaction due to system overload",
357                &["error_type"],
358                registry,
359            )
360            .unwrap(),
361            submission_rejected_transactions: register_int_counter_vec_with_registry!(
362                "validator_service_submission_rejected_transactions",
363                "Number of transactions rejected during submission",
364                &["reason"],
365                registry,
366            )
367            .unwrap(),
368            connection_ip_not_found: register_int_counter_with_registry!(
369                "validator_service_connection_ip_not_found",
370                "Number of times connection IP was not extractable from request",
371                registry,
372            )
373            .unwrap(),
374            forwarded_header_parse_error: register_int_counter_with_registry!(
375                "validator_service_forwarded_header_parse_error",
376                "Number of times x-forwarded-for header could not be parsed",
377                registry,
378            )
379            .unwrap(),
380            forwarded_header_invalid: register_int_counter_with_registry!(
381                "validator_service_forwarded_header_invalid",
382                "Number of times x-forwarded-for header was invalid",
383                registry,
384            )
385            .unwrap(),
386            forwarded_header_not_included: register_int_counter_with_registry!(
387                "validator_service_forwarded_header_not_included",
388                "Number of times x-forwarded-for header was (unexpectedly) not included in request",
389                registry,
390            )
391            .unwrap(),
392            client_id_source_config_mismatch: register_int_counter_with_registry!(
393                "validator_service_client_id_source_config_mismatch",
394                "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
395                registry,
396            )
397            .unwrap(),
398            x_forwarded_for_num_hops: register_gauge_with_registry!(
399                "validator_service_x_forwarded_for_num_hops",
400                "Number of hops in x-forwarded-for header",
401                registry,
402            )
403            .unwrap(),
404            gasless_rate_limited_count: register_int_counter_with_registry!(
405                "validator_service_gasless_rate_limited_count",
406                "Number of gasless transactions rejected by rate limiter",
407                registry,
408            )
409            .unwrap(),
410        }
411    }
412
413    pub fn new_for_tests() -> Self {
414        let registry = Registry::new();
415        Self::new(&registry)
416    }
417}
418
419#[derive(Clone)]
420pub struct ValidatorService {
421    state: Arc<AuthorityState>,
422    consensus_adapter: Arc<ConsensusAdapter>,
423    metrics: Arc<ValidatorServiceMetrics>,
424    traffic_controller: Option<Arc<TrafficController>>,
425    client_id_source: Option<ClientIdSource>,
426    gasless_limiter: GaslessRateLimiter,
427}
428
429impl ValidatorService {
430    pub fn new(
431        state: Arc<AuthorityState>,
432        consensus_adapter: Arc<ConsensusAdapter>,
433        validator_metrics: Arc<ValidatorServiceMetrics>,
434        client_id_source: Option<ClientIdSource>,
435    ) -> Self {
436        let traffic_controller = state.traffic_controller.clone();
437        let gasless_limiter = GaslessRateLimiter::new(state.consensus_gasless_counter.clone());
438        Self {
439            state,
440            consensus_adapter,
441            metrics: validator_metrics,
442            traffic_controller,
443            client_id_source,
444            gasless_limiter,
445        }
446    }
447
448    pub fn new_for_tests(
449        state: Arc<AuthorityState>,
450        consensus_adapter: Arc<ConsensusAdapter>,
451        metrics: Arc<ValidatorServiceMetrics>,
452    ) -> Self {
453        let gasless_limiter = GaslessRateLimiter::new(state.consensus_gasless_counter.clone());
454        Self {
455            state,
456            consensus_adapter,
457            metrics,
458            traffic_controller: None,
459            client_id_source: None,
460            gasless_limiter,
461        }
462    }
463
464    pub fn validator_state(&self) -> &Arc<AuthorityState> {
465        &self.state
466    }
467
468    /// Test method that performs transaction validation without going through gRPC.
469    pub fn handle_transaction_for_testing(&self, transaction: Transaction) -> SuiResult<()> {
470        let epoch_store = self.state.load_epoch_store_one_call_per_task();
471
472        // Validity check (basic structural validation)
473        transaction.validity_check(&epoch_store.tx_validity_check_context())?;
474
475        // Signature verification
476        let transaction = epoch_store
477            .verify_transaction_require_no_aliases(transaction)?
478            .into_tx();
479
480        // Validate the transaction
481        self.state
482            .handle_vote_transaction(&epoch_store, transaction)?;
483
484        Ok(())
485    }
486
487    /// Test method that performs transaction validation with overload checking.
488    /// Used for testing validator overload behavior.
489    pub fn handle_transaction_for_testing_with_overload_check(
490        &self,
491        transaction: Transaction,
492    ) -> SuiResult<()> {
493        let epoch_store = self.state.load_epoch_store_one_call_per_task();
494
495        // Validity check (basic structural validation)
496        transaction.validity_check(&epoch_store.tx_validity_check_context())?;
497
498        // Check system overload
499        self.state.check_system_overload(
500            self.consensus_adapter.as_ref(),
501            transaction.data(),
502            self.state.check_system_overload_at_signing(),
503        )?;
504
505        // Signature verification
506        let transaction = epoch_store
507            .verify_transaction_require_no_aliases(transaction)?
508            .into_tx();
509
510        // Validate the transaction
511        self.state
512            .handle_vote_transaction(&epoch_store, transaction)?;
513
514        Ok(())
515    }
516
517    /// Collect the IDs of input objects that are immutable.
518    /// This is used to create the ImmutableInputObjects claim for consensus messages.
519    async fn collect_immutable_object_ids(
520        &self,
521        tx: &VerifiedTransaction,
522        state: &AuthorityState,
523    ) -> SuiResult<Vec<ObjectID>> {
524        let input_objects = tx.data().transaction_data().input_objects()?;
525
526        // Collect object IDs from ImmOrOwnedMoveObject inputs
527        let object_ids: Vec<ObjectID> = input_objects
528            .iter()
529            .filter_map(|obj| match obj {
530                InputObjectKind::ImmOrOwnedMoveObject((id, _, _)) => Some(*id),
531                _ => None,
532            })
533            .collect();
534        if object_ids.is_empty() {
535            return Ok(vec![]);
536        }
537
538        // Load objects from cache and filter to immutable ones
539        let objects = state.get_object_cache_reader().get_objects(&object_ids);
540
541        // All objects should be found, since owned input objects have been validated to exist.
542        objects
543            .into_iter()
544            .zip_debug_eq(object_ids.iter())
545            .filter_map(|(obj, id)| {
546                let Some(o) = obj else {
547                    return Some(Err::<ObjectID, SuiError>(
548                        SuiErrorKind::UserInputError {
549                            error: UserInputError::ObjectNotFound {
550                                object_id: *id,
551                                version: None,
552                            },
553                        }
554                        .into(),
555                    ));
556                };
557                if o.is_immutable() {
558                    Some(Ok(*id))
559                } else {
560                    None
561                }
562            })
563            .collect::<SuiResult<Vec<ObjectID>>>()
564    }
565
566    #[instrument(
567        name = "ValidatorService::handle_submit_transaction",
568        level = "error",
569        skip_all,
570        err(level = "debug")
571    )]
572    async fn handle_submit_transaction(
573        &self,
574        request: tonic::Request<RawSubmitTxRequest>,
575    ) -> WrappedServiceResponse<RawSubmitTxResponse> {
576        let Self {
577            state,
578            consensus_adapter,
579            metrics,
580            traffic_controller: _,
581            client_id_source,
582            gasless_limiter: _,
583        } = self.clone();
584
585        let submitter_client_addr = if let Some(client_id_source) = &client_id_source {
586            self.get_client_ip_addr(&request, client_id_source)
587        } else {
588            self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
589        };
590
591        let inner = request.into_inner();
592        let start_epoch = state.load_epoch_store_one_call_per_task().epoch();
593
594        let next_epoch = start_epoch + 1;
595        let mut max_retries = 1;
596
597        loop {
598            let res = self
599                .handle_submit_transaction_inner(
600                    &state,
601                    &consensus_adapter,
602                    &metrics,
603                    &inner,
604                    submitter_client_addr,
605                )
606                .await;
607            match res {
608                Ok((response, weight)) => return Ok((tonic::Response::new(response), weight)),
609                Err(err) => {
610                    if max_retries > 0
611                        && let SuiErrorKind::ValidatorHaltedAtEpochEnd = err.as_inner()
612                    {
613                        max_retries -= 1;
614
615                        debug!(
616                            "ValidatorHaltedAtEpochEnd. Will retry after validator reconfigures"
617                        );
618
619                        if let Ok(Ok(new_epoch)) =
620                            timeout(Duration::from_secs(15), state.wait_for_epoch(next_epoch)).await
621                        {
622                            assert_reachable!("retry submission at epoch end");
623                            if new_epoch == next_epoch {
624                                continue;
625                            }
626
627                            debug_fatal!(
628                                "expected epoch {} after reconfiguration. got {}",
629                                next_epoch,
630                                new_epoch
631                            );
632                        }
633                    }
634                    return Err(err.into());
635                }
636            }
637        }
638    }
639
640    async fn handle_submit_transaction_inner(
641        &self,
642        state: &AuthorityState,
643        consensus_adapter: &ConsensusAdapter,
644        metrics: &ValidatorServiceMetrics,
645        request: &RawSubmitTxRequest,
646        submitter_client_addr: Option<IpAddr>,
647    ) -> SuiResult<(RawSubmitTxResponse, Weight)> {
648        let epoch_store = state.load_epoch_store_one_call_per_task();
649        if !epoch_store.protocol_config().mysticeti_fastpath() {
650            return Err(SuiErrorKind::UnsupportedFeatureError {
651                error: "Mysticeti fastpath".to_string(),
652            }
653            .into());
654        }
655
656        let submit_type = SubmitTxType::try_from(request.submit_type).map_err(|e| {
657            SuiErrorKind::GrpcMessageDeserializeError {
658                type_info: "RawSubmitTxRequest.submit_type".to_string(),
659                error: e.to_string(),
660            }
661        })?;
662
663        let is_ping_request = submit_type == SubmitTxType::Ping;
664        if is_ping_request {
665            fp_ensure!(
666                request.transactions.is_empty(),
667                SuiErrorKind::InvalidRequest(format!(
668                    "Ping request cannot contain {} transactions",
669                    request.transactions.len()
670                ))
671                .into()
672            );
673        } else {
674            // Ensure default and soft bundle requests contain at least one transaction.
675            fp_ensure!(
676                !request.transactions.is_empty(),
677                SuiErrorKind::InvalidRequest(
678                    "At least one transaction needs to be submitted".to_string(),
679                )
680                .into()
681            );
682        }
683
684        // NOTE: for soft bundle requests, the system tries to sequence the transactions in the same order
685        // if they use the same gas price. But this is only done with best effort.
686        // Transactions in a soft bundle can be individually rejected or deferred, without affecting
687        // other transactions in the same bundle.
688        let is_soft_bundle_request = submit_type == SubmitTxType::SoftBundle;
689
690        let max_num_transactions = if is_soft_bundle_request {
691            // Soft bundle cannot contain too many transactions.
692            // Otherwise it is hard to include all of them in a single block.
693            epoch_store.protocol_config().max_soft_bundle_size()
694        } else {
695            // Still enforce a limit even when transactions do not need to be in the same block.
696            epoch_store
697                .protocol_config()
698                .max_num_transactions_in_block()
699        };
700        fp_ensure!(
701            request.transactions.len() <= max_num_transactions as usize,
702            SuiErrorKind::InvalidRequest(format!(
703                "Too many transactions in request: {} vs {}",
704                request.transactions.len(),
705                max_num_transactions
706            ))
707            .into()
708        );
709
710        // Transaction digests.
711        let mut tx_digests = Vec::with_capacity(request.transactions.len());
712        // Transactions to submit to consensus.
713        let mut consensus_transactions = Vec::with_capacity(request.transactions.len());
714        // Indexes of transactions above in the request transactions.
715        let mut transaction_indexes = Vec::with_capacity(request.transactions.len());
716        // Results corresponding to each transaction in the request.
717        let mut results: Vec<Option<SubmitTxResult>> = vec![None; request.transactions.len()];
718        // Total size of all transactions in the request.
719        let mut total_size_bytes = 0;
720        // Traffic control spam weight to use for the transaction.
721        let mut spam_weight = Weight::zero();
722
723        let req_type = if is_ping_request {
724            "ping"
725        } else if request.transactions.len() == 1 {
726            "single_transaction"
727        } else if is_soft_bundle_request {
728            "soft_bundle"
729        } else {
730            "batch"
731        };
732
733        let _handle_tx_metrics_guard = metrics
734            .handle_submit_transaction_latency
735            .with_label_values(&[req_type])
736            .start_timer();
737
738        for (idx, tx_bytes) in request.transactions.iter().enumerate() {
739            let transaction = match bcs::from_bytes::<Transaction>(tx_bytes) {
740                Ok(txn) => txn,
741                Err(e) => {
742                    // Ok to fail the request when any transaction is invalid.
743                    return Err(SuiErrorKind::TransactionDeserializationError {
744                        error: format!("Failed to deserialize transaction at index {}: {}", idx, e),
745                    }
746                    .into());
747                }
748            };
749
750            // Ok to fail the request when any transaction is invalid.
751            let tx_size = transaction.validity_check(&epoch_store.tx_validity_check_context())?;
752
753            if transaction
754                .data()
755                .transaction_data()
756                .is_gasless_transaction()
757            {
758                // Gasless transactions count for traffic control, since they have no economic cost.
759                spam_weight = Weight::one();
760            }
761
762            let overload_check_res = state.check_system_overload(
763                consensus_adapter,
764                transaction.data(),
765                state.check_system_overload_at_signing(),
766            );
767            if let Err(error) = overload_check_res {
768                metrics
769                    .num_rejected_tx_during_overload
770                    .with_label_values(&[error.as_ref()])
771                    .inc();
772                results[idx] = Some(SubmitTxResult::Rejected { error });
773                continue;
774            }
775
776            if transaction
777                .data()
778                .transaction_data()
779                .is_gasless_transaction()
780                && !self
781                    .gasless_limiter
782                    .try_acquire(epoch_store.protocol_config())
783            {
784                metrics.gasless_rate_limited_count.inc();
785                results[idx] = Some(SubmitTxResult::Rejected {
786                    error: SuiErrorKind::ValidatorOverloadedRetryAfter {
787                        retry_after_secs: 1,
788                    }
789                    .into(),
790                });
791                continue;
792            }
793
794            // Ok to fail the request when any signature is invalid.
795            let verified_transaction = {
796                let _metrics_guard = metrics.tx_verification_latency.start_timer();
797                if epoch_store.protocol_config().address_aliases() {
798                    match epoch_store.verify_transaction_with_current_aliases(transaction) {
799                        Ok(tx) => tx,
800                        Err(e) => {
801                            metrics.signature_errors.inc();
802                            return Err(e);
803                        }
804                    }
805                } else {
806                    match epoch_store.verify_transaction_require_no_aliases(transaction) {
807                        Ok(tx) => tx,
808                        Err(e) => {
809                            metrics.signature_errors.inc();
810                            return Err(e);
811                        }
812                    }
813                }
814            };
815
816            let tx_digest = *verified_transaction.tx().digest();
817            debug!(
818                ?tx_digest,
819                "handle_submit_transaction: verified transaction"
820            );
821
822            // Check if the transaction has executed, before checking input objects
823            // which could have been consumed.
824            if let Some(effects) = state
825                .get_transaction_cache_reader()
826                .get_executed_effects(&tx_digest)
827            {
828                let effects_digest = effects.digest();
829                if let Ok(executed_data) = self.complete_executed_data(effects).await {
830                    let executed_result = SubmitTxResult::Executed {
831                        effects_digest,
832                        details: Some(executed_data),
833                    };
834                    results[idx] = Some(executed_result);
835                    debug!(?tx_digest, "handle_submit_transaction: already executed");
836                    continue;
837                }
838            }
839
840            if self
841                .state
842                .get_transaction_cache_reader()
843                .transaction_executed_in_last_epoch(&tx_digest, epoch_store.epoch())
844            {
845                results[idx] = Some(SubmitTxResult::Rejected {
846                    error: UserInputError::TransactionAlreadyExecuted { digest: tx_digest }.into(),
847                });
848                debug!(
849                    ?tx_digest,
850                    "handle_submit_transaction: transaction already executed in previous epoch"
851                );
852                continue;
853            }
854
855            debug!(
856                ?tx_digest,
857                "handle_submit_transaction: waiting for fastpath dependency objects"
858            );
859            if !state
860                .wait_for_fastpath_dependency_objects(
861                    verified_transaction.tx(),
862                    epoch_store.epoch(),
863                )
864                .await?
865            {
866                debug!(
867                    ?tx_digest,
868                    "fastpath input objects are still unavailable after waiting"
869                );
870            }
871
872            match state.handle_vote_transaction(&epoch_store, verified_transaction.tx().clone()) {
873                Ok(_) => { /* continue processing */ }
874                Err(e) => {
875                    // Check if transaction has been executed while being validated.
876                    // This is an edge case so checking executed effects twice is acceptable.
877                    if let Some(effects) = state
878                        .get_transaction_cache_reader()
879                        .get_executed_effects(&tx_digest)
880                    {
881                        let effects_digest = effects.digest();
882                        if let Ok(executed_data) = self.complete_executed_data(effects).await {
883                            let executed_result = SubmitTxResult::Executed {
884                                effects_digest,
885                                details: Some(executed_data),
886                            };
887                            results[idx] = Some(executed_result);
888                            continue;
889                        }
890                    }
891
892                    // When the transaction has not been executed, record the error for the transaction.
893                    debug!(?tx_digest, "Transaction rejected during submission: {e}");
894                    metrics
895                        .submission_rejected_transactions
896                        .with_label_values(&[e.to_variant_name()])
897                        .inc();
898                    results[idx] = Some(SubmitTxResult::Rejected { error: e });
899                    continue;
900                }
901            }
902
903            // Create claims with aliases and / or immutable objects.
904            let mut claims = vec![];
905
906            let immutable_object_ids = self
907                .collect_immutable_object_ids(verified_transaction.tx(), state)
908                .await?;
909            if !immutable_object_ids.is_empty() {
910                claims.push(TransactionClaim::ImmutableInputObjects(
911                    immutable_object_ids,
912                ));
913            }
914
915            let (tx, aliases) = verified_transaction.into_inner();
916            if epoch_store.protocol_config().address_aliases() {
917                if epoch_store
918                    .protocol_config()
919                    .fix_checkpoint_signature_mapping()
920                {
921                    claims.push(TransactionClaim::AddressAliasesV2(aliases));
922                } else {
923                    let v1_aliases: Vec<_> = tx
924                        .data()
925                        .intent_message()
926                        .value
927                        .required_signers()
928                        .into_iter()
929                        .zip_eq(aliases.into_iter().map(|(_, seq)| seq))
930                        .collect();
931                    #[allow(deprecated)]
932                    claims.push(TransactionClaim::AddressAliases(
933                        nonempty::NonEmpty::from_vec(v1_aliases)
934                            .expect("must have at least one required_signer"),
935                    ));
936                }
937            }
938
939            let tx_with_claims = TransactionWithClaims::new(tx.into(), claims);
940
941            consensus_transactions.push(ConsensusTransaction::new_user_transaction_v2_message(
942                &state.name,
943                tx_with_claims,
944            ));
945
946            transaction_indexes.push(idx);
947            tx_digests.push(tx_digest);
948            total_size_bytes += tx_size;
949        }
950
951        if consensus_transactions.is_empty() && !is_ping_request {
952            return Ok((Self::try_from_submit_tx_response(results)?, spam_weight));
953        }
954
955        // Set the max bytes size of the soft bundle to be half of the consensus max transactions in block size.
956        // We do this to account for serialization overheads and to ensure that the soft bundle is not too large
957        // when is attempted to be posted via consensus.
958        let max_transaction_bytes = if is_soft_bundle_request {
959            epoch_store
960                .protocol_config()
961                .consensus_max_transactions_in_block_bytes()
962                / 2
963        } else {
964            epoch_store
965                .protocol_config()
966                .consensus_max_transactions_in_block_bytes()
967        };
968        fp_ensure!(
969            total_size_bytes <= max_transaction_bytes as usize,
970            SuiErrorKind::UserInputError {
971                error: UserInputError::TotalTransactionSizeTooLargeInBatch {
972                    size: total_size_bytes,
973                    limit: max_transaction_bytes,
974                },
975            }
976            .into()
977        );
978
979        metrics
980            .handle_submit_transaction_bytes
981            .with_label_values(&[req_type])
982            .observe(total_size_bytes as f64);
983        metrics
984            .handle_submit_transaction_batch_size
985            .with_label_values(&[req_type])
986            .observe(consensus_transactions.len() as f64);
987
988        let _latency_metric_guard = metrics
989            .handle_submit_transaction_consensus_latency
990            .with_label_values(&[req_type])
991            .start_timer();
992
993        let consensus_positions = if is_soft_bundle_request || is_ping_request {
994            // We only allow the `consensus_transactions` to be empty for ping requests. This is how it should and is be treated from the downstream components.
995            // For any other case, having an empty `consensus_transactions` vector is an invalid state and we should have never reached at this point.
996            assert!(
997                is_ping_request || !consensus_transactions.is_empty(),
998                "A valid soft bundle must have at least one transaction"
999            );
1000            debug!(
1001                "handle_submit_transaction: submitting consensus transactions ({}): {}",
1002                req_type,
1003                consensus_transactions
1004                    .iter()
1005                    .map(|t| t.local_display())
1006                    .join(", ")
1007            );
1008            self.handle_submit_to_consensus_for_position(
1009                consensus_transactions,
1010                &epoch_store,
1011                submitter_client_addr,
1012            )
1013            .await?
1014        } else {
1015            let futures = consensus_transactions.into_iter().map(|t| {
1016                debug!(
1017                    "handle_submit_transaction: submitting consensus transaction ({}): {}",
1018                    req_type,
1019                    t.local_display(),
1020                );
1021                self.handle_submit_to_consensus_for_position(
1022                    vec![t],
1023                    &epoch_store,
1024                    submitter_client_addr,
1025                )
1026            });
1027            future::try_join_all(futures)
1028                .await?
1029                .into_iter()
1030                .flatten()
1031                .collect()
1032        };
1033
1034        if is_ping_request {
1035            // For ping requests, return the special consensus position.
1036            assert_eq!(consensus_positions.len(), 1);
1037            results.push(Some(SubmitTxResult::Submitted {
1038                consensus_position: consensus_positions[0],
1039            }));
1040        } else {
1041            // Otherwise, return the consensus position for each transaction.
1042            for ((idx, tx_digest), consensus_position) in transaction_indexes
1043                .into_iter()
1044                .zip_debug_eq(tx_digests)
1045                .zip_debug_eq(consensus_positions)
1046            {
1047                debug!(
1048                    ?tx_digest,
1049                    "handle_submit_transaction: submitted consensus transaction at {}",
1050                    consensus_position,
1051                );
1052                results[idx] = Some(SubmitTxResult::Submitted { consensus_position });
1053            }
1054        }
1055
1056        Ok((Self::try_from_submit_tx_response(results)?, spam_weight))
1057    }
1058
1059    fn try_from_submit_tx_response(
1060        results: Vec<Option<SubmitTxResult>>,
1061    ) -> Result<RawSubmitTxResponse, SuiError> {
1062        let mut raw_results = Vec::new();
1063        for (i, result) in results.into_iter().enumerate() {
1064            let result = result.ok_or_else(|| SuiErrorKind::GenericAuthorityError {
1065                error: format!("Missing transaction result at {}", i),
1066            })?;
1067            let raw_result = result.try_into()?;
1068            raw_results.push(raw_result);
1069        }
1070        Ok(RawSubmitTxResponse {
1071            results: raw_results,
1072        })
1073    }
1074
1075    #[instrument(
1076        name = "ValidatorService::handle_submit_to_consensus_for_position",
1077        level = "debug",
1078        skip_all,
1079        err(level = "debug")
1080    )]
1081    async fn handle_submit_to_consensus_for_position(
1082        &self,
1083        // Empty when this is a ping request.
1084        consensus_transactions: Vec<ConsensusTransaction>,
1085        epoch_store: &Arc<AuthorityPerEpochStore>,
1086        submitter_client_addr: Option<IpAddr>,
1087    ) -> Result<Vec<ConsensusPosition>, tonic::Status> {
1088        let (tx_consensus_positions, rx_consensus_positions) = oneshot::channel();
1089
1090        {
1091            // code block within reconfiguration lock
1092            let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
1093            if !reconfiguration_lock.should_accept_user_certs() {
1094                self.metrics.num_rejected_cert_in_epoch_boundary.inc();
1095                return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1096            }
1097
1098            // Submit to consensus and wait for position, we do not check if tx
1099            // has been processed by consensus already as this method is called
1100            // to get back a consensus position.
1101            let _metrics_guard = self.metrics.consensus_latency.start_timer();
1102
1103            self.consensus_adapter.submit_batch(
1104                &consensus_transactions,
1105                Some(&reconfiguration_lock),
1106                epoch_store,
1107                Some(tx_consensus_positions),
1108                submitter_client_addr,
1109            )?;
1110        }
1111
1112        Ok(rx_consensus_positions.await.map_err(|e| {
1113            SuiErrorKind::FailedToSubmitToConsensus(format!(
1114                "Failed to get consensus position: {e}"
1115            ))
1116        })?)
1117    }
1118
1119    async fn collect_effects_data(
1120        &self,
1121        effects: &TransactionEffects,
1122        include_events: bool,
1123        include_input_objects: bool,
1124        include_output_objects: bool,
1125    ) -> SuiResult<(Option<TransactionEvents>, Vec<Object>, Vec<Object>)> {
1126        let events = if include_events && effects.events_digest().is_some() {
1127            Some(
1128                self.state
1129                    .get_transaction_events(effects.transaction_digest())?,
1130            )
1131        } else {
1132            None
1133        };
1134
1135        let input_objects = if include_input_objects {
1136            self.state.get_transaction_input_objects(effects)?
1137        } else {
1138            vec![]
1139        };
1140
1141        let output_objects = if include_output_objects {
1142            self.state.get_transaction_output_objects(effects)?
1143        } else {
1144            vec![]
1145        };
1146
1147        Ok((events, input_objects, output_objects))
1148    }
1149}
1150
1151type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
1152
1153impl ValidatorService {
1154    async fn handle_submit_transaction_impl(
1155        &self,
1156        request: tonic::Request<RawSubmitTxRequest>,
1157    ) -> WrappedServiceResponse<RawSubmitTxResponse> {
1158        self.handle_submit_transaction(request).await
1159    }
1160
1161    async fn wait_for_effects_impl(
1162        &self,
1163        request: tonic::Request<RawWaitForEffectsRequest>,
1164    ) -> WrappedServiceResponse<RawWaitForEffectsResponse> {
1165        let request: WaitForEffectsRequest = request.into_inner().try_into()?;
1166        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1167        let response = timeout(
1168            // TODO(fastpath): Tune this once we have a good estimate of the typical delay.
1169            Duration::from_secs(20),
1170            epoch_store
1171                .within_alive_epoch(self.wait_for_effects_response(request, &epoch_store))
1172                .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch())),
1173        )
1174        .await
1175        .map_err(|_| tonic::Status::internal("Timeout waiting for effects"))???
1176        .try_into()?;
1177        Ok((tonic::Response::new(response), Weight::zero()))
1178    }
1179
1180    #[instrument(name= "ValidatorService::wait_for_effects_response", level = "error", skip_all, fields(consensus_position = ?request.consensus_position))]
1181    async fn wait_for_effects_response(
1182        &self,
1183        request: WaitForEffectsRequest,
1184        epoch_store: &Arc<AuthorityPerEpochStore>,
1185    ) -> SuiResult<WaitForEffectsResponse> {
1186        if request.ping_type.is_some() {
1187            return timeout(
1188                Duration::from_secs(10),
1189                self.ping_response(request, epoch_store),
1190            )
1191            .await
1192            .map_err(|_| SuiErrorKind::TimeoutError)?;
1193        }
1194
1195        let Some(tx_digest) = request.transaction_digest else {
1196            return Err(SuiErrorKind::InvalidRequest(
1197                "Transaction digest is required for wait for effects requests".to_string(),
1198            )
1199            .into());
1200        };
1201        let tx_digests = [tx_digest];
1202
1203        // When consensus_position is provided, also watch the consensus status cache
1204        // so rejected/dropped transactions get a timely response instead of waiting
1205        // forever for effects that will never be produced.
1206        let consensus_status_future = async {
1207            let consensus_position = match request.consensus_position {
1208                Some(pos) => pos,
1209                None => return futures::future::pending().await,
1210            };
1211            let consensus_tx_status_cache = epoch_store.consensus_tx_status_cache.as_ref().ok_or(
1212                SuiErrorKind::UnsupportedFeatureError {
1213                    error: "Consensus tx status cache".to_string(),
1214                },
1215            )?;
1216            consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1217            match consensus_tx_status_cache
1218                .notify_read_transaction_status(consensus_position)
1219                .await
1220            {
1221                NotifyReadConsensusTxStatusResult::Status(
1222                    ConsensusTxStatus::Rejected | ConsensusTxStatus::Dropped,
1223                ) => Ok(WaitForEffectsResponse::Rejected {
1224                    error: epoch_store.get_rejection_vote_reason(consensus_position),
1225                }),
1226                NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized) => {
1227                    // Effects will be produced — yield to let the effects future win.
1228                    futures::future::pending().await
1229                }
1230                NotifyReadConsensusTxStatusResult::Expired(round) => {
1231                    Ok(WaitForEffectsResponse::Expired {
1232                        epoch: epoch_store.epoch(),
1233                        round: Some(round),
1234                    })
1235                }
1236            }
1237        };
1238
1239        tokio::select! {
1240            effects_result = self.state
1241                .get_transaction_cache_reader()
1242                .notify_read_executed_effects_may_fail(
1243                    "AuthorityServer::wait_for_effects::notify_read_executed_effects_finalized",
1244                    &tx_digests,
1245                ) => {
1246                let effects = effects_result?.pop().unwrap();
1247                let effects_digest = effects.digest();
1248                let details = if request.include_details {
1249                    Some(self.complete_executed_data(effects).await?)
1250                } else {
1251                    None
1252                };
1253                Ok(WaitForEffectsResponse::Executed {
1254                    effects_digest,
1255                    details,
1256                })
1257            }
1258            status_response = consensus_status_future => {
1259                status_response
1260            }
1261        }
1262    }
1263
1264    #[instrument(level = "error", skip_all, err(level = "debug"))]
1265    async fn ping_response(
1266        &self,
1267        request: WaitForEffectsRequest,
1268        epoch_store: &Arc<AuthorityPerEpochStore>,
1269    ) -> SuiResult<WaitForEffectsResponse> {
1270        let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1271            return Err(SuiErrorKind::UnsupportedFeatureError {
1272                error: "Mysticeti fastpath".to_string(),
1273            }
1274            .into());
1275        };
1276
1277        let Some(consensus_position) = request.consensus_position else {
1278            return Err(SuiErrorKind::InvalidRequest(
1279                "Consensus position is required for Ping requests".to_string(),
1280            )
1281            .into());
1282        };
1283
1284        // We assume that the caller has already checked for the existence of the `ping` field, but handling it gracefully here.
1285        let Some(ping) = request.ping_type else {
1286            return Err(SuiErrorKind::InvalidRequest(
1287                "Ping type is required for ping requests".to_string(),
1288            )
1289            .into());
1290        };
1291
1292        let _metrics_guard = self
1293            .metrics
1294            .handle_wait_for_effects_ping_latency
1295            .with_label_values(&[ping.as_str()])
1296            .start_timer();
1297
1298        consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1299
1300        let details = if request.include_details {
1301            Some(Box::new(ExecutedData::default()))
1302        } else {
1303            None
1304        };
1305
1306        let status = consensus_tx_status_cache
1307            .notify_read_transaction_status(consensus_position)
1308            .await;
1309        match status {
1310            NotifyReadConsensusTxStatusResult::Status(status) => match status {
1311                ConsensusTxStatus::Rejected | ConsensusTxStatus::Dropped => {
1312                    Ok(WaitForEffectsResponse::Rejected {
1313                        error: epoch_store.get_rejection_vote_reason(consensus_position),
1314                    })
1315                }
1316                ConsensusTxStatus::Finalized => Ok(WaitForEffectsResponse::Executed {
1317                    effects_digest: TransactionEffectsDigest::ZERO,
1318                    details,
1319                }),
1320            },
1321            NotifyReadConsensusTxStatusResult::Expired(round) => {
1322                Ok(WaitForEffectsResponse::Expired {
1323                    epoch: epoch_store.epoch(),
1324                    round: Some(round),
1325                })
1326            }
1327        }
1328    }
1329
1330    async fn complete_executed_data(
1331        &self,
1332        effects: TransactionEffects,
1333    ) -> SuiResult<Box<ExecutedData>> {
1334        let (events, input_objects, output_objects) = self
1335            .collect_effects_data(
1336                &effects, /* include_events */ true, /* include_input_objects */ true,
1337                /* include_output_objects */ true,
1338            )
1339            .await?;
1340        Ok(Box::new(ExecutedData {
1341            effects,
1342            events,
1343            input_objects,
1344            output_objects,
1345        }))
1346    }
1347
1348    async fn object_info_impl(
1349        &self,
1350        request: tonic::Request<ObjectInfoRequest>,
1351    ) -> WrappedServiceResponse<ObjectInfoResponse> {
1352        let request = request.into_inner();
1353        let response = self.state.handle_object_info_request(request).await?;
1354        Ok((tonic::Response::new(response), Weight::one()))
1355    }
1356
1357    async fn transaction_info_impl(
1358        &self,
1359        request: tonic::Request<TransactionInfoRequest>,
1360    ) -> WrappedServiceResponse<TransactionInfoResponse> {
1361        let request = request.into_inner();
1362        let response = self.state.handle_transaction_info_request(request).await?;
1363        Ok((tonic::Response::new(response), Weight::one()))
1364    }
1365
1366    async fn checkpoint_impl(
1367        &self,
1368        request: tonic::Request<CheckpointRequest>,
1369    ) -> WrappedServiceResponse<CheckpointResponse> {
1370        let request = request.into_inner();
1371        let response = self.state.handle_checkpoint_request(&request)?;
1372        Ok((tonic::Response::new(response), Weight::one()))
1373    }
1374
1375    async fn checkpoint_v2_impl(
1376        &self,
1377        request: tonic::Request<CheckpointRequestV2>,
1378    ) -> WrappedServiceResponse<CheckpointResponseV2> {
1379        let request = request.into_inner();
1380        let response = self.state.handle_checkpoint_request_v2(&request)?;
1381        Ok((tonic::Response::new(response), Weight::one()))
1382    }
1383
1384    async fn get_system_state_object_impl(
1385        &self,
1386        _request: tonic::Request<SystemStateRequest>,
1387    ) -> WrappedServiceResponse<SuiSystemState> {
1388        let response = self
1389            .state
1390            .get_object_cache_reader()
1391            .get_sui_system_state_object_unsafe()?;
1392        Ok((tonic::Response::new(response), Weight::one()))
1393    }
1394
1395    async fn validator_health_impl(
1396        &self,
1397        _request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1398    ) -> WrappedServiceResponse<sui_types::messages_grpc::RawValidatorHealthResponse> {
1399        let state = &self.state;
1400
1401        // Get epoch store once for both metrics
1402        let epoch_store = state.load_epoch_store_one_call_per_task();
1403
1404        // Get in-flight execution transactions from execution scheduler
1405        let num_inflight_execution_transactions =
1406            state.execution_scheduler().num_pending_certificates() as u64;
1407
1408        // Get in-flight consensus transactions from consensus adapter
1409        let num_inflight_consensus_transactions =
1410            self.consensus_adapter.num_inflight_transactions();
1411
1412        // Get last committed leader round from epoch store
1413        let last_committed_leader_round = epoch_store
1414            .consensus_tx_status_cache
1415            .as_ref()
1416            .and_then(|cache| cache.get_last_committed_leader_round())
1417            .unwrap_or(0);
1418
1419        // Get last locally built checkpoint sequence
1420        let last_locally_built_checkpoint = epoch_store
1421            .last_built_checkpoint_summary()
1422            .ok()
1423            .flatten()
1424            .map(|(_, summary)| summary.sequence_number)
1425            .unwrap_or(0);
1426
1427        let typed_response = sui_types::messages_grpc::ValidatorHealthResponse {
1428            num_inflight_consensus_transactions,
1429            num_inflight_execution_transactions,
1430            last_locally_built_checkpoint,
1431            last_committed_leader_round,
1432        };
1433
1434        let raw_response = typed_response
1435            .try_into()
1436            .map_err(|e: sui_types::error::SuiError| {
1437                tonic::Status::internal(format!("Failed to serialize health response: {}", e))
1438            })?;
1439
1440        Ok((tonic::Response::new(raw_response), Weight::one()))
1441    }
1442
1443    fn get_client_ip_addr<T>(
1444        &self,
1445        request: &tonic::Request<T>,
1446        source: &ClientIdSource,
1447    ) -> Option<IpAddr> {
1448        let forwarded_header = request.metadata().get_all("x-forwarded-for").iter().next();
1449
1450        if let Some(header) = forwarded_header {
1451            let num_hops = header
1452                .to_str()
1453                .map(|h| h.split(',').count().saturating_sub(1))
1454                .unwrap_or(0);
1455
1456            self.metrics.x_forwarded_for_num_hops.set(num_hops as f64);
1457        }
1458
1459        match source {
1460            ClientIdSource::SocketAddr => {
1461                let socket_addr: Option<SocketAddr> = request.remote_addr();
1462
1463                // We will hit this case if the IO type used does not
1464                // implement Connected or when using a unix domain socket.
1465                // TODO: once we have confirmed that no legitimate traffic
1466                // is hitting this case, we should reject such requests that
1467                // hit this case.
1468                if let Some(socket_addr) = socket_addr {
1469                    Some(socket_addr.ip())
1470                } else {
1471                    if cfg!(msim) {
1472                        // Ignore the error from simtests.
1473                    } else if cfg!(test) {
1474                        panic!("Failed to get remote address from request");
1475                    } else {
1476                        self.metrics.connection_ip_not_found.inc();
1477                        error!("Failed to get remote address from request");
1478                    }
1479                    None
1480                }
1481            }
1482            ClientIdSource::XForwardedFor(num_hops) => {
1483                let do_header_parse = |op: &MetadataValue<Ascii>| {
1484                    match op.to_str() {
1485                        Ok(header_val) => {
1486                            let header_contents =
1487                                header_val.split(',').map(str::trim).collect::<Vec<_>>();
1488                            if *num_hops == 0 {
1489                                error!(
1490                                    "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
1491                                    number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
1492                                    to this node. Skipping traffic controller request handling.",
1493                                    header_contents,
1494                                );
1495                                return None;
1496                            }
1497                            let contents_len = header_contents.len();
1498                            if contents_len < *num_hops {
1499                                error!(
1500                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1501                                    Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
1502                                    `client-id-source` in the node config.",
1503                                    header_contents, contents_len, num_hops, contents_len,
1504                                );
1505                                self.metrics.client_id_source_config_mismatch.inc();
1506                                return None;
1507                            }
1508                            let Some(client_ip) = header_contents.get(contents_len - num_hops)
1509                            else {
1510                                error!(
1511                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1512                                    Expected at least {} values. Skipping traffic controller request handling.",
1513                                    header_contents, contents_len, num_hops, contents_len,
1514                                );
1515                                return None;
1516                            };
1517                            parse_ip(client_ip).or_else(|| {
1518                                self.metrics.forwarded_header_parse_error.inc();
1519                                None
1520                            })
1521                        }
1522                        Err(e) => {
1523                            // TODO: once we have confirmed that no legitimate traffic
1524                            // is hitting this case, we should reject such requests that
1525                            // hit this case.
1526                            self.metrics.forwarded_header_invalid.inc();
1527                            error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
1528                            None
1529                        }
1530                    }
1531                };
1532                if let Some(op) = request.metadata().get("x-forwarded-for") {
1533                    do_header_parse(op)
1534                } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
1535                    do_header_parse(op)
1536                } else {
1537                    self.metrics.forwarded_header_not_included.inc();
1538                    error!(
1539                        "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
1540                    );
1541                    None
1542                }
1543            }
1544        }
1545    }
1546
1547    async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
1548        if let Some(traffic_controller) = &self.traffic_controller {
1549            if !traffic_controller.check(&client, &None).await {
1550                // Entity in blocklist
1551                Err(tonic::Status::from_error(
1552                    SuiErrorKind::TooManyRequests.into(),
1553                ))
1554            } else {
1555                Ok(())
1556            }
1557        } else {
1558            Ok(())
1559        }
1560    }
1561
1562    fn handle_traffic_resp<T>(
1563        &self,
1564        client: Option<IpAddr>,
1565        wrapped_response: WrappedServiceResponse<T>,
1566    ) -> Result<tonic::Response<T>, tonic::Status> {
1567        let (error, spam_weight, unwrapped_response) = match wrapped_response {
1568            Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
1569            Err(status) => (
1570                Some(SuiError::from(status.clone())),
1571                Weight::zero(),
1572                Err(status.clone()),
1573            ),
1574        };
1575
1576        if let Some(traffic_controller) = self.traffic_controller.clone() {
1577            traffic_controller.tally(TrafficTally {
1578                direct: client,
1579                through_fullnode: None,
1580                error_info: error.map(|e| {
1581                    let error_type = String::from(e.clone().as_ref());
1582                    let error_weight = normalize(e);
1583                    (error_weight, error_type)
1584                }),
1585                spam_weight,
1586                timestamp: SystemTime::now(),
1587            })
1588        }
1589        unwrapped_response
1590    }
1591}
1592
1593// TODO: refine error matching here
1594fn normalize(err: SuiError) -> Weight {
1595    match err.as_inner() {
1596        SuiErrorKind::UserInputError {
1597            error: UserInputError::IncorrectUserSignature { .. },
1598        } => Weight::one(),
1599        SuiErrorKind::InvalidSignature { .. }
1600        | SuiErrorKind::SignerSignatureAbsent { .. }
1601        | SuiErrorKind::SignerSignatureNumberMismatch { .. }
1602        | SuiErrorKind::IncorrectSigner { .. }
1603        | SuiErrorKind::UnknownSigner { .. }
1604        | SuiErrorKind::WrongEpoch { .. } => Weight::one(),
1605        _ => Weight::zero(),
1606    }
1607}
1608
1609/// Implements generic pre- and post-processing. Since this is on the critical
1610/// path, any heavy lifting should be done in a separate non-blocking task
1611/// unless it is necessary to override the return value.
1612#[macro_export]
1613macro_rules! handle_with_decoration {
1614    ($self:ident, $func_name:ident, $request:ident) => {{
1615        if $self.client_id_source.is_none() {
1616            return $self.$func_name($request).await.map(|(result, _)| result);
1617        }
1618
1619        let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
1620
1621        // check if either IP is blocked, in which case return early
1622        $self.handle_traffic_req(client.clone()).await?;
1623
1624        // handle traffic tallying
1625        let wrapped_response = $self.$func_name($request).await;
1626        $self.handle_traffic_resp(client, wrapped_response)
1627    }};
1628}
1629
1630#[async_trait]
1631impl Validator for ValidatorService {
1632    async fn submit_transaction(
1633        &self,
1634        request: tonic::Request<RawSubmitTxRequest>,
1635    ) -> Result<tonic::Response<RawSubmitTxResponse>, tonic::Status> {
1636        let validator_service = self.clone();
1637
1638        // Spawns a task which handles the transaction. The task will unconditionally continue
1639        // processing in the event that the client connection is dropped.
1640        spawn_monitored_task!(async move {
1641            // NB: traffic tally wrapping handled within the task rather than on task exit
1642            // to prevent an attacker from subverting traffic control by severing the connection
1643            handle_with_decoration!(validator_service, handle_submit_transaction_impl, request)
1644        })
1645        .await
1646        .unwrap()
1647    }
1648
1649    async fn wait_for_effects(
1650        &self,
1651        request: tonic::Request<RawWaitForEffectsRequest>,
1652    ) -> Result<tonic::Response<RawWaitForEffectsResponse>, tonic::Status> {
1653        handle_with_decoration!(self, wait_for_effects_impl, request)
1654    }
1655
1656    async fn object_info(
1657        &self,
1658        request: tonic::Request<ObjectInfoRequest>,
1659    ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
1660        handle_with_decoration!(self, object_info_impl, request)
1661    }
1662
1663    async fn transaction_info(
1664        &self,
1665        request: tonic::Request<TransactionInfoRequest>,
1666    ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
1667        handle_with_decoration!(self, transaction_info_impl, request)
1668    }
1669
1670    async fn checkpoint(
1671        &self,
1672        request: tonic::Request<CheckpointRequest>,
1673    ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
1674        handle_with_decoration!(self, checkpoint_impl, request)
1675    }
1676
1677    async fn checkpoint_v2(
1678        &self,
1679        request: tonic::Request<CheckpointRequestV2>,
1680    ) -> Result<tonic::Response<CheckpointResponseV2>, tonic::Status> {
1681        handle_with_decoration!(self, checkpoint_v2_impl, request)
1682    }
1683
1684    async fn get_system_state_object(
1685        &self,
1686        request: tonic::Request<SystemStateRequest>,
1687    ) -> Result<tonic::Response<SuiSystemState>, tonic::Status> {
1688        handle_with_decoration!(self, get_system_state_object_impl, request)
1689    }
1690
1691    async fn validator_health(
1692        &self,
1693        request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1694    ) -> Result<tonic::Response<sui_types::messages_grpc::RawValidatorHealthResponse>, tonic::Status>
1695    {
1696        handle_with_decoration!(self, validator_health_impl, request)
1697    }
1698}