sui_core/
authority_server.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use anyhow::Result;
6use async_trait::async_trait;
7use fastcrypto::traits::KeyPair;
8use futures::{TryFutureExt, future};
9use itertools::Itertools as _;
10use mysten_common::{assert_reachable, debug_fatal};
11use mysten_metrics::spawn_monitored_task;
12use nonempty::NonEmpty;
13use prometheus::{
14    Gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, Registry,
15    register_gauge_with_registry, register_histogram_vec_with_registry,
16    register_histogram_with_registry, register_int_counter_vec_with_registry,
17    register_int_counter_with_registry,
18};
19use std::{
20    cmp::Ordering,
21    future::Future,
22    io,
23    net::{IpAddr, SocketAddr},
24    pin::Pin,
25    sync::Arc,
26    time::{Duration, SystemTime},
27};
28use sui_network::{
29    api::{Validator, ValidatorServer},
30    tonic,
31    validator::server::SUI_TLS_SERVER_NAME,
32};
33use sui_types::effects::TransactionEffectsAPI;
34use sui_types::message_envelope::Message;
35use sui_types::messages_consensus::ConsensusPosition;
36use sui_types::messages_consensus::ConsensusTransaction;
37use sui_types::messages_grpc::{
38    ObjectInfoRequest, ObjectInfoResponse, RawSubmitTxResponse, SystemStateRequest,
39    TransactionInfoRequest, TransactionInfoResponse,
40};
41use sui_types::multiaddr::Multiaddr;
42use sui_types::object::Object;
43use sui_types::sui_system_state::SuiSystemState;
44use sui_types::traffic_control::{ClientIdSource, Weight};
45use sui_types::{
46    base_types::ObjectID,
47    digests::{TransactionDigest, TransactionEffectsDigest},
48    error::{SuiErrorKind, UserInputError},
49};
50use sui_types::{
51    effects::TransactionEffects,
52    messages_grpc::{
53        ExecutedData, RawSubmitTxRequest, RawWaitForEffectsRequest, RawWaitForEffectsResponse,
54        SubmitTxResult, WaitForEffectsRequest, WaitForEffectsResponse,
55    },
56};
57use sui_types::{effects::TransactionEvents, messages_grpc::SubmitTxType};
58use sui_types::{error::*, transaction::*};
59use sui_types::{
60    fp_ensure,
61    messages_checkpoint::{
62        CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
63    },
64};
65use tokio::sync::oneshot;
66use tokio::time::timeout;
67use tonic::metadata::{Ascii, MetadataValue};
68use tracing::{debug, error, info, instrument};
69
70use crate::consensus_adapter::ConnectionMonitorStatusForTests;
71use crate::{
72    authority::{AuthorityState, consensus_tx_status_cache::ConsensusTxStatus},
73    consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics},
74    traffic_controller::{TrafficController, parse_ip, policies::TrafficTally},
75};
76use crate::{
77    authority::{
78        authority_per_epoch_store::AuthorityPerEpochStore,
79        consensus_tx_status_cache::NotifyReadConsensusTxStatusResult,
80    },
81    checkpoints::CheckpointStore,
82    mysticeti_adapter::LazyMysticetiClient,
83    transaction_outputs::TransactionOutputs,
84};
85use sui_config::local_ip_utils::new_local_tcp_address_for_testing;
86use sui_types::messages_grpc::PingType;
87
88#[cfg(test)]
89#[path = "unit_tests/server_tests.rs"]
90mod server_tests;
91
92#[cfg(test)]
93#[path = "unit_tests/wait_for_effects_tests.rs"]
94mod wait_for_effects_tests;
95
96#[cfg(test)]
97#[path = "unit_tests/submit_transaction_tests.rs"]
98mod submit_transaction_tests;
99
100pub struct AuthorityServerHandle {
101    server_handle: sui_network::validator::server::Server,
102}
103
104impl AuthorityServerHandle {
105    pub async fn join(self) -> Result<(), io::Error> {
106        self.server_handle.handle().wait_for_shutdown().await;
107        Ok(())
108    }
109
110    pub async fn kill(self) -> Result<(), io::Error> {
111        self.server_handle.handle().shutdown().await;
112        Ok(())
113    }
114
115    pub fn address(&self) -> &Multiaddr {
116        self.server_handle.local_addr()
117    }
118}
119
120pub struct AuthorityServer {
121    address: Multiaddr,
122    pub state: Arc<AuthorityState>,
123    consensus_adapter: Arc<ConsensusAdapter>,
124    pub metrics: Arc<ValidatorServiceMetrics>,
125}
126
127impl AuthorityServer {
128    pub fn new_for_test_with_consensus_adapter(
129        state: Arc<AuthorityState>,
130        consensus_adapter: Arc<ConsensusAdapter>,
131    ) -> Self {
132        let address = new_local_tcp_address_for_testing();
133        let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
134
135        Self {
136            address,
137            state,
138            consensus_adapter,
139            metrics,
140        }
141    }
142
143    pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
144        let consensus_adapter = Arc::new(ConsensusAdapter::new(
145            Arc::new(LazyMysticetiClient::new()),
146            CheckpointStore::new_for_tests(),
147            state.name,
148            Arc::new(ConnectionMonitorStatusForTests {}),
149            100_000,
150            100_000,
151            None,
152            None,
153            ConsensusAdapterMetrics::new_test(),
154            state.epoch_store_for_testing().protocol_config().clone(),
155        ));
156        Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
157    }
158
159    pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
160        let address = self.address.clone();
161        self.spawn_with_bind_address_for_test(address).await
162    }
163
164    pub async fn spawn_with_bind_address_for_test(
165        self,
166        address: Multiaddr,
167    ) -> Result<AuthorityServerHandle, io::Error> {
168        let tls_config = sui_tls::create_rustls_server_config(
169            self.state.config.network_key_pair().copy().private(),
170            SUI_TLS_SERVER_NAME.to_string(),
171        );
172        let config = mysten_network::config::Config::new();
173        let server = sui_network::validator::server::ServerBuilder::from_config(
174            &config,
175            mysten_network::metrics::DefaultMetricsCallbackProvider::default(),
176        )
177        .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
178            self.state,
179            self.consensus_adapter,
180            self.metrics,
181        )))
182        .bind(&address, Some(tls_config))
183        .await
184        .unwrap();
185        let local_addr = server.local_addr().to_owned();
186        info!("Listening to traffic on {local_addr}");
187        let handle = AuthorityServerHandle {
188            server_handle: server,
189        };
190        Ok(handle)
191    }
192}
193
194pub struct ValidatorServiceMetrics {
195    pub signature_errors: IntCounter,
196    pub tx_verification_latency: Histogram,
197    pub cert_verification_latency: Histogram,
198    pub consensus_latency: Histogram,
199    pub handle_transaction_latency: Histogram,
200    pub submit_certificate_consensus_latency: Histogram,
201    pub handle_certificate_consensus_latency: Histogram,
202    pub handle_certificate_non_consensus_latency: Histogram,
203    pub handle_soft_bundle_certificates_consensus_latency: Histogram,
204    pub handle_soft_bundle_certificates_count: Histogram,
205    pub handle_soft_bundle_certificates_size_bytes: Histogram,
206    pub handle_transaction_consensus_latency: Histogram,
207    pub handle_submit_transaction_consensus_latency: HistogramVec,
208    pub handle_wait_for_effects_ping_latency: HistogramVec,
209
210    handle_submit_transaction_latency: HistogramVec,
211    handle_submit_transaction_bytes: HistogramVec,
212    handle_submit_transaction_batch_size: HistogramVec,
213
214    num_rejected_cert_in_epoch_boundary: IntCounter,
215    num_rejected_tx_during_overload: IntCounterVec,
216    submission_rejected_transactions: IntCounterVec,
217    connection_ip_not_found: IntCounter,
218    forwarded_header_parse_error: IntCounter,
219    forwarded_header_invalid: IntCounter,
220    forwarded_header_not_included: IntCounter,
221    client_id_source_config_mismatch: IntCounter,
222    x_forwarded_for_num_hops: Gauge,
223}
224
225impl ValidatorServiceMetrics {
226    pub fn new(registry: &Registry) -> Self {
227        Self {
228            signature_errors: register_int_counter_with_registry!(
229                "total_signature_errors",
230                "Number of transaction signature errors",
231                registry,
232            )
233            .unwrap(),
234            tx_verification_latency: register_histogram_with_registry!(
235                "validator_service_tx_verification_latency",
236                "Latency of verifying a transaction",
237                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
238                registry,
239            )
240            .unwrap(),
241            cert_verification_latency: register_histogram_with_registry!(
242                "validator_service_cert_verification_latency",
243                "Latency of verifying a certificate",
244                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
245                registry,
246            )
247            .unwrap(),
248            consensus_latency: register_histogram_with_registry!(
249                "validator_service_consensus_latency",
250                "Time spent between submitting a txn to consensus and getting back local acknowledgement. Execution and finalization time are not included.",
251                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
252                registry,
253            )
254            .unwrap(),
255            handle_transaction_latency: register_histogram_with_registry!(
256                "validator_service_handle_transaction_latency",
257                "Latency of handling a transaction",
258                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
259                registry,
260            )
261            .unwrap(),
262            handle_certificate_consensus_latency: register_histogram_with_registry!(
263                "validator_service_handle_certificate_consensus_latency",
264                "Latency of handling a consensus transaction certificate",
265                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
266                registry,
267            )
268            .unwrap(),
269            submit_certificate_consensus_latency: register_histogram_with_registry!(
270                "validator_service_submit_certificate_consensus_latency",
271                "Latency of submit_certificate RPC handler",
272                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
273                registry,
274            )
275            .unwrap(),
276            handle_certificate_non_consensus_latency: register_histogram_with_registry!(
277                "validator_service_handle_certificate_non_consensus_latency",
278                "Latency of handling a non-consensus transaction certificate",
279                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
280                registry,
281            )
282            .unwrap(),
283            handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
284                "validator_service_handle_soft_bundle_certificates_consensus_latency",
285                "Latency of handling a consensus soft bundle",
286                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
287                registry,
288            )
289            .unwrap(),
290            handle_soft_bundle_certificates_count: register_histogram_with_registry!(
291                "handle_soft_bundle_certificates_count",
292                "The number of certificates included in a soft bundle",
293                mysten_metrics::COUNT_BUCKETS.to_vec(),
294                registry,
295            )
296            .unwrap(),
297            handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
298                "handle_soft_bundle_certificates_size_bytes",
299                "The size of soft bundle in bytes",
300                mysten_metrics::BYTES_BUCKETS.to_vec(),
301                registry,
302            )
303            .unwrap(),
304            handle_transaction_consensus_latency: register_histogram_with_registry!(
305                "validator_service_handle_transaction_consensus_latency",
306                "Latency of handling a user transaction sent through consensus",
307                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
308                registry,
309            )
310            .unwrap(),
311            handle_submit_transaction_consensus_latency: register_histogram_vec_with_registry!(
312                "validator_service_submit_transaction_consensus_latency",
313                "Latency of submitting a user transaction sent through consensus",
314                &["req_type"],
315                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
316                registry,
317            )
318            .unwrap(),
319            handle_submit_transaction_latency: register_histogram_vec_with_registry!(
320                "validator_service_submit_transaction_latency",
321                "Latency of submit transaction handler",
322                &["req_type"],
323                mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
324                registry,
325            )
326            .unwrap(),
327            handle_wait_for_effects_ping_latency: register_histogram_vec_with_registry!(
328                "validator_service_handle_wait_for_effects_ping_latency",
329                "Latency of handling a ping request for wait_for_effects",
330                &["req_type"],
331                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
332                registry,
333            )
334            .unwrap(),
335            handle_submit_transaction_bytes: register_histogram_vec_with_registry!(
336                "validator_service_submit_transaction_bytes",
337                "The size of transactions in the submit transaction request",
338                &["req_type"],
339                mysten_metrics::BYTES_BUCKETS.to_vec(),
340                registry,
341            )
342            .unwrap(),
343            handle_submit_transaction_batch_size: register_histogram_vec_with_registry!(
344                "validator_service_submit_transaction_batch_size",
345                "The number of transactions in the submit transaction request",
346                &["req_type"],
347                mysten_metrics::COUNT_BUCKETS.to_vec(),
348                registry,
349            )
350            .unwrap(),
351            num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
352                "validator_service_num_rejected_cert_in_epoch_boundary",
353                "Number of rejected transaction certificate during epoch transitioning",
354                registry,
355            )
356            .unwrap(),
357            num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
358                "validator_service_num_rejected_tx_during_overload",
359                "Number of rejected transaction due to system overload",
360                &["error_type"],
361                registry,
362            )
363            .unwrap(),
364            submission_rejected_transactions: register_int_counter_vec_with_registry!(
365                "validator_service_submission_rejected_transactions",
366                "Number of transactions rejected during submission",
367                &["reason"],
368                registry,
369            )
370            .unwrap(),
371            connection_ip_not_found: register_int_counter_with_registry!(
372                "validator_service_connection_ip_not_found",
373                "Number of times connection IP was not extractable from request",
374                registry,
375            )
376            .unwrap(),
377            forwarded_header_parse_error: register_int_counter_with_registry!(
378                "validator_service_forwarded_header_parse_error",
379                "Number of times x-forwarded-for header could not be parsed",
380                registry,
381            )
382            .unwrap(),
383            forwarded_header_invalid: register_int_counter_with_registry!(
384                "validator_service_forwarded_header_invalid",
385                "Number of times x-forwarded-for header was invalid",
386                registry,
387            )
388            .unwrap(),
389            forwarded_header_not_included: register_int_counter_with_registry!(
390                "validator_service_forwarded_header_not_included",
391                "Number of times x-forwarded-for header was (unexpectedly) not included in request",
392                registry,
393            )
394            .unwrap(),
395            client_id_source_config_mismatch: register_int_counter_with_registry!(
396                "validator_service_client_id_source_config_mismatch",
397                "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
398                registry,
399            )
400            .unwrap(),
401            x_forwarded_for_num_hops: register_gauge_with_registry!(
402                "validator_service_x_forwarded_for_num_hops",
403                "Number of hops in x-forwarded-for header",
404                registry,
405            )
406            .unwrap(),
407        }
408    }
409
410    pub fn new_for_tests() -> Self {
411        let registry = Registry::new();
412        Self::new(&registry)
413    }
414}
415
416#[derive(Clone)]
417pub struct ValidatorService {
418    state: Arc<AuthorityState>,
419    consensus_adapter: Arc<ConsensusAdapter>,
420    metrics: Arc<ValidatorServiceMetrics>,
421    traffic_controller: Option<Arc<TrafficController>>,
422    client_id_source: Option<ClientIdSource>,
423}
424
425impl ValidatorService {
426    pub fn new(
427        state: Arc<AuthorityState>,
428        consensus_adapter: Arc<ConsensusAdapter>,
429        validator_metrics: Arc<ValidatorServiceMetrics>,
430        client_id_source: Option<ClientIdSource>,
431    ) -> Self {
432        let traffic_controller = state.traffic_controller.clone();
433        Self {
434            state,
435            consensus_adapter,
436            metrics: validator_metrics,
437            traffic_controller,
438            client_id_source,
439        }
440    }
441
442    pub fn new_for_tests(
443        state: Arc<AuthorityState>,
444        consensus_adapter: Arc<ConsensusAdapter>,
445        metrics: Arc<ValidatorServiceMetrics>,
446    ) -> Self {
447        Self {
448            state,
449            consensus_adapter,
450            metrics,
451            traffic_controller: None,
452            client_id_source: None,
453        }
454    }
455
456    pub fn validator_state(&self) -> &Arc<AuthorityState> {
457        &self.state
458    }
459
460    /// Test method that performs transaction validation without going through gRPC.
461    pub fn handle_transaction_for_testing(&self, transaction: Transaction) -> SuiResult<()> {
462        let epoch_store = self.state.load_epoch_store_one_call_per_task();
463
464        // Validity check (basic structural validation)
465        transaction.validity_check(&epoch_store.tx_validity_check_context())?;
466
467        // Signature verification
468        let transaction = epoch_store
469            .verify_transaction_require_no_aliases(transaction)?
470            .into_tx();
471
472        // Validate the transaction
473        self.state
474            .handle_vote_transaction(&epoch_store, transaction)?;
475
476        Ok(())
477    }
478
479    /// Test method that performs transaction validation with overload checking.
480    /// Used for testing validator overload behavior.
481    pub fn handle_transaction_for_testing_with_overload_check(
482        &self,
483        transaction: Transaction,
484    ) -> SuiResult<()> {
485        let epoch_store = self.state.load_epoch_store_one_call_per_task();
486
487        // Validity check (basic structural validation)
488        transaction.validity_check(&epoch_store.tx_validity_check_context())?;
489
490        // Check system overload
491        self.state.check_system_overload(
492            self.consensus_adapter.as_ref(),
493            transaction.data(),
494            self.state.check_system_overload_at_signing(),
495        )?;
496
497        // Signature verification
498        let transaction = epoch_store
499            .verify_transaction_require_no_aliases(transaction)?
500            .into_tx();
501
502        // Validate the transaction
503        self.state
504            .handle_vote_transaction(&epoch_store, transaction)?;
505
506        Ok(())
507    }
508
509    /// Collect the IDs of input objects that are immutable.
510    /// This is used to create the ImmutableInputObjects claim for consensus messages.
511    async fn collect_immutable_object_ids(
512        &self,
513        tx: &VerifiedTransaction,
514        state: &AuthorityState,
515    ) -> SuiResult<Vec<ObjectID>> {
516        let input_objects = tx.data().transaction_data().input_objects()?;
517
518        // Collect object IDs from ImmOrOwnedMoveObject inputs
519        let object_ids: Vec<ObjectID> = input_objects
520            .iter()
521            .filter_map(|obj| match obj {
522                InputObjectKind::ImmOrOwnedMoveObject((id, _, _)) => Some(*id),
523                _ => None,
524            })
525            .collect();
526        if object_ids.is_empty() {
527            return Ok(vec![]);
528        }
529
530        // Load objects from cache and filter to immutable ones
531        let objects = state.get_object_cache_reader().get_objects(&object_ids);
532
533        // All objects should be found, since owned input objects have been validated to exist.
534        objects
535            .into_iter()
536            .zip(object_ids.iter())
537            .filter_map(|(obj, id)| {
538                let Some(o) = obj else {
539                    return Some(Err::<ObjectID, SuiError>(
540                        SuiErrorKind::UserInputError {
541                            error: UserInputError::ObjectNotFound {
542                                object_id: *id,
543                                version: None,
544                            },
545                        }
546                        .into(),
547                    ));
548                };
549                if o.is_immutable() {
550                    Some(Ok(*id))
551                } else {
552                    None
553                }
554            })
555            .collect::<SuiResult<Vec<ObjectID>>>()
556    }
557
558    #[instrument(
559        name = "ValidatorService::handle_submit_transaction",
560        level = "error",
561        skip_all,
562        err(level = "debug")
563    )]
564    async fn handle_submit_transaction(
565        &self,
566        request: tonic::Request<RawSubmitTxRequest>,
567    ) -> WrappedServiceResponse<RawSubmitTxResponse> {
568        let Self {
569            state,
570            consensus_adapter,
571            metrics,
572            traffic_controller: _,
573            client_id_source,
574        } = self.clone();
575
576        let submitter_client_addr = if let Some(client_id_source) = &client_id_source {
577            self.get_client_ip_addr(&request, client_id_source)
578        } else {
579            self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
580        };
581
582        let inner = request.into_inner();
583        let start_epoch = state.load_epoch_store_one_call_per_task().epoch();
584
585        let next_epoch = start_epoch + 1;
586        let mut max_retries = 1;
587
588        loop {
589            let res = self
590                .handle_submit_transaction_inner(
591                    &state,
592                    &consensus_adapter,
593                    &metrics,
594                    &inner,
595                    submitter_client_addr,
596                )
597                .await;
598            match res {
599                Ok((response, weight)) => return Ok((tonic::Response::new(response), weight)),
600                Err(err) => {
601                    if max_retries > 0
602                        && let SuiErrorKind::ValidatorHaltedAtEpochEnd = err.as_inner()
603                    {
604                        max_retries -= 1;
605
606                        debug!(
607                            "ValidatorHaltedAtEpochEnd. Will retry after validator reconfigures"
608                        );
609
610                        if let Ok(Ok(new_epoch)) =
611                            timeout(Duration::from_secs(15), state.wait_for_epoch(next_epoch)).await
612                        {
613                            assert_reachable!("retry submission at epoch end");
614                            if new_epoch == next_epoch {
615                                continue;
616                            }
617
618                            debug_fatal!(
619                                "expected epoch {} after reconfiguration. got {}",
620                                next_epoch,
621                                new_epoch
622                            );
623                        }
624                    }
625                    return Err(err.into());
626                }
627            }
628        }
629    }
630
631    async fn handle_submit_transaction_inner(
632        &self,
633        state: &AuthorityState,
634        consensus_adapter: &ConsensusAdapter,
635        metrics: &ValidatorServiceMetrics,
636        request: &RawSubmitTxRequest,
637        submitter_client_addr: Option<IpAddr>,
638    ) -> SuiResult<(RawSubmitTxResponse, Weight)> {
639        let epoch_store = state.load_epoch_store_one_call_per_task();
640        if !epoch_store.protocol_config().mysticeti_fastpath() {
641            return Err(SuiErrorKind::UnsupportedFeatureError {
642                error: "Mysticeti fastpath".to_string(),
643            }
644            .into());
645        }
646
647        let submit_type = SubmitTxType::try_from(request.submit_type).map_err(|e| {
648            SuiErrorKind::GrpcMessageDeserializeError {
649                type_info: "RawSubmitTxRequest.submit_type".to_string(),
650                error: e.to_string(),
651            }
652        })?;
653
654        let is_ping_request = submit_type == SubmitTxType::Ping;
655        if is_ping_request {
656            fp_ensure!(
657                request.transactions.is_empty(),
658                SuiErrorKind::InvalidRequest(format!(
659                    "Ping request cannot contain {} transactions",
660                    request.transactions.len()
661                ))
662                .into()
663            );
664        } else {
665            // Ensure default and soft bundle requests contain at least one transaction.
666            fp_ensure!(
667                !request.transactions.is_empty(),
668                SuiErrorKind::InvalidRequest(
669                    "At least one transaction needs to be submitted".to_string(),
670                )
671                .into()
672            );
673        }
674
675        // NOTE: for soft bundle requests, the system tries to sequence the transactions in the same order
676        // if they use the same gas price. But this is only done with best effort.
677        // Transactions in a soft bundle can be individually rejected or deferred, without affecting
678        // other transactions in the same bundle.
679        let is_soft_bundle_request = submit_type == SubmitTxType::SoftBundle;
680
681        let max_num_transactions = if is_soft_bundle_request {
682            // Soft bundle cannot contain too many transactions.
683            // Otherwise it is hard to include all of them in a single block.
684            epoch_store.protocol_config().max_soft_bundle_size()
685        } else {
686            // Still enforce a limit even when transactions do not need to be in the same block.
687            epoch_store
688                .protocol_config()
689                .max_num_transactions_in_block()
690        };
691        fp_ensure!(
692            request.transactions.len() <= max_num_transactions as usize,
693            SuiErrorKind::InvalidRequest(format!(
694                "Too many transactions in request: {} vs {}",
695                request.transactions.len(),
696                max_num_transactions
697            ))
698            .into()
699        );
700
701        // Transaction digests.
702        let mut tx_digests = Vec::with_capacity(request.transactions.len());
703        // Transactions to submit to consensus.
704        let mut consensus_transactions = Vec::with_capacity(request.transactions.len());
705        // Indexes of transactions above in the request transactions.
706        let mut transaction_indexes = Vec::with_capacity(request.transactions.len());
707        // Results corresponding to each transaction in the request.
708        let mut results: Vec<Option<SubmitTxResult>> = vec![None; request.transactions.len()];
709        // Total size of all transactions in the request.
710        let mut total_size_bytes = 0;
711
712        let req_type = if is_ping_request {
713            "ping"
714        } else if request.transactions.len() == 1 {
715            "single_transaction"
716        } else if is_soft_bundle_request {
717            "soft_bundle"
718        } else {
719            "batch"
720        };
721
722        let _handle_tx_metrics_guard = metrics
723            .handle_submit_transaction_latency
724            .with_label_values(&[req_type])
725            .start_timer();
726
727        for (idx, tx_bytes) in request.transactions.iter().enumerate() {
728            let transaction = match bcs::from_bytes::<Transaction>(tx_bytes) {
729                Ok(txn) => txn,
730                Err(e) => {
731                    // Ok to fail the request when any transaction is invalid.
732                    return Err(SuiErrorKind::TransactionDeserializationError {
733                        error: format!("Failed to deserialize transaction at index {}: {}", idx, e),
734                    }
735                    .into());
736                }
737            };
738
739            // Ok to fail the request when any transaction is invalid.
740            let tx_size = transaction.validity_check(&epoch_store.tx_validity_check_context())?;
741
742            let overload_check_res = state.check_system_overload(
743                consensus_adapter,
744                transaction.data(),
745                state.check_system_overload_at_signing(),
746            );
747            if let Err(error) = overload_check_res {
748                metrics
749                    .num_rejected_tx_during_overload
750                    .with_label_values(&[error.as_ref()])
751                    .inc();
752                results[idx] = Some(SubmitTxResult::Rejected { error });
753                continue;
754            }
755
756            // Ok to fail the request when any signature is invalid.
757            let verified_transaction = {
758                let _metrics_guard = metrics.tx_verification_latency.start_timer();
759                if epoch_store.protocol_config().address_aliases() {
760                    match epoch_store.verify_transaction_with_current_aliases(transaction) {
761                        Ok(tx) => tx,
762                        Err(e) => {
763                            metrics.signature_errors.inc();
764                            return Err(e);
765                        }
766                    }
767                } else {
768                    match epoch_store.verify_transaction_require_no_aliases(transaction) {
769                        Ok(tx) => tx,
770                        Err(e) => {
771                            metrics.signature_errors.inc();
772                            return Err(e);
773                        }
774                    }
775                }
776            };
777
778            let tx_digest = verified_transaction.tx().digest();
779            tx_digests.push(*tx_digest);
780
781            debug!(
782                ?tx_digest,
783                "handle_submit_transaction: verified transaction"
784            );
785
786            // Check if the transaction has executed, before checking input objects
787            // which could have been consumed.
788            if let Some(effects) = state
789                .get_transaction_cache_reader()
790                .get_executed_effects(tx_digest)
791            {
792                let effects_digest = effects.digest();
793                if let Ok(executed_data) = self.complete_executed_data(effects, None).await {
794                    let executed_result = SubmitTxResult::Executed {
795                        effects_digest,
796                        details: Some(executed_data),
797                        fast_path: false,
798                    };
799                    results[idx] = Some(executed_result);
800                    debug!(?tx_digest, "handle_submit_transaction: already executed");
801                    continue;
802                }
803            }
804
805            if self
806                .state
807                .get_transaction_cache_reader()
808                .transaction_executed_in_last_epoch(tx_digest, epoch_store.epoch())
809            {
810                results[idx] = Some(SubmitTxResult::Rejected {
811                    error: UserInputError::TransactionAlreadyExecuted { digest: *tx_digest }.into(),
812                });
813                debug!(
814                    ?tx_digest,
815                    "handle_submit_transaction: transaction already executed in previous epoch"
816                );
817                continue;
818            }
819
820            debug!(
821                ?tx_digest,
822                "handle_submit_transaction: waiting for fastpath dependency objects"
823            );
824            if !state
825                .wait_for_fastpath_dependency_objects(
826                    verified_transaction.tx(),
827                    epoch_store.epoch(),
828                )
829                .await?
830            {
831                debug!(
832                    ?tx_digest,
833                    "fastpath input objects are still unavailable after waiting"
834                );
835            }
836
837            match state.handle_vote_transaction(&epoch_store, verified_transaction.tx().clone()) {
838                Ok(_) => { /* continue processing */ }
839                Err(e) => {
840                    // Check if transaction has been executed while being validated.
841                    // This is an edge case so checking executed effects twice is acceptable.
842                    if let Some(effects) = state
843                        .get_transaction_cache_reader()
844                        .get_executed_effects(tx_digest)
845                    {
846                        let effects_digest = effects.digest();
847                        if let Ok(executed_data) = self.complete_executed_data(effects, None).await
848                        {
849                            let executed_result = SubmitTxResult::Executed {
850                                effects_digest,
851                                details: Some(executed_data),
852                                fast_path: false,
853                            };
854                            results[idx] = Some(executed_result);
855                            continue;
856                        }
857                    }
858
859                    // When the transaction has not been executed, record the error for the transaction.
860                    debug!(?tx_digest, "Transaction rejected during submission: {e}");
861                    metrics
862                        .submission_rejected_transactions
863                        .with_label_values(&[e.to_variant_name()])
864                        .inc();
865                    results[idx] = Some(SubmitTxResult::Rejected { error: e });
866                    continue;
867                }
868            }
869
870            // Create claims with aliases and / or immutable objects.
871            if epoch_store.protocol_config().address_aliases()
872                || epoch_store.protocol_config().disable_preconsensus_locking()
873            {
874                let mut claims = vec![];
875
876                if epoch_store.protocol_config().disable_preconsensus_locking() {
877                    let immutable_object_ids = self
878                        .collect_immutable_object_ids(verified_transaction.tx(), state)
879                        .await?;
880                    if !immutable_object_ids.is_empty() {
881                        claims.push(TransactionClaim::ImmutableInputObjects(
882                            immutable_object_ids,
883                        ));
884                    }
885                }
886
887                let (tx, aliases) = verified_transaction.into_inner();
888                if epoch_store.protocol_config().address_aliases() {
889                    if epoch_store
890                        .protocol_config()
891                        .fix_checkpoint_signature_mapping()
892                    {
893                        claims.push(TransactionClaim::AddressAliasesV2(aliases));
894                    } else {
895                        let v1_aliases: Vec<_> = tx
896                            .data()
897                            .intent_message()
898                            .value
899                            .required_signers()
900                            .into_iter()
901                            .zip_eq(aliases.into_iter().map(|(_, seq)| seq))
902                            .collect();
903                        #[allow(deprecated)]
904                        claims.push(TransactionClaim::AddressAliases(
905                            NonEmpty::from_vec(v1_aliases)
906                                .expect("must have at least one required_signer"),
907                        ));
908                    }
909                }
910
911                let tx_with_claims = TransactionWithClaims::new(tx.into(), claims);
912
913                consensus_transactions.push(ConsensusTransaction::new_user_transaction_v2_message(
914                    &state.name,
915                    tx_with_claims,
916                ));
917            } else {
918                consensus_transactions.push(ConsensusTransaction::new_user_transaction_message(
919                    &state.name,
920                    verified_transaction.into_tx().into(),
921                ));
922            }
923            transaction_indexes.push(idx);
924            total_size_bytes += tx_size;
925        }
926
927        if consensus_transactions.is_empty() && !is_ping_request {
928            return Ok((Self::try_from_submit_tx_response(results)?, Weight::zero()));
929        }
930
931        // Set the max bytes size of the soft bundle to be half of the consensus max transactions in block size.
932        // We do this to account for serialization overheads and to ensure that the soft bundle is not too large
933        // when is attempted to be posted via consensus.
934        let max_transaction_bytes = if is_soft_bundle_request {
935            epoch_store
936                .protocol_config()
937                .consensus_max_transactions_in_block_bytes()
938                / 2
939        } else {
940            epoch_store
941                .protocol_config()
942                .consensus_max_transactions_in_block_bytes()
943        };
944        fp_ensure!(
945            total_size_bytes <= max_transaction_bytes as usize,
946            SuiErrorKind::UserInputError {
947                error: UserInputError::TotalTransactionSizeTooLargeInBatch {
948                    size: total_size_bytes,
949                    limit: max_transaction_bytes,
950                },
951            }
952            .into()
953        );
954
955        metrics
956            .handle_submit_transaction_bytes
957            .with_label_values(&[req_type])
958            .observe(total_size_bytes as f64);
959        metrics
960            .handle_submit_transaction_batch_size
961            .with_label_values(&[req_type])
962            .observe(consensus_transactions.len() as f64);
963
964        let _latency_metric_guard = metrics
965            .handle_submit_transaction_consensus_latency
966            .with_label_values(&[req_type])
967            .start_timer();
968
969        let consensus_positions = if is_soft_bundle_request || is_ping_request {
970            // We only allow the `consensus_transactions` to be empty for ping requests. This is how it should and is be treated from the downstream components.
971            // For any other case, having an empty `consensus_transactions` vector is an invalid state and we should have never reached at this point.
972            assert!(
973                is_ping_request || !consensus_transactions.is_empty(),
974                "A valid soft bundle must have at least one transaction"
975            );
976            debug!(
977                "handle_submit_transaction: submitting consensus transactions ({}): {}",
978                req_type,
979                consensus_transactions
980                    .iter()
981                    .map(|t| t.local_display())
982                    .join(", ")
983            );
984            self.handle_submit_to_consensus_for_position(
985                consensus_transactions,
986                &epoch_store,
987                submitter_client_addr,
988            )
989            .await?
990        } else {
991            let futures = consensus_transactions.into_iter().map(|t| {
992                debug!(
993                    "handle_submit_transaction: submitting consensus transaction ({}): {}",
994                    req_type,
995                    t.local_display(),
996                );
997                self.handle_submit_to_consensus_for_position(
998                    vec![t],
999                    &epoch_store,
1000                    submitter_client_addr,
1001                )
1002            });
1003            future::try_join_all(futures)
1004                .await?
1005                .into_iter()
1006                .flatten()
1007                .collect()
1008        };
1009
1010        if is_ping_request {
1011            // For ping requests, return the special consensus position.
1012            assert_eq!(consensus_positions.len(), 1);
1013            results.push(Some(SubmitTxResult::Submitted {
1014                consensus_position: consensus_positions[0],
1015            }));
1016        } else {
1017            // Otherwise, return the consensus position for each transaction.
1018            for ((idx, tx_digest), consensus_position) in transaction_indexes
1019                .into_iter()
1020                .zip(tx_digests)
1021                .zip(consensus_positions)
1022            {
1023                debug!(
1024                    ?tx_digest,
1025                    "handle_submit_transaction: submitted consensus transaction at {}",
1026                    consensus_position,
1027                );
1028                results[idx] = Some(SubmitTxResult::Submitted { consensus_position });
1029            }
1030        }
1031
1032        Ok((Self::try_from_submit_tx_response(results)?, Weight::zero()))
1033    }
1034
1035    fn try_from_submit_tx_response(
1036        results: Vec<Option<SubmitTxResult>>,
1037    ) -> Result<RawSubmitTxResponse, SuiError> {
1038        let mut raw_results = Vec::new();
1039        for (i, result) in results.into_iter().enumerate() {
1040            let result = result.ok_or_else(|| SuiErrorKind::GenericAuthorityError {
1041                error: format!("Missing transaction result at {}", i),
1042            })?;
1043            let raw_result = result.try_into()?;
1044            raw_results.push(raw_result);
1045        }
1046        Ok(RawSubmitTxResponse {
1047            results: raw_results,
1048        })
1049    }
1050
1051    #[instrument(
1052        name = "ValidatorService::handle_submit_to_consensus_for_position",
1053        level = "debug",
1054        skip_all,
1055        err(level = "debug")
1056    )]
1057    async fn handle_submit_to_consensus_for_position(
1058        &self,
1059        // Empty when this is a ping request.
1060        consensus_transactions: Vec<ConsensusTransaction>,
1061        epoch_store: &Arc<AuthorityPerEpochStore>,
1062        submitter_client_addr: Option<IpAddr>,
1063    ) -> Result<Vec<ConsensusPosition>, tonic::Status> {
1064        let (tx_consensus_positions, rx_consensus_positions) = oneshot::channel();
1065
1066        {
1067            // code block within reconfiguration lock
1068            let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
1069            if !reconfiguration_lock.should_accept_user_certs() {
1070                self.metrics.num_rejected_cert_in_epoch_boundary.inc();
1071                return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1072            }
1073
1074            // Submit to consensus and wait for position, we do not check if tx
1075            // has been processed by consensus already as this method is called
1076            // to get back a consensus position.
1077            let _metrics_guard = self.metrics.consensus_latency.start_timer();
1078
1079            self.consensus_adapter.submit_batch(
1080                &consensus_transactions,
1081                Some(&reconfiguration_lock),
1082                epoch_store,
1083                Some(tx_consensus_positions),
1084                submitter_client_addr,
1085            )?;
1086        }
1087
1088        Ok(rx_consensus_positions.await.map_err(|e| {
1089            SuiErrorKind::FailedToSubmitToConsensus(format!(
1090                "Failed to get consensus position: {e}"
1091            ))
1092        })?)
1093    }
1094
1095    async fn collect_effects_data(
1096        &self,
1097        effects: &TransactionEffects,
1098        include_events: bool,
1099        include_input_objects: bool,
1100        include_output_objects: bool,
1101        fastpath_outputs: Option<Arc<TransactionOutputs>>,
1102    ) -> SuiResult<(Option<TransactionEvents>, Vec<Object>, Vec<Object>)> {
1103        let events = if include_events && effects.events_digest().is_some() {
1104            if let Some(fastpath_outputs) = &fastpath_outputs {
1105                Some(fastpath_outputs.events.clone())
1106            } else {
1107                Some(
1108                    self.state
1109                        .get_transaction_events(effects.transaction_digest())?,
1110                )
1111            }
1112        } else {
1113            None
1114        };
1115
1116        let input_objects = if include_input_objects {
1117            self.state.get_transaction_input_objects(effects)?
1118        } else {
1119            vec![]
1120        };
1121
1122        let output_objects = if include_output_objects {
1123            if let Some(fastpath_outputs) = &fastpath_outputs {
1124                fastpath_outputs.written.values().cloned().collect()
1125            } else {
1126                self.state.get_transaction_output_objects(effects)?
1127            }
1128        } else {
1129            vec![]
1130        };
1131
1132        Ok((events, input_objects, output_objects))
1133    }
1134}
1135
1136type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
1137
1138impl ValidatorService {
1139    async fn handle_submit_transaction_impl(
1140        &self,
1141        request: tonic::Request<RawSubmitTxRequest>,
1142    ) -> WrappedServiceResponse<RawSubmitTxResponse> {
1143        self.handle_submit_transaction(request).await
1144    }
1145
1146    async fn wait_for_effects_impl(
1147        &self,
1148        request: tonic::Request<RawWaitForEffectsRequest>,
1149    ) -> WrappedServiceResponse<RawWaitForEffectsResponse> {
1150        let request: WaitForEffectsRequest = request.into_inner().try_into()?;
1151        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1152        let response = timeout(
1153            // TODO(fastpath): Tune this once we have a good estimate of the typical delay.
1154            Duration::from_secs(20),
1155            epoch_store
1156                .within_alive_epoch(self.wait_for_effects_response(request, &epoch_store))
1157                .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch())),
1158        )
1159        .await
1160        .map_err(|_| tonic::Status::internal("Timeout waiting for effects"))???
1161        .try_into()?;
1162        Ok((tonic::Response::new(response), Weight::zero()))
1163    }
1164
1165    #[instrument(name= "ValidatorService::wait_for_effects_response", level = "error", skip_all, fields(consensus_position = ?request.consensus_position, fast_path_effects = tracing::field::Empty))]
1166    async fn wait_for_effects_response(
1167        &self,
1168        request: WaitForEffectsRequest,
1169        epoch_store: &Arc<AuthorityPerEpochStore>,
1170    ) -> SuiResult<WaitForEffectsResponse> {
1171        if request.ping_type.is_some() {
1172            return timeout(
1173                Duration::from_secs(10),
1174                self.ping_response(request, epoch_store),
1175            )
1176            .await
1177            .map_err(|_| SuiErrorKind::TimeoutError)?;
1178        }
1179
1180        let Some(tx_digest) = request.transaction_digest else {
1181            return Err(SuiErrorKind::InvalidRequest(
1182                "Transaction digest is required for wait for effects requests".to_string(),
1183            )
1184            .into());
1185        };
1186        let tx_digests = [tx_digest];
1187
1188        let fastpath_effects_future: Pin<Box<dyn Future<Output = _> + Send>> =
1189            if let Some(consensus_position) = request.consensus_position {
1190                Box::pin(self.wait_for_fastpath_effects(
1191                    consensus_position,
1192                    &tx_digests,
1193                    request.include_details,
1194                    epoch_store,
1195                ))
1196            } else {
1197                Box::pin(futures::future::pending())
1198            };
1199
1200        tokio::select! {
1201            // Ensure that finalized effects are always prioritized.
1202            biased;
1203            // We always wait for effects regardless of consensus position via
1204            // notify_read_executed_effects. This is safe because we have separated
1205            // mysticeti fastpath outputs to a separate dirty cache
1206            // UncommittedData::fastpath_transaction_outputs that will only get flushed
1207            // once finalized. So the output of notify_read_executed_effects is
1208            // guaranteed to be finalized effects or effects from QD execution.
1209            mut effects = self.state
1210                .get_transaction_cache_reader()
1211                .notify_read_executed_effects(
1212                    "AuthorityServer::wait_for_effects::notify_read_executed_effects_finalized",
1213                    &tx_digests,
1214                ) => {
1215                tracing::Span::current().record("fast_path_effects", false);
1216                let effects = effects.pop().unwrap();
1217                let details = if request.include_details {
1218                    Some(self.complete_executed_data(effects.clone(), None).await?)
1219                } else {
1220                    None
1221                };
1222
1223                Ok(WaitForEffectsResponse::Executed {
1224                    effects_digest: effects.digest(),
1225                    details,
1226                    fast_path: false,
1227                })
1228            }
1229
1230            fastpath_response = fastpath_effects_future => {
1231                tracing::Span::current().record("fast_path_effects", true);
1232                fastpath_response
1233            }
1234        }
1235    }
1236
1237    #[instrument(level = "error", skip_all, err(level = "debug"))]
1238    async fn ping_response(
1239        &self,
1240        request: WaitForEffectsRequest,
1241        epoch_store: &Arc<AuthorityPerEpochStore>,
1242    ) -> SuiResult<WaitForEffectsResponse> {
1243        let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1244            return Err(SuiErrorKind::UnsupportedFeatureError {
1245                error: "Mysticeti fastpath".to_string(),
1246            }
1247            .into());
1248        };
1249
1250        let Some(consensus_position) = request.consensus_position else {
1251            return Err(SuiErrorKind::InvalidRequest(
1252                "Consensus position is required for Ping requests".to_string(),
1253            )
1254            .into());
1255        };
1256
1257        // We assume that the caller has already checked for the existence of the `ping` field, but handling it gracefully here.
1258        let Some(ping) = request.ping_type else {
1259            return Err(SuiErrorKind::InvalidRequest(
1260                "Ping type is required for ping requests".to_string(),
1261            )
1262            .into());
1263        };
1264
1265        let _metrics_guard = self
1266            .metrics
1267            .handle_wait_for_effects_ping_latency
1268            .with_label_values(&[ping.as_str()])
1269            .start_timer();
1270
1271        consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1272
1273        let mut last_status = None;
1274        let details = if request.include_details {
1275            Some(Box::new(ExecutedData::default()))
1276        } else {
1277            None
1278        };
1279
1280        loop {
1281            let status = consensus_tx_status_cache
1282                .notify_read_transaction_status_change(consensus_position, last_status)
1283                .await;
1284            match status {
1285                NotifyReadConsensusTxStatusResult::Status(status) => match status {
1286                    ConsensusTxStatus::FastpathCertified => {
1287                        // If the request is for consensus, we need to wait for the transaction to be finalised via Consensus.
1288                        if ping == PingType::Consensus {
1289                            last_status = Some(status);
1290                            continue;
1291                        }
1292                        return Ok(WaitForEffectsResponse::Executed {
1293                            effects_digest: TransactionEffectsDigest::ZERO,
1294                            details,
1295                            fast_path: true,
1296                        });
1297                    }
1298                    ConsensusTxStatus::Rejected => {
1299                        return Ok(WaitForEffectsResponse::Rejected { error: None });
1300                    }
1301                    ConsensusTxStatus::Finalized => {
1302                        return Ok(WaitForEffectsResponse::Executed {
1303                            effects_digest: TransactionEffectsDigest::ZERO,
1304                            details,
1305                            fast_path: false,
1306                        });
1307                    }
1308                    ConsensusTxStatus::Dropped => {
1309                        // Transaction was dropped post-consensus, currently only due to invalid owned object inputs..
1310                        // Fetch the detailed error (e.g., ObjectLockConflict) from the rejection reason cache.
1311                        return Ok(WaitForEffectsResponse::Rejected {
1312                            error: epoch_store.get_rejection_vote_reason(consensus_position),
1313                        });
1314                    }
1315                },
1316                NotifyReadConsensusTxStatusResult::Expired(round) => {
1317                    return Ok(WaitForEffectsResponse::Expired {
1318                        epoch: epoch_store.epoch(),
1319                        round: Some(round),
1320                    });
1321                }
1322            }
1323        }
1324    }
1325
1326    #[instrument(level = "error", skip_all, err(level = "debug"))]
1327    async fn wait_for_fastpath_effects(
1328        &self,
1329        consensus_position: ConsensusPosition,
1330        tx_digests: &[TransactionDigest],
1331        include_details: bool,
1332        epoch_store: &Arc<AuthorityPerEpochStore>,
1333    ) -> SuiResult<WaitForEffectsResponse> {
1334        let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1335            return Err(SuiErrorKind::UnsupportedFeatureError {
1336                error: "Mysticeti fastpath".to_string(),
1337            }
1338            .into());
1339        };
1340
1341        let local_epoch = epoch_store.epoch();
1342        match consensus_position.epoch.cmp(&local_epoch) {
1343            Ordering::Less => {
1344                // Ask TransactionDriver to retry submitting the transaction and get a new ConsensusPosition,
1345                // if response from this validator is desired.
1346                let response = WaitForEffectsResponse::Expired {
1347                    epoch: local_epoch,
1348                    round: None,
1349                };
1350                return Ok(response);
1351            }
1352            Ordering::Greater => {
1353                // Ask TransactionDriver to retry this RPC until the validator's epoch catches up.
1354                return Err(SuiErrorKind::WrongEpoch {
1355                    expected_epoch: local_epoch,
1356                    actual_epoch: consensus_position.epoch,
1357                }
1358                .into());
1359            }
1360            Ordering::Equal => {
1361                // The validator's epoch is the same as the epoch of the transaction.
1362                // We can proceed with the normal flow.
1363            }
1364        };
1365
1366        consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1367
1368        let mut current_status = None;
1369        loop {
1370            tokio::select! {
1371                status_result = consensus_tx_status_cache
1372                    .notify_read_transaction_status_change(consensus_position, current_status) => {
1373                    match status_result {
1374                        NotifyReadConsensusTxStatusResult::Status(new_status) => {
1375                            match new_status {
1376                                ConsensusTxStatus::Rejected => {
1377                                    return Ok(WaitForEffectsResponse::Rejected {
1378                                        error: epoch_store.get_rejection_vote_reason(
1379                                            consensus_position
1380                                        )
1381                                    });
1382                                }
1383                                ConsensusTxStatus::FastpathCertified => {
1384                                    current_status = Some(new_status);
1385                                    continue;
1386                                }
1387                                ConsensusTxStatus::Finalized => {
1388                                    current_status = Some(new_status);
1389                                    continue;
1390                                }
1391                                ConsensusTxStatus::Dropped => {
1392                                    // Transaction was dropped post-consensus, currently only due to invalid owned object inputs.
1393                                    // Fetch the detailed error from the rejection reason cache.
1394                                    return Ok(WaitForEffectsResponse::Rejected {
1395                                        error: epoch_store
1396                                            .get_rejection_vote_reason(consensus_position),
1397                                    });
1398                                }
1399                            }
1400                        }
1401                        NotifyReadConsensusTxStatusResult::Expired(round) => {
1402                            return Ok(WaitForEffectsResponse::Expired {
1403                                epoch: epoch_store.epoch(),
1404                                round: Some(round),
1405                            });
1406                        }
1407                    }
1408                }
1409
1410                mut outputs = self.state.get_transaction_cache_reader().notify_read_fastpath_transaction_outputs(tx_digests),
1411                    if current_status == Some(ConsensusTxStatus::FastpathCertified) || current_status == Some(ConsensusTxStatus::Finalized) => {
1412                    let outputs = outputs.pop().unwrap();
1413                    let effects = outputs.effects.clone();
1414
1415                    let details = if include_details {
1416                        Some(self.complete_executed_data(effects.clone(), Some(outputs)).await?)
1417                    } else {
1418                        None
1419                    };
1420
1421                    return Ok(WaitForEffectsResponse::Executed {
1422                        effects_digest: effects.digest(),
1423                        details,
1424                        fast_path: current_status == Some(ConsensusTxStatus::FastpathCertified),
1425                    });
1426                }
1427            }
1428        }
1429    }
1430
1431    async fn complete_executed_data(
1432        &self,
1433        effects: TransactionEffects,
1434        fastpath_outputs: Option<Arc<TransactionOutputs>>,
1435    ) -> SuiResult<Box<ExecutedData>> {
1436        let (events, input_objects, output_objects) = self
1437            .collect_effects_data(
1438                &effects,
1439                /* include_events */ true,
1440                /* include_input_objects */ true,
1441                /* include_output_objects */ true,
1442                fastpath_outputs,
1443            )
1444            .await?;
1445        Ok(Box::new(ExecutedData {
1446            effects,
1447            events,
1448            input_objects,
1449            output_objects,
1450        }))
1451    }
1452
1453    async fn object_info_impl(
1454        &self,
1455        request: tonic::Request<ObjectInfoRequest>,
1456    ) -> WrappedServiceResponse<ObjectInfoResponse> {
1457        let request = request.into_inner();
1458        let response = self.state.handle_object_info_request(request).await?;
1459        Ok((tonic::Response::new(response), Weight::one()))
1460    }
1461
1462    async fn transaction_info_impl(
1463        &self,
1464        request: tonic::Request<TransactionInfoRequest>,
1465    ) -> WrappedServiceResponse<TransactionInfoResponse> {
1466        let request = request.into_inner();
1467        let response = self.state.handle_transaction_info_request(request).await?;
1468        Ok((tonic::Response::new(response), Weight::one()))
1469    }
1470
1471    async fn checkpoint_impl(
1472        &self,
1473        request: tonic::Request<CheckpointRequest>,
1474    ) -> WrappedServiceResponse<CheckpointResponse> {
1475        let request = request.into_inner();
1476        let response = self.state.handle_checkpoint_request(&request)?;
1477        Ok((tonic::Response::new(response), Weight::one()))
1478    }
1479
1480    async fn checkpoint_v2_impl(
1481        &self,
1482        request: tonic::Request<CheckpointRequestV2>,
1483    ) -> WrappedServiceResponse<CheckpointResponseV2> {
1484        let request = request.into_inner();
1485        let response = self.state.handle_checkpoint_request_v2(&request)?;
1486        Ok((tonic::Response::new(response), Weight::one()))
1487    }
1488
1489    async fn get_system_state_object_impl(
1490        &self,
1491        _request: tonic::Request<SystemStateRequest>,
1492    ) -> WrappedServiceResponse<SuiSystemState> {
1493        let response = self
1494            .state
1495            .get_object_cache_reader()
1496            .get_sui_system_state_object_unsafe()?;
1497        Ok((tonic::Response::new(response), Weight::one()))
1498    }
1499
1500    async fn validator_health_impl(
1501        &self,
1502        _request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1503    ) -> WrappedServiceResponse<sui_types::messages_grpc::RawValidatorHealthResponse> {
1504        let state = &self.state;
1505
1506        // Get epoch store once for both metrics
1507        let epoch_store = state.load_epoch_store_one_call_per_task();
1508
1509        // Get in-flight execution transactions from execution scheduler
1510        let num_inflight_execution_transactions =
1511            state.execution_scheduler().num_pending_certificates() as u64;
1512
1513        // Get in-flight consensus transactions from consensus adapter
1514        let num_inflight_consensus_transactions =
1515            self.consensus_adapter.num_inflight_transactions();
1516
1517        // Get last committed leader round from epoch store
1518        let last_committed_leader_round = epoch_store
1519            .consensus_tx_status_cache
1520            .as_ref()
1521            .and_then(|cache| cache.get_last_committed_leader_round())
1522            .unwrap_or(0);
1523
1524        // Get last locally built checkpoint sequence
1525        let last_locally_built_checkpoint = epoch_store
1526            .last_built_checkpoint_summary()
1527            .ok()
1528            .flatten()
1529            .map(|(_, summary)| summary.sequence_number)
1530            .unwrap_or(0);
1531
1532        let typed_response = sui_types::messages_grpc::ValidatorHealthResponse {
1533            num_inflight_consensus_transactions,
1534            num_inflight_execution_transactions,
1535            last_locally_built_checkpoint,
1536            last_committed_leader_round,
1537        };
1538
1539        let raw_response = typed_response
1540            .try_into()
1541            .map_err(|e: sui_types::error::SuiError| {
1542                tonic::Status::internal(format!("Failed to serialize health response: {}", e))
1543            })?;
1544
1545        Ok((tonic::Response::new(raw_response), Weight::one()))
1546    }
1547
1548    fn get_client_ip_addr<T>(
1549        &self,
1550        request: &tonic::Request<T>,
1551        source: &ClientIdSource,
1552    ) -> Option<IpAddr> {
1553        let forwarded_header = request.metadata().get_all("x-forwarded-for").iter().next();
1554
1555        if let Some(header) = forwarded_header {
1556            let num_hops = header
1557                .to_str()
1558                .map(|h| h.split(',').count().saturating_sub(1))
1559                .unwrap_or(0);
1560
1561            self.metrics.x_forwarded_for_num_hops.set(num_hops as f64);
1562        }
1563
1564        match source {
1565            ClientIdSource::SocketAddr => {
1566                let socket_addr: Option<SocketAddr> = request.remote_addr();
1567
1568                // We will hit this case if the IO type used does not
1569                // implement Connected or when using a unix domain socket.
1570                // TODO: once we have confirmed that no legitimate traffic
1571                // is hitting this case, we should reject such requests that
1572                // hit this case.
1573                if let Some(socket_addr) = socket_addr {
1574                    Some(socket_addr.ip())
1575                } else {
1576                    if cfg!(msim) {
1577                        // Ignore the error from simtests.
1578                    } else if cfg!(test) {
1579                        panic!("Failed to get remote address from request");
1580                    } else {
1581                        self.metrics.connection_ip_not_found.inc();
1582                        error!("Failed to get remote address from request");
1583                    }
1584                    None
1585                }
1586            }
1587            ClientIdSource::XForwardedFor(num_hops) => {
1588                let do_header_parse = |op: &MetadataValue<Ascii>| {
1589                    match op.to_str() {
1590                        Ok(header_val) => {
1591                            let header_contents =
1592                                header_val.split(',').map(str::trim).collect::<Vec<_>>();
1593                            if *num_hops == 0 {
1594                                error!(
1595                                    "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
1596                                    number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
1597                                    to this node. Skipping traffic controller request handling.",
1598                                    header_contents,
1599                                );
1600                                return None;
1601                            }
1602                            let contents_len = header_contents.len();
1603                            if contents_len < *num_hops {
1604                                error!(
1605                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1606                                    Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
1607                                    `client-id-source` in the node config.",
1608                                    header_contents, contents_len, num_hops, contents_len,
1609                                );
1610                                self.metrics.client_id_source_config_mismatch.inc();
1611                                return None;
1612                            }
1613                            let Some(client_ip) = header_contents.get(contents_len - num_hops)
1614                            else {
1615                                error!(
1616                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1617                                    Expected at least {} values. Skipping traffic controller request handling.",
1618                                    header_contents, contents_len, num_hops, contents_len,
1619                                );
1620                                return None;
1621                            };
1622                            parse_ip(client_ip).or_else(|| {
1623                                self.metrics.forwarded_header_parse_error.inc();
1624                                None
1625                            })
1626                        }
1627                        Err(e) => {
1628                            // TODO: once we have confirmed that no legitimate traffic
1629                            // is hitting this case, we should reject such requests that
1630                            // hit this case.
1631                            self.metrics.forwarded_header_invalid.inc();
1632                            error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
1633                            None
1634                        }
1635                    }
1636                };
1637                if let Some(op) = request.metadata().get("x-forwarded-for") {
1638                    do_header_parse(op)
1639                } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
1640                    do_header_parse(op)
1641                } else {
1642                    self.metrics.forwarded_header_not_included.inc();
1643                    error!(
1644                        "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
1645                    );
1646                    None
1647                }
1648            }
1649        }
1650    }
1651
1652    async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
1653        if let Some(traffic_controller) = &self.traffic_controller {
1654            if !traffic_controller.check(&client, &None).await {
1655                // Entity in blocklist
1656                Err(tonic::Status::from_error(
1657                    SuiErrorKind::TooManyRequests.into(),
1658                ))
1659            } else {
1660                Ok(())
1661            }
1662        } else {
1663            Ok(())
1664        }
1665    }
1666
1667    fn handle_traffic_resp<T>(
1668        &self,
1669        client: Option<IpAddr>,
1670        wrapped_response: WrappedServiceResponse<T>,
1671    ) -> Result<tonic::Response<T>, tonic::Status> {
1672        let (error, spam_weight, unwrapped_response) = match wrapped_response {
1673            Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
1674            Err(status) => (
1675                Some(SuiError::from(status.clone())),
1676                Weight::zero(),
1677                Err(status.clone()),
1678            ),
1679        };
1680
1681        if let Some(traffic_controller) = self.traffic_controller.clone() {
1682            traffic_controller.tally(TrafficTally {
1683                direct: client,
1684                through_fullnode: None,
1685                error_info: error.map(|e| {
1686                    let error_type = String::from(e.clone().as_ref());
1687                    let error_weight = normalize(e);
1688                    (error_weight, error_type)
1689                }),
1690                spam_weight,
1691                timestamp: SystemTime::now(),
1692            })
1693        }
1694        unwrapped_response
1695    }
1696}
1697
1698// TODO: refine error matching here
1699fn normalize(err: SuiError) -> Weight {
1700    match err.as_inner() {
1701        SuiErrorKind::UserInputError {
1702            error: UserInputError::IncorrectUserSignature { .. },
1703        } => Weight::one(),
1704        SuiErrorKind::InvalidSignature { .. }
1705        | SuiErrorKind::SignerSignatureAbsent { .. }
1706        | SuiErrorKind::SignerSignatureNumberMismatch { .. }
1707        | SuiErrorKind::IncorrectSigner { .. }
1708        | SuiErrorKind::UnknownSigner { .. }
1709        | SuiErrorKind::WrongEpoch { .. } => Weight::one(),
1710        _ => Weight::zero(),
1711    }
1712}
1713
1714/// Implements generic pre- and post-processing. Since this is on the critical
1715/// path, any heavy lifting should be done in a separate non-blocking task
1716/// unless it is necessary to override the return value.
1717#[macro_export]
1718macro_rules! handle_with_decoration {
1719    ($self:ident, $func_name:ident, $request:ident) => {{
1720        if $self.client_id_source.is_none() {
1721            return $self.$func_name($request).await.map(|(result, _)| result);
1722        }
1723
1724        let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
1725
1726        // check if either IP is blocked, in which case return early
1727        $self.handle_traffic_req(client.clone()).await?;
1728
1729        // handle traffic tallying
1730        let wrapped_response = $self.$func_name($request).await;
1731        $self.handle_traffic_resp(client, wrapped_response)
1732    }};
1733}
1734
1735#[async_trait]
1736impl Validator for ValidatorService {
1737    async fn submit_transaction(
1738        &self,
1739        request: tonic::Request<RawSubmitTxRequest>,
1740    ) -> Result<tonic::Response<RawSubmitTxResponse>, tonic::Status> {
1741        let validator_service = self.clone();
1742
1743        // Spawns a task which handles the transaction. The task will unconditionally continue
1744        // processing in the event that the client connection is dropped.
1745        spawn_monitored_task!(async move {
1746            // NB: traffic tally wrapping handled within the task rather than on task exit
1747            // to prevent an attacker from subverting traffic control by severing the connection
1748            handle_with_decoration!(validator_service, handle_submit_transaction_impl, request)
1749        })
1750        .await
1751        .unwrap()
1752    }
1753
1754    async fn wait_for_effects(
1755        &self,
1756        request: tonic::Request<RawWaitForEffectsRequest>,
1757    ) -> Result<tonic::Response<RawWaitForEffectsResponse>, tonic::Status> {
1758        handle_with_decoration!(self, wait_for_effects_impl, request)
1759    }
1760
1761    async fn object_info(
1762        &self,
1763        request: tonic::Request<ObjectInfoRequest>,
1764    ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
1765        handle_with_decoration!(self, object_info_impl, request)
1766    }
1767
1768    async fn transaction_info(
1769        &self,
1770        request: tonic::Request<TransactionInfoRequest>,
1771    ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
1772        handle_with_decoration!(self, transaction_info_impl, request)
1773    }
1774
1775    async fn checkpoint(
1776        &self,
1777        request: tonic::Request<CheckpointRequest>,
1778    ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
1779        handle_with_decoration!(self, checkpoint_impl, request)
1780    }
1781
1782    async fn checkpoint_v2(
1783        &self,
1784        request: tonic::Request<CheckpointRequestV2>,
1785    ) -> Result<tonic::Response<CheckpointResponseV2>, tonic::Status> {
1786        handle_with_decoration!(self, checkpoint_v2_impl, request)
1787    }
1788
1789    async fn get_system_state_object(
1790        &self,
1791        request: tonic::Request<SystemStateRequest>,
1792    ) -> Result<tonic::Response<SuiSystemState>, tonic::Status> {
1793        handle_with_decoration!(self, get_system_state_object_impl, request)
1794    }
1795
1796    async fn validator_health(
1797        &self,
1798        request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1799    ) -> Result<tonic::Response<sui_types::messages_grpc::RawValidatorHealthResponse>, tonic::Status>
1800    {
1801        handle_with_decoration!(self, validator_health_impl, request)
1802    }
1803}