1use anyhow::anyhow;
6use async_trait::async_trait;
7use mysten_network::config::Config;
8use std::collections::BTreeMap;
9use std::net::SocketAddr;
10use std::time::Duration;
11use sui_network::{api::ValidatorClient, tonic};
12use sui_types::base_types::AuthorityName;
13use sui_types::committee::CommitteeWithNetworkMetadata;
14use sui_types::crypto::NetworkPublicKey;
15use sui_types::error::{SuiError, SuiResult};
16use sui_types::messages_checkpoint::{
17 CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
18};
19use sui_types::multiaddr::Multiaddr;
20use sui_types::sui_system_state::SuiSystemState;
21use tap::TapFallible;
22
23use crate::authority_client::tonic::IntoRequest;
24use sui_network::tonic::metadata::KeyAndValueRef;
25use sui_network::tonic::transport::Channel;
26use sui_types::messages_grpc::{
27 ObjectInfoRequest, ObjectInfoResponse, RawValidatorHealthRequest, RawWaitForEffectsRequest,
28 SubmitTxRequest, SubmitTxResponse, SystemStateRequest, TransactionInfoRequest,
29 TransactionInfoResponse, ValidatorHealthRequest, ValidatorHealthResponse,
30 WaitForEffectsRequest, WaitForEffectsResponse,
31};
32
33#[async_trait]
34pub trait AuthorityAPI {
35 async fn submit_transaction(
37 &self,
38 request: SubmitTxRequest,
39 client_addr: Option<SocketAddr>,
40 ) -> Result<SubmitTxResponse, SuiError>;
41
42 async fn wait_for_effects(
45 &self,
46 request: WaitForEffectsRequest,
47 client_addr: Option<SocketAddr>,
48 ) -> Result<WaitForEffectsResponse, SuiError>;
49
50 async fn handle_object_info_request(
54 &self,
55 request: ObjectInfoRequest,
56 ) -> Result<ObjectInfoResponse, SuiError>;
57
58 async fn handle_transaction_info_request(
60 &self,
61 request: TransactionInfoRequest,
62 ) -> Result<TransactionInfoResponse, SuiError>;
63
64 async fn handle_checkpoint(
65 &self,
66 request: CheckpointRequest,
67 ) -> Result<CheckpointResponse, SuiError>;
68
69 async fn handle_checkpoint_v2(
70 &self,
71 request: CheckpointRequestV2,
72 ) -> Result<CheckpointResponseV2, SuiError>;
73
74 async fn handle_system_state_object(
77 &self,
78 request: SystemStateRequest,
79 ) -> Result<SuiSystemState, SuiError>;
80
81 async fn validator_health(
83 &self,
84 request: ValidatorHealthRequest,
85 ) -> Result<ValidatorHealthResponse, SuiError>;
86}
87
88#[derive(Clone)]
89pub struct NetworkAuthorityClient {
90 client: SuiResult<ValidatorClient<Channel>>,
91}
92
93impl NetworkAuthorityClient {
94 pub async fn connect(
95 address: &Multiaddr,
96 tls_target: NetworkPublicKey,
97 ) -> anyhow::Result<Self> {
98 let tls_config = sui_tls::create_rustls_client_config(
99 tls_target,
100 sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(),
101 None,
102 );
103 let channel = mysten_network::client::connect(address, tls_config)
104 .await
105 .map_err(|err| anyhow!(err.to_string()))?;
106 Ok(Self::new(channel))
107 }
108
109 pub fn connect_lazy(address: &Multiaddr, tls_target: NetworkPublicKey) -> Self {
110 let tls_config = sui_tls::create_rustls_client_config(
111 tls_target,
112 sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(),
113 None,
114 );
115 let client: SuiResult<_> = mysten_network::client::connect_lazy(address, tls_config)
116 .map(ValidatorClient::new)
117 .map_err(|err| err.to_string().into());
118 Self { client }
119 }
120
121 pub fn new(channel: Channel) -> Self {
122 Self {
123 client: Ok(ValidatorClient::new(channel)),
124 }
125 }
126
127 fn new_lazy(client: SuiResult<Channel>) -> Self {
128 Self {
129 client: client.map(ValidatorClient::new),
130 }
131 }
132
133 pub(crate) fn client(&self) -> SuiResult<ValidatorClient<Channel>> {
134 self.client.clone()
135 }
136
137 pub fn get_client_for_testing(&self) -> SuiResult<ValidatorClient<Channel>> {
138 self.client()
139 }
140}
141
142#[async_trait]
143impl AuthorityAPI for NetworkAuthorityClient {
144 async fn submit_transaction(
146 &self,
147 request: SubmitTxRequest,
148 client_addr: Option<SocketAddr>,
149 ) -> Result<SubmitTxResponse, SuiError> {
150 let mut request = request.into_raw()?.into_request();
151 insert_metadata(&mut request, client_addr);
152
153 self.client()?
154 .submit_transaction(request)
155 .await
156 .map(tonic::Response::into_inner)
157 .map_err(Into::<SuiError>::into)?
158 .try_into()
159 }
160
161 async fn wait_for_effects(
162 &self,
163 request: WaitForEffectsRequest,
164 client_addr: Option<SocketAddr>,
165 ) -> Result<WaitForEffectsResponse, SuiError> {
166 let raw_request: RawWaitForEffectsRequest = request.try_into()?;
167 let mut request = raw_request.into_request();
168 insert_metadata(&mut request, client_addr);
169
170 self.client()?
171 .wait_for_effects(request)
172 .await
173 .map(tonic::Response::into_inner)
174 .map_err(Into::<SuiError>::into)?
175 .try_into()
176 }
177
178 async fn handle_object_info_request(
179 &self,
180 request: ObjectInfoRequest,
181 ) -> Result<ObjectInfoResponse, SuiError> {
182 self.client()?
183 .object_info(request)
184 .await
185 .map(tonic::Response::into_inner)
186 .map_err(Into::into)
187 }
188
189 async fn handle_transaction_info_request(
191 &self,
192 request: TransactionInfoRequest,
193 ) -> Result<TransactionInfoResponse, SuiError> {
194 self.client()?
195 .transaction_info(request)
196 .await
197 .map(tonic::Response::into_inner)
198 .map_err(Into::into)
199 }
200
201 async fn handle_checkpoint(
203 &self,
204 request: CheckpointRequest,
205 ) -> Result<CheckpointResponse, SuiError> {
206 self.client()?
207 .checkpoint(request)
208 .await
209 .map(tonic::Response::into_inner)
210 .map_err(Into::into)
211 }
212
213 async fn handle_checkpoint_v2(
215 &self,
216 request: CheckpointRequestV2,
217 ) -> Result<CheckpointResponseV2, SuiError> {
218 self.client()?
219 .checkpoint_v2(request)
220 .await
221 .map(tonic::Response::into_inner)
222 .map_err(Into::into)
223 }
224
225 async fn handle_system_state_object(
226 &self,
227 request: SystemStateRequest,
228 ) -> Result<SuiSystemState, SuiError> {
229 self.client()?
230 .get_system_state_object(request)
231 .await
232 .map(tonic::Response::into_inner)
233 .map_err(Into::into)
234 }
235
236 async fn validator_health(
237 &self,
238 request: ValidatorHealthRequest,
239 ) -> Result<ValidatorHealthResponse, SuiError> {
240 let raw_request: RawValidatorHealthRequest = request.try_into()?;
241
242 self.client()?
243 .validator_health(raw_request)
244 .await
245 .map(tonic::Response::into_inner)
246 .map_err(Into::<SuiError>::into)?
247 .try_into()
248 }
249}
250
251pub fn make_network_authority_clients_with_network_config(
252 committee: &CommitteeWithNetworkMetadata,
253 network_config: &Config,
254) -> BTreeMap<AuthorityName, NetworkAuthorityClient> {
255 let mut authority_clients = BTreeMap::new();
256 for (name, (_state, network_metadata)) in committee.validators() {
257 let address = network_metadata
258 .network_address
259 .clone()
260 .rewrite_udp_to_tcp()
261 .rewrite_http_to_https();
262 let tls_config = network_metadata
263 .network_public_key
264 .as_ref()
265 .map(|key| {
266 sui_tls::create_rustls_client_config(
267 key.clone(),
268 sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(),
269 None,
270 )
271 })
272 .ok_or(SuiError::from("network public key is not available"));
273 let maybe_channel = tls_config
274 .and_then(|tls_config| {
275 network_config
276 .connect_lazy(&address, tls_config)
277 .map_err(|e| e.to_string().into())
278 })
279 .tap_err(|e| {
280 tracing::error!(
281 address = %address,
282 name = %name,
283 "unable to create authority client: {e}"
284 )
285 });
286 let client = NetworkAuthorityClient::new_lazy(maybe_channel);
287 authority_clients.insert(*name, client);
288 }
289 authority_clients
290}
291
292pub fn make_authority_clients_with_timeout_config(
293 committee: &CommitteeWithNetworkMetadata,
294 connect_timeout: Duration,
295 request_timeout: Duration,
296) -> BTreeMap<AuthorityName, NetworkAuthorityClient> {
297 let mut network_config = mysten_network::config::Config::new();
298 network_config.connect_timeout = Some(connect_timeout);
299 network_config.request_timeout = Some(request_timeout);
300 network_config.http2_keepalive_interval = Some(connect_timeout);
301 network_config.http2_keepalive_timeout = Some(connect_timeout);
302 make_network_authority_clients_with_network_config(committee, &network_config)
303}
304
305fn insert_metadata<T>(request: &mut tonic::Request<T>, client_addr: Option<SocketAddr>) {
306 if let Some(client_addr) = client_addr {
307 let mut metadata = tonic::metadata::MetadataMap::new();
308 metadata.insert("x-forwarded-for", client_addr.to_string().parse().unwrap());
309 metadata
310 .iter()
311 .for_each(|key_and_value| match key_and_value {
312 KeyAndValueRef::Ascii(key, value) => {
313 request.metadata_mut().insert(key, value.clone());
314 }
315 KeyAndValueRef::Binary(key, value) => {
316 request.metadata_mut().insert_bin(key, value.clone());
317 }
318 });
319 }
320}