sui_core/
authority_server.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use anyhow::Result;
6use async_trait::async_trait;
7use fastcrypto::traits::KeyPair;
8use futures::{TryFutureExt, future};
9use itertools::Itertools as _;
10use mysten_common::{assert_reachable, debug_fatal};
11use mysten_metrics::spawn_monitored_task;
12use prometheus::{
13    Gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, Registry,
14    register_gauge_with_registry, register_histogram_vec_with_registry,
15    register_histogram_with_registry, register_int_counter_vec_with_registry,
16    register_int_counter_with_registry,
17};
18use std::{
19    io,
20    net::{IpAddr, SocketAddr},
21    sync::Arc,
22    time::{Duration, SystemTime},
23};
24use sui_network::{
25    api::{Validator, ValidatorServer},
26    tonic,
27    validator::server::SUI_TLS_SERVER_NAME,
28};
29use sui_types::effects::TransactionEffectsAPI;
30use sui_types::message_envelope::Message;
31use sui_types::messages_consensus::ConsensusPosition;
32use sui_types::messages_consensus::ConsensusTransaction;
33use sui_types::messages_grpc::{
34    ObjectInfoRequest, ObjectInfoResponse, RawSubmitTxResponse, SystemStateRequest,
35    TransactionInfoRequest, TransactionInfoResponse,
36};
37use sui_types::multiaddr::Multiaddr;
38use sui_types::object::Object;
39use sui_types::sui_system_state::SuiSystemState;
40use sui_types::traffic_control::{ClientIdSource, Weight};
41use sui_types::{
42    base_types::ObjectID,
43    digests::TransactionEffectsDigest,
44    error::{SuiErrorKind, UserInputError},
45};
46use sui_types::{
47    effects::TransactionEffects,
48    messages_grpc::{
49        ExecutedData, RawSubmitTxRequest, RawWaitForEffectsRequest, RawWaitForEffectsResponse,
50        SubmitTxResult, WaitForEffectsRequest, WaitForEffectsResponse,
51    },
52};
53use sui_types::{effects::TransactionEvents, messages_grpc::SubmitTxType};
54use sui_types::{error::*, transaction::*};
55use sui_types::{
56    fp_ensure,
57    messages_checkpoint::{
58        CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
59    },
60};
61use tokio::sync::oneshot;
62use tokio::time::timeout;
63use tonic::metadata::{Ascii, MetadataValue};
64use tracing::{debug, error, info, instrument};
65
66use crate::consensus_adapter::ConnectionMonitorStatusForTests;
67use crate::{
68    authority::{AuthorityState, consensus_tx_status_cache::ConsensusTxStatus},
69    consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics},
70    traffic_controller::{TrafficController, parse_ip, policies::TrafficTally},
71};
72use crate::{
73    authority::{
74        authority_per_epoch_store::AuthorityPerEpochStore,
75        consensus_tx_status_cache::NotifyReadConsensusTxStatusResult,
76    },
77    checkpoints::CheckpointStore,
78    mysticeti_adapter::LazyMysticetiClient,
79};
80use sui_config::local_ip_utils::new_local_tcp_address_for_testing;
81
82#[cfg(test)]
83#[path = "unit_tests/server_tests.rs"]
84mod server_tests;
85
86#[cfg(test)]
87#[path = "unit_tests/wait_for_effects_tests.rs"]
88mod wait_for_effects_tests;
89
90#[cfg(test)]
91#[path = "unit_tests/submit_transaction_tests.rs"]
92mod submit_transaction_tests;
93
94pub struct AuthorityServerHandle {
95    server_handle: sui_network::validator::server::Server,
96}
97
98impl AuthorityServerHandle {
99    pub async fn join(self) -> Result<(), io::Error> {
100        self.server_handle.handle().wait_for_shutdown().await;
101        Ok(())
102    }
103
104    pub async fn kill(self) -> Result<(), io::Error> {
105        self.server_handle.handle().shutdown().await;
106        Ok(())
107    }
108
109    pub fn address(&self) -> &Multiaddr {
110        self.server_handle.local_addr()
111    }
112}
113
114pub struct AuthorityServer {
115    address: Multiaddr,
116    pub state: Arc<AuthorityState>,
117    consensus_adapter: Arc<ConsensusAdapter>,
118    pub metrics: Arc<ValidatorServiceMetrics>,
119}
120
121impl AuthorityServer {
122    pub fn new_for_test_with_consensus_adapter(
123        state: Arc<AuthorityState>,
124        consensus_adapter: Arc<ConsensusAdapter>,
125    ) -> Self {
126        let address = new_local_tcp_address_for_testing();
127        let metrics = Arc::new(ValidatorServiceMetrics::new_for_tests());
128
129        Self {
130            address,
131            state,
132            consensus_adapter,
133            metrics,
134        }
135    }
136
137    pub fn new_for_test(state: Arc<AuthorityState>) -> Self {
138        let consensus_adapter = Arc::new(ConsensusAdapter::new(
139            Arc::new(LazyMysticetiClient::new()),
140            CheckpointStore::new_for_tests(),
141            state.name,
142            Arc::new(ConnectionMonitorStatusForTests {}),
143            100_000,
144            100_000,
145            None,
146            None,
147            ConsensusAdapterMetrics::new_test(),
148            state.epoch_store_for_testing().protocol_config().clone(),
149        ));
150        Self::new_for_test_with_consensus_adapter(state, consensus_adapter)
151    }
152
153    pub async fn spawn_for_test(self) -> Result<AuthorityServerHandle, io::Error> {
154        let address = self.address.clone();
155        self.spawn_with_bind_address_for_test(address).await
156    }
157
158    pub async fn spawn_with_bind_address_for_test(
159        self,
160        address: Multiaddr,
161    ) -> Result<AuthorityServerHandle, io::Error> {
162        let tls_config = sui_tls::create_rustls_server_config(
163            self.state.config.network_key_pair().copy().private(),
164            SUI_TLS_SERVER_NAME.to_string(),
165        );
166        let config = mysten_network::config::Config::new();
167        let server = sui_network::validator::server::ServerBuilder::from_config(
168            &config,
169            mysten_network::metrics::DefaultMetricsCallbackProvider::default(),
170        )
171        .add_service(ValidatorServer::new(ValidatorService::new_for_tests(
172            self.state,
173            self.consensus_adapter,
174            self.metrics,
175        )))
176        .bind(&address, Some(tls_config))
177        .await
178        .unwrap();
179        let local_addr = server.local_addr().to_owned();
180        info!("Listening to traffic on {local_addr}");
181        let handle = AuthorityServerHandle {
182            server_handle: server,
183        };
184        Ok(handle)
185    }
186}
187
188pub struct ValidatorServiceMetrics {
189    pub signature_errors: IntCounter,
190    pub tx_verification_latency: Histogram,
191    pub cert_verification_latency: Histogram,
192    pub consensus_latency: Histogram,
193    pub handle_transaction_latency: Histogram,
194    pub submit_certificate_consensus_latency: Histogram,
195    pub handle_certificate_consensus_latency: Histogram,
196    pub handle_certificate_non_consensus_latency: Histogram,
197    pub handle_soft_bundle_certificates_consensus_latency: Histogram,
198    pub handle_soft_bundle_certificates_count: Histogram,
199    pub handle_soft_bundle_certificates_size_bytes: Histogram,
200    pub handle_transaction_consensus_latency: Histogram,
201    pub handle_submit_transaction_consensus_latency: HistogramVec,
202    pub handle_wait_for_effects_ping_latency: HistogramVec,
203
204    handle_submit_transaction_latency: HistogramVec,
205    handle_submit_transaction_bytes: HistogramVec,
206    handle_submit_transaction_batch_size: HistogramVec,
207
208    num_rejected_cert_in_epoch_boundary: IntCounter,
209    num_rejected_tx_during_overload: IntCounterVec,
210    submission_rejected_transactions: IntCounterVec,
211    connection_ip_not_found: IntCounter,
212    forwarded_header_parse_error: IntCounter,
213    forwarded_header_invalid: IntCounter,
214    forwarded_header_not_included: IntCounter,
215    client_id_source_config_mismatch: IntCounter,
216    x_forwarded_for_num_hops: Gauge,
217}
218
219impl ValidatorServiceMetrics {
220    pub fn new(registry: &Registry) -> Self {
221        Self {
222            signature_errors: register_int_counter_with_registry!(
223                "total_signature_errors",
224                "Number of transaction signature errors",
225                registry,
226            )
227            .unwrap(),
228            tx_verification_latency: register_histogram_with_registry!(
229                "validator_service_tx_verification_latency",
230                "Latency of verifying a transaction",
231                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
232                registry,
233            )
234            .unwrap(),
235            cert_verification_latency: register_histogram_with_registry!(
236                "validator_service_cert_verification_latency",
237                "Latency of verifying a certificate",
238                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
239                registry,
240            )
241            .unwrap(),
242            consensus_latency: register_histogram_with_registry!(
243                "validator_service_consensus_latency",
244                "Time spent between submitting a txn to consensus and getting back local acknowledgement. Execution and finalization time are not included.",
245                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
246                registry,
247            )
248            .unwrap(),
249            handle_transaction_latency: register_histogram_with_registry!(
250                "validator_service_handle_transaction_latency",
251                "Latency of handling a transaction",
252                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
253                registry,
254            )
255            .unwrap(),
256            handle_certificate_consensus_latency: register_histogram_with_registry!(
257                "validator_service_handle_certificate_consensus_latency",
258                "Latency of handling a consensus transaction certificate",
259                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
260                registry,
261            )
262            .unwrap(),
263            submit_certificate_consensus_latency: register_histogram_with_registry!(
264                "validator_service_submit_certificate_consensus_latency",
265                "Latency of submit_certificate RPC handler",
266                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
267                registry,
268            )
269            .unwrap(),
270            handle_certificate_non_consensus_latency: register_histogram_with_registry!(
271                "validator_service_handle_certificate_non_consensus_latency",
272                "Latency of handling a non-consensus transaction certificate",
273                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
274                registry,
275            )
276            .unwrap(),
277            handle_soft_bundle_certificates_consensus_latency: register_histogram_with_registry!(
278                "validator_service_handle_soft_bundle_certificates_consensus_latency",
279                "Latency of handling a consensus soft bundle",
280                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
281                registry,
282            )
283            .unwrap(),
284            handle_soft_bundle_certificates_count: register_histogram_with_registry!(
285                "handle_soft_bundle_certificates_count",
286                "The number of certificates included in a soft bundle",
287                mysten_metrics::COUNT_BUCKETS.to_vec(),
288                registry,
289            )
290            .unwrap(),
291            handle_soft_bundle_certificates_size_bytes: register_histogram_with_registry!(
292                "handle_soft_bundle_certificates_size_bytes",
293                "The size of soft bundle in bytes",
294                mysten_metrics::BYTES_BUCKETS.to_vec(),
295                registry,
296            )
297            .unwrap(),
298            handle_transaction_consensus_latency: register_histogram_with_registry!(
299                "validator_service_handle_transaction_consensus_latency",
300                "Latency of handling a user transaction sent through consensus",
301                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
302                registry,
303            )
304            .unwrap(),
305            handle_submit_transaction_consensus_latency: register_histogram_vec_with_registry!(
306                "validator_service_submit_transaction_consensus_latency",
307                "Latency of submitting a user transaction sent through consensus",
308                &["req_type"],
309                mysten_metrics::COARSE_LATENCY_SEC_BUCKETS.to_vec(),
310                registry,
311            )
312            .unwrap(),
313            handle_submit_transaction_latency: register_histogram_vec_with_registry!(
314                "validator_service_submit_transaction_latency",
315                "Latency of submit transaction handler",
316                &["req_type"],
317                mysten_metrics::LATENCY_SEC_BUCKETS.to_vec(),
318                registry,
319            )
320            .unwrap(),
321            handle_wait_for_effects_ping_latency: register_histogram_vec_with_registry!(
322                "validator_service_handle_wait_for_effects_ping_latency",
323                "Latency of handling a ping request for wait_for_effects",
324                &["req_type"],
325                mysten_metrics::SUBSECOND_LATENCY_SEC_BUCKETS.to_vec(),
326                registry,
327            )
328            .unwrap(),
329            handle_submit_transaction_bytes: register_histogram_vec_with_registry!(
330                "validator_service_submit_transaction_bytes",
331                "The size of transactions in the submit transaction request",
332                &["req_type"],
333                mysten_metrics::BYTES_BUCKETS.to_vec(),
334                registry,
335            )
336            .unwrap(),
337            handle_submit_transaction_batch_size: register_histogram_vec_with_registry!(
338                "validator_service_submit_transaction_batch_size",
339                "The number of transactions in the submit transaction request",
340                &["req_type"],
341                mysten_metrics::COUNT_BUCKETS.to_vec(),
342                registry,
343            )
344            .unwrap(),
345            num_rejected_cert_in_epoch_boundary: register_int_counter_with_registry!(
346                "validator_service_num_rejected_cert_in_epoch_boundary",
347                "Number of rejected transaction certificate during epoch transitioning",
348                registry,
349            )
350            .unwrap(),
351            num_rejected_tx_during_overload: register_int_counter_vec_with_registry!(
352                "validator_service_num_rejected_tx_during_overload",
353                "Number of rejected transaction due to system overload",
354                &["error_type"],
355                registry,
356            )
357            .unwrap(),
358            submission_rejected_transactions: register_int_counter_vec_with_registry!(
359                "validator_service_submission_rejected_transactions",
360                "Number of transactions rejected during submission",
361                &["reason"],
362                registry,
363            )
364            .unwrap(),
365            connection_ip_not_found: register_int_counter_with_registry!(
366                "validator_service_connection_ip_not_found",
367                "Number of times connection IP was not extractable from request",
368                registry,
369            )
370            .unwrap(),
371            forwarded_header_parse_error: register_int_counter_with_registry!(
372                "validator_service_forwarded_header_parse_error",
373                "Number of times x-forwarded-for header could not be parsed",
374                registry,
375            )
376            .unwrap(),
377            forwarded_header_invalid: register_int_counter_with_registry!(
378                "validator_service_forwarded_header_invalid",
379                "Number of times x-forwarded-for header was invalid",
380                registry,
381            )
382            .unwrap(),
383            forwarded_header_not_included: register_int_counter_with_registry!(
384                "validator_service_forwarded_header_not_included",
385                "Number of times x-forwarded-for header was (unexpectedly) not included in request",
386                registry,
387            )
388            .unwrap(),
389            client_id_source_config_mismatch: register_int_counter_with_registry!(
390                "validator_service_client_id_source_config_mismatch",
391                "Number of times detected that client id source config doesn't agree with x-forwarded-for header",
392                registry,
393            )
394            .unwrap(),
395            x_forwarded_for_num_hops: register_gauge_with_registry!(
396                "validator_service_x_forwarded_for_num_hops",
397                "Number of hops in x-forwarded-for header",
398                registry,
399            )
400            .unwrap(),
401        }
402    }
403
404    pub fn new_for_tests() -> Self {
405        let registry = Registry::new();
406        Self::new(&registry)
407    }
408}
409
410#[derive(Clone)]
411pub struct ValidatorService {
412    state: Arc<AuthorityState>,
413    consensus_adapter: Arc<ConsensusAdapter>,
414    metrics: Arc<ValidatorServiceMetrics>,
415    traffic_controller: Option<Arc<TrafficController>>,
416    client_id_source: Option<ClientIdSource>,
417}
418
419impl ValidatorService {
420    pub fn new(
421        state: Arc<AuthorityState>,
422        consensus_adapter: Arc<ConsensusAdapter>,
423        validator_metrics: Arc<ValidatorServiceMetrics>,
424        client_id_source: Option<ClientIdSource>,
425    ) -> Self {
426        let traffic_controller = state.traffic_controller.clone();
427        Self {
428            state,
429            consensus_adapter,
430            metrics: validator_metrics,
431            traffic_controller,
432            client_id_source,
433        }
434    }
435
436    pub fn new_for_tests(
437        state: Arc<AuthorityState>,
438        consensus_adapter: Arc<ConsensusAdapter>,
439        metrics: Arc<ValidatorServiceMetrics>,
440    ) -> Self {
441        Self {
442            state,
443            consensus_adapter,
444            metrics,
445            traffic_controller: None,
446            client_id_source: None,
447        }
448    }
449
450    pub fn validator_state(&self) -> &Arc<AuthorityState> {
451        &self.state
452    }
453
454    /// Test method that performs transaction validation without going through gRPC.
455    pub fn handle_transaction_for_testing(&self, transaction: Transaction) -> SuiResult<()> {
456        let epoch_store = self.state.load_epoch_store_one_call_per_task();
457
458        // Validity check (basic structural validation)
459        transaction.validity_check(&epoch_store.tx_validity_check_context())?;
460
461        // Signature verification
462        let transaction = epoch_store
463            .verify_transaction_require_no_aliases(transaction)?
464            .into_tx();
465
466        // Validate the transaction
467        self.state
468            .handle_vote_transaction(&epoch_store, transaction)?;
469
470        Ok(())
471    }
472
473    /// Test method that performs transaction validation with overload checking.
474    /// Used for testing validator overload behavior.
475    pub fn handle_transaction_for_testing_with_overload_check(
476        &self,
477        transaction: Transaction,
478    ) -> SuiResult<()> {
479        let epoch_store = self.state.load_epoch_store_one_call_per_task();
480
481        // Validity check (basic structural validation)
482        transaction.validity_check(&epoch_store.tx_validity_check_context())?;
483
484        // Check system overload
485        self.state.check_system_overload(
486            self.consensus_adapter.as_ref(),
487            transaction.data(),
488            self.state.check_system_overload_at_signing(),
489        )?;
490
491        // Signature verification
492        let transaction = epoch_store
493            .verify_transaction_require_no_aliases(transaction)?
494            .into_tx();
495
496        // Validate the transaction
497        self.state
498            .handle_vote_transaction(&epoch_store, transaction)?;
499
500        Ok(())
501    }
502
503    /// Collect the IDs of input objects that are immutable.
504    /// This is used to create the ImmutableInputObjects claim for consensus messages.
505    async fn collect_immutable_object_ids(
506        &self,
507        tx: &VerifiedTransaction,
508        state: &AuthorityState,
509    ) -> SuiResult<Vec<ObjectID>> {
510        let input_objects = tx.data().transaction_data().input_objects()?;
511
512        // Collect object IDs from ImmOrOwnedMoveObject inputs
513        let object_ids: Vec<ObjectID> = input_objects
514            .iter()
515            .filter_map(|obj| match obj {
516                InputObjectKind::ImmOrOwnedMoveObject((id, _, _)) => Some(*id),
517                _ => None,
518            })
519            .collect();
520        if object_ids.is_empty() {
521            return Ok(vec![]);
522        }
523
524        // Load objects from cache and filter to immutable ones
525        let objects = state.get_object_cache_reader().get_objects(&object_ids);
526
527        // All objects should be found, since owned input objects have been validated to exist.
528        objects
529            .into_iter()
530            .zip(object_ids.iter())
531            .filter_map(|(obj, id)| {
532                let Some(o) = obj else {
533                    return Some(Err::<ObjectID, SuiError>(
534                        SuiErrorKind::UserInputError {
535                            error: UserInputError::ObjectNotFound {
536                                object_id: *id,
537                                version: None,
538                            },
539                        }
540                        .into(),
541                    ));
542                };
543                if o.is_immutable() {
544                    Some(Ok(*id))
545                } else {
546                    None
547                }
548            })
549            .collect::<SuiResult<Vec<ObjectID>>>()
550    }
551
552    #[instrument(
553        name = "ValidatorService::handle_submit_transaction",
554        level = "error",
555        skip_all,
556        err(level = "debug")
557    )]
558    async fn handle_submit_transaction(
559        &self,
560        request: tonic::Request<RawSubmitTxRequest>,
561    ) -> WrappedServiceResponse<RawSubmitTxResponse> {
562        let Self {
563            state,
564            consensus_adapter,
565            metrics,
566            traffic_controller: _,
567            client_id_source,
568        } = self.clone();
569
570        let submitter_client_addr = if let Some(client_id_source) = &client_id_source {
571            self.get_client_ip_addr(&request, client_id_source)
572        } else {
573            self.get_client_ip_addr(&request, &ClientIdSource::SocketAddr)
574        };
575
576        let inner = request.into_inner();
577        let start_epoch = state.load_epoch_store_one_call_per_task().epoch();
578
579        let next_epoch = start_epoch + 1;
580        let mut max_retries = 1;
581
582        loop {
583            let res = self
584                .handle_submit_transaction_inner(
585                    &state,
586                    &consensus_adapter,
587                    &metrics,
588                    &inner,
589                    submitter_client_addr,
590                )
591                .await;
592            match res {
593                Ok((response, weight)) => return Ok((tonic::Response::new(response), weight)),
594                Err(err) => {
595                    if max_retries > 0
596                        && let SuiErrorKind::ValidatorHaltedAtEpochEnd = err.as_inner()
597                    {
598                        max_retries -= 1;
599
600                        debug!(
601                            "ValidatorHaltedAtEpochEnd. Will retry after validator reconfigures"
602                        );
603
604                        if let Ok(Ok(new_epoch)) =
605                            timeout(Duration::from_secs(15), state.wait_for_epoch(next_epoch)).await
606                        {
607                            assert_reachable!("retry submission at epoch end");
608                            if new_epoch == next_epoch {
609                                continue;
610                            }
611
612                            debug_fatal!(
613                                "expected epoch {} after reconfiguration. got {}",
614                                next_epoch,
615                                new_epoch
616                            );
617                        }
618                    }
619                    return Err(err.into());
620                }
621            }
622        }
623    }
624
625    async fn handle_submit_transaction_inner(
626        &self,
627        state: &AuthorityState,
628        consensus_adapter: &ConsensusAdapter,
629        metrics: &ValidatorServiceMetrics,
630        request: &RawSubmitTxRequest,
631        submitter_client_addr: Option<IpAddr>,
632    ) -> SuiResult<(RawSubmitTxResponse, Weight)> {
633        let epoch_store = state.load_epoch_store_one_call_per_task();
634        if !epoch_store.protocol_config().mysticeti_fastpath() {
635            return Err(SuiErrorKind::UnsupportedFeatureError {
636                error: "Mysticeti fastpath".to_string(),
637            }
638            .into());
639        }
640
641        let submit_type = SubmitTxType::try_from(request.submit_type).map_err(|e| {
642            SuiErrorKind::GrpcMessageDeserializeError {
643                type_info: "RawSubmitTxRequest.submit_type".to_string(),
644                error: e.to_string(),
645            }
646        })?;
647
648        let is_ping_request = submit_type == SubmitTxType::Ping;
649        if is_ping_request {
650            fp_ensure!(
651                request.transactions.is_empty(),
652                SuiErrorKind::InvalidRequest(format!(
653                    "Ping request cannot contain {} transactions",
654                    request.transactions.len()
655                ))
656                .into()
657            );
658        } else {
659            // Ensure default and soft bundle requests contain at least one transaction.
660            fp_ensure!(
661                !request.transactions.is_empty(),
662                SuiErrorKind::InvalidRequest(
663                    "At least one transaction needs to be submitted".to_string(),
664                )
665                .into()
666            );
667        }
668
669        // NOTE: for soft bundle requests, the system tries to sequence the transactions in the same order
670        // if they use the same gas price. But this is only done with best effort.
671        // Transactions in a soft bundle can be individually rejected or deferred, without affecting
672        // other transactions in the same bundle.
673        let is_soft_bundle_request = submit_type == SubmitTxType::SoftBundle;
674
675        let max_num_transactions = if is_soft_bundle_request {
676            // Soft bundle cannot contain too many transactions.
677            // Otherwise it is hard to include all of them in a single block.
678            epoch_store.protocol_config().max_soft_bundle_size()
679        } else {
680            // Still enforce a limit even when transactions do not need to be in the same block.
681            epoch_store
682                .protocol_config()
683                .max_num_transactions_in_block()
684        };
685        fp_ensure!(
686            request.transactions.len() <= max_num_transactions as usize,
687            SuiErrorKind::InvalidRequest(format!(
688                "Too many transactions in request: {} vs {}",
689                request.transactions.len(),
690                max_num_transactions
691            ))
692            .into()
693        );
694
695        // Transaction digests.
696        let mut tx_digests = Vec::with_capacity(request.transactions.len());
697        // Transactions to submit to consensus.
698        let mut consensus_transactions = Vec::with_capacity(request.transactions.len());
699        // Indexes of transactions above in the request transactions.
700        let mut transaction_indexes = Vec::with_capacity(request.transactions.len());
701        // Results corresponding to each transaction in the request.
702        let mut results: Vec<Option<SubmitTxResult>> = vec![None; request.transactions.len()];
703        // Total size of all transactions in the request.
704        let mut total_size_bytes = 0;
705        // Traffic control spam weight to use for the transaction.
706        let mut spam_weight = Weight::zero();
707
708        let req_type = if is_ping_request {
709            "ping"
710        } else if request.transactions.len() == 1 {
711            "single_transaction"
712        } else if is_soft_bundle_request {
713            "soft_bundle"
714        } else {
715            "batch"
716        };
717
718        let _handle_tx_metrics_guard = metrics
719            .handle_submit_transaction_latency
720            .with_label_values(&[req_type])
721            .start_timer();
722
723        for (idx, tx_bytes) in request.transactions.iter().enumerate() {
724            let transaction = match bcs::from_bytes::<Transaction>(tx_bytes) {
725                Ok(txn) => txn,
726                Err(e) => {
727                    // Ok to fail the request when any transaction is invalid.
728                    return Err(SuiErrorKind::TransactionDeserializationError {
729                        error: format!("Failed to deserialize transaction at index {}: {}", idx, e),
730                    }
731                    .into());
732                }
733            };
734
735            // Ok to fail the request when any transaction is invalid.
736            let tx_size = transaction.validity_check(&epoch_store.tx_validity_check_context())?;
737
738            if transaction
739                .data()
740                .transaction_data()
741                .is_gasless_transaction()
742            {
743                // Gasless transactions count for traffic control, since they have no economic cost.
744                spam_weight = Weight::one();
745            }
746
747            let overload_check_res = state.check_system_overload(
748                consensus_adapter,
749                transaction.data(),
750                state.check_system_overload_at_signing(),
751            );
752            if let Err(error) = overload_check_res {
753                metrics
754                    .num_rejected_tx_during_overload
755                    .with_label_values(&[error.as_ref()])
756                    .inc();
757                results[idx] = Some(SubmitTxResult::Rejected { error });
758                continue;
759            }
760
761            // Ok to fail the request when any signature is invalid.
762            let verified_transaction = {
763                let _metrics_guard = metrics.tx_verification_latency.start_timer();
764                if epoch_store.protocol_config().address_aliases() {
765                    match epoch_store.verify_transaction_with_current_aliases(transaction) {
766                        Ok(tx) => tx,
767                        Err(e) => {
768                            metrics.signature_errors.inc();
769                            return Err(e);
770                        }
771                    }
772                } else {
773                    match epoch_store.verify_transaction_require_no_aliases(transaction) {
774                        Ok(tx) => tx,
775                        Err(e) => {
776                            metrics.signature_errors.inc();
777                            return Err(e);
778                        }
779                    }
780                }
781            };
782
783            let tx_digest = verified_transaction.tx().digest();
784            tx_digests.push(*tx_digest);
785
786            debug!(
787                ?tx_digest,
788                "handle_submit_transaction: verified transaction"
789            );
790
791            // Check if the transaction has executed, before checking input objects
792            // which could have been consumed.
793            if let Some(effects) = state
794                .get_transaction_cache_reader()
795                .get_executed_effects(tx_digest)
796            {
797                let effects_digest = effects.digest();
798                if let Ok(executed_data) = self.complete_executed_data(effects).await {
799                    let executed_result = SubmitTxResult::Executed {
800                        effects_digest,
801                        details: Some(executed_data),
802                    };
803                    results[idx] = Some(executed_result);
804                    debug!(?tx_digest, "handle_submit_transaction: already executed");
805                    continue;
806                }
807            }
808
809            if self
810                .state
811                .get_transaction_cache_reader()
812                .transaction_executed_in_last_epoch(tx_digest, epoch_store.epoch())
813            {
814                results[idx] = Some(SubmitTxResult::Rejected {
815                    error: UserInputError::TransactionAlreadyExecuted { digest: *tx_digest }.into(),
816                });
817                debug!(
818                    ?tx_digest,
819                    "handle_submit_transaction: transaction already executed in previous epoch"
820                );
821                continue;
822            }
823
824            debug!(
825                ?tx_digest,
826                "handle_submit_transaction: waiting for fastpath dependency objects"
827            );
828            if !state
829                .wait_for_fastpath_dependency_objects(
830                    verified_transaction.tx(),
831                    epoch_store.epoch(),
832                )
833                .await?
834            {
835                debug!(
836                    ?tx_digest,
837                    "fastpath input objects are still unavailable after waiting"
838                );
839            }
840
841            match state.handle_vote_transaction(&epoch_store, verified_transaction.tx().clone()) {
842                Ok(_) => { /* continue processing */ }
843                Err(e) => {
844                    // Check if transaction has been executed while being validated.
845                    // This is an edge case so checking executed effects twice is acceptable.
846                    if let Some(effects) = state
847                        .get_transaction_cache_reader()
848                        .get_executed_effects(tx_digest)
849                    {
850                        let effects_digest = effects.digest();
851                        if let Ok(executed_data) = self.complete_executed_data(effects).await {
852                            let executed_result = SubmitTxResult::Executed {
853                                effects_digest,
854                                details: Some(executed_data),
855                            };
856                            results[idx] = Some(executed_result);
857                            continue;
858                        }
859                    }
860
861                    // When the transaction has not been executed, record the error for the transaction.
862                    debug!(?tx_digest, "Transaction rejected during submission: {e}");
863                    metrics
864                        .submission_rejected_transactions
865                        .with_label_values(&[e.to_variant_name()])
866                        .inc();
867                    results[idx] = Some(SubmitTxResult::Rejected { error: e });
868                    continue;
869                }
870            }
871
872            // Create claims with aliases and / or immutable objects.
873            let mut claims = vec![];
874
875            let immutable_object_ids = self
876                .collect_immutable_object_ids(verified_transaction.tx(), state)
877                .await?;
878            if !immutable_object_ids.is_empty() {
879                claims.push(TransactionClaim::ImmutableInputObjects(
880                    immutable_object_ids,
881                ));
882            }
883
884            let (tx, aliases) = verified_transaction.into_inner();
885            if epoch_store.protocol_config().address_aliases() {
886                if epoch_store
887                    .protocol_config()
888                    .fix_checkpoint_signature_mapping()
889                {
890                    claims.push(TransactionClaim::AddressAliasesV2(aliases));
891                } else {
892                    let v1_aliases: Vec<_> = tx
893                        .data()
894                        .intent_message()
895                        .value
896                        .required_signers()
897                        .into_iter()
898                        .zip_eq(aliases.into_iter().map(|(_, seq)| seq))
899                        .collect();
900                    #[allow(deprecated)]
901                    claims.push(TransactionClaim::AddressAliases(
902                        nonempty::NonEmpty::from_vec(v1_aliases)
903                            .expect("must have at least one required_signer"),
904                    ));
905                }
906            }
907
908            let tx_with_claims = TransactionWithClaims::new(tx.into(), claims);
909
910            consensus_transactions.push(ConsensusTransaction::new_user_transaction_v2_message(
911                &state.name,
912                tx_with_claims,
913            ));
914
915            transaction_indexes.push(idx);
916            total_size_bytes += tx_size;
917        }
918
919        if consensus_transactions.is_empty() && !is_ping_request {
920            return Ok((Self::try_from_submit_tx_response(results)?, spam_weight));
921        }
922
923        // Set the max bytes size of the soft bundle to be half of the consensus max transactions in block size.
924        // We do this to account for serialization overheads and to ensure that the soft bundle is not too large
925        // when is attempted to be posted via consensus.
926        let max_transaction_bytes = if is_soft_bundle_request {
927            epoch_store
928                .protocol_config()
929                .consensus_max_transactions_in_block_bytes()
930                / 2
931        } else {
932            epoch_store
933                .protocol_config()
934                .consensus_max_transactions_in_block_bytes()
935        };
936        fp_ensure!(
937            total_size_bytes <= max_transaction_bytes as usize,
938            SuiErrorKind::UserInputError {
939                error: UserInputError::TotalTransactionSizeTooLargeInBatch {
940                    size: total_size_bytes,
941                    limit: max_transaction_bytes,
942                },
943            }
944            .into()
945        );
946
947        metrics
948            .handle_submit_transaction_bytes
949            .with_label_values(&[req_type])
950            .observe(total_size_bytes as f64);
951        metrics
952            .handle_submit_transaction_batch_size
953            .with_label_values(&[req_type])
954            .observe(consensus_transactions.len() as f64);
955
956        let _latency_metric_guard = metrics
957            .handle_submit_transaction_consensus_latency
958            .with_label_values(&[req_type])
959            .start_timer();
960
961        let consensus_positions = if is_soft_bundle_request || is_ping_request {
962            // We only allow the `consensus_transactions` to be empty for ping requests. This is how it should and is be treated from the downstream components.
963            // For any other case, having an empty `consensus_transactions` vector is an invalid state and we should have never reached at this point.
964            assert!(
965                is_ping_request || !consensus_transactions.is_empty(),
966                "A valid soft bundle must have at least one transaction"
967            );
968            debug!(
969                "handle_submit_transaction: submitting consensus transactions ({}): {}",
970                req_type,
971                consensus_transactions
972                    .iter()
973                    .map(|t| t.local_display())
974                    .join(", ")
975            );
976            self.handle_submit_to_consensus_for_position(
977                consensus_transactions,
978                &epoch_store,
979                submitter_client_addr,
980            )
981            .await?
982        } else {
983            let futures = consensus_transactions.into_iter().map(|t| {
984                debug!(
985                    "handle_submit_transaction: submitting consensus transaction ({}): {}",
986                    req_type,
987                    t.local_display(),
988                );
989                self.handle_submit_to_consensus_for_position(
990                    vec![t],
991                    &epoch_store,
992                    submitter_client_addr,
993                )
994            });
995            future::try_join_all(futures)
996                .await?
997                .into_iter()
998                .flatten()
999                .collect()
1000        };
1001
1002        if is_ping_request {
1003            // For ping requests, return the special consensus position.
1004            assert_eq!(consensus_positions.len(), 1);
1005            results.push(Some(SubmitTxResult::Submitted {
1006                consensus_position: consensus_positions[0],
1007            }));
1008        } else {
1009            // Otherwise, return the consensus position for each transaction.
1010            for ((idx, tx_digest), consensus_position) in transaction_indexes
1011                .into_iter()
1012                .zip(tx_digests)
1013                .zip(consensus_positions)
1014            {
1015                debug!(
1016                    ?tx_digest,
1017                    "handle_submit_transaction: submitted consensus transaction at {}",
1018                    consensus_position,
1019                );
1020                results[idx] = Some(SubmitTxResult::Submitted { consensus_position });
1021            }
1022        }
1023
1024        Ok((Self::try_from_submit_tx_response(results)?, spam_weight))
1025    }
1026
1027    fn try_from_submit_tx_response(
1028        results: Vec<Option<SubmitTxResult>>,
1029    ) -> Result<RawSubmitTxResponse, SuiError> {
1030        let mut raw_results = Vec::new();
1031        for (i, result) in results.into_iter().enumerate() {
1032            let result = result.ok_or_else(|| SuiErrorKind::GenericAuthorityError {
1033                error: format!("Missing transaction result at {}", i),
1034            })?;
1035            let raw_result = result.try_into()?;
1036            raw_results.push(raw_result);
1037        }
1038        Ok(RawSubmitTxResponse {
1039            results: raw_results,
1040        })
1041    }
1042
1043    #[instrument(
1044        name = "ValidatorService::handle_submit_to_consensus_for_position",
1045        level = "debug",
1046        skip_all,
1047        err(level = "debug")
1048    )]
1049    async fn handle_submit_to_consensus_for_position(
1050        &self,
1051        // Empty when this is a ping request.
1052        consensus_transactions: Vec<ConsensusTransaction>,
1053        epoch_store: &Arc<AuthorityPerEpochStore>,
1054        submitter_client_addr: Option<IpAddr>,
1055    ) -> Result<Vec<ConsensusPosition>, tonic::Status> {
1056        let (tx_consensus_positions, rx_consensus_positions) = oneshot::channel();
1057
1058        {
1059            // code block within reconfiguration lock
1060            let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
1061            if !reconfiguration_lock.should_accept_user_certs() {
1062                self.metrics.num_rejected_cert_in_epoch_boundary.inc();
1063                return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1064            }
1065
1066            // Submit to consensus and wait for position, we do not check if tx
1067            // has been processed by consensus already as this method is called
1068            // to get back a consensus position.
1069            let _metrics_guard = self.metrics.consensus_latency.start_timer();
1070
1071            self.consensus_adapter.submit_batch(
1072                &consensus_transactions,
1073                Some(&reconfiguration_lock),
1074                epoch_store,
1075                Some(tx_consensus_positions),
1076                submitter_client_addr,
1077            )?;
1078        }
1079
1080        Ok(rx_consensus_positions.await.map_err(|e| {
1081            SuiErrorKind::FailedToSubmitToConsensus(format!(
1082                "Failed to get consensus position: {e}"
1083            ))
1084        })?)
1085    }
1086
1087    async fn collect_effects_data(
1088        &self,
1089        effects: &TransactionEffects,
1090        include_events: bool,
1091        include_input_objects: bool,
1092        include_output_objects: bool,
1093    ) -> SuiResult<(Option<TransactionEvents>, Vec<Object>, Vec<Object>)> {
1094        let events = if include_events && effects.events_digest().is_some() {
1095            Some(
1096                self.state
1097                    .get_transaction_events(effects.transaction_digest())?,
1098            )
1099        } else {
1100            None
1101        };
1102
1103        let input_objects = if include_input_objects {
1104            self.state.get_transaction_input_objects(effects)?
1105        } else {
1106            vec![]
1107        };
1108
1109        let output_objects = if include_output_objects {
1110            self.state.get_transaction_output_objects(effects)?
1111        } else {
1112            vec![]
1113        };
1114
1115        Ok((events, input_objects, output_objects))
1116    }
1117}
1118
1119type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;
1120
1121impl ValidatorService {
1122    async fn handle_submit_transaction_impl(
1123        &self,
1124        request: tonic::Request<RawSubmitTxRequest>,
1125    ) -> WrappedServiceResponse<RawSubmitTxResponse> {
1126        self.handle_submit_transaction(request).await
1127    }
1128
1129    async fn wait_for_effects_impl(
1130        &self,
1131        request: tonic::Request<RawWaitForEffectsRequest>,
1132    ) -> WrappedServiceResponse<RawWaitForEffectsResponse> {
1133        let request: WaitForEffectsRequest = request.into_inner().try_into()?;
1134        let epoch_store = self.state.load_epoch_store_one_call_per_task();
1135        let response = timeout(
1136            // TODO(fastpath): Tune this once we have a good estimate of the typical delay.
1137            Duration::from_secs(20),
1138            epoch_store
1139                .within_alive_epoch(self.wait_for_effects_response(request, &epoch_store))
1140                .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch())),
1141        )
1142        .await
1143        .map_err(|_| tonic::Status::internal("Timeout waiting for effects"))???
1144        .try_into()?;
1145        Ok((tonic::Response::new(response), Weight::zero()))
1146    }
1147
1148    #[instrument(name= "ValidatorService::wait_for_effects_response", level = "error", skip_all, fields(consensus_position = ?request.consensus_position))]
1149    async fn wait_for_effects_response(
1150        &self,
1151        request: WaitForEffectsRequest,
1152        epoch_store: &Arc<AuthorityPerEpochStore>,
1153    ) -> SuiResult<WaitForEffectsResponse> {
1154        if request.ping_type.is_some() {
1155            return timeout(
1156                Duration::from_secs(10),
1157                self.ping_response(request, epoch_store),
1158            )
1159            .await
1160            .map_err(|_| SuiErrorKind::TimeoutError)?;
1161        }
1162
1163        let Some(tx_digest) = request.transaction_digest else {
1164            return Err(SuiErrorKind::InvalidRequest(
1165                "Transaction digest is required for wait for effects requests".to_string(),
1166            )
1167            .into());
1168        };
1169        let tx_digests = [tx_digest];
1170
1171        // When consensus_position is provided, also watch the consensus status cache
1172        // so rejected/dropped transactions get a timely response instead of waiting
1173        // forever for effects that will never be produced.
1174        let consensus_status_future = async {
1175            let consensus_position = match request.consensus_position {
1176                Some(pos) => pos,
1177                None => return futures::future::pending().await,
1178            };
1179            let consensus_tx_status_cache = epoch_store.consensus_tx_status_cache.as_ref().ok_or(
1180                SuiErrorKind::UnsupportedFeatureError {
1181                    error: "Consensus tx status cache".to_string(),
1182                },
1183            )?;
1184            consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1185            match consensus_tx_status_cache
1186                .notify_read_transaction_status(consensus_position)
1187                .await
1188            {
1189                NotifyReadConsensusTxStatusResult::Status(
1190                    ConsensusTxStatus::Rejected | ConsensusTxStatus::Dropped,
1191                ) => Ok(WaitForEffectsResponse::Rejected {
1192                    error: epoch_store.get_rejection_vote_reason(consensus_position),
1193                }),
1194                NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized) => {
1195                    // Effects will be produced — yield to let the effects future win.
1196                    futures::future::pending().await
1197                }
1198                NotifyReadConsensusTxStatusResult::Expired(round) => {
1199                    Ok(WaitForEffectsResponse::Expired {
1200                        epoch: epoch_store.epoch(),
1201                        round: Some(round),
1202                    })
1203                }
1204            }
1205        };
1206
1207        tokio::select! {
1208            effects_result = self.state
1209                .get_transaction_cache_reader()
1210                .notify_read_executed_effects_may_fail(
1211                    "AuthorityServer::wait_for_effects::notify_read_executed_effects_finalized",
1212                    &tx_digests,
1213                ) => {
1214                let effects = effects_result?.pop().unwrap();
1215                let effects_digest = effects.digest();
1216                let details = if request.include_details {
1217                    Some(self.complete_executed_data(effects).await?)
1218                } else {
1219                    None
1220                };
1221                Ok(WaitForEffectsResponse::Executed {
1222                    effects_digest,
1223                    details,
1224                })
1225            }
1226            status_response = consensus_status_future => {
1227                status_response
1228            }
1229        }
1230    }
1231
1232    #[instrument(level = "error", skip_all, err(level = "debug"))]
1233    async fn ping_response(
1234        &self,
1235        request: WaitForEffectsRequest,
1236        epoch_store: &Arc<AuthorityPerEpochStore>,
1237    ) -> SuiResult<WaitForEffectsResponse> {
1238        let Some(consensus_tx_status_cache) = epoch_store.consensus_tx_status_cache.as_ref() else {
1239            return Err(SuiErrorKind::UnsupportedFeatureError {
1240                error: "Mysticeti fastpath".to_string(),
1241            }
1242            .into());
1243        };
1244
1245        let Some(consensus_position) = request.consensus_position else {
1246            return Err(SuiErrorKind::InvalidRequest(
1247                "Consensus position is required for Ping requests".to_string(),
1248            )
1249            .into());
1250        };
1251
1252        // We assume that the caller has already checked for the existence of the `ping` field, but handling it gracefully here.
1253        let Some(ping) = request.ping_type else {
1254            return Err(SuiErrorKind::InvalidRequest(
1255                "Ping type is required for ping requests".to_string(),
1256            )
1257            .into());
1258        };
1259
1260        let _metrics_guard = self
1261            .metrics
1262            .handle_wait_for_effects_ping_latency
1263            .with_label_values(&[ping.as_str()])
1264            .start_timer();
1265
1266        consensus_tx_status_cache.check_position_too_ahead(&consensus_position)?;
1267
1268        let details = if request.include_details {
1269            Some(Box::new(ExecutedData::default()))
1270        } else {
1271            None
1272        };
1273
1274        let status = consensus_tx_status_cache
1275            .notify_read_transaction_status(consensus_position)
1276            .await;
1277        match status {
1278            NotifyReadConsensusTxStatusResult::Status(status) => match status {
1279                ConsensusTxStatus::Rejected | ConsensusTxStatus::Dropped => {
1280                    Ok(WaitForEffectsResponse::Rejected {
1281                        error: epoch_store.get_rejection_vote_reason(consensus_position),
1282                    })
1283                }
1284                ConsensusTxStatus::Finalized => Ok(WaitForEffectsResponse::Executed {
1285                    effects_digest: TransactionEffectsDigest::ZERO,
1286                    details,
1287                }),
1288            },
1289            NotifyReadConsensusTxStatusResult::Expired(round) => {
1290                Ok(WaitForEffectsResponse::Expired {
1291                    epoch: epoch_store.epoch(),
1292                    round: Some(round),
1293                })
1294            }
1295        }
1296    }
1297
1298    async fn complete_executed_data(
1299        &self,
1300        effects: TransactionEffects,
1301    ) -> SuiResult<Box<ExecutedData>> {
1302        let (events, input_objects, output_objects) = self
1303            .collect_effects_data(
1304                &effects, /* include_events */ true, /* include_input_objects */ true,
1305                /* include_output_objects */ true,
1306            )
1307            .await?;
1308        Ok(Box::new(ExecutedData {
1309            effects,
1310            events,
1311            input_objects,
1312            output_objects,
1313        }))
1314    }
1315
1316    async fn object_info_impl(
1317        &self,
1318        request: tonic::Request<ObjectInfoRequest>,
1319    ) -> WrappedServiceResponse<ObjectInfoResponse> {
1320        let request = request.into_inner();
1321        let response = self.state.handle_object_info_request(request).await?;
1322        Ok((tonic::Response::new(response), Weight::one()))
1323    }
1324
1325    async fn transaction_info_impl(
1326        &self,
1327        request: tonic::Request<TransactionInfoRequest>,
1328    ) -> WrappedServiceResponse<TransactionInfoResponse> {
1329        let request = request.into_inner();
1330        let response = self.state.handle_transaction_info_request(request).await?;
1331        Ok((tonic::Response::new(response), Weight::one()))
1332    }
1333
1334    async fn checkpoint_impl(
1335        &self,
1336        request: tonic::Request<CheckpointRequest>,
1337    ) -> WrappedServiceResponse<CheckpointResponse> {
1338        let request = request.into_inner();
1339        let response = self.state.handle_checkpoint_request(&request)?;
1340        Ok((tonic::Response::new(response), Weight::one()))
1341    }
1342
1343    async fn checkpoint_v2_impl(
1344        &self,
1345        request: tonic::Request<CheckpointRequestV2>,
1346    ) -> WrappedServiceResponse<CheckpointResponseV2> {
1347        let request = request.into_inner();
1348        let response = self.state.handle_checkpoint_request_v2(&request)?;
1349        Ok((tonic::Response::new(response), Weight::one()))
1350    }
1351
1352    async fn get_system_state_object_impl(
1353        &self,
1354        _request: tonic::Request<SystemStateRequest>,
1355    ) -> WrappedServiceResponse<SuiSystemState> {
1356        let response = self
1357            .state
1358            .get_object_cache_reader()
1359            .get_sui_system_state_object_unsafe()?;
1360        Ok((tonic::Response::new(response), Weight::one()))
1361    }
1362
1363    async fn validator_health_impl(
1364        &self,
1365        _request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1366    ) -> WrappedServiceResponse<sui_types::messages_grpc::RawValidatorHealthResponse> {
1367        let state = &self.state;
1368
1369        // Get epoch store once for both metrics
1370        let epoch_store = state.load_epoch_store_one_call_per_task();
1371
1372        // Get in-flight execution transactions from execution scheduler
1373        let num_inflight_execution_transactions =
1374            state.execution_scheduler().num_pending_certificates() as u64;
1375
1376        // Get in-flight consensus transactions from consensus adapter
1377        let num_inflight_consensus_transactions =
1378            self.consensus_adapter.num_inflight_transactions();
1379
1380        // Get last committed leader round from epoch store
1381        let last_committed_leader_round = epoch_store
1382            .consensus_tx_status_cache
1383            .as_ref()
1384            .and_then(|cache| cache.get_last_committed_leader_round())
1385            .unwrap_or(0);
1386
1387        // Get last locally built checkpoint sequence
1388        let last_locally_built_checkpoint = epoch_store
1389            .last_built_checkpoint_summary()
1390            .ok()
1391            .flatten()
1392            .map(|(_, summary)| summary.sequence_number)
1393            .unwrap_or(0);
1394
1395        let typed_response = sui_types::messages_grpc::ValidatorHealthResponse {
1396            num_inflight_consensus_transactions,
1397            num_inflight_execution_transactions,
1398            last_locally_built_checkpoint,
1399            last_committed_leader_round,
1400        };
1401
1402        let raw_response = typed_response
1403            .try_into()
1404            .map_err(|e: sui_types::error::SuiError| {
1405                tonic::Status::internal(format!("Failed to serialize health response: {}", e))
1406            })?;
1407
1408        Ok((tonic::Response::new(raw_response), Weight::one()))
1409    }
1410
1411    fn get_client_ip_addr<T>(
1412        &self,
1413        request: &tonic::Request<T>,
1414        source: &ClientIdSource,
1415    ) -> Option<IpAddr> {
1416        let forwarded_header = request.metadata().get_all("x-forwarded-for").iter().next();
1417
1418        if let Some(header) = forwarded_header {
1419            let num_hops = header
1420                .to_str()
1421                .map(|h| h.split(',').count().saturating_sub(1))
1422                .unwrap_or(0);
1423
1424            self.metrics.x_forwarded_for_num_hops.set(num_hops as f64);
1425        }
1426
1427        match source {
1428            ClientIdSource::SocketAddr => {
1429                let socket_addr: Option<SocketAddr> = request.remote_addr();
1430
1431                // We will hit this case if the IO type used does not
1432                // implement Connected or when using a unix domain socket.
1433                // TODO: once we have confirmed that no legitimate traffic
1434                // is hitting this case, we should reject such requests that
1435                // hit this case.
1436                if let Some(socket_addr) = socket_addr {
1437                    Some(socket_addr.ip())
1438                } else {
1439                    if cfg!(msim) {
1440                        // Ignore the error from simtests.
1441                    } else if cfg!(test) {
1442                        panic!("Failed to get remote address from request");
1443                    } else {
1444                        self.metrics.connection_ip_not_found.inc();
1445                        error!("Failed to get remote address from request");
1446                    }
1447                    None
1448                }
1449            }
1450            ClientIdSource::XForwardedFor(num_hops) => {
1451                let do_header_parse = |op: &MetadataValue<Ascii>| {
1452                    match op.to_str() {
1453                        Ok(header_val) => {
1454                            let header_contents =
1455                                header_val.split(',').map(str::trim).collect::<Vec<_>>();
1456                            if *num_hops == 0 {
1457                                error!(
1458                                    "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \
1459                                    number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \
1460                                    to this node. Skipping traffic controller request handling.",
1461                                    header_contents,
1462                                );
1463                                return None;
1464                            }
1465                            let contents_len = header_contents.len();
1466                            if contents_len < *num_hops {
1467                                error!(
1468                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1469                                    Expected at least {} values. Please correctly set the `x-forwarded-for` value under \
1470                                    `client-id-source` in the node config.",
1471                                    header_contents, contents_len, num_hops, contents_len,
1472                                );
1473                                self.metrics.client_id_source_config_mismatch.inc();
1474                                return None;
1475                            }
1476                            let Some(client_ip) = header_contents.get(contents_len - num_hops)
1477                            else {
1478                                error!(
1479                                    "x-forwarded-for header value of {:?} contains {} values, but {} hops were specified. \
1480                                    Expected at least {} values. Skipping traffic controller request handling.",
1481                                    header_contents, contents_len, num_hops, contents_len,
1482                                );
1483                                return None;
1484                            };
1485                            parse_ip(client_ip).or_else(|| {
1486                                self.metrics.forwarded_header_parse_error.inc();
1487                                None
1488                            })
1489                        }
1490                        Err(e) => {
1491                            // TODO: once we have confirmed that no legitimate traffic
1492                            // is hitting this case, we should reject such requests that
1493                            // hit this case.
1494                            self.metrics.forwarded_header_invalid.inc();
1495                            error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
1496                            None
1497                        }
1498                    }
1499                };
1500                if let Some(op) = request.metadata().get("x-forwarded-for") {
1501                    do_header_parse(op)
1502                } else if let Some(op) = request.metadata().get("X-Forwarded-For") {
1503                    do_header_parse(op)
1504                } else {
1505                    self.metrics.forwarded_header_not_included.inc();
1506                    error!(
1507                        "x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"
1508                    );
1509                    None
1510                }
1511            }
1512        }
1513    }
1514
1515    async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
1516        if let Some(traffic_controller) = &self.traffic_controller {
1517            if !traffic_controller.check(&client, &None).await {
1518                // Entity in blocklist
1519                Err(tonic::Status::from_error(
1520                    SuiErrorKind::TooManyRequests.into(),
1521                ))
1522            } else {
1523                Ok(())
1524            }
1525        } else {
1526            Ok(())
1527        }
1528    }
1529
1530    fn handle_traffic_resp<T>(
1531        &self,
1532        client: Option<IpAddr>,
1533        wrapped_response: WrappedServiceResponse<T>,
1534    ) -> Result<tonic::Response<T>, tonic::Status> {
1535        let (error, spam_weight, unwrapped_response) = match wrapped_response {
1536            Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
1537            Err(status) => (
1538                Some(SuiError::from(status.clone())),
1539                Weight::zero(),
1540                Err(status.clone()),
1541            ),
1542        };
1543
1544        if let Some(traffic_controller) = self.traffic_controller.clone() {
1545            traffic_controller.tally(TrafficTally {
1546                direct: client,
1547                through_fullnode: None,
1548                error_info: error.map(|e| {
1549                    let error_type = String::from(e.clone().as_ref());
1550                    let error_weight = normalize(e);
1551                    (error_weight, error_type)
1552                }),
1553                spam_weight,
1554                timestamp: SystemTime::now(),
1555            })
1556        }
1557        unwrapped_response
1558    }
1559}
1560
1561// TODO: refine error matching here
1562fn normalize(err: SuiError) -> Weight {
1563    match err.as_inner() {
1564        SuiErrorKind::UserInputError {
1565            error: UserInputError::IncorrectUserSignature { .. },
1566        } => Weight::one(),
1567        SuiErrorKind::InvalidSignature { .. }
1568        | SuiErrorKind::SignerSignatureAbsent { .. }
1569        | SuiErrorKind::SignerSignatureNumberMismatch { .. }
1570        | SuiErrorKind::IncorrectSigner { .. }
1571        | SuiErrorKind::UnknownSigner { .. }
1572        | SuiErrorKind::WrongEpoch { .. } => Weight::one(),
1573        _ => Weight::zero(),
1574    }
1575}
1576
1577/// Implements generic pre- and post-processing. Since this is on the critical
1578/// path, any heavy lifting should be done in a separate non-blocking task
1579/// unless it is necessary to override the return value.
1580#[macro_export]
1581macro_rules! handle_with_decoration {
1582    ($self:ident, $func_name:ident, $request:ident) => {{
1583        if $self.client_id_source.is_none() {
1584            return $self.$func_name($request).await.map(|(result, _)| result);
1585        }
1586
1587        let client = $self.get_client_ip_addr(&$request, $self.client_id_source.as_ref().unwrap());
1588
1589        // check if either IP is blocked, in which case return early
1590        $self.handle_traffic_req(client.clone()).await?;
1591
1592        // handle traffic tallying
1593        let wrapped_response = $self.$func_name($request).await;
1594        $self.handle_traffic_resp(client, wrapped_response)
1595    }};
1596}
1597
1598#[async_trait]
1599impl Validator for ValidatorService {
1600    async fn submit_transaction(
1601        &self,
1602        request: tonic::Request<RawSubmitTxRequest>,
1603    ) -> Result<tonic::Response<RawSubmitTxResponse>, tonic::Status> {
1604        let validator_service = self.clone();
1605
1606        // Spawns a task which handles the transaction. The task will unconditionally continue
1607        // processing in the event that the client connection is dropped.
1608        spawn_monitored_task!(async move {
1609            // NB: traffic tally wrapping handled within the task rather than on task exit
1610            // to prevent an attacker from subverting traffic control by severing the connection
1611            handle_with_decoration!(validator_service, handle_submit_transaction_impl, request)
1612        })
1613        .await
1614        .unwrap()
1615    }
1616
1617    async fn wait_for_effects(
1618        &self,
1619        request: tonic::Request<RawWaitForEffectsRequest>,
1620    ) -> Result<tonic::Response<RawWaitForEffectsResponse>, tonic::Status> {
1621        handle_with_decoration!(self, wait_for_effects_impl, request)
1622    }
1623
1624    async fn object_info(
1625        &self,
1626        request: tonic::Request<ObjectInfoRequest>,
1627    ) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
1628        handle_with_decoration!(self, object_info_impl, request)
1629    }
1630
1631    async fn transaction_info(
1632        &self,
1633        request: tonic::Request<TransactionInfoRequest>,
1634    ) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
1635        handle_with_decoration!(self, transaction_info_impl, request)
1636    }
1637
1638    async fn checkpoint(
1639        &self,
1640        request: tonic::Request<CheckpointRequest>,
1641    ) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
1642        handle_with_decoration!(self, checkpoint_impl, request)
1643    }
1644
1645    async fn checkpoint_v2(
1646        &self,
1647        request: tonic::Request<CheckpointRequestV2>,
1648    ) -> Result<tonic::Response<CheckpointResponseV2>, tonic::Status> {
1649        handle_with_decoration!(self, checkpoint_v2_impl, request)
1650    }
1651
1652    async fn get_system_state_object(
1653        &self,
1654        request: tonic::Request<SystemStateRequest>,
1655    ) -> Result<tonic::Response<SuiSystemState>, tonic::Status> {
1656        handle_with_decoration!(self, get_system_state_object_impl, request)
1657    }
1658
1659    async fn validator_health(
1660        &self,
1661        request: tonic::Request<sui_types::messages_grpc::RawValidatorHealthRequest>,
1662    ) -> Result<tonic::Response<sui_types::messages_grpc::RawValidatorHealthResponse>, tonic::Status>
1663    {
1664        handle_with_decoration!(self, validator_health_impl, request)
1665    }
1666}