sui_core/
authority_server.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use anyhow::Result;
6use async_trait::async_trait;
7use fastcrypto::traits::KeyPair;
8use futures::{TryFutureExt, future};
9use itertools::Itertools as _;
10use mysten_common::{assert_reachable, debug_fatal};
11use mysten_metrics::spawn_monitored_task;
12use prometheus::{
13    Gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, Registry,
14    register_gauge_with_registry, register_histogram_vec_with_registry,
15    register_histogram_with_registry, register_int_counter_vec_with_registry,
16    register_int_counter_with_registry,
17};
18use std::{
19    cmp::Ordering,
20    future::Future,
21    io,
22    net::{IpAddr, SocketAddr},
23    pin::Pin,
24    sync::Arc,
25    time::{Duration, SystemTime},
26};
27use sui_network::{
28    api::{Validator, ValidatorServer},
29    tonic,
30    validator::server::SUI_TLS_SERVER_NAME,
31};
32use sui_types::effects::TransactionEffectsAPI;
33use sui_types::message_envelope::Message;
34use sui_types::messages_consensus::ConsensusPosition;
35use sui_types::messages_consensus::ConsensusTransaction;
36use sui_types::messages_grpc::{
37    ObjectInfoRequest, ObjectInfoResponse, RawSubmitTxResponse, SystemStateRequest,
38    TransactionInfoRequest, TransactionInfoResponse,
39};
40use sui_types::multiaddr::Multiaddr;
41use sui_types::object::Object;
42use sui_types::sui_system_state::SuiSystemState;
43use sui_types::traffic_control::{ClientIdSource, Weight};
44use sui_types::{
45    base_types::ObjectID,
46    digests::{TransactionDigest, TransactionEffectsDigest},
47    error::{SuiErrorKind, UserInputError},
48};
49use sui_types::{
50    effects::TransactionEffects,
51    messages_grpc::{
52        ExecutedData, RawSubmitTxRequest, RawWaitForEffectsRequest, RawWaitForEffectsResponse,
53        SubmitTxResult, WaitForEffectsRequest, WaitForEffectsResponse,
54    },
55};
56use sui_types::{effects::TransactionEvents, messages_grpc::SubmitTxType};
57use sui_types::{error::*, transaction::*};
58use sui_types::{
59    fp_ensure,
60    messages_checkpoint::{
61        CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
62    },
63};
64use tokio::sync::oneshot;
65use tokio::time::timeout;
66use tonic::metadata::{Ascii, MetadataValue};
67use tracing::{debug, error, info, instrument};
68
69use crate::consensus_adapter::ConnectionMonitorStatusForTests;
70use crate::{
71    authority::{AuthorityState, consensus_tx_status_cache::ConsensusTxStatus},
72    consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics},
73    traffic_controller::{TrafficController, parse_ip, policies::TrafficTally},
74};
75use crate::{
76    authority::{
77        authority_per_epoch_store::AuthorityPerEpochStore,
78        consensus_tx_status_cache::NotifyReadConsensusTxStatusResult,
79    },
80    checkpoints::CheckpointStore,
81    mysticeti_adapter::LazyMysticetiClient,
82};
83use sui_config::local_ip_utils::new_local_tcp_address_for_testing;
84use sui_types::messages_grpc::PingType;
85
86#[cfg(test)]
87#[path = "unit_tests/server_tests.rs"]
88mod server_tests;
89
90#[cfg(test)]
91#[path = "unit_tests/wait_for_effects_tests.rs"]
92mod wait_for_effects_tests;
93
94#[cfg(test)]
95#[path = "unit_tests/submit_transaction_tests.rs"]
96mod submit_transaction_tests;
97
98pub struct AuthorityServerHandle {
99    server_handle: sui_network::validator::server::Server,
100}
101
102impl AuthorityServerHandle {
103    pub async fn join(self) -> Result<(), io::Error> {
104        self.server_handle.handle().wait_for_shutdown().await;
105        Ok(())
106    }
107
108    pub async fn kill(self) -> Result<(), io::Error> {
109        self.server_handle.handle().shutdown().await;
110        Ok(())
111    }
112
113    pub fn address(&self) -> &Multiaddr {
114        self.server_handle.local_addr()
115    }
116}
117
118pub struct AuthorityServer {
119    address: Multiaddr,
120    pub state: Arc<AuthorityState>,
121    consensus_adapter: Arc<ConsensusAdapter>,
122    pub metrics: Arc<ValidatorServiceMetrics>,
123}
124
125impl AuthorityServer {
126    pub fn new_for_test_with_consensus_adapter(
127        state: Arc<AuthorityState>,
128        consensus_adapter: Arc<ConsensusAdapter>,
129    ) -> Self {
130        let address = new_local_tcp_address_for_testing();
131        let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
132
133        Self {
134            address,
135            state,
136            consensus_adapter,
137            metrics,
138        }
139    }
140
141    pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
142        let consensus_adapter = Arc::new(ConsensusAdapter::new(
143            Arc::new(LazyMysticetiClient::new()),
144            CheckpointStore::new_for_tests(),
145            state.name,
146            Arc::new(ConnectionMonitorStatusForTests {}),
147            100_000,
148            100_000,
149            None,
150            None,
151            ConsensusAdapterMetrics::new_test(),
152            state.epoch_store_for_testing().protocol_config().clone(),
153        ));
154        Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
155    }
156
157    pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
158        let address = self.address.clone();
159        self.spawn_with_bind_address_for_test(address).await
160    }
161
162    pub async fn spawn_with_bind_address_for_test(
163        self,
164        address: Multiaddr,
165    ) -> Result<AuthorityServerHandle, io::Error> {
166        let tls_config = sui_tls::create_rustls_server_config(
167            self.state.config.network_key_pair().copy().private(),
168            SUI_TLS_SERVER_NAME.to_string(),
169        );
170        let config = mysten_network::config::Config::new();
171        let server = sui_network::validator::server::ServerBuilder::from_config(
172            &config,
173            mysten_network::metrics::DefaultMetricsCallbackProvider::default(),
174        )
175        .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
176            self.state,
177            self.consensus_adapter,
178            self.metrics,
179        )))
180        .bind(&address, Some(tls_config))
181        .await
182        .unwrap();
183        let local_addr = server.local_addr().to_owned();
184        info!("Listening to traffic on {local_addr}");
185        let handle = AuthorityServerHandle {
186            server_handle: server,
187        };
188        Ok(handle)
189    }
190}
191
192pub struct ValidatorServiceMetrics {
193    pub signature_errors: IntCounter,
194    pub tx_verification_latency: Histogram,
195    pub cert_verification_latency: Histogram,
196    pub consensus_latency: Histogram,
197    pub handle_transaction_latency: Histogram,
198    pub submit_certificate_consensus_latency: Histogram,
199    pub handle_certificate_consensus_latency: Histogram,
200    pub handle_certificate_non_consensus_latency: Histogram,
201    pub handle_soft_bundle_certificates_consensus_latency: Histogram,
202    pub handle_soft_bundle_certificates_count: Histogram,
203    pub handle_soft_bundle_certificates_size_bytes: Histogram,
204    pub handle_transaction_consensus_latency: Histogram,
205    pub handle_submit_transaction_consensus_latency: HistogramVec,
206    pub handle_wait_for_effects_ping_latency: HistogramVec,
207
208    handle_submit_transaction_latency: HistogramVec,
209    handle_submit_transaction_bytes: HistogramVec,
210    handle_submit_transaction_batch_size: HistogramVec,
211
212    num_rejected_cert_in_epoch_boundary: IntCounter,
213    num_rejected_tx_during_overload: IntCounterVec,
214    submission_rejected_transactions: IntCounterVec,
215    connection_ip_not_found: IntCounter,
216    forwarded_header_parse_error: IntCounter,
217    forwarded_header_invalid: IntCounter,
218    forwarded_header_not_included: IntCounter,
219    client_id_source_config_mismatch: IntCounter,
220    x_forwarded_for_num_hops: Gauge,
221}
222
223impl ValidatorServiceMetrics {
224    pub fn new(registry: &Registry) -> Self {
225        Self {
226            signature_errors: register_int_counter_with_registry!(
227                "total_signature_errors",
228                "Number of transaction signature errors",
229                registry,
230            )
231            .unwrap(),
232            tx_verification_latency: register_histogram_with_registry!(
233                "validator_service_tx_verification_latency",
234                "Latency of verifying a transaction",
235                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
236                registry,
237            )
238            .unwrap(),
239            cert_verification_latency: register_histogram_with_registry!(
240                "validator_service_cert_verification_latency",
241                "Latency of verifying a certificate",
242                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
243                registry,
244            )
245            .unwrap(),
246            consensus_latency: register_histogram_with_registry!(
247                "validator_service_consensus_latency",
248                "Time spent between submitting a txn to consensus and getting back local acknowledgement. Execution and finalization time are not included.",
249                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
250                registry,
251            )
252            .unwrap(),
253            handle_transaction_latency: register_histogram_with_registry!(
254                "validator_service_handle_transaction_latency",
255                "Latency of handling a transaction",
256                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
257                registry,
258            )
259            .unwrap(),
260            handle_certificate_consensus_latency: register_histogram_with_registry!(
261                "validator_service_handle_certificate_consensus_latency",
262                "Latency of handling a consensus transaction certificate",
263                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
264                registry,
265            )
266            .unwrap(),
267            submit_certificate_consensus_latency: register_histogram_with_registry!(
268                "validator_service_submit_certificate_consensus_latency",
269                "Latency of submit_certificate RPC handler",
270                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
271                registry,
272            )
273            .unwrap(),
274            handle_certificate_non_consensus_latency: register_histogram_with_registry!(
275                "validator_service_handle_certificate_non_consensus_latency",
276                "Latency of handling a non-consensus transaction certificate",
277                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
278                registry,
279            )
280            .unwrap(),
281            handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
282                "validator_service_handle_soft_bundle_certificates_consensus_latency",
283                "Latency of handling a consensus soft bundle",
284                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
285                registry,
286            )
287            .unwrap(),
288            handle_soft_bundle_certificates_count: register_histogram_with_registry!(
289                "handle_soft_bundle_certificates_count",
290                "The number of certificates included in a soft bundle",
291                mysten_metrics::COUNT_BUCKETS.to_vec(),
292                registry,
293            )
294            .unwrap(),
295            handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
296                "handle_soft_bundle_certificates_size_bytes",
297                "The size of soft bundle in bytes",
298                mysten_metrics::BYTES_BUCKETS.to_vec(),
299                registry,
300            )
301            .unwrap(),
302            handle_transaction_consensus_latency: register_histogram_with_registry!(
303                "validator_service_handle_transaction_consensus_latency",
304                "Latency of handling a user transaction sent through consensus",
305                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
306                registry,
307            )
308            .unwrap(),
309            handle_submit_transaction_consensus_latency: register_histogram_vec_with_registry!(
310                "validator_service_submit_transaction_consensus_latency",
311                "Latency of submitting a user transaction sent through consensus",
312                &["req_type"],
313                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
314                registry,
315            )
316            .unwrap(),
317            handle_submit_transaction_latency: register_histogram_vec_with_registry!(
318                "validator_service_submit_transaction_latency",
319                "Latency of submit transaction handler",
320                &["req_type"],
321                mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
322                registry,
323            )
324            .unwrap(),
325            handle_wait_for_effects_ping_latency: register_histogram_vec_with_registry!(
326                "validator_service_handle_wait_for_effects_ping_latency",
327                "Latency of handling a ping request for wait_for_effects",
328                &["req_type"],
329                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
330                registry,
331            )
332            .unwrap(),
333            handle_submit_transaction_bytes: register_histogram_vec_with_registry!(
334                "validator_service_submit_transaction_bytes",
335                "The size of transactions in the submit transaction request",
336                &["req_type"],
337                mysten_metrics::BYTES_BUCKETS.to_vec(),
338                registry,
339            )
340            .unwrap(),
341            handle_submit_transaction_batch_size: register_histogram_vec_with_registry!(
342                "validator_service_submit_transaction_batch_size",
343                "The number of transactions in the submit transaction request",
344                &["req_type"],
345                mysten_metrics::COUNT_BUCKETS.to_vec(),
346                registry,
347            )
348            .unwrap(),
349            num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
350                "validator_service_num_rejected_cert_in_epoch_boundary",
351                "Number of rejected transaction certificate during epoch transitioning",
352                registry,
353            )
354            .unwrap(),
355            num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
356                "validator_service_num_rejected_tx_during_overload",
357                "Number of rejected transaction due to system overload",
358                &["error_type"],
359                registry,
360            )
361            .unwrap(),
362            submission_rejected_transactions: register_int_counter_vec_with_registry!(
363                "validator_service_submission_rejected_transactions",
364                "Number of transactions rejected during submission",
365                &["reason"],
366                registry,
367            )
368            .unwrap(),
369            connection_ip_not_found: register_int_counter_with_registry!(
370                "validator_service_connection_ip_not_found",
371                "Number of times connection IP was not extractable from request",
372                registry,
373            )
374            .unwrap(),
375            forwarded_header_parse_error: register_int_counter_with_registry!(
376                "validator_service_forwarded_header_parse_error",
377                "Number of times x-forwarded-for header could not be parsed",
378                registry,
379            )
380            .unwrap(),
381            forwarded_header_invalid: register_int_counter_with_registry!(
382                "validator_service_forwarded_header_invalid",
383                "Number of times x-forwarded-for header was invalid",
384                registry,
385            )
386            .unwrap(),
387            forwarded_header_not_included: register_int_counter_with_registry!(
388                "validator_service_forwarded_header_not_included",
389                "Number of times x-forwarded-for header was (unexpectedly) not included in request",
390                registry,
391            )
392            .unwrap(),
393            client_id_source_config_mismatch: register_int_counter_with_registry!(
394                "validator_service_client_id_source_config_mismatch",
395                "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
396                registry,
397            )
398            .unwrap(),
399            x_forwarded_for_num_hops: register_gauge_with_registry!(
400                "validator_service_x_forwarded_for_num_hops",
401                "Number of hops in x-forwarded-for header",
402                registry,
403            )
404            .unwrap(),
405        }
406    }
407
408    pub fn new_for_tests() -> Self {
409        let registry = Registry::new();
410        Self::new(&registry)
411    }
412}
413
414#[derive(Clone)]
415pub struct ValidatorService {
416    state: Arc<AuthorityState>,
417    consensus_adapter: Arc<ConsensusAdapter>,
418    metrics: Arc<ValidatorServiceMetrics>,
419    traffic_controller: Option<Arc<TrafficController>>,
420    client_id_source: Option<ClientIdSource>,
421}
422
423impl ValidatorService {
424    pub fn new(
425        state: Arc<AuthorityState>,
426        consensus_adapter: Arc<ConsensusAdapter>,
427        validator_metrics: Arc<ValidatorServiceMetrics>,
428        client_id_source: Option<ClientIdSource>,
429    ) -> Self {
430        let traffic_controller = state.traffic_controller.clone();
431        Self {
432            state,
433            consensus_adapter,
434            metrics: validator_metrics,
435            traffic_controller,
436            client_id_source,
437        }
438    }
439
440    pub fn new_for_tests(
441        state: Arc<AuthorityState>,
442        consensus_adapter: Arc<ConsensusAdapter>,
443        metrics: Arc<ValidatorServiceMetrics>,
444    ) -> Self {
445        Self {
446            state,
447            consensus_adapter,
448            metrics,
449            traffic_controller: None,
450            client_id_source: None,
451        }
452    }
453
454    pub fn validator_state(&self) -> &Arc<AuthorityState> {
455        &self.state
456    }
457
458    /// Test method that performs transaction validation without going through gRPC.
459    pub fn handle_transaction_for_testing(&self, transaction: Transaction) -> SuiResult<()> {
460        let epoch_store = self.state.load_epoch_store_one_call_per_task();
461
462        // Validity check (basic structural validation)
463        transaction.validity_check(&epoch_store.tx_validity_check_context())?;
464
465        // Signature verification
466        let transaction = epoch_store
467            .verify_transaction_require_no_aliases(transaction)?
468            .into_tx();
469
470        // Validate the transaction
471        self.state
472            .handle_vote_transaction(&epoch_store, transaction)?;
473
474        Ok(())
475    }
476
477    /// Test method that performs transaction validation with overload checking.
478    /// Used for testing validator overload behavior.
479    pub fn handle_transaction_for_testing_with_overload_check(
480        &self,
481        transaction: Transaction,
482    ) -> SuiResult<()> {
483        let epoch_store = self.state.load_epoch_store_one_call_per_task();
484
485        // Validity check (basic structural validation)
486        transaction.validity_check(&epoch_store.tx_validity_check_context())?;
487
488        // Check system overload
489        self.state.check_system_overload(
490            self.consensus_adapter.as_ref(),
491            transaction.data(),
492            self.state.check_system_overload_at_signing(),
493        )?;
494
495        // Signature verification
496        let transaction = epoch_store
497            .verify_transaction_require_no_aliases(transaction)?
498            .into_tx();
499
500        // Validate the transaction
501        self.state
502            .handle_vote_transaction(&epoch_store, transaction)?;
503
504        Ok(())
505    }
506
507    /// Collect the IDs of input objects that are immutable.
508    /// This is used to create the ImmutableInputObjects claim for consensus messages.
509    async fn collect_immutable_object_ids(
510        &self,
511        tx: &VerifiedTransaction,
512        state: &AuthorityState,
513    ) -> SuiResult<Vec<ObjectID>> {
514        let input_objects = tx.data().transaction_data().input_objects()?;
515
516        // Collect object IDs from ImmOrOwnedMoveObject inputs
517        let object_ids: Vec<ObjectID> = input_objects
518            .iter()
519            .filter_map(|obj| match obj {
520                InputObjectKind::ImmOrOwnedMoveObject((id, _, _)) => Some(*id),
521                _ => None,
522            })
523            .collect();
524        if object_ids.is_empty() {
525            return Ok(vec![]);
526        }
527
528        // Load objects from cache and filter to immutable ones
529        let objects = state.get_object_cache_reader().get_objects(&object_ids);
530
531        // All objects should be found, since owned input objects have been validated to exist.
532        objects
533            .into_iter()
534            .zip(object_ids.iter())
535            .filter_map(|(obj, id)| {
536                let Some(o) = obj else {
537                    return Some(Err::<ObjectID, SuiError>(
538                        SuiErrorKind::UserInputError {
539                            error: UserInputError::ObjectNotFound {
540                                object_id: *id,
541                                version: None,
542                            },
543                        }
544                        .into(),
545                    ));
546                };
547                if o.is_immutable() {
548                    Some(Ok(*id))
549                } else {
550                    None
551                }
552            })
553            .collect::<SuiResult<Vec<ObjectID>>>()
554    }
555
556    #[instrument(
557        name = "ValidatorService::handle_submit_transaction",
558        level = "error",
559        skip_all,
560        err(level = "debug")
561    )]
562    async fn handle_submit_transaction(
563        &self,
564        request: tonic::Request<RawSubmitTxRequest>,
565    ) -> WrappedServiceResponse<RawSubmitTxResponse> {
566        let Self {
567            state,
568            consensus_adapter,
569            metrics,
570            traffic_controller: _,
571            client_id_source,
572        } = self.clone();
573
574        let submitter_client_addr = if let Some(client_id_source) = &client_id_source {
575            self.get_client_ip_addr(&request, client_id_source)
576        } else {
577            self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
578        };
579
580        let inner = request.into_inner();
581        let start_epoch = state.load_epoch_store_one_call_per_task().epoch();
582
583        let next_epoch = start_epoch + 1;
584        let mut max_retries = 1;
585
586        loop {
587            let res = self
588                .handle_submit_transaction_inner(
589                    &state,
590                    &consensus_adapter,
591                    &metrics,
592                    &inner,
593                    submitter_client_addr,
594                )
595                .await;
596            match res {
597                Ok((response, weight)) => return Ok((tonic::Response::new(response), weight)),
598                Err(err) => {
599                    if max_retries > 0
600                        && let SuiErrorKind::ValidatorHaltedAtEpochEnd = err.as_inner()
601                    {
602                        max_retries -= 1;
603
604                        debug!(
605                            "ValidatorHaltedAtEpochEnd. Will retry after validator reconfigures"
606                        );
607
608                        if let Ok(Ok(new_epoch)) =
609                            timeout(Duration::from_secs(15), state.wait_for_epoch(next_epoch)).await
610                        {
611                            assert_reachable!("retry submission at epoch end");
612                            if new_epoch == next_epoch {
613                                continue;
614                            }
615
616                            debug_fatal!(
617                                "expected epoch {} after reconfiguration. got {}",
618                                next_epoch,
619                                new_epoch
620                            );
621                        }
622                    }
623                    return Err(err.into());
624                }
625            }
626        }
627    }
628
629    async fn handle_submit_transaction_inner(
630        &self,
631        state: &AuthorityState,
632        consensus_adapter: &ConsensusAdapter,
633        metrics: &ValidatorServiceMetrics,
634        request: &RawSubmitTxRequest,
635        submitter_client_addr: Option<IpAddr>,
636    ) -> SuiResult<(RawSubmitTxResponse, Weight)> {
637        let epoch_store = state.load_epoch_store_one_call_per_task();
638        if !epoch_store.protocol_config().mysticeti_fastpath() {
639            return Err(SuiErrorKind::UnsupportedFeatureError {
640                error: "Mysticeti fastpath".to_string(),
641            }
642            .into());
643        }
644
645        let submit_type = SubmitTxType::try_from(request.submit_type).map_err(|e| {
646            SuiErrorKind::GrpcMessageDeserializeError {
647                type_info: "RawSubmitTxRequest.submit_type".to_string(),
648                error: e.to_string(),
649            }
650        })?;
651
652        let is_ping_request = submit_type == SubmitTxType::Ping;
653        if is_ping_request {
654            fp_ensure!(
655                request.transactions.is_empty(),
656                SuiErrorKind::InvalidRequest(format!(
657                    "Ping request cannot contain {} transactions",
658                    request.transactions.len()
659                ))
660                .into()
661            );
662        } else {
663            // Ensure default and soft bundle requests contain at least one transaction.
664            fp_ensure!(
665                !request.transactions.is_empty(),
666                SuiErrorKind::InvalidRequest(
667                    "At least one transaction needs to be submitted".to_string(),
668                )
669                .into()
670            );
671        }
672
673        // NOTE: for soft bundle requests, the system tries to sequence the transactions in the same order
674        // if they use the same gas price. But this is only done with best effort.
675        // Transactions in a soft bundle can be individually rejected or deferred, without affecting
676        // other transactions in the same bundle.
677        let is_soft_bundle_request = submit_type == SubmitTxType::SoftBundle;
678
679        let max_num_transactions = if is_soft_bundle_request {
680            // Soft bundle cannot contain too many transactions.
681            // Otherwise it is hard to include all of them in a single block.
682            epoch_store.protocol_config().max_soft_bundle_size()
683        } else {
684            // Still enforce a limit even when transactions do not need to be in the same block.
685            epoch_store
686                .protocol_config()
687                .max_num_transactions_in_block()
688        };
689        fp_ensure!(
690            request.transactions.len() <= max_num_transactions as usize,
691            SuiErrorKind::InvalidRequest(format!(
692                "Too many transactions in request: {} vs {}",
693                request.transactions.len(),
694                max_num_transactions
695            ))
696            .into()
697        );
698
699        // Transaction digests.
700        let mut tx_digests = Vec::with_capacity(request.transactions.len());
701        // Transactions to submit to consensus.
702        let mut consensus_transactions = Vec::with_capacity(request.transactions.len());
703        // Indexes of transactions above in the request transactions.
704        let mut transaction_indexes = Vec::with_capacity(request.transactions.len());
705        // Results corresponding to each transaction in the request.
706        let mut results: Vec<Option<SubmitTxResult>> = vec![None; request.transactions.len()];
707        // Total size of all transactions in the request.
708        let mut total_size_bytes = 0;
709
710        let req_type = if is_ping_request {
711            "ping"
712        } else if request.transactions.len() == 1 {
713            "single_transaction"
714        } else if is_soft_bundle_request {
715            "soft_bundle"
716        } else {
717            "batch"
718        };
719
720        let _handle_tx_metrics_guard = metrics
721            .handle_submit_transaction_latency
722            .with_label_values(&[req_type])
723            .start_timer();
724
725        for (idx, tx_bytes) in request.transactions.iter().enumerate() {
726            let transaction = match bcs::from_bytes::<Transaction>(tx_bytes) {
727                Ok(txn) => txn,
728                Err(e) => {
729                    // Ok to fail the request when any transaction is invalid.
730                    return Err(SuiErrorKind::TransactionDeserializationError {
731                        error: format!("Failed to deserialize transaction at index {}: {}", idx, e),
732                    }
733                    .into());
734                }
735            };
736
737            // Ok to fail the request when any transaction is invalid.
738            let tx_size = transaction.validity_check(&epoch_store.tx_validity_check_context())?;
739
740            let overload_check_res = state.check_system_overload(
741                consensus_adapter,
742                transaction.data(),
743                state.check_system_overload_at_signing(),
744            );
745            if let Err(error) = overload_check_res {
746                metrics
747                    .num_rejected_tx_during_overload
748                    .with_label_values(&[error.as_ref()])
749                    .inc();
750                results[idx] = Some(SubmitTxResult::Rejected { error });
751                continue;
752            }
753
754            // Ok to fail the request when any signature is invalid.
755            let verified_transaction = {
756                let _metrics_guard = metrics.tx_verification_latency.start_timer();
757                if epoch_store.protocol_config().address_aliases() {
758                    match epoch_store.verify_transaction_with_current_aliases(transaction) {
759                        Ok(tx) => tx,
760                        Err(e) => {
761                            metrics.signature_errors.inc();
762                            return Err(e);
763                        }
764                    }
765                } else {
766                    match epoch_store.verify_transaction_require_no_aliases(transaction) {
767                        Ok(tx) => tx,
768                        Err(e) => {
769                            metrics.signature_errors.inc();
770                            return Err(e);
771                        }
772                    }
773                }
774            };
775
776            let tx_digest = verified_transaction.tx().digest();
777            tx_digests.push(*tx_digest);
778
779            debug!(
780                ?tx_digest,
781                "handle_submit_transaction: verified transaction"
782            );
783
784            // Check if the transaction has executed, before checking input objects
785            // which could have been consumed.
786            if let Some(effects) = state
787                .get_transaction_cache_reader()
788                .get_executed_effects(tx_digest)
789            {
790                let effects_digest = effects.digest();
791                if let Ok(executed_data) = self.complete_executed_data(effects).await {
792                    let executed_result = SubmitTxResult::Executed {
793                        effects_digest,
794                        details: Some(executed_data),
795                        fast_path: false,
796                    };
797                    results[idx] = Some(executed_result);
798                    debug!(?tx_digest, "handle_submit_transaction: already executed");
799                    continue;
800                }
801            }
802
803            if self
804                .state
805                .get_transaction_cache_reader()
806                .transaction_executed_in_last_epoch(tx_digest, epoch_store.epoch())
807            {
808                results[idx] = Some(SubmitTxResult::Rejected {
809                    error: UserInputError::TransactionAlreadyExecuted { digest: *tx_digest }.into(),
810                });
811                debug!(
812                    ?tx_digest,
813                    "handle_submit_transaction: transaction already executed in previous epoch"
814                );
815                continue;
816            }
817
818            debug!(
819                ?tx_digest,
820                "handle_submit_transaction: waiting for fastpath dependency objects"
821            );
822            if !state
823                .wait_for_fastpath_dependency_objects(
824                    verified_transaction.tx(),
825                    epoch_store.epoch(),
826                )
827                .await?
828            {
829                debug!(
830                    ?tx_digest,
831                    "fastpath input objects are still unavailable after waiting"
832                );
833            }
834
835            match state.handle_vote_transaction(&epoch_store, verified_transaction.tx().clone()) {
836                Ok(_) => { /* continue processing */ }
837                Err(e) => {
838                    // Check if transaction has been executed while being validated.
839                    // This is an edge case so checking executed effects twice is acceptable.
840                    if let Some(effects) = state
841                        .get_transaction_cache_reader()
842                        .get_executed_effects(tx_digest)
843                    {
844                        let effects_digest = effects.digest();
845                        if let Ok(executed_data) = self.complete_executed_data(effects).await {
846                            let executed_result = SubmitTxResult::Executed {
847                                effects_digest,
848                                details: Some(executed_data),
849                                fast_path: false,
850                            };
851                            results[idx] = Some(executed_result);
852                            continue;
853                        }
854                    }
855
856                    // When the transaction has not been executed, record the error for the transaction.
857                    debug!(?tx_digest, "Transaction rejected during submission: {e}");
858                    metrics
859                        .submission_rejected_transactions
860                        .with_label_values(&[e.to_variant_name()])
861                        .inc();
862                    results[idx] = Some(SubmitTxResult::Rejected { error: e });
863                    continue;
864                }
865            }
866
867            // Create claims with aliases and / or immutable objects.
868            let mut claims = vec![];
869
870            let immutable_object_ids = self
871                .collect_immutable_object_ids(verified_transaction.tx(), state)
872                .await?;
873            if !immutable_object_ids.is_empty() {
874                claims.push(TransactionClaim::ImmutableInputObjects(
875                    immutable_object_ids,
876                ));
877            }
878
879            let (tx, aliases) = verified_transaction.into_inner();
880            if epoch_store.protocol_config().address_aliases() {
881                if epoch_store
882                    .protocol_config()
883                    .fix_checkpoint_signature_mapping()
884                {
885                    claims.push(TransactionClaim::AddressAliasesV2(aliases));
886                } else {
887                    let v1_aliases: Vec<_> = tx
888                        .data()
889                        .intent_message()
890                        .value
891                        .required_signers()
892                        .into_iter()
893                        .zip_eq(aliases.into_iter().map(|(_, seq)| seq))
894                        .collect();
895                    #[allow(deprecated)]
896                    claims.push(TransactionClaim::AddressAliases(
897                        nonempty::NonEmpty::from_vec(v1_aliases)
898                            .expect("must have at least one required_signer"),
899                    ));
900                }
901            }
902
903            let tx_with_claims = TransactionWithClaims::new(tx.into(), claims);
904
905            consensus_transactions.push(ConsensusTransaction::new_user_transaction_v2_message(
906                &state.name,
907                tx_with_claims,
908            ));
909
910            transaction_indexes.push(idx);
911            total_size_bytes += tx_size;
912        }
913
914        if consensus_transactions.is_empty() && !is_ping_request {
915            return Ok((Self::try_from_submit_tx_response(results)?, Weight::zero()));
916        }
917
918        // Set the max bytes size of the soft bundle to be half of the consensus max transactions in block size.
919        // We do this to account for serialization overheads and to ensure that the soft bundle is not too large
920        // when is attempted to be posted via consensus.
921        let max_transaction_bytes = if is_soft_bundle_request {
922            epoch_store
923                .protocol_config()
924                .consensus_max_transactions_in_block_bytes()
925                / 2
926        } else {
927            epoch_store
928                .protocol_config()
929                .consensus_max_transactions_in_block_bytes()
930        };
931        fp_ensure!(
932            total_size_bytes <= max_transaction_bytes as usize,
933            SuiErrorKind::UserInputError {
934                error: UserInputError::TotalTransactionSizeTooLargeInBatch {
935                    size: total_size_bytes,
936                    limit: max_transaction_bytes,
937                },
938            }
939            .into()
940        );
941
942        metrics
943            .handle_submit_transaction_bytes
944            .with_label_values(&[req_type])
945            .observe(total_size_bytes as f64);
946        metrics
947            .handle_submit_transaction_batch_size
948            .with_label_values(&[req_type])
949            .observe(consensus_transactions.len() as f64);
950
951        let _latency_metric_guard = metrics
952            .handle_submit_transaction_consensus_latency
953            .with_label_values(&[req_type])
954            .start_timer();
955
956        let consensus_positions = if is_soft_bundle_request || is_ping_request {
957            // We only allow the `consensus_transactions` to be empty for ping requests. This is how it should and is be treated from the downstream components.
958            // For any other case, having an empty `consensus_transactions` vector is an invalid state and we should have never reached at this point.
959            assert!(
960                is_ping_request || !consensus_transactions.is_empty(),
961                "A valid soft bundle must have at least one transaction"
962            );
963            debug!(
964                "handle_submit_transaction: submitting consensus transactions ({}): {}",
965                req_type,
966                consensus_transactions
967                    .iter()
968                    .map(|t| t.local_display())
969                    .join(", ")
970            );
971            self.handle_submit_to_consensus_for_position(
972                consensus_transactions,
973                &epoch_store,
974                submitter_client_addr,
975            )
976            .await?
977        } else {
978            let futures = consensus_transactions.into_iter().map(|t| {
979                debug!(
980                    "handle_submit_transaction: submitting consensus transaction ({}): {}",
981                    req_type,
982                    t.local_display(),
983                );
984                self.handle_submit_to_consensus_for_position(
985                    vec![t],
986                    &epoch_store,
987                    submitter_client_addr,
988                )
989            });
990            future::try_join_all(futures)
991                .await?
992                .into_iter()
993                .flatten()
994                .collect()
995        };
996
997        if is_ping_request {
998            // For ping requests, return the special consensus position.
999            assert_eq!(consensus_positions.len(), 1);
1000            results.push(Some(SubmitTxResult::Submitted {
1001                consensus_position: consensus_positions[0],
1002            }));
1003        } else {
1004            // Otherwise, return the consensus position for each transaction.
1005            for ((idx, tx_digest), consensus_position) in transaction_indexes
1006                .into_iter()
1007                .zip(tx_digests)
1008                .zip(consensus_positions)
1009            {
1010                debug!(
1011                    ?tx_digest,
1012                    "handle_submit_transaction: submitted consensus transaction at {}",
1013                    consensus_position,
1014                );
1015                results[idx] = Some(SubmitTxResult::Submitted { consensus_position });
1016            }
1017        }
1018
1019        Ok((Self::try_from_submit_tx_response(results)?, Weight::zero()))
1020    }
1021
1022    fn try_from_submit_tx_response(
1023        results: Vec<Option<SubmitTxResult>>,
1024    ) -> Result<RawSubmitTxResponse, SuiError> {
1025        let mut raw_results = Vec::new();
1026        for (i, result) in results.into_iter().enumerate() {
1027            let result = result.ok_or_else(|| SuiErrorKind::GenericAuthorityError {
1028                error: format!("Missing transaction result at {}", i),
1029            })?;
1030            let raw_result = result.try_into()?;
1031            raw_results.push(raw_result);
1032        }
1033        Ok(RawSubmitTxResponse {
1034            results: raw_results,
1035        })
1036    }
1037
1038    #[instrument(
1039        name = "ValidatorService::handle_submit_to_consensus_for_position",
1040        level = "debug",
1041        skip_all,
1042        err(level = "debug")
1043    )]
1044    async fn handle_submit_to_consensus_for_position(
1045        &self,
1046        // Empty when this is a ping request.
1047        consensus_transactions: Vec<ConsensusTransaction>,
1048        epoch_store: &Arc<AuthorityPerEpochStore>,
1049        submitter_client_addr: Option<IpAddr>,
1050    ) -> Result<Vec<ConsensusPosition>, tonic::Status> {
1051        let (tx_consensus_positions, rx_consensus_positions) = oneshot::channel();
1052
1053        {
1054            // code block within reconfiguration lock
1055            let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
1056            if !reconfiguration_lock.should_accept_user_certs() {
1057                self.metrics.num_rejected_cert_in_epoch_boundary.inc();
1058                return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1059            }
1060
1061            // Submit to consensus and wait for position, we do not check if tx
1062            // has been processed by consensus already as this method is called
1063            // to get back a consensus position.
1064            let _metrics_guard = self.metrics.consensus_latency.start_timer();
1065
1066            self.consensus_adapter.submit_batch(
1067                &consensus_transactions,
1068                Some(&reconfiguration_lock),
1069                epoch_store,
1070                Some(tx_consensus_positions),
1071                submitter_client_addr,
1072            )?;
1073        }
1074
1075        Ok(rx_consensus_positions.await.map_err(|e| {
1076            SuiErrorKind::FailedToSubmitToConsensus(format!(
1077                "Failed to get consensus position: {e}"
1078            ))
1079        })?)
1080    }
1081
1082    async fn collect_effects_data(
1083        &self,
1084        effects: &TransactionEffects,
1085        include_events: bool,
1086        include_input_objects: bool,
1087        include_output_objects: bool,
1088    ) -> SuiResult<(Option<TransactionEvents>, Vec<Object>, Vec<Object>)> {
1089        let events = if include_events && effects.events_digest().is_some() {
1090            Some(
1091                self.state
1092                    .get_transaction_events(effects.transaction_digest())?,
1093            )
1094        } else {
1095            None
1096        };
1097
1098        let input_objects = if include_input_objects {
1099            self.state.get_transaction_input_objects(effects)?
1100        } else {
1101            vec![]
1102        };
1103
1104        let output_objects = if include_output_objects {
1105            self.state.get_transaction_output_objects(effects)?
1106        } else {
1107            vec![]
1108        };
1109
1110        Ok((events, input_objects, output_objects))
1111    }
1112}
1113
1114type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
1115
1116impl ValidatorService {
1117    async fn handle_submit_transaction_impl(
1118        &self,
1119        request: tonic::Request<RawSubmitTxRequest>,
1120    ) -> WrappedServiceResponse<RawSubmitTxResponse> {
1121        self.handle_submit_transaction(request).await
1122    }
1123
1124    async fn wait_for_effects_impl(
1125        &self,
1126        request: tonic::Request<RawWaitForEffectsRequest>,
1127    ) -> WrappedServiceResponse<RawWaitForEffectsResponse> {
1128        let request: WaitForEffectsRequest = request.into_inner().try_into()?;
1129        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1130        let response = timeout(
1131            // TODO(fastpath): Tune this once we have a good estimate of the typical delay.
1132            Duration::from_secs(20),
1133            epoch_store
1134                .within_alive_epoch(self.wait_for_effects_response(request, &epoch_store))
1135                .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch())),
1136        )
1137        .await
1138        .map_err(|_| tonic::Status::internal("Timeout waiting for effects"))???
1139        .try_into()?;
1140        Ok((tonic::Response::new(response), Weight::zero()))
1141    }
1142
1143    #[instrument(name= "ValidatorService::wait_for_effects_response", level = "error", skip_all, fields(consensus_position = ?request.consensus_position))]
1144    async fn wait_for_effects_response(
1145        &self,
1146        request: WaitForEffectsRequest,
1147        epoch_store: &Arc<AuthorityPerEpochStore>,
1148    ) -> SuiResult<WaitForEffectsResponse> {
1149        if request.ping_type.is_some() {
1150            return timeout(
1151                Duration::from_secs(10),
1152                self.ping_response(request, epoch_store),
1153            )
1154            .await
1155            .map_err(|_| SuiErrorKind::TimeoutError)?;
1156        }
1157
1158        let Some(tx_digest) = request.transaction_digest else {
1159            return Err(SuiErrorKind::InvalidRequest(
1160                "Transaction digest is required for wait for effects requests".to_string(),
1161            )
1162            .into());
1163        };
1164        let tx_digests = [tx_digest];
1165
1166        let fastpath_effects_future: Pin<Box<dyn Future<Output = _> + Send>> =
1167            if let Some(consensus_position) = request.consensus_position {
1168                Box::pin(self.wait_for_fastpath_effects(
1169                    consensus_position,
1170                    &tx_digests,
1171                    request.include_details,
1172                    epoch_store,
1173                ))
1174            } else {
1175                Box::pin(futures::future::pending())
1176            };
1177
1178        tokio::select! {
1179            // We always wait for effects regardless of consensus position via
1180            // notify_read_executed_effects_may_fail. This is safe because we have separated
1181            // mysticeti fastpath outputs to a separate dirty cache
1182            // UncommittedData::fastpath_transaction_outputs that will only get flushed
1183            // once finalized. So the output of notify_read_executed_effects_may_fail is
1184            // guaranteed to be finalized effects or effects from QD execution.
1185            // We use _may_fail variant because effects may have been pruned for old transactions.
1186            effects_result = self.state
1187                .get_transaction_cache_reader()
1188                .notify_read_executed_effects_may_fail(
1189                    "AuthorityServer::wait_for_effects::notify_read_executed_effects_finalized",
1190                    &tx_digests,
1191                ) => {
1192                let effects = effects_result?.pop().unwrap();
1193                let effects_digest = effects.digest();
1194                let details = if request.include_details {
1195                    Some(self.complete_executed_data(effects).await?)
1196                } else {
1197                    None
1198                };
1199
1200                Ok(WaitForEffectsResponse::Executed {
1201                    effects_digest,
1202                    details,
1203                    fast_path: false,
1204                })
1205            }
1206
1207            fastpath_response = fastpath_effects_future => {
1208                tracing::Span::current().record("fast_path_effects", true);
1209                fastpath_response
1210            }
1211        }
1212    }
1213
1214    #[instrument(level = "error", skip_all, err(level = "debug"))]
1215    async fn ping_response(
1216        &self,
1217        request: WaitForEffectsRequest,
1218        epoch_store: &Arc<AuthorityPerEpochStore>,
1219    ) -> SuiResult<WaitForEffectsResponse> {
1220        let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1221            return Err(SuiErrorKind::UnsupportedFeatureError {
1222                error: "Mysticeti fastpath".to_string(),
1223            }
1224            .into());
1225        };
1226
1227        let Some(consensus_position) = request.consensus_position else {
1228            return Err(SuiErrorKind::InvalidRequest(
1229                "Consensus position is required for Ping requests".to_string(),
1230            )
1231            .into());
1232        };
1233
1234        // We assume that the caller has already checked for the existence of the `ping` field, but handling it gracefully here.
1235        let Some(ping) = request.ping_type else {
1236            return Err(SuiErrorKind::InvalidRequest(
1237                "Ping type is required for ping requests".to_string(),
1238            )
1239            .into());
1240        };
1241
1242        let _metrics_guard = self
1243            .metrics
1244            .handle_wait_for_effects_ping_latency
1245            .with_label_values(&[ping.as_str()])
1246            .start_timer();
1247
1248        consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1249
1250        let mut last_status = None;
1251        let details = if request.include_details {
1252            Some(Box::new(ExecutedData::default()))
1253        } else {
1254            None
1255        };
1256
1257        loop {
1258            let status = consensus_tx_status_cache
1259                .notify_read_transaction_status_change(consensus_position, last_status)
1260                .await;
1261            match status {
1262                NotifyReadConsensusTxStatusResult::Status(status) => match status {
1263                    ConsensusTxStatus::FastpathCertified => {
1264                        // If the request is for consensus, we need to wait for the transaction to be finalised via Consensus.
1265                        if ping == PingType::Consensus {
1266                            last_status = Some(status);
1267                            continue;
1268                        }
1269                        return Ok(WaitForEffectsResponse::Executed {
1270                            effects_digest: TransactionEffectsDigest::ZERO,
1271                            details,
1272                            fast_path: true,
1273                        });
1274                    }
1275                    ConsensusTxStatus::Rejected => {
1276                        return Ok(WaitForEffectsResponse::Rejected { error: None });
1277                    }
1278                    ConsensusTxStatus::Finalized => {
1279                        return Ok(WaitForEffectsResponse::Executed {
1280                            effects_digest: TransactionEffectsDigest::ZERO,
1281                            details,
1282                            fast_path: false,
1283                        });
1284                    }
1285                    ConsensusTxStatus::Dropped => {
1286                        // Transaction was dropped post-consensus, currently only due to invalid owned object inputs..
1287                        // Fetch the detailed error (e.g., ObjectLockConflict) from the rejection reason cache.
1288                        return Ok(WaitForEffectsResponse::Rejected {
1289                            error: epoch_store.get_rejection_vote_reason(consensus_position),
1290                        });
1291                    }
1292                },
1293                NotifyReadConsensusTxStatusResult::Expired(round) => {
1294                    return Ok(WaitForEffectsResponse::Expired {
1295                        epoch: epoch_store.epoch(),
1296                        round: Some(round),
1297                    });
1298                }
1299            }
1300        }
1301    }
1302
1303    #[instrument(level = "error", skip_all, err(level = "debug"))]
1304    async fn wait_for_fastpath_effects(
1305        &self,
1306        consensus_position: ConsensusPosition,
1307        tx_digests: &[TransactionDigest],
1308        include_details: bool,
1309        epoch_store: &Arc<AuthorityPerEpochStore>,
1310    ) -> SuiResult<WaitForEffectsResponse> {
1311        let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1312            return Err(SuiErrorKind::UnsupportedFeatureError {
1313                error: "Mysticeti fastpath".to_string(),
1314            }
1315            .into());
1316        };
1317
1318        let local_epoch = epoch_store.epoch();
1319        match consensus_position.epoch.cmp(&local_epoch) {
1320            Ordering::Less => {
1321                // Ask TransactionDriver to retry submitting the transaction and get a new ConsensusPosition,
1322                // if response from this validator is desired.
1323                let response = WaitForEffectsResponse::Expired {
1324                    epoch: local_epoch,
1325                    round: None,
1326                };
1327                return Ok(response);
1328            }
1329            Ordering::Greater => {
1330                // Ask TransactionDriver to retry this RPC until the validator's epoch catches up.
1331                return Err(SuiErrorKind::WrongEpoch {
1332                    expected_epoch: local_epoch,
1333                    actual_epoch: consensus_position.epoch,
1334                }
1335                .into());
1336            }
1337            Ordering::Equal => {
1338                // The validator's epoch is the same as the epoch of the transaction.
1339                // We can proceed with the normal flow.
1340            }
1341        };
1342
1343        consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1344
1345        // FastpathCertified is non-terminal (tx may still be rejected), so loop
1346        // until we see a terminal status.
1347        let mut current_status = None;
1348        loop {
1349            let status = consensus_tx_status_cache
1350                .notify_read_transaction_status_change(consensus_position, current_status)
1351                .await;
1352            match status {
1353                NotifyReadConsensusTxStatusResult::Status(new_status) => match new_status {
1354                    ConsensusTxStatus::Rejected => {
1355                        return Ok(WaitForEffectsResponse::Rejected {
1356                            error: epoch_store.get_rejection_vote_reason(consensus_position),
1357                        });
1358                    }
1359                    ConsensusTxStatus::FastpathCertified => {
1360                        current_status = Some(new_status);
1361                        continue;
1362                    }
1363                    ConsensusTxStatus::Finalized => {
1364                        let effects = self
1365                            .state
1366                            .get_transaction_cache_reader()
1367                            .notify_read_executed_effects_may_fail(
1368                                "AuthorityServer::wait_for_fastpath_effects::notify_read_executed_effects",
1369                                tx_digests,
1370                            )
1371                            .await?
1372                            .pop()
1373                            .unwrap();
1374                        let effects_digest = effects.digest();
1375
1376                        let details = if include_details {
1377                            Some(self.complete_executed_data(effects).await?)
1378                        } else {
1379                            None
1380                        };
1381
1382                        return Ok(WaitForEffectsResponse::Executed {
1383                            effects_digest,
1384                            details,
1385                            fast_path: false,
1386                        });
1387                    }
1388                    ConsensusTxStatus::Dropped => {
1389                        return Ok(WaitForEffectsResponse::Rejected {
1390                            error: epoch_store.get_rejection_vote_reason(consensus_position),
1391                        });
1392                    }
1393                },
1394                NotifyReadConsensusTxStatusResult::Expired(round) => {
1395                    return Ok(WaitForEffectsResponse::Expired {
1396                        epoch: epoch_store.epoch(),
1397                        round: Some(round),
1398                    });
1399                }
1400            }
1401        }
1402    }
1403
1404    async fn complete_executed_data(
1405        &self,
1406        effects: TransactionEffects,
1407    ) -> SuiResult<Box<ExecutedData>> {
1408        let (events, input_objects, output_objects) = self
1409            .collect_effects_data(
1410                &effects, /* include_events */ true, /* include_input_objects */ true,
1411                /* include_output_objects */ true,
1412            )
1413            .await?;
1414        Ok(Box::new(ExecutedData {
1415            effects,
1416            events,
1417            input_objects,
1418            output_objects,
1419        }))
1420    }
1421
1422    async fn object_info_impl(
1423        &self,
1424        request: tonic::Request<ObjectInfoRequest>,
1425    ) -> WrappedServiceResponse<ObjectInfoResponse> {
1426        let request = request.into_inner();
1427        let response = self.state.handle_object_info_request(request).await?;
1428        Ok((tonic::Response::new(response), Weight::one()))
1429    }
1430
1431    async fn transaction_info_impl(
1432        &self,
1433        request: tonic::Request<TransactionInfoRequest>,
1434    ) -> WrappedServiceResponse<TransactionInfoResponse> {
1435        let request = request.into_inner();
1436        let response = self.state.handle_transaction_info_request(request).await?;
1437        Ok((tonic::Response::new(response), Weight::one()))
1438    }
1439
1440    async fn checkpoint_impl(
1441        &self,
1442        request: tonic::Request<CheckpointRequest>,
1443    ) -> WrappedServiceResponse<CheckpointResponse> {
1444        let request = request.into_inner();
1445        let response = self.state.handle_checkpoint_request(&request)?;
1446        Ok((tonic::Response::new(response), Weight::one()))
1447    }
1448
1449    async fn checkpoint_v2_impl(
1450        &self,
1451        request: tonic::Request<CheckpointRequestV2>,
1452    ) -> WrappedServiceResponse<CheckpointResponseV2> {
1453        let request = request.into_inner();
1454        let response = self.state.handle_checkpoint_request_v2(&request)?;
1455        Ok((tonic::Response::new(response), Weight::one()))
1456    }
1457
1458    async fn get_system_state_object_impl(
1459        &self,
1460        _request: tonic::Request<SystemStateRequest>,
1461    ) -> WrappedServiceResponse<SuiSystemState> {
1462        let response = self
1463            .state
1464            .get_object_cache_reader()
1465            .get_sui_system_state_object_unsafe()?;
1466        Ok((tonic::Response::new(response), Weight::one()))
1467    }
1468
1469    async fn validator_health_impl(
1470        &self,
1471        _request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1472    ) -> WrappedServiceResponse<sui_types::messages_grpc::RawValidatorHealthResponse> {
1473        let state = &self.state;
1474
1475        // Get epoch store once for both metrics
1476        let epoch_store = state.load_epoch_store_one_call_per_task();
1477
1478        // Get in-flight execution transactions from execution scheduler
1479        let num_inflight_execution_transactions =
1480            state.execution_scheduler().num_pending_certificates() as u64;
1481
1482        // Get in-flight consensus transactions from consensus adapter
1483        let num_inflight_consensus_transactions =
1484            self.consensus_adapter.num_inflight_transactions();
1485
1486        // Get last committed leader round from epoch store
1487        let last_committed_leader_round = epoch_store
1488            .consensus_tx_status_cache
1489            .as_ref()
1490            .and_then(|cache| cache.get_last_committed_leader_round())
1491            .unwrap_or(0);
1492
1493        // Get last locally built checkpoint sequence
1494        let last_locally_built_checkpoint = epoch_store
1495            .last_built_checkpoint_summary()
1496            .ok()
1497            .flatten()
1498            .map(|(_, summary)| summary.sequence_number)
1499            .unwrap_or(0);
1500
1501        let typed_response = sui_types::messages_grpc::ValidatorHealthResponse {
1502            num_inflight_consensus_transactions,
1503            num_inflight_execution_transactions,
1504            last_locally_built_checkpoint,
1505            last_committed_leader_round,
1506        };
1507
1508        let raw_response = typed_response
1509            .try_into()
1510            .map_err(|e: sui_types::error::SuiError| {
1511                tonic::Status::internal(format!("Failed to serialize health response: {}", e))
1512            })?;
1513
1514        Ok((tonic::Response::new(raw_response), Weight::one()))
1515    }
1516
1517    fn get_client_ip_addr<T>(
1518        &self,
1519        request: &tonic::Request<T>,
1520        source: &ClientIdSource,
1521    ) -> Option<IpAddr> {
1522        let forwarded_header = request.metadata().get_all("x-forwarded-for").iter().next();
1523
1524        if let Some(header) = forwarded_header {
1525            let num_hops = header
1526                .to_str()
1527                .map(|h| h.split(',').count().saturating_sub(1))
1528                .unwrap_or(0);
1529
1530            self.metrics.x_forwarded_for_num_hops.set(num_hops as f64);
1531        }
1532
1533        match source {
1534            ClientIdSource::SocketAddr => {
1535                let socket_addr: Option<SocketAddr> = request.remote_addr();
1536
1537                // We will hit this case if the IO type used does not
1538                // implement Connected or when using a unix domain socket.
1539                // TODO: once we have confirmed that no legitimate traffic
1540                // is hitting this case, we should reject such requests that
1541                // hit this case.
1542                if let Some(socket_addr) = socket_addr {
1543                    Some(socket_addr.ip())
1544                } else {
1545                    if cfg!(msim) {
1546                        // Ignore the error from simtests.
1547                    } else if cfg!(test) {
1548                        panic!("Failed to get remote address from request");
1549                    } else {
1550                        self.metrics.connection_ip_not_found.inc();
1551                        error!("Failed to get remote address from request");
1552                    }
1553                    None
1554                }
1555            }
1556            ClientIdSource::XForwardedFor(num_hops) => {
1557                let do_header_parse = |op: &MetadataValue<Ascii>| {
1558                    match op.to_str() {
1559                        Ok(header_val) => {
1560                            let header_contents =
1561                                header_val.split(',').map(str::trim).collect::<Vec<_>>();
1562                            if *num_hops == 0 {
1563                                error!(
1564                                    "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
1565                                    number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
1566                                    to this node. Skipping traffic controller request handling.",
1567                                    header_contents,
1568                                );
1569                                return None;
1570                            }
1571                            let contents_len = header_contents.len();
1572                            if contents_len < *num_hops {
1573                                error!(
1574                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1575                                    Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
1576                                    `client-id-source` in the node config.",
1577                                    header_contents, contents_len, num_hops, contents_len,
1578                                );
1579                                self.metrics.client_id_source_config_mismatch.inc();
1580                                return None;
1581                            }
1582                            let Some(client_ip) = header_contents.get(contents_len - num_hops)
1583                            else {
1584                                error!(
1585                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1586                                    Expected at least {} values. Skipping traffic controller request handling.",
1587                                    header_contents, contents_len, num_hops, contents_len,
1588                                );
1589                                return None;
1590                            };
1591                            parse_ip(client_ip).or_else(|| {
1592                                self.metrics.forwarded_header_parse_error.inc();
1593                                None
1594                            })
1595                        }
1596                        Err(e) => {
1597                            // TODO: once we have confirmed that no legitimate traffic
1598                            // is hitting this case, we should reject such requests that
1599                            // hit this case.
1600                            self.metrics.forwarded_header_invalid.inc();
1601                            error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
1602                            None
1603                        }
1604                    }
1605                };
1606                if let Some(op) = request.metadata().get("x-forwarded-for") {
1607                    do_header_parse(op)
1608                } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
1609                    do_header_parse(op)
1610                } else {
1611                    self.metrics.forwarded_header_not_included.inc();
1612                    error!(
1613                        "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
1614                    );
1615                    None
1616                }
1617            }
1618        }
1619    }
1620
1621    async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
1622        if let Some(traffic_controller) = &self.traffic_controller {
1623            if !traffic_controller.check(&client, &None).await {
1624                // Entity in blocklist
1625                Err(tonic::Status::from_error(
1626                    SuiErrorKind::TooManyRequests.into(),
1627                ))
1628            } else {
1629                Ok(())
1630            }
1631        } else {
1632            Ok(())
1633        }
1634    }
1635
1636    fn handle_traffic_resp<T>(
1637        &self,
1638        client: Option<IpAddr>,
1639        wrapped_response: WrappedServiceResponse<T>,
1640    ) -> Result<tonic::Response<T>, tonic::Status> {
1641        let (error, spam_weight, unwrapped_response) = match wrapped_response {
1642            Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
1643            Err(status) => (
1644                Some(SuiError::from(status.clone())),
1645                Weight::zero(),
1646                Err(status.clone()),
1647            ),
1648        };
1649
1650        if let Some(traffic_controller) = self.traffic_controller.clone() {
1651            traffic_controller.tally(TrafficTally {
1652                direct: client,
1653                through_fullnode: None,
1654                error_info: error.map(|e| {
1655                    let error_type = String::from(e.clone().as_ref());
1656                    let error_weight = normalize(e);
1657                    (error_weight, error_type)
1658                }),
1659                spam_weight,
1660                timestamp: SystemTime::now(),
1661            })
1662        }
1663        unwrapped_response
1664    }
1665}
1666
1667// TODO: refine error matching here
1668fn normalize(err: SuiError) -> Weight {
1669    match err.as_inner() {
1670        SuiErrorKind::UserInputError {
1671            error: UserInputError::IncorrectUserSignature { .. },
1672        } => Weight::one(),
1673        SuiErrorKind::InvalidSignature { .. }
1674        | SuiErrorKind::SignerSignatureAbsent { .. }
1675        | SuiErrorKind::SignerSignatureNumberMismatch { .. }
1676        | SuiErrorKind::IncorrectSigner { .. }
1677        | SuiErrorKind::UnknownSigner { .. }
1678        | SuiErrorKind::WrongEpoch { .. } => Weight::one(),
1679        _ => Weight::zero(),
1680    }
1681}
1682
1683/// Implements generic pre- and post-processing. Since this is on the critical
1684/// path, any heavy lifting should be done in a separate non-blocking task
1685/// unless it is necessary to override the return value.
1686#[macro_export]
1687macro_rules! handle_with_decoration {
1688    ($self:ident, $func_name:ident, $request:ident) => {{
1689        if $self.client_id_source.is_none() {
1690            return $self.$func_name($request).await.map(|(result, _)| result);
1691        }
1692
1693        let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
1694
1695        // check if either IP is blocked, in which case return early
1696        $self.handle_traffic_req(client.clone()).await?;
1697
1698        // handle traffic tallying
1699        let wrapped_response = $self.$func_name($request).await;
1700        $self.handle_traffic_resp(client, wrapped_response)
1701    }};
1702}
1703
1704#[async_trait]
1705impl Validator for ValidatorService {
1706    async fn submit_transaction(
1707        &self,
1708        request: tonic::Request<RawSubmitTxRequest>,
1709    ) -> Result<tonic::Response<RawSubmitTxResponse>, tonic::Status> {
1710        let validator_service = self.clone();
1711
1712        // Spawns a task which handles the transaction. The task will unconditionally continue
1713        // processing in the event that the client connection is dropped.
1714        spawn_monitored_task!(async move {
1715            // NB: traffic tally wrapping handled within the task rather than on task exit
1716            // to prevent an attacker from subverting traffic control by severing the connection
1717            handle_with_decoration!(validator_service, handle_submit_transaction_impl, request)
1718        })
1719        .await
1720        .unwrap()
1721    }
1722
1723    async fn wait_for_effects(
1724        &self,
1725        request: tonic::Request<RawWaitForEffectsRequest>,
1726    ) -> Result<tonic::Response<RawWaitForEffectsResponse>, tonic::Status> {
1727        handle_with_decoration!(self, wait_for_effects_impl, request)
1728    }
1729
1730    async fn object_info(
1731        &self,
1732        request: tonic::Request<ObjectInfoRequest>,
1733    ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
1734        handle_with_decoration!(self, object_info_impl, request)
1735    }
1736
1737    async fn transaction_info(
1738        &self,
1739        request: tonic::Request<TransactionInfoRequest>,
1740    ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
1741        handle_with_decoration!(self, transaction_info_impl, request)
1742    }
1743
1744    async fn checkpoint(
1745        &self,
1746        request: tonic::Request<CheckpointRequest>,
1747    ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
1748        handle_with_decoration!(self, checkpoint_impl, request)
1749    }
1750
1751    async fn checkpoint_v2(
1752        &self,
1753        request: tonic::Request<CheckpointRequestV2>,
1754    ) -> Result<tonic::Response<CheckpointResponseV2>, tonic::Status> {
1755        handle_with_decoration!(self, checkpoint_v2_impl, request)
1756    }
1757
1758    async fn get_system_state_object(
1759        &self,
1760        request: tonic::Request<SystemStateRequest>,
1761    ) -> Result<tonic::Response<SuiSystemState>, tonic::Status> {
1762        handle_with_decoration!(self, get_system_state_object_impl, request)
1763    }
1764
1765    async fn validator_health(
1766        &self,
1767        request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1768    ) -> Result<tonic::Response<sui_types::messages_grpc::RawValidatorHealthResponse>, tonic::Status>
1769    {
1770        handle_with_decoration!(self, validator_health_impl, request)
1771    }
1772}