sui_core/
authority_client.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use anyhow::anyhow;
6use async_trait::async_trait;
7use mysten_network::config::Config;
8use std::collections::BTreeMap;
9use std::net::SocketAddr;
10use std::time::Duration;
11use sui_network::{api::ValidatorClient, tonic};
12use sui_types::base_types::AuthorityName;
13use sui_types::committee::CommitteeWithNetworkMetadata;
14use sui_types::crypto::NetworkPublicKey;
15use sui_types::messages_checkpoint::{
16    CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
17};
18use sui_types::multiaddr::Multiaddr;
19use sui_types::sui_system_state::SuiSystemState;
20use sui_types::{
21    error::{SuiError, SuiResult},
22    transaction::*,
23};
24use tap::TapFallible;
25
26use crate::authority_client::tonic::IntoRequest;
27use sui_network::tonic::metadata::KeyAndValueRef;
28use sui_network::tonic::transport::Channel;
29use sui_types::messages_grpc::{
30    HandleCertificateRequestV3, HandleCertificateResponseV2, HandleCertificateResponseV3,
31    HandleSoftBundleCertificatesRequestV3, HandleSoftBundleCertificatesResponseV3,
32    HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse, RawValidatorHealthRequest,
33    RawWaitForEffectsRequest, SubmitTxRequest, SubmitTxResponse, SystemStateRequest,
34    TransactionInfoRequest, TransactionInfoResponse, ValidatorHealthRequest,
35    ValidatorHealthResponse, WaitForEffectsRequest, WaitForEffectsResponse,
36};
37
38#[async_trait]
39pub trait AuthorityAPI {
40    /// Submits a transaction to validators for sequencing and execution.
41    async fn submit_transaction(
42        &self,
43        request: SubmitTxRequest,
44        client_addr: Option<SocketAddr>,
45    ) -> Result<SubmitTxResponse, SuiError>;
46
47    /// Waits for effects of a transaction that has been submitted to the network
48    /// through the `submit_transaction` API.
49    async fn wait_for_effects(
50        &self,
51        request: WaitForEffectsRequest,
52        client_addr: Option<SocketAddr>,
53    ) -> Result<WaitForEffectsResponse, SuiError>;
54
55    // TODO(fastpath): Add a soft bundle path for mfp which will return the list of consensus positions
56
57    /// Initiate a new transaction to a Sui or Primary account.
58    async fn handle_transaction(
59        &self,
60        transaction: Transaction,
61        client_addr: Option<SocketAddr>,
62    ) -> Result<HandleTransactionResponse, SuiError>;
63
64    /// Execute a certificate.
65    async fn handle_certificate_v2(
66        &self,
67        certificate: CertifiedTransaction,
68        client_addr: Option<SocketAddr>,
69    ) -> Result<HandleCertificateResponseV2, SuiError>;
70
71    /// Execute a certificate.
72    async fn handle_certificate_v3(
73        &self,
74        request: HandleCertificateRequestV3,
75        client_addr: Option<SocketAddr>,
76    ) -> Result<HandleCertificateResponseV3, SuiError>;
77
78    /// Execute a Soft Bundle with multiple certificates.
79    async fn handle_soft_bundle_certificates_v3(
80        &self,
81        request: HandleSoftBundleCertificatesRequestV3,
82        client_addr: Option<SocketAddr>,
83    ) -> Result<HandleSoftBundleCertificatesResponseV3, SuiError>;
84
85    /// Handle Object information requests for this account.
86    async fn handle_object_info_request(
87        &self,
88        request: ObjectInfoRequest,
89    ) -> Result<ObjectInfoResponse, SuiError>;
90
91    /// Handle Object information requests for this account.
92    async fn handle_transaction_info_request(
93        &self,
94        request: TransactionInfoRequest,
95    ) -> Result<TransactionInfoResponse, SuiError>;
96
97    async fn handle_checkpoint(
98        &self,
99        request: CheckpointRequest,
100    ) -> Result<CheckpointResponse, SuiError>;
101
102    async fn handle_checkpoint_v2(
103        &self,
104        request: CheckpointRequestV2,
105    ) -> Result<CheckpointResponseV2, SuiError>;
106
107    // This API is exclusively used by the benchmark code.
108    // Hence it's OK to return a fixed system state type.
109    async fn handle_system_state_object(
110        &self,
111        request: SystemStateRequest,
112    ) -> Result<SuiSystemState, SuiError>;
113
114    /// Get validator health metrics (for latency measurement)
115    async fn validator_health(
116        &self,
117        request: ValidatorHealthRequest,
118    ) -> Result<ValidatorHealthResponse, SuiError>;
119}
120
121#[derive(Clone)]
122pub struct NetworkAuthorityClient {
123    client: SuiResult<ValidatorClient<Channel>>,
124}
125
126impl NetworkAuthorityClient {
127    pub async fn connect(
128        address: &Multiaddr,
129        tls_target: NetworkPublicKey,
130    ) -> anyhow::Result<Self> {
131        let tls_config = sui_tls::create_rustls_client_config(
132            tls_target,
133            sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(),
134            None,
135        );
136        let channel = mysten_network::client::connect(address, tls_config)
137            .await
138            .map_err(|err| anyhow!(err.to_string()))?;
139        Ok(Self::new(channel))
140    }
141
142    pub fn connect_lazy(address: &Multiaddr, tls_target: NetworkPublicKey) -> Self {
143        let tls_config = sui_tls::create_rustls_client_config(
144            tls_target,
145            sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(),
146            None,
147        );
148        let client: SuiResult<_> = mysten_network::client::connect_lazy(address, tls_config)
149            .map(ValidatorClient::new)
150            .map_err(|err| err.to_string().into());
151        Self { client }
152    }
153
154    pub fn new(channel: Channel) -> Self {
155        Self {
156            client: Ok(ValidatorClient::new(channel)),
157        }
158    }
159
160    fn new_lazy(client: SuiResult<Channel>) -> Self {
161        Self {
162            client: client.map(ValidatorClient::new),
163        }
164    }
165
166    pub(crate) fn client(&self) -> SuiResult<ValidatorClient<Channel>> {
167        self.client.clone()
168    }
169
170    pub fn get_client_for_testing(&self) -> SuiResult<ValidatorClient<Channel>> {
171        self.client()
172    }
173}
174
175#[async_trait]
176impl AuthorityAPI for NetworkAuthorityClient {
177    /// Submits a transaction to the Sui network for certification and execution.
178    async fn submit_transaction(
179        &self,
180        request: SubmitTxRequest,
181        client_addr: Option<SocketAddr>,
182    ) -> Result<SubmitTxResponse, SuiError> {
183        let mut request = request.into_raw()?.into_request();
184        insert_metadata(&mut request, client_addr);
185
186        self.client()?
187            .submit_transaction(request)
188            .await
189            .map(tonic::Response::into_inner)
190            .map_err(Into::<SuiError>::into)?
191            .try_into()
192    }
193
194    async fn wait_for_effects(
195        &self,
196        request: WaitForEffectsRequest,
197        client_addr: Option<SocketAddr>,
198    ) -> Result<WaitForEffectsResponse, SuiError> {
199        let raw_request: RawWaitForEffectsRequest = request.try_into()?;
200        let mut request = raw_request.into_request();
201        insert_metadata(&mut request, client_addr);
202
203        self.client()?
204            .wait_for_effects(request)
205            .await
206            .map(tonic::Response::into_inner)
207            .map_err(Into::<SuiError>::into)?
208            .try_into()
209    }
210
211    /// Initiate a new transfer to a Sui or Primary account.
212    async fn handle_transaction(
213        &self,
214        transaction: Transaction,
215        client_addr: Option<SocketAddr>,
216    ) -> Result<HandleTransactionResponse, SuiError> {
217        let mut request = transaction.into_request();
218        insert_metadata(&mut request, client_addr);
219
220        self.client()?
221            .transaction(request)
222            .await
223            .map(tonic::Response::into_inner)
224            .map_err(Into::into)
225    }
226
227    /// Execute a certificate.
228    async fn handle_certificate_v2(
229        &self,
230        certificate: CertifiedTransaction,
231        client_addr: Option<SocketAddr>,
232    ) -> Result<HandleCertificateResponseV2, SuiError> {
233        let mut request = certificate.into_request();
234        insert_metadata(&mut request, client_addr);
235
236        let response = self
237            .client()?
238            .handle_certificate_v2(request)
239            .await
240            .map(tonic::Response::into_inner);
241
242        response.map_err(Into::into)
243    }
244
245    async fn handle_certificate_v3(
246        &self,
247        request: HandleCertificateRequestV3,
248        client_addr: Option<SocketAddr>,
249    ) -> Result<HandleCertificateResponseV3, SuiError> {
250        let mut request = request.into_request();
251        insert_metadata(&mut request, client_addr);
252
253        let response = self
254            .client()?
255            .handle_certificate_v3(request)
256            .await
257            .map(tonic::Response::into_inner);
258
259        response.map_err(Into::into)
260    }
261
262    async fn handle_soft_bundle_certificates_v3(
263        &self,
264        request: HandleSoftBundleCertificatesRequestV3,
265        client_addr: Option<SocketAddr>,
266    ) -> Result<HandleSoftBundleCertificatesResponseV3, SuiError> {
267        let mut request = request.into_request();
268        insert_metadata(&mut request, client_addr);
269
270        let response = self
271            .client()?
272            .handle_soft_bundle_certificates_v3(request)
273            .await
274            .map(tonic::Response::into_inner);
275
276        response.map_err(Into::into)
277    }
278
279    async fn handle_object_info_request(
280        &self,
281        request: ObjectInfoRequest,
282    ) -> Result<ObjectInfoResponse, SuiError> {
283        self.client()?
284            .object_info(request)
285            .await
286            .map(tonic::Response::into_inner)
287            .map_err(Into::into)
288    }
289
290    /// Handle Object information requests for this account.
291    async fn handle_transaction_info_request(
292        &self,
293        request: TransactionInfoRequest,
294    ) -> Result<TransactionInfoResponse, SuiError> {
295        self.client()?
296            .transaction_info(request)
297            .await
298            .map(tonic::Response::into_inner)
299            .map_err(Into::into)
300    }
301
302    /// Handle Object information requests for this account.
303    async fn handle_checkpoint(
304        &self,
305        request: CheckpointRequest,
306    ) -> Result<CheckpointResponse, SuiError> {
307        self.client()?
308            .checkpoint(request)
309            .await
310            .map(tonic::Response::into_inner)
311            .map_err(Into::into)
312    }
313
314    /// Handle Object information requests for this account.
315    async fn handle_checkpoint_v2(
316        &self,
317        request: CheckpointRequestV2,
318    ) -> Result<CheckpointResponseV2, SuiError> {
319        self.client()?
320            .checkpoint_v2(request)
321            .await
322            .map(tonic::Response::into_inner)
323            .map_err(Into::into)
324    }
325
326    async fn handle_system_state_object(
327        &self,
328        request: SystemStateRequest,
329    ) -> Result<SuiSystemState, SuiError> {
330        self.client()?
331            .get_system_state_object(request)
332            .await
333            .map(tonic::Response::into_inner)
334            .map_err(Into::into)
335    }
336
337    async fn validator_health(
338        &self,
339        request: ValidatorHealthRequest,
340    ) -> Result<ValidatorHealthResponse, SuiError> {
341        let raw_request: RawValidatorHealthRequest = request.try_into()?;
342
343        self.client()?
344            .validator_health(raw_request)
345            .await
346            .map(tonic::Response::into_inner)
347            .map_err(Into::<SuiError>::into)?
348            .try_into()
349    }
350}
351
352pub fn make_network_authority_clients_with_network_config(
353    committee: &CommitteeWithNetworkMetadata,
354    network_config: &Config,
355) -> BTreeMap<AuthorityName, NetworkAuthorityClient> {
356    let mut authority_clients = BTreeMap::new();
357    for (name, (_state, network_metadata)) in committee.validators() {
358        let address = network_metadata
359            .network_address
360            .clone()
361            .rewrite_udp_to_tcp()
362            .rewrite_http_to_https();
363        let tls_config = network_metadata
364            .network_public_key
365            .as_ref()
366            .map(|key| {
367                sui_tls::create_rustls_client_config(
368                    key.clone(),
369                    sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(),
370                    None,
371                )
372            })
373            .ok_or(SuiError::from("network public key is not available"));
374        let maybe_channel = tls_config
375            .and_then(|tls_config| {
376                network_config
377                    .connect_lazy(&address, tls_config)
378                    .map_err(|e| e.to_string().into())
379            })
380            .tap_err(|e| {
381                tracing::error!(
382                    address = %address,
383                    name = %name,
384                    "unable to create authority client: {e}"
385                )
386            });
387        let client = NetworkAuthorityClient::new_lazy(maybe_channel);
388        authority_clients.insert(*name, client);
389    }
390    authority_clients
391}
392
393pub fn make_authority_clients_with_timeout_config(
394    committee: &CommitteeWithNetworkMetadata,
395    connect_timeout: Duration,
396    request_timeout: Duration,
397) -> BTreeMap<AuthorityName, NetworkAuthorityClient> {
398    let mut network_config = mysten_network::config::Config::new();
399    network_config.connect_timeout = Some(connect_timeout);
400    network_config.request_timeout = Some(request_timeout);
401    network_config.http2_keepalive_interval = Some(connect_timeout);
402    network_config.http2_keepalive_timeout = Some(connect_timeout);
403    make_network_authority_clients_with_network_config(committee, &network_config)
404}
405
406fn insert_metadata<T>(request: &mut tonic::Request<T>, client_addr: Option<SocketAddr>) {
407    if let Some(client_addr) = client_addr {
408        let mut metadata = tonic::metadata::MetadataMap::new();
409        metadata.insert("x-forwarded-for", client_addr.to_string().parse().unwrap());
410        metadata
411            .iter()
412            .for_each(|key_and_value| match key_and_value {
413                KeyAndValueRef::Ascii(key, value) => {
414                    request.metadata_mut().insert(key, value.clone());
415                }
416                KeyAndValueRef::Binary(key, value) => {
417                    request.metadata_mut().insert_bin(key, value.clone());
418                }
419            });
420    }
421}