sui_core/
authority_server.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use anyhow::Result;
6use async_trait::async_trait;
7use fastcrypto::traits::KeyPair;
8use futures::{TryFutureExt, future};
9use itertools::Itertools as _;
10use mysten_common::{assert_reachable, debug_fatal};
11use mysten_metrics::spawn_monitored_task;
12use prometheus::{
13    Gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, Registry,
14    register_gauge_with_registry, register_histogram_vec_with_registry,
15    register_histogram_with_registry, register_int_counter_vec_with_registry,
16    register_int_counter_with_registry,
17};
18use std::{
19    cmp::Ordering,
20    future::Future,
21    io,
22    net::{IpAddr, SocketAddr},
23    pin::Pin,
24    sync::Arc,
25    time::{Duration, SystemTime},
26};
27use sui_network::{
28    api::{Validator, ValidatorServer},
29    tonic,
30    validator::server::SUI_TLS_SERVER_NAME,
31};
32use sui_types::effects::TransactionEffectsAPI;
33use sui_types::message_envelope::Message;
34use sui_types::messages_consensus::ConsensusPosition;
35use sui_types::messages_consensus::ConsensusTransaction;
36use sui_types::messages_grpc::{
37    ObjectInfoRequest, ObjectInfoResponse, RawSubmitTxResponse, SystemStateRequest,
38    TransactionInfoRequest, TransactionInfoResponse,
39};
40use sui_types::multiaddr::Multiaddr;
41use sui_types::object::Object;
42use sui_types::sui_system_state::SuiSystemState;
43use sui_types::traffic_control::{ClientIdSource, Weight};
44use sui_types::{
45    base_types::ObjectID,
46    digests::{TransactionDigest, TransactionEffectsDigest},
47    error::{SuiErrorKind, UserInputError},
48};
49use sui_types::{
50    effects::TransactionEffects,
51    messages_grpc::{
52        ExecutedData, RawSubmitTxRequest, RawWaitForEffectsRequest, RawWaitForEffectsResponse,
53        SubmitTxResult, WaitForEffectsRequest, WaitForEffectsResponse,
54    },
55};
56use sui_types::{effects::TransactionEvents, messages_grpc::SubmitTxType};
57use sui_types::{error::*, transaction::*};
58use sui_types::{
59    fp_ensure,
60    messages_checkpoint::{
61        CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
62    },
63};
64use tokio::sync::oneshot;
65use tokio::time::timeout;
66use tonic::metadata::{Ascii, MetadataValue};
67use tracing::{debug, error, info, instrument};
68
69use crate::consensus_adapter::ConnectionMonitorStatusForTests;
70use crate::{
71    authority::{AuthorityState, consensus_tx_status_cache::ConsensusTxStatus},
72    consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics},
73    traffic_controller::{TrafficController, parse_ip, policies::TrafficTally},
74};
75use crate::{
76    authority::{
77        authority_per_epoch_store::AuthorityPerEpochStore,
78        consensus_tx_status_cache::NotifyReadConsensusTxStatusResult,
79    },
80    checkpoints::CheckpointStore,
81    mysticeti_adapter::LazyMysticetiClient,
82    transaction_outputs::TransactionOutputs,
83};
84use sui_config::local_ip_utils::new_local_tcp_address_for_testing;
85use sui_types::messages_grpc::PingType;
86
87#[cfg(test)]
88#[path = "unit_tests/server_tests.rs"]
89mod server_tests;
90
91#[cfg(test)]
92#[path = "unit_tests/wait_for_effects_tests.rs"]
93mod wait_for_effects_tests;
94
95#[cfg(test)]
96#[path = "unit_tests/submit_transaction_tests.rs"]
97mod submit_transaction_tests;
98
99pub struct AuthorityServerHandle {
100    server_handle: sui_network::validator::server::Server,
101}
102
103impl AuthorityServerHandle {
104    pub async fn join(self) -> Result<(), io::Error> {
105        self.server_handle.handle().wait_for_shutdown().await;
106        Ok(())
107    }
108
109    pub async fn kill(self) -> Result<(), io::Error> {
110        self.server_handle.handle().shutdown().await;
111        Ok(())
112    }
113
114    pub fn address(&self) -> &Multiaddr {
115        self.server_handle.local_addr()
116    }
117}
118
119pub struct AuthorityServer {
120    address: Multiaddr,
121    pub state: Arc<AuthorityState>,
122    consensus_adapter: Arc<ConsensusAdapter>,
123    pub metrics: Arc<ValidatorServiceMetrics>,
124}
125
126impl AuthorityServer {
127    pub fn new_for_test_with_consensus_adapter(
128        state: Arc<AuthorityState>,
129        consensus_adapter: Arc<ConsensusAdapter>,
130    ) -> Self {
131        let address = new_local_tcp_address_for_testing();
132        let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
133
134        Self {
135            address,
136            state,
137            consensus_adapter,
138            metrics,
139        }
140    }
141
142    pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
143        let consensus_adapter = Arc::new(ConsensusAdapter::new(
144            Arc::new(LazyMysticetiClient::new()),
145            CheckpointStore::new_for_tests(),
146            state.name,
147            Arc::new(ConnectionMonitorStatusForTests {}),
148            100_000,
149            100_000,
150            None,
151            None,
152            ConsensusAdapterMetrics::new_test(),
153            state.epoch_store_for_testing().protocol_config().clone(),
154        ));
155        Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
156    }
157
158    pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
159        let address = self.address.clone();
160        self.spawn_with_bind_address_for_test(address).await
161    }
162
163    pub async fn spawn_with_bind_address_for_test(
164        self,
165        address: Multiaddr,
166    ) -> Result<AuthorityServerHandle, io::Error> {
167        let tls_config = sui_tls::create_rustls_server_config(
168            self.state.config.network_key_pair().copy().private(),
169            SUI_TLS_SERVER_NAME.to_string(),
170        );
171        let config = mysten_network::config::Config::new();
172        let server = sui_network::validator::server::ServerBuilder::from_config(
173            &config,
174            mysten_network::metrics::DefaultMetricsCallbackProvider::default(),
175        )
176        .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
177            self.state,
178            self.consensus_adapter,
179            self.metrics,
180        )))
181        .bind(&address, Some(tls_config))
182        .await
183        .unwrap();
184        let local_addr = server.local_addr().to_owned();
185        info!("Listening to traffic on {local_addr}");
186        let handle = AuthorityServerHandle {
187            server_handle: server,
188        };
189        Ok(handle)
190    }
191}
192
193pub struct ValidatorServiceMetrics {
194    pub signature_errors: IntCounter,
195    pub tx_verification_latency: Histogram,
196    pub cert_verification_latency: Histogram,
197    pub consensus_latency: Histogram,
198    pub handle_transaction_latency: Histogram,
199    pub submit_certificate_consensus_latency: Histogram,
200    pub handle_certificate_consensus_latency: Histogram,
201    pub handle_certificate_non_consensus_latency: Histogram,
202    pub handle_soft_bundle_certificates_consensus_latency: Histogram,
203    pub handle_soft_bundle_certificates_count: Histogram,
204    pub handle_soft_bundle_certificates_size_bytes: Histogram,
205    pub handle_transaction_consensus_latency: Histogram,
206    pub handle_submit_transaction_consensus_latency: HistogramVec,
207    pub handle_wait_for_effects_ping_latency: HistogramVec,
208
209    handle_submit_transaction_latency: HistogramVec,
210    handle_submit_transaction_bytes: HistogramVec,
211    handle_submit_transaction_batch_size: HistogramVec,
212
213    num_rejected_cert_in_epoch_boundary: IntCounter,
214    num_rejected_tx_during_overload: IntCounterVec,
215    submission_rejected_transactions: IntCounterVec,
216    connection_ip_not_found: IntCounter,
217    forwarded_header_parse_error: IntCounter,
218    forwarded_header_invalid: IntCounter,
219    forwarded_header_not_included: IntCounter,
220    client_id_source_config_mismatch: IntCounter,
221    x_forwarded_for_num_hops: Gauge,
222}
223
224impl ValidatorServiceMetrics {
225    pub fn new(registry: &Registry) -> Self {
226        Self {
227            signature_errors: register_int_counter_with_registry!(
228                "total_signature_errors",
229                "Number of transaction signature errors",
230                registry,
231            )
232            .unwrap(),
233            tx_verification_latency: register_histogram_with_registry!(
234                "validator_service_tx_verification_latency",
235                "Latency of verifying a transaction",
236                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
237                registry,
238            )
239            .unwrap(),
240            cert_verification_latency: register_histogram_with_registry!(
241                "validator_service_cert_verification_latency",
242                "Latency of verifying a certificate",
243                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
244                registry,
245            )
246            .unwrap(),
247            consensus_latency: register_histogram_with_registry!(
248                "validator_service_consensus_latency",
249                "Time spent between submitting a txn to consensus and getting back local acknowledgement. Execution and finalization time are not included.",
250                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
251                registry,
252            )
253            .unwrap(),
254            handle_transaction_latency: register_histogram_with_registry!(
255                "validator_service_handle_transaction_latency",
256                "Latency of handling a transaction",
257                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
258                registry,
259            )
260            .unwrap(),
261            handle_certificate_consensus_latency: register_histogram_with_registry!(
262                "validator_service_handle_certificate_consensus_latency",
263                "Latency of handling a consensus transaction certificate",
264                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
265                registry,
266            )
267            .unwrap(),
268            submit_certificate_consensus_latency: register_histogram_with_registry!(
269                "validator_service_submit_certificate_consensus_latency",
270                "Latency of submit_certificate RPC handler",
271                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
272                registry,
273            )
274            .unwrap(),
275            handle_certificate_non_consensus_latency: register_histogram_with_registry!(
276                "validator_service_handle_certificate_non_consensus_latency",
277                "Latency of handling a non-consensus transaction certificate",
278                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
279                registry,
280            )
281            .unwrap(),
282            handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
283                "validator_service_handle_soft_bundle_certificates_consensus_latency",
284                "Latency of handling a consensus soft bundle",
285                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
286                registry,
287            )
288            .unwrap(),
289            handle_soft_bundle_certificates_count: register_histogram_with_registry!(
290                "handle_soft_bundle_certificates_count",
291                "The number of certificates included in a soft bundle",
292                mysten_metrics::COUNT_BUCKETS.to_vec(),
293                registry,
294            )
295            .unwrap(),
296            handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
297                "handle_soft_bundle_certificates_size_bytes",
298                "The size of soft bundle in bytes",
299                mysten_metrics::BYTES_BUCKETS.to_vec(),
300                registry,
301            )
302            .unwrap(),
303            handle_transaction_consensus_latency: register_histogram_with_registry!(
304                "validator_service_handle_transaction_consensus_latency",
305                "Latency of handling a user transaction sent through consensus",
306                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
307                registry,
308            )
309            .unwrap(),
310            handle_submit_transaction_consensus_latency: register_histogram_vec_with_registry!(
311                "validator_service_submit_transaction_consensus_latency",
312                "Latency of submitting a user transaction sent through consensus",
313                &["req_type"],
314                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
315                registry,
316            )
317            .unwrap(),
318            handle_submit_transaction_latency: register_histogram_vec_with_registry!(
319                "validator_service_submit_transaction_latency",
320                "Latency of submit transaction handler",
321                &["req_type"],
322                mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
323                registry,
324            )
325            .unwrap(),
326            handle_wait_for_effects_ping_latency: register_histogram_vec_with_registry!(
327                "validator_service_handle_wait_for_effects_ping_latency",
328                "Latency of handling a ping request for wait_for_effects",
329                &["req_type"],
330                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
331                registry,
332            )
333            .unwrap(),
334            handle_submit_transaction_bytes: register_histogram_vec_with_registry!(
335                "validator_service_submit_transaction_bytes",
336                "The size of transactions in the submit transaction request",
337                &["req_type"],
338                mysten_metrics::BYTES_BUCKETS.to_vec(),
339                registry,
340            )
341            .unwrap(),
342            handle_submit_transaction_batch_size: register_histogram_vec_with_registry!(
343                "validator_service_submit_transaction_batch_size",
344                "The number of transactions in the submit transaction request",
345                &["req_type"],
346                mysten_metrics::COUNT_BUCKETS.to_vec(),
347                registry,
348            )
349            .unwrap(),
350            num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
351                "validator_service_num_rejected_cert_in_epoch_boundary",
352                "Number of rejected transaction certificate during epoch transitioning",
353                registry,
354            )
355            .unwrap(),
356            num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
357                "validator_service_num_rejected_tx_during_overload",
358                "Number of rejected transaction due to system overload",
359                &["error_type"],
360                registry,
361            )
362            .unwrap(),
363            submission_rejected_transactions: register_int_counter_vec_with_registry!(
364                "validator_service_submission_rejected_transactions",
365                "Number of transactions rejected during submission",
366                &["reason"],
367                registry,
368            )
369            .unwrap(),
370            connection_ip_not_found: register_int_counter_with_registry!(
371                "validator_service_connection_ip_not_found",
372                "Number of times connection IP was not extractable from request",
373                registry,
374            )
375            .unwrap(),
376            forwarded_header_parse_error: register_int_counter_with_registry!(
377                "validator_service_forwarded_header_parse_error",
378                "Number of times x-forwarded-for header could not be parsed",
379                registry,
380            )
381            .unwrap(),
382            forwarded_header_invalid: register_int_counter_with_registry!(
383                "validator_service_forwarded_header_invalid",
384                "Number of times x-forwarded-for header was invalid",
385                registry,
386            )
387            .unwrap(),
388            forwarded_header_not_included: register_int_counter_with_registry!(
389                "validator_service_forwarded_header_not_included",
390                "Number of times x-forwarded-for header was (unexpectedly) not included in request",
391                registry,
392            )
393            .unwrap(),
394            client_id_source_config_mismatch: register_int_counter_with_registry!(
395                "validator_service_client_id_source_config_mismatch",
396                "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
397                registry,
398            )
399            .unwrap(),
400            x_forwarded_for_num_hops: register_gauge_with_registry!(
401                "validator_service_x_forwarded_for_num_hops",
402                "Number of hops in x-forwarded-for header",
403                registry,
404            )
405            .unwrap(),
406        }
407    }
408
409    pub fn new_for_tests() -> Self {
410        let registry = Registry::new();
411        Self::new(&registry)
412    }
413}
414
415#[derive(Clone)]
416pub struct ValidatorService {
417    state: Arc<AuthorityState>,
418    consensus_adapter: Arc<ConsensusAdapter>,
419    metrics: Arc<ValidatorServiceMetrics>,
420    traffic_controller: Option<Arc<TrafficController>>,
421    client_id_source: Option<ClientIdSource>,
422}
423
424impl ValidatorService {
425    pub fn new(
426        state: Arc<AuthorityState>,
427        consensus_adapter: Arc<ConsensusAdapter>,
428        validator_metrics: Arc<ValidatorServiceMetrics>,
429        client_id_source: Option<ClientIdSource>,
430    ) -> Self {
431        let traffic_controller = state.traffic_controller.clone();
432        Self {
433            state,
434            consensus_adapter,
435            metrics: validator_metrics,
436            traffic_controller,
437            client_id_source,
438        }
439    }
440
441    pub fn new_for_tests(
442        state: Arc<AuthorityState>,
443        consensus_adapter: Arc<ConsensusAdapter>,
444        metrics: Arc<ValidatorServiceMetrics>,
445    ) -> Self {
446        Self {
447            state,
448            consensus_adapter,
449            metrics,
450            traffic_controller: None,
451            client_id_source: None,
452        }
453    }
454
455    pub fn validator_state(&self) -> &Arc<AuthorityState> {
456        &self.state
457    }
458
459    /// Test method that performs transaction validation without going through gRPC.
460    pub fn handle_transaction_for_testing(&self, transaction: Transaction) -> SuiResult<()> {
461        let epoch_store = self.state.load_epoch_store_one_call_per_task();
462
463        // Validity check (basic structural validation)
464        transaction.validity_check(&epoch_store.tx_validity_check_context())?;
465
466        // Signature verification
467        let transaction = epoch_store
468            .verify_transaction_require_no_aliases(transaction)?
469            .into_tx();
470
471        // Validate the transaction
472        self.state
473            .handle_vote_transaction(&epoch_store, transaction)?;
474
475        Ok(())
476    }
477
478    /// Test method that performs transaction validation with overload checking.
479    /// Used for testing validator overload behavior.
480    pub fn handle_transaction_for_testing_with_overload_check(
481        &self,
482        transaction: Transaction,
483    ) -> SuiResult<()> {
484        let epoch_store = self.state.load_epoch_store_one_call_per_task();
485
486        // Validity check (basic structural validation)
487        transaction.validity_check(&epoch_store.tx_validity_check_context())?;
488
489        // Check system overload
490        self.state.check_system_overload(
491            self.consensus_adapter.as_ref(),
492            transaction.data(),
493            self.state.check_system_overload_at_signing(),
494        )?;
495
496        // Signature verification
497        let transaction = epoch_store
498            .verify_transaction_require_no_aliases(transaction)?
499            .into_tx();
500
501        // Validate the transaction
502        self.state
503            .handle_vote_transaction(&epoch_store, transaction)?;
504
505        Ok(())
506    }
507
508    /// Collect the IDs of input objects that are immutable.
509    /// This is used to create the ImmutableInputObjects claim for consensus messages.
510    async fn collect_immutable_object_ids(
511        &self,
512        tx: &VerifiedTransaction,
513        state: &AuthorityState,
514    ) -> SuiResult<Vec<ObjectID>> {
515        let input_objects = tx.data().transaction_data().input_objects()?;
516
517        // Collect object IDs from ImmOrOwnedMoveObject inputs
518        let object_ids: Vec<ObjectID> = input_objects
519            .iter()
520            .filter_map(|obj| match obj {
521                InputObjectKind::ImmOrOwnedMoveObject((id, _, _)) => Some(*id),
522                _ => None,
523            })
524            .collect();
525        if object_ids.is_empty() {
526            return Ok(vec![]);
527        }
528
529        // Load objects from cache and filter to immutable ones
530        let objects = state.get_object_cache_reader().get_objects(&object_ids);
531
532        // All objects should be found, since owned input objects have been validated to exist.
533        objects
534            .into_iter()
535            .zip(object_ids.iter())
536            .filter_map(|(obj, id)| {
537                let Some(o) = obj else {
538                    return Some(Err::<ObjectID, SuiError>(
539                        SuiErrorKind::UserInputError {
540                            error: UserInputError::ObjectNotFound {
541                                object_id: *id,
542                                version: None,
543                            },
544                        }
545                        .into(),
546                    ));
547                };
548                if o.is_immutable() {
549                    Some(Ok(*id))
550                } else {
551                    None
552                }
553            })
554            .collect::<SuiResult<Vec<ObjectID>>>()
555    }
556
557    #[instrument(
558        name = "ValidatorService::handle_submit_transaction",
559        level = "error",
560        skip_all,
561        err(level = "debug")
562    )]
563    async fn handle_submit_transaction(
564        &self,
565        request: tonic::Request<RawSubmitTxRequest>,
566    ) -> WrappedServiceResponse<RawSubmitTxResponse> {
567        let Self {
568            state,
569            consensus_adapter,
570            metrics,
571            traffic_controller: _,
572            client_id_source,
573        } = self.clone();
574
575        let submitter_client_addr = if let Some(client_id_source) = &client_id_source {
576            self.get_client_ip_addr(&request, client_id_source)
577        } else {
578            self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
579        };
580
581        let inner = request.into_inner();
582        let start_epoch = state.load_epoch_store_one_call_per_task().epoch();
583
584        let next_epoch = start_epoch + 1;
585        let mut max_retries = 1;
586
587        loop {
588            let res = self
589                .handle_submit_transaction_inner(
590                    &state,
591                    &consensus_adapter,
592                    &metrics,
593                    &inner,
594                    submitter_client_addr,
595                )
596                .await;
597            match res {
598                Ok((response, weight)) => return Ok((tonic::Response::new(response), weight)),
599                Err(err) => {
600                    if max_retries > 0
601                        && let SuiErrorKind::ValidatorHaltedAtEpochEnd = err.as_inner()
602                    {
603                        max_retries -= 1;
604
605                        debug!(
606                            "ValidatorHaltedAtEpochEnd. Will retry after validator reconfigures"
607                        );
608
609                        if let Ok(Ok(new_epoch)) =
610                            timeout(Duration::from_secs(15), state.wait_for_epoch(next_epoch)).await
611                        {
612                            assert_reachable!("retry submission at epoch end");
613                            if new_epoch == next_epoch {
614                                continue;
615                            }
616
617                            debug_fatal!(
618                                "expected epoch {} after reconfiguration. got {}",
619                                next_epoch,
620                                new_epoch
621                            );
622                        }
623                    }
624                    return Err(err.into());
625                }
626            }
627        }
628    }
629
630    async fn handle_submit_transaction_inner(
631        &self,
632        state: &AuthorityState,
633        consensus_adapter: &ConsensusAdapter,
634        metrics: &ValidatorServiceMetrics,
635        request: &RawSubmitTxRequest,
636        submitter_client_addr: Option<IpAddr>,
637    ) -> SuiResult<(RawSubmitTxResponse, Weight)> {
638        let epoch_store = state.load_epoch_store_one_call_per_task();
639        if !epoch_store.protocol_config().mysticeti_fastpath() {
640            return Err(SuiErrorKind::UnsupportedFeatureError {
641                error: "Mysticeti fastpath".to_string(),
642            }
643            .into());
644        }
645
646        let submit_type = SubmitTxType::try_from(request.submit_type).map_err(|e| {
647            SuiErrorKind::GrpcMessageDeserializeError {
648                type_info: "RawSubmitTxRequest.submit_type".to_string(),
649                error: e.to_string(),
650            }
651        })?;
652
653        let is_ping_request = submit_type == SubmitTxType::Ping;
654        if is_ping_request {
655            fp_ensure!(
656                request.transactions.is_empty(),
657                SuiErrorKind::InvalidRequest(format!(
658                    "Ping request cannot contain {} transactions",
659                    request.transactions.len()
660                ))
661                .into()
662            );
663        } else {
664            // Ensure default and soft bundle requests contain at least one transaction.
665            fp_ensure!(
666                !request.transactions.is_empty(),
667                SuiErrorKind::InvalidRequest(
668                    "At least one transaction needs to be submitted".to_string(),
669                )
670                .into()
671            );
672        }
673
674        // NOTE: for soft bundle requests, the system tries to sequence the transactions in the same order
675        // if they use the same gas price. But this is only done with best effort.
676        // Transactions in a soft bundle can be individually rejected or deferred, without affecting
677        // other transactions in the same bundle.
678        let is_soft_bundle_request = submit_type == SubmitTxType::SoftBundle;
679
680        let max_num_transactions = if is_soft_bundle_request {
681            // Soft bundle cannot contain too many transactions.
682            // Otherwise it is hard to include all of them in a single block.
683            epoch_store.protocol_config().max_soft_bundle_size()
684        } else {
685            // Still enforce a limit even when transactions do not need to be in the same block.
686            epoch_store
687                .protocol_config()
688                .max_num_transactions_in_block()
689        };
690        fp_ensure!(
691            request.transactions.len() <= max_num_transactions as usize,
692            SuiErrorKind::InvalidRequest(format!(
693                "Too many transactions in request: {} vs {}",
694                request.transactions.len(),
695                max_num_transactions
696            ))
697            .into()
698        );
699
700        // Transaction digests.
701        let mut tx_digests = Vec::with_capacity(request.transactions.len());
702        // Transactions to submit to consensus.
703        let mut consensus_transactions = Vec::with_capacity(request.transactions.len());
704        // Indexes of transactions above in the request transactions.
705        let mut transaction_indexes = Vec::with_capacity(request.transactions.len());
706        // Results corresponding to each transaction in the request.
707        let mut results: Vec<Option<SubmitTxResult>> = vec![None; request.transactions.len()];
708        // Total size of all transactions in the request.
709        let mut total_size_bytes = 0;
710
711        let req_type = if is_ping_request {
712            "ping"
713        } else if request.transactions.len() == 1 {
714            "single_transaction"
715        } else if is_soft_bundle_request {
716            "soft_bundle"
717        } else {
718            "batch"
719        };
720
721        let _handle_tx_metrics_guard = metrics
722            .handle_submit_transaction_latency
723            .with_label_values(&[req_type])
724            .start_timer();
725
726        for (idx, tx_bytes) in request.transactions.iter().enumerate() {
727            let transaction = match bcs::from_bytes::<Transaction>(tx_bytes) {
728                Ok(txn) => txn,
729                Err(e) => {
730                    // Ok to fail the request when any transaction is invalid.
731                    return Err(SuiErrorKind::TransactionDeserializationError {
732                        error: format!("Failed to deserialize transaction at index {}: {}", idx, e),
733                    }
734                    .into());
735                }
736            };
737
738            // Ok to fail the request when any transaction is invalid.
739            let tx_size = transaction.validity_check(&epoch_store.tx_validity_check_context())?;
740
741            let overload_check_res = state.check_system_overload(
742                consensus_adapter,
743                transaction.data(),
744                state.check_system_overload_at_signing(),
745            );
746            if let Err(error) = overload_check_res {
747                metrics
748                    .num_rejected_tx_during_overload
749                    .with_label_values(&[error.as_ref()])
750                    .inc();
751                results[idx] = Some(SubmitTxResult::Rejected { error });
752                continue;
753            }
754
755            // Ok to fail the request when any signature is invalid.
756            let verified_transaction = {
757                let _metrics_guard = metrics.tx_verification_latency.start_timer();
758                if epoch_store.protocol_config().address_aliases() {
759                    match epoch_store.verify_transaction_with_current_aliases(transaction) {
760                        Ok(tx) => tx,
761                        Err(e) => {
762                            metrics.signature_errors.inc();
763                            return Err(e);
764                        }
765                    }
766                } else {
767                    match epoch_store.verify_transaction_require_no_aliases(transaction) {
768                        Ok(tx) => tx,
769                        Err(e) => {
770                            metrics.signature_errors.inc();
771                            return Err(e);
772                        }
773                    }
774                }
775            };
776
777            let tx_digest = verified_transaction.tx().digest();
778            tx_digests.push(*tx_digest);
779
780            debug!(
781                ?tx_digest,
782                "handle_submit_transaction: verified transaction"
783            );
784
785            // Check if the transaction has executed, before checking input objects
786            // which could have been consumed.
787            if let Some(effects) = state
788                .get_transaction_cache_reader()
789                .get_executed_effects(tx_digest)
790            {
791                let effects_digest = effects.digest();
792                if let Ok(executed_data) = self.complete_executed_data(effects, None).await {
793                    let executed_result = SubmitTxResult::Executed {
794                        effects_digest,
795                        details: Some(executed_data),
796                        fast_path: false,
797                    };
798                    results[idx] = Some(executed_result);
799                    debug!(?tx_digest, "handle_submit_transaction: already executed");
800                    continue;
801                }
802            }
803
804            if self
805                .state
806                .get_transaction_cache_reader()
807                .transaction_executed_in_last_epoch(tx_digest, epoch_store.epoch())
808            {
809                results[idx] = Some(SubmitTxResult::Rejected {
810                    error: UserInputError::TransactionAlreadyExecuted { digest: *tx_digest }.into(),
811                });
812                debug!(
813                    ?tx_digest,
814                    "handle_submit_transaction: transaction already executed in previous epoch"
815                );
816                continue;
817            }
818
819            debug!(
820                ?tx_digest,
821                "handle_submit_transaction: waiting for fastpath dependency objects"
822            );
823            if !state
824                .wait_for_fastpath_dependency_objects(
825                    verified_transaction.tx(),
826                    epoch_store.epoch(),
827                )
828                .await?
829            {
830                debug!(
831                    ?tx_digest,
832                    "fastpath input objects are still unavailable after waiting"
833                );
834            }
835
836            match state.handle_vote_transaction(&epoch_store, verified_transaction.tx().clone()) {
837                Ok(_) => { /* continue processing */ }
838                Err(e) => {
839                    // Check if transaction has been executed while being validated.
840                    // This is an edge case so checking executed effects twice is acceptable.
841                    if let Some(effects) = state
842                        .get_transaction_cache_reader()
843                        .get_executed_effects(tx_digest)
844                    {
845                        let effects_digest = effects.digest();
846                        if let Ok(executed_data) = self.complete_executed_data(effects, None).await
847                        {
848                            let executed_result = SubmitTxResult::Executed {
849                                effects_digest,
850                                details: Some(executed_data),
851                                fast_path: false,
852                            };
853                            results[idx] = Some(executed_result);
854                            continue;
855                        }
856                    }
857
858                    // When the transaction has not been executed, record the error for the transaction.
859                    debug!(?tx_digest, "Transaction rejected during submission: {e}");
860                    metrics
861                        .submission_rejected_transactions
862                        .with_label_values(&[e.to_variant_name()])
863                        .inc();
864                    results[idx] = Some(SubmitTxResult::Rejected { error: e });
865                    continue;
866                }
867            }
868
869            // Create claims with aliases and / or immutable objects.
870            if epoch_store.protocol_config().address_aliases()
871                || epoch_store.protocol_config().disable_preconsensus_locking()
872            {
873                let mut claims = vec![];
874
875                if epoch_store.protocol_config().disable_preconsensus_locking() {
876                    let immutable_object_ids = self
877                        .collect_immutable_object_ids(verified_transaction.tx(), state)
878                        .await?;
879                    if !immutable_object_ids.is_empty() {
880                        claims.push(TransactionClaim::ImmutableInputObjects(
881                            immutable_object_ids,
882                        ));
883                    }
884                }
885
886                let (tx, aliases) = verified_transaction.into_inner();
887                if epoch_store.protocol_config().address_aliases() {
888                    claims.push(TransactionClaim::AddressAliases(aliases));
889                }
890
891                let tx_with_claims = TransactionWithClaims::new(tx.into(), claims);
892
893                consensus_transactions.push(ConsensusTransaction::new_user_transaction_v2_message(
894                    &state.name,
895                    tx_with_claims,
896                ));
897            } else {
898                consensus_transactions.push(ConsensusTransaction::new_user_transaction_message(
899                    &state.name,
900                    verified_transaction.into_tx().into(),
901                ));
902            }
903            transaction_indexes.push(idx);
904            total_size_bytes += tx_size;
905        }
906
907        if consensus_transactions.is_empty() && !is_ping_request {
908            return Ok((Self::try_from_submit_tx_response(results)?, Weight::zero()));
909        }
910
911        // Set the max bytes size of the soft bundle to be half of the consensus max transactions in block size.
912        // We do this to account for serialization overheads and to ensure that the soft bundle is not too large
913        // when is attempted to be posted via consensus.
914        let max_transaction_bytes = if is_soft_bundle_request {
915            epoch_store
916                .protocol_config()
917                .consensus_max_transactions_in_block_bytes()
918                / 2
919        } else {
920            epoch_store
921                .protocol_config()
922                .consensus_max_transactions_in_block_bytes()
923        };
924        fp_ensure!(
925            total_size_bytes <= max_transaction_bytes as usize,
926            SuiErrorKind::UserInputError {
927                error: UserInputError::TotalTransactionSizeTooLargeInBatch {
928                    size: total_size_bytes,
929                    limit: max_transaction_bytes,
930                },
931            }
932            .into()
933        );
934
935        metrics
936            .handle_submit_transaction_bytes
937            .with_label_values(&[req_type])
938            .observe(total_size_bytes as f64);
939        metrics
940            .handle_submit_transaction_batch_size
941            .with_label_values(&[req_type])
942            .observe(consensus_transactions.len() as f64);
943
944        let _latency_metric_guard = metrics
945            .handle_submit_transaction_consensus_latency
946            .with_label_values(&[req_type])
947            .start_timer();
948
949        let consensus_positions = if is_soft_bundle_request || is_ping_request {
950            // We only allow the `consensus_transactions` to be empty for ping requests. This is how it should and is be treated from the downstream components.
951            // For any other case, having an empty `consensus_transactions` vector is an invalid state and we should have never reached at this point.
952            assert!(
953                is_ping_request || !consensus_transactions.is_empty(),
954                "A valid soft bundle must have at least one transaction"
955            );
956            debug!(
957                "handle_submit_transaction: submitting consensus transactions ({}): {}",
958                req_type,
959                consensus_transactions
960                    .iter()
961                    .map(|t| t.local_display())
962                    .join(", ")
963            );
964            self.handle_submit_to_consensus_for_position(
965                consensus_transactions,
966                &epoch_store,
967                submitter_client_addr,
968            )
969            .await?
970        } else {
971            let futures = consensus_transactions.into_iter().map(|t| {
972                debug!(
973                    "handle_submit_transaction: submitting consensus transaction ({}): {}",
974                    req_type,
975                    t.local_display(),
976                );
977                self.handle_submit_to_consensus_for_position(
978                    vec![t],
979                    &epoch_store,
980                    submitter_client_addr,
981                )
982            });
983            future::try_join_all(futures)
984                .await?
985                .into_iter()
986                .flatten()
987                .collect()
988        };
989
990        if is_ping_request {
991            // For ping requests, return the special consensus position.
992            assert_eq!(consensus_positions.len(), 1);
993            results.push(Some(SubmitTxResult::Submitted {
994                consensus_position: consensus_positions[0],
995            }));
996        } else {
997            // Otherwise, return the consensus position for each transaction.
998            for ((idx, tx_digest), consensus_position) in transaction_indexes
999                .into_iter()
1000                .zip(tx_digests)
1001                .zip(consensus_positions)
1002            {
1003                debug!(
1004                    ?tx_digest,
1005                    "handle_submit_transaction: submitted consensus transaction at {}",
1006                    consensus_position,
1007                );
1008                results[idx] = Some(SubmitTxResult::Submitted { consensus_position });
1009            }
1010        }
1011
1012        Ok((Self::try_from_submit_tx_response(results)?, Weight::zero()))
1013    }
1014
1015    fn try_from_submit_tx_response(
1016        results: Vec<Option<SubmitTxResult>>,
1017    ) -> Result<RawSubmitTxResponse, SuiError> {
1018        let mut raw_results = Vec::new();
1019        for (i, result) in results.into_iter().enumerate() {
1020            let result = result.ok_or_else(|| SuiErrorKind::GenericAuthorityError {
1021                error: format!("Missing transaction result at {}", i),
1022            })?;
1023            let raw_result = result.try_into()?;
1024            raw_results.push(raw_result);
1025        }
1026        Ok(RawSubmitTxResponse {
1027            results: raw_results,
1028        })
1029    }
1030
1031    #[instrument(
1032        name = "ValidatorService::handle_submit_to_consensus_for_position",
1033        level = "debug",
1034        skip_all,
1035        err(level = "debug")
1036    )]
1037    async fn handle_submit_to_consensus_for_position(
1038        &self,
1039        // Empty when this is a ping request.
1040        consensus_transactions: Vec<ConsensusTransaction>,
1041        epoch_store: &Arc<AuthorityPerEpochStore>,
1042        submitter_client_addr: Option<IpAddr>,
1043    ) -> Result<Vec<ConsensusPosition>, tonic::Status> {
1044        let (tx_consensus_positions, rx_consensus_positions) = oneshot::channel();
1045
1046        {
1047            // code block within reconfiguration lock
1048            let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
1049            if !reconfiguration_lock.should_accept_user_certs() {
1050                self.metrics.num_rejected_cert_in_epoch_boundary.inc();
1051                return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1052            }
1053
1054            // Submit to consensus and wait for position, we do not check if tx
1055            // has been processed by consensus already as this method is called
1056            // to get back a consensus position.
1057            let _metrics_guard = self.metrics.consensus_latency.start_timer();
1058
1059            self.consensus_adapter.submit_batch(
1060                &consensus_transactions,
1061                Some(&reconfiguration_lock),
1062                epoch_store,
1063                Some(tx_consensus_positions),
1064                submitter_client_addr,
1065            )?;
1066        }
1067
1068        Ok(rx_consensus_positions.await.map_err(|e| {
1069            SuiErrorKind::FailedToSubmitToConsensus(format!(
1070                "Failed to get consensus position: {e}"
1071            ))
1072        })?)
1073    }
1074
1075    async fn collect_effects_data(
1076        &self,
1077        effects: &TransactionEffects,
1078        include_events: bool,
1079        include_input_objects: bool,
1080        include_output_objects: bool,
1081        fastpath_outputs: Option<Arc<TransactionOutputs>>,
1082    ) -> SuiResult<(Option<TransactionEvents>, Vec<Object>, Vec<Object>)> {
1083        let events = if include_events && effects.events_digest().is_some() {
1084            if let Some(fastpath_outputs) = &fastpath_outputs {
1085                Some(fastpath_outputs.events.clone())
1086            } else {
1087                Some(
1088                    self.state
1089                        .get_transaction_events(effects.transaction_digest())?,
1090                )
1091            }
1092        } else {
1093            None
1094        };
1095
1096        let input_objects = if include_input_objects {
1097            self.state.get_transaction_input_objects(effects)?
1098        } else {
1099            vec![]
1100        };
1101
1102        let output_objects = if include_output_objects {
1103            if let Some(fastpath_outputs) = &fastpath_outputs {
1104                fastpath_outputs.written.values().cloned().collect()
1105            } else {
1106                self.state.get_transaction_output_objects(effects)?
1107            }
1108        } else {
1109            vec![]
1110        };
1111
1112        Ok((events, input_objects, output_objects))
1113    }
1114}
1115
1116type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
1117
1118impl ValidatorService {
1119    async fn handle_submit_transaction_impl(
1120        &self,
1121        request: tonic::Request<RawSubmitTxRequest>,
1122    ) -> WrappedServiceResponse<RawSubmitTxResponse> {
1123        self.handle_submit_transaction(request).await
1124    }
1125
1126    async fn wait_for_effects_impl(
1127        &self,
1128        request: tonic::Request<RawWaitForEffectsRequest>,
1129    ) -> WrappedServiceResponse<RawWaitForEffectsResponse> {
1130        let request: WaitForEffectsRequest = request.into_inner().try_into()?;
1131        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1132        let response = timeout(
1133            // TODO(fastpath): Tune this once we have a good estimate of the typical delay.
1134            Duration::from_secs(20),
1135            epoch_store
1136                .within_alive_epoch(self.wait_for_effects_response(request, &epoch_store))
1137                .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch())),
1138        )
1139        .await
1140        .map_err(|_| tonic::Status::internal("Timeout waiting for effects"))???
1141        .try_into()?;
1142        Ok((tonic::Response::new(response), Weight::zero()))
1143    }
1144
1145    #[instrument(name= "ValidatorService::wait_for_effects_response", level = "error", skip_all, fields(consensus_position = ?request.consensus_position, fast_path_effects = tracing::field::Empty))]
1146    async fn wait_for_effects_response(
1147        &self,
1148        request: WaitForEffectsRequest,
1149        epoch_store: &Arc<AuthorityPerEpochStore>,
1150    ) -> SuiResult<WaitForEffectsResponse> {
1151        if request.ping_type.is_some() {
1152            return timeout(
1153                Duration::from_secs(10),
1154                self.ping_response(request, epoch_store),
1155            )
1156            .await
1157            .map_err(|_| SuiErrorKind::TimeoutError)?;
1158        }
1159
1160        let Some(tx_digest) = request.transaction_digest else {
1161            return Err(SuiErrorKind::InvalidRequest(
1162                "Transaction digest is required for wait for effects requests".to_string(),
1163            )
1164            .into());
1165        };
1166        let tx_digests = [tx_digest];
1167
1168        let fastpath_effects_future: Pin<Box<dyn Future<Output = _> + Send>> =
1169            if let Some(consensus_position) = request.consensus_position {
1170                Box::pin(self.wait_for_fastpath_effects(
1171                    consensus_position,
1172                    &tx_digests,
1173                    request.include_details,
1174                    epoch_store,
1175                ))
1176            } else {
1177                Box::pin(futures::future::pending())
1178            };
1179
1180        tokio::select! {
1181            // Ensure that finalized effects are always prioritized.
1182            biased;
1183            // We always wait for effects regardless of consensus position via
1184            // notify_read_executed_effects. This is safe because we have separated
1185            // mysticeti fastpath outputs to a separate dirty cache
1186            // UncommittedData::fastpath_transaction_outputs that will only get flushed
1187            // once finalized. So the output of notify_read_executed_effects is
1188            // guaranteed to be finalized effects or effects from QD execution.
1189            mut effects = self.state
1190                .get_transaction_cache_reader()
1191                .notify_read_executed_effects(
1192                    "AuthorityServer::wait_for_effects::notify_read_executed_effects_finalized",
1193                    &tx_digests,
1194                ) => {
1195                tracing::Span::current().record("fast_path_effects", false);
1196                let effects = effects.pop().unwrap();
1197                let details = if request.include_details {
1198                    Some(self.complete_executed_data(effects.clone(), None).await?)
1199                } else {
1200                    None
1201                };
1202
1203                Ok(WaitForEffectsResponse::Executed {
1204                    effects_digest: effects.digest(),
1205                    details,
1206                    fast_path: false,
1207                })
1208            }
1209
1210            fastpath_response = fastpath_effects_future => {
1211                tracing::Span::current().record("fast_path_effects", true);
1212                fastpath_response
1213            }
1214        }
1215    }
1216
1217    #[instrument(level = "error", skip_all, err(level = "debug"))]
1218    async fn ping_response(
1219        &self,
1220        request: WaitForEffectsRequest,
1221        epoch_store: &Arc<AuthorityPerEpochStore>,
1222    ) -> SuiResult<WaitForEffectsResponse> {
1223        let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1224            return Err(SuiErrorKind::UnsupportedFeatureError {
1225                error: "Mysticeti fastpath".to_string(),
1226            }
1227            .into());
1228        };
1229
1230        let Some(consensus_position) = request.consensus_position else {
1231            return Err(SuiErrorKind::InvalidRequest(
1232                "Consensus position is required for Ping requests".to_string(),
1233            )
1234            .into());
1235        };
1236
1237        // We assume that the caller has already checked for the existence of the `ping` field, but handling it gracefully here.
1238        let Some(ping) = request.ping_type else {
1239            return Err(SuiErrorKind::InvalidRequest(
1240                "Ping type is required for ping requests".to_string(),
1241            )
1242            .into());
1243        };
1244
1245        let _metrics_guard = self
1246            .metrics
1247            .handle_wait_for_effects_ping_latency
1248            .with_label_values(&[ping.as_str()])
1249            .start_timer();
1250
1251        consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1252
1253        let mut last_status = None;
1254        let details = if request.include_details {
1255            Some(Box::new(ExecutedData::default()))
1256        } else {
1257            None
1258        };
1259
1260        loop {
1261            let status = consensus_tx_status_cache
1262                .notify_read_transaction_status_change(consensus_position, last_status)
1263                .await;
1264            match status {
1265                NotifyReadConsensusTxStatusResult::Status(status) => match status {
1266                    ConsensusTxStatus::FastpathCertified => {
1267                        // If the request is for consensus, we need to wait for the transaction to be finalised via Consensus.
1268                        if ping == PingType::Consensus {
1269                            last_status = Some(status);
1270                            continue;
1271                        }
1272                        return Ok(WaitForEffectsResponse::Executed {
1273                            effects_digest: TransactionEffectsDigest::ZERO,
1274                            details,
1275                            fast_path: true,
1276                        });
1277                    }
1278                    ConsensusTxStatus::Rejected => {
1279                        return Ok(WaitForEffectsResponse::Rejected { error: None });
1280                    }
1281                    ConsensusTxStatus::Finalized => {
1282                        return Ok(WaitForEffectsResponse::Executed {
1283                            effects_digest: TransactionEffectsDigest::ZERO,
1284                            details,
1285                            fast_path: false,
1286                        });
1287                    }
1288                    ConsensusTxStatus::Dropped => {
1289                        // Transaction was dropped post-consensus, currently only due to invalid owned object inputs..
1290                        // Fetch the detailed error (e.g., ObjectLockConflict) from the rejection reason cache.
1291                        return Ok(WaitForEffectsResponse::Rejected {
1292                            error: epoch_store.get_rejection_vote_reason(consensus_position),
1293                        });
1294                    }
1295                },
1296                NotifyReadConsensusTxStatusResult::Expired(round) => {
1297                    return Ok(WaitForEffectsResponse::Expired {
1298                        epoch: epoch_store.epoch(),
1299                        round: Some(round),
1300                    });
1301                }
1302            }
1303        }
1304    }
1305
1306    #[instrument(level = "error", skip_all, err(level = "debug"))]
1307    async fn wait_for_fastpath_effects(
1308        &self,
1309        consensus_position: ConsensusPosition,
1310        tx_digests: &[TransactionDigest],
1311        include_details: bool,
1312        epoch_store: &Arc<AuthorityPerEpochStore>,
1313    ) -> SuiResult<WaitForEffectsResponse> {
1314        let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1315            return Err(SuiErrorKind::UnsupportedFeatureError {
1316                error: "Mysticeti fastpath".to_string(),
1317            }
1318            .into());
1319        };
1320
1321        let local_epoch = epoch_store.epoch();
1322        match consensus_position.epoch.cmp(&local_epoch) {
1323            Ordering::Less => {
1324                // Ask TransactionDriver to retry submitting the transaction and get a new ConsensusPosition,
1325                // if response from this validator is desired.
1326                let response = WaitForEffectsResponse::Expired {
1327                    epoch: local_epoch,
1328                    round: None,
1329                };
1330                return Ok(response);
1331            }
1332            Ordering::Greater => {
1333                // Ask TransactionDriver to retry this RPC until the validator's epoch catches up.
1334                return Err(SuiErrorKind::WrongEpoch {
1335                    expected_epoch: local_epoch,
1336                    actual_epoch: consensus_position.epoch,
1337                }
1338                .into());
1339            }
1340            Ordering::Equal => {
1341                // The validator's epoch is the same as the epoch of the transaction.
1342                // We can proceed with the normal flow.
1343            }
1344        };
1345
1346        consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1347
1348        let mut current_status = None;
1349        loop {
1350            tokio::select! {
1351                status_result = consensus_tx_status_cache
1352                    .notify_read_transaction_status_change(consensus_position, current_status) => {
1353                    match status_result {
1354                        NotifyReadConsensusTxStatusResult::Status(new_status) => {
1355                            match new_status {
1356                                ConsensusTxStatus::Rejected => {
1357                                    return Ok(WaitForEffectsResponse::Rejected {
1358                                        error: epoch_store.get_rejection_vote_reason(
1359                                            consensus_position
1360                                        )
1361                                    });
1362                                }
1363                                ConsensusTxStatus::FastpathCertified => {
1364                                    current_status = Some(new_status);
1365                                    continue;
1366                                }
1367                                ConsensusTxStatus::Finalized => {
1368                                    current_status = Some(new_status);
1369                                    continue;
1370                                }
1371                                ConsensusTxStatus::Dropped => {
1372                                    // Transaction was dropped post-consensus, currently only due to invalid owned object inputs.
1373                                    // Fetch the detailed error from the rejection reason cache.
1374                                    return Ok(WaitForEffectsResponse::Rejected {
1375                                        error: epoch_store
1376                                            .get_rejection_vote_reason(consensus_position),
1377                                    });
1378                                }
1379                            }
1380                        }
1381                        NotifyReadConsensusTxStatusResult::Expired(round) => {
1382                            return Ok(WaitForEffectsResponse::Expired {
1383                                epoch: epoch_store.epoch(),
1384                                round: Some(round),
1385                            });
1386                        }
1387                    }
1388                }
1389
1390                mut outputs = self.state.get_transaction_cache_reader().notify_read_fastpath_transaction_outputs(tx_digests),
1391                    if current_status == Some(ConsensusTxStatus::FastpathCertified) || current_status == Some(ConsensusTxStatus::Finalized) => {
1392                    let outputs = outputs.pop().unwrap();
1393                    let effects = outputs.effects.clone();
1394
1395                    let details = if include_details {
1396                        Some(self.complete_executed_data(effects.clone(), Some(outputs)).await?)
1397                    } else {
1398                        None
1399                    };
1400
1401                    return Ok(WaitForEffectsResponse::Executed {
1402                        effects_digest: effects.digest(),
1403                        details,
1404                        fast_path: current_status == Some(ConsensusTxStatus::FastpathCertified),
1405                    });
1406                }
1407            }
1408        }
1409    }
1410
1411    async fn complete_executed_data(
1412        &self,
1413        effects: TransactionEffects,
1414        fastpath_outputs: Option<Arc<TransactionOutputs>>,
1415    ) -> SuiResult<Box<ExecutedData>> {
1416        let (events, input_objects, output_objects) = self
1417            .collect_effects_data(
1418                &effects,
1419                /* include_events */ true,
1420                /* include_input_objects */ true,
1421                /* include_output_objects */ true,
1422                fastpath_outputs,
1423            )
1424            .await?;
1425        Ok(Box::new(ExecutedData {
1426            effects,
1427            events,
1428            input_objects,
1429            output_objects,
1430        }))
1431    }
1432
1433    async fn object_info_impl(
1434        &self,
1435        request: tonic::Request<ObjectInfoRequest>,
1436    ) -> WrappedServiceResponse<ObjectInfoResponse> {
1437        let request = request.into_inner();
1438        let response = self.state.handle_object_info_request(request).await?;
1439        Ok((tonic::Response::new(response), Weight::one()))
1440    }
1441
1442    async fn transaction_info_impl(
1443        &self,
1444        request: tonic::Request<TransactionInfoRequest>,
1445    ) -> WrappedServiceResponse<TransactionInfoResponse> {
1446        let request = request.into_inner();
1447        let response = self.state.handle_transaction_info_request(request).await?;
1448        Ok((tonic::Response::new(response), Weight::one()))
1449    }
1450
1451    async fn checkpoint_impl(
1452        &self,
1453        request: tonic::Request<CheckpointRequest>,
1454    ) -> WrappedServiceResponse<CheckpointResponse> {
1455        let request = request.into_inner();
1456        let response = self.state.handle_checkpoint_request(&request)?;
1457        Ok((tonic::Response::new(response), Weight::one()))
1458    }
1459
1460    async fn checkpoint_v2_impl(
1461        &self,
1462        request: tonic::Request<CheckpointRequestV2>,
1463    ) -> WrappedServiceResponse<CheckpointResponseV2> {
1464        let request = request.into_inner();
1465        let response = self.state.handle_checkpoint_request_v2(&request)?;
1466        Ok((tonic::Response::new(response), Weight::one()))
1467    }
1468
1469    async fn get_system_state_object_impl(
1470        &self,
1471        _request: tonic::Request<SystemStateRequest>,
1472    ) -> WrappedServiceResponse<SuiSystemState> {
1473        let response = self
1474            .state
1475            .get_object_cache_reader()
1476            .get_sui_system_state_object_unsafe()?;
1477        Ok((tonic::Response::new(response), Weight::one()))
1478    }
1479
1480    async fn validator_health_impl(
1481        &self,
1482        _request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1483    ) -> WrappedServiceResponse<sui_types::messages_grpc::RawValidatorHealthResponse> {
1484        let state = &self.state;
1485
1486        // Get epoch store once for both metrics
1487        let epoch_store = state.load_epoch_store_one_call_per_task();
1488
1489        // Get in-flight execution transactions from execution scheduler
1490        let num_inflight_execution_transactions =
1491            state.execution_scheduler().num_pending_certificates() as u64;
1492
1493        // Get in-flight consensus transactions from consensus adapter
1494        let num_inflight_consensus_transactions =
1495            self.consensus_adapter.num_inflight_transactions();
1496
1497        // Get last committed leader round from epoch store
1498        let last_committed_leader_round = epoch_store
1499            .consensus_tx_status_cache
1500            .as_ref()
1501            .and_then(|cache| cache.get_last_committed_leader_round())
1502            .unwrap_or(0);
1503
1504        // Get last locally built checkpoint sequence
1505        let last_locally_built_checkpoint = epoch_store
1506            .last_built_checkpoint_summary()
1507            .ok()
1508            .flatten()
1509            .map(|(_, summary)| summary.sequence_number)
1510            .unwrap_or(0);
1511
1512        let typed_response = sui_types::messages_grpc::ValidatorHealthResponse {
1513            num_inflight_consensus_transactions,
1514            num_inflight_execution_transactions,
1515            last_locally_built_checkpoint,
1516            last_committed_leader_round,
1517        };
1518
1519        let raw_response = typed_response
1520            .try_into()
1521            .map_err(|e: sui_types::error::SuiError| {
1522                tonic::Status::internal(format!("Failed to serialize health response: {}", e))
1523            })?;
1524
1525        Ok((tonic::Response::new(raw_response), Weight::one()))
1526    }
1527
1528    fn get_client_ip_addr<T>(
1529        &self,
1530        request: &tonic::Request<T>,
1531        source: &ClientIdSource,
1532    ) -> Option<IpAddr> {
1533        let forwarded_header = request.metadata().get_all("x-forwarded-for").iter().next();
1534
1535        if let Some(header) = forwarded_header {
1536            let num_hops = header
1537                .to_str()
1538                .map(|h| h.split(',').count().saturating_sub(1))
1539                .unwrap_or(0);
1540
1541            self.metrics.x_forwarded_for_num_hops.set(num_hops as f64);
1542        }
1543
1544        match source {
1545            ClientIdSource::SocketAddr => {
1546                let socket_addr: Option<SocketAddr> = request.remote_addr();
1547
1548                // We will hit this case if the IO type used does not
1549                // implement Connected or when using a unix domain socket.
1550                // TODO: once we have confirmed that no legitimate traffic
1551                // is hitting this case, we should reject such requests that
1552                // hit this case.
1553                if let Some(socket_addr) = socket_addr {
1554                    Some(socket_addr.ip())
1555                } else {
1556                    if cfg!(msim) {
1557                        // Ignore the error from simtests.
1558                    } else if cfg!(test) {
1559                        panic!("Failed to get remote address from request");
1560                    } else {
1561                        self.metrics.connection_ip_not_found.inc();
1562                        error!("Failed to get remote address from request");
1563                    }
1564                    None
1565                }
1566            }
1567            ClientIdSource::XForwardedFor(num_hops) => {
1568                let do_header_parse = |op: &MetadataValue<Ascii>| {
1569                    match op.to_str() {
1570                        Ok(header_val) => {
1571                            let header_contents =
1572                                header_val.split(',').map(str::trim).collect::<Vec<_>>();
1573                            if *num_hops == 0 {
1574                                error!(
1575                                    "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
1576                                    number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
1577                                    to this node. Skipping traffic controller request handling.",
1578                                    header_contents,
1579                                );
1580                                return None;
1581                            }
1582                            let contents_len = header_contents.len();
1583                            if contents_len < *num_hops {
1584                                error!(
1585                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1586                                    Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
1587                                    `client-id-source` in the node config.",
1588                                    header_contents, contents_len, num_hops, contents_len,
1589                                );
1590                                self.metrics.client_id_source_config_mismatch.inc();
1591                                return None;
1592                            }
1593                            let Some(client_ip) = header_contents.get(contents_len - num_hops)
1594                            else {
1595                                error!(
1596                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1597                                    Expected at least {} values. Skipping traffic controller request handling.",
1598                                    header_contents, contents_len, num_hops, contents_len,
1599                                );
1600                                return None;
1601                            };
1602                            parse_ip(client_ip).or_else(|| {
1603                                self.metrics.forwarded_header_parse_error.inc();
1604                                None
1605                            })
1606                        }
1607                        Err(e) => {
1608                            // TODO: once we have confirmed that no legitimate traffic
1609                            // is hitting this case, we should reject such requests that
1610                            // hit this case.
1611                            self.metrics.forwarded_header_invalid.inc();
1612                            error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
1613                            None
1614                        }
1615                    }
1616                };
1617                if let Some(op) = request.metadata().get("x-forwarded-for") {
1618                    do_header_parse(op)
1619                } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
1620                    do_header_parse(op)
1621                } else {
1622                    self.metrics.forwarded_header_not_included.inc();
1623                    error!(
1624                        "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
1625                    );
1626                    None
1627                }
1628            }
1629        }
1630    }
1631
1632    async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
1633        if let Some(traffic_controller) = &self.traffic_controller {
1634            if !traffic_controller.check(&client, &None).await {
1635                // Entity in blocklist
1636                Err(tonic::Status::from_error(
1637                    SuiErrorKind::TooManyRequests.into(),
1638                ))
1639            } else {
1640                Ok(())
1641            }
1642        } else {
1643            Ok(())
1644        }
1645    }
1646
1647    fn handle_traffic_resp<T>(
1648        &self,
1649        client: Option<IpAddr>,
1650        wrapped_response: WrappedServiceResponse<T>,
1651    ) -> Result<tonic::Response<T>, tonic::Status> {
1652        let (error, spam_weight, unwrapped_response) = match wrapped_response {
1653            Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
1654            Err(status) => (
1655                Some(SuiError::from(status.clone())),
1656                Weight::zero(),
1657                Err(status.clone()),
1658            ),
1659        };
1660
1661        if let Some(traffic_controller) = self.traffic_controller.clone() {
1662            traffic_controller.tally(TrafficTally {
1663                direct: client,
1664                through_fullnode: None,
1665                error_info: error.map(|e| {
1666                    let error_type = String::from(e.clone().as_ref());
1667                    let error_weight = normalize(e);
1668                    (error_weight, error_type)
1669                }),
1670                spam_weight,
1671                timestamp: SystemTime::now(),
1672            })
1673        }
1674        unwrapped_response
1675    }
1676}
1677
1678// TODO: refine error matching here
1679fn normalize(err: SuiError) -> Weight {
1680    match err.as_inner() {
1681        SuiErrorKind::UserInputError {
1682            error: UserInputError::IncorrectUserSignature { .. },
1683        } => Weight::one(),
1684        SuiErrorKind::InvalidSignature { .. }
1685        | SuiErrorKind::SignerSignatureAbsent { .. }
1686        | SuiErrorKind::SignerSignatureNumberMismatch { .. }
1687        | SuiErrorKind::IncorrectSigner { .. }
1688        | SuiErrorKind::UnknownSigner { .. }
1689        | SuiErrorKind::WrongEpoch { .. } => Weight::one(),
1690        _ => Weight::zero(),
1691    }
1692}
1693
1694/// Implements generic pre- and post-processing. Since this is on the critical
1695/// path, any heavy lifting should be done in a separate non-blocking task
1696/// unless it is necessary to override the return value.
1697#[macro_export]
1698macro_rules! handle_with_decoration {
1699    ($self:ident, $func_name:ident, $request:ident) => {{
1700        if $self.client_id_source.is_none() {
1701            return $self.$func_name($request).await.map(|(result, _)| result);
1702        }
1703
1704        let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
1705
1706        // check if either IP is blocked, in which case return early
1707        $self.handle_traffic_req(client.clone()).await?;
1708
1709        // handle traffic tallying
1710        let wrapped_response = $self.$func_name($request).await;
1711        $self.handle_traffic_resp(client, wrapped_response)
1712    }};
1713}
1714
1715#[async_trait]
1716impl Validator for ValidatorService {
1717    async fn submit_transaction(
1718        &self,
1719        request: tonic::Request<RawSubmitTxRequest>,
1720    ) -> Result<tonic::Response<RawSubmitTxResponse>, tonic::Status> {
1721        let validator_service = self.clone();
1722
1723        // Spawns a task which handles the transaction. The task will unconditionally continue
1724        // processing in the event that the client connection is dropped.
1725        spawn_monitored_task!(async move {
1726            // NB: traffic tally wrapping handled within the task rather than on task exit
1727            // to prevent an attacker from subverting traffic control by severing the connection
1728            handle_with_decoration!(validator_service, handle_submit_transaction_impl, request)
1729        })
1730        .await
1731        .unwrap()
1732    }
1733
1734    async fn wait_for_effects(
1735        &self,
1736        request: tonic::Request<RawWaitForEffectsRequest>,
1737    ) -> Result<tonic::Response<RawWaitForEffectsResponse>, tonic::Status> {
1738        handle_with_decoration!(self, wait_for_effects_impl, request)
1739    }
1740
1741    async fn object_info(
1742        &self,
1743        request: tonic::Request<ObjectInfoRequest>,
1744    ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
1745        handle_with_decoration!(self, object_info_impl, request)
1746    }
1747
1748    async fn transaction_info(
1749        &self,
1750        request: tonic::Request<TransactionInfoRequest>,
1751    ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
1752        handle_with_decoration!(self, transaction_info_impl, request)
1753    }
1754
1755    async fn checkpoint(
1756        &self,
1757        request: tonic::Request<CheckpointRequest>,
1758    ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
1759        handle_with_decoration!(self, checkpoint_impl, request)
1760    }
1761
1762    async fn checkpoint_v2(
1763        &self,
1764        request: tonic::Request<CheckpointRequestV2>,
1765    ) -> Result<tonic::Response<CheckpointResponseV2>, tonic::Status> {
1766        handle_with_decoration!(self, checkpoint_v2_impl, request)
1767    }
1768
1769    async fn get_system_state_object(
1770        &self,
1771        request: tonic::Request<SystemStateRequest>,
1772    ) -> Result<tonic::Response<SuiSystemState>, tonic::Status> {
1773        handle_with_decoration!(self, get_system_state_object_impl, request)
1774    }
1775
1776    async fn validator_health(
1777        &self,
1778        request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1779    ) -> Result<tonic::Response<sui_types::messages_grpc::RawValidatorHealthResponse>, tonic::Status>
1780    {
1781        handle_with_decoration!(self, validator_health_impl, request)
1782    }
1783}