use anyhow::anyhow;
use async_trait::async_trait;
use mysten_network::config::Config;
use std::collections::BTreeMap;
use std::net::SocketAddr;
use std::time::Duration;
use sui_network::{api::ValidatorClient, tonic};
use sui_types::base_types::AuthorityName;
use sui_types::committee::CommitteeWithNetworkMetadata;
use sui_types::messages_checkpoint::{
CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
};
use sui_types::multiaddr::Multiaddr;
use sui_types::sui_system_state::SuiSystemState;
use sui_types::{error::SuiError, transaction::*};
use crate::authority_client::tonic::IntoRequest;
use sui_network::tonic::metadata::KeyAndValueRef;
use sui_network::tonic::transport::Channel;
use sui_types::messages_grpc::{
HandleCertificateRequestV3, HandleCertificateResponseV2, HandleCertificateResponseV3,
HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse, SystemStateRequest,
TransactionInfoRequest, TransactionInfoResponse,
};
#[async_trait]
pub trait AuthorityAPI {
async fn handle_transaction(
&self,
transaction: Transaction,
client_addr: Option<SocketAddr>,
) -> Result<HandleTransactionResponse, SuiError>;
async fn handle_certificate_v2(
&self,
certificate: CertifiedTransaction,
client_addr: Option<SocketAddr>,
) -> Result<HandleCertificateResponseV2, SuiError>;
async fn handle_certificate_v3(
&self,
request: HandleCertificateRequestV3,
client_addr: Option<SocketAddr>,
) -> Result<HandleCertificateResponseV3, SuiError>;
async fn handle_object_info_request(
&self,
request: ObjectInfoRequest,
) -> Result<ObjectInfoResponse, SuiError>;
async fn handle_transaction_info_request(
&self,
request: TransactionInfoRequest,
) -> Result<TransactionInfoResponse, SuiError>;
async fn handle_checkpoint(
&self,
request: CheckpointRequest,
) -> Result<CheckpointResponse, SuiError>;
async fn handle_checkpoint_v2(
&self,
request: CheckpointRequestV2,
) -> Result<CheckpointResponseV2, SuiError>;
async fn handle_system_state_object(
&self,
request: SystemStateRequest,
) -> Result<SuiSystemState, SuiError>;
}
#[derive(Clone)]
pub struct NetworkAuthorityClient {
client: ValidatorClient<Channel>,
}
impl NetworkAuthorityClient {
pub async fn connect(address: &Multiaddr) -> anyhow::Result<Self> {
let channel = mysten_network::client::connect(address)
.await
.map_err(|err| anyhow!(err.to_string()))?;
Ok(Self::new(channel))
}
pub fn connect_lazy(address: &Multiaddr) -> anyhow::Result<Self> {
let channel = mysten_network::client::connect_lazy(address)
.map_err(|err| anyhow!(err.to_string()))?;
Ok(Self::new(channel))
}
pub fn new(channel: Channel) -> Self {
Self {
client: ValidatorClient::new(channel),
}
}
fn client(&self) -> ValidatorClient<Channel> {
self.client.clone()
}
}
#[async_trait]
impl AuthorityAPI for NetworkAuthorityClient {
async fn handle_transaction(
&self,
transaction: Transaction,
client_addr: Option<SocketAddr>,
) -> Result<HandleTransactionResponse, SuiError> {
let mut request = transaction.into_request();
insert_metadata(&mut request, client_addr);
self.client()
.transaction(request)
.await
.map(tonic::Response::into_inner)
.map_err(Into::into)
}
async fn handle_certificate_v2(
&self,
certificate: CertifiedTransaction,
client_addr: Option<SocketAddr>,
) -> Result<HandleCertificateResponseV2, SuiError> {
let mut request = certificate.into_request();
insert_metadata(&mut request, client_addr);
let response = self
.client()
.handle_certificate_v2(request)
.await
.map(tonic::Response::into_inner);
response.map_err(Into::into)
}
async fn handle_certificate_v3(
&self,
request: HandleCertificateRequestV3,
client_addr: Option<SocketAddr>,
) -> Result<HandleCertificateResponseV3, SuiError> {
let mut request = request.into_request();
insert_metadata(&mut request, client_addr);
let response = self
.client()
.handle_certificate_v3(request)
.await
.map(tonic::Response::into_inner);
response.map_err(Into::into)
}
async fn handle_object_info_request(
&self,
request: ObjectInfoRequest,
) -> Result<ObjectInfoResponse, SuiError> {
self.client()
.object_info(request)
.await
.map(tonic::Response::into_inner)
.map_err(Into::into)
}
async fn handle_transaction_info_request(
&self,
request: TransactionInfoRequest,
) -> Result<TransactionInfoResponse, SuiError> {
self.client()
.transaction_info(request)
.await
.map(tonic::Response::into_inner)
.map_err(Into::into)
}
async fn handle_checkpoint(
&self,
request: CheckpointRequest,
) -> Result<CheckpointResponse, SuiError> {
self.client()
.checkpoint(request)
.await
.map(tonic::Response::into_inner)
.map_err(Into::into)
}
async fn handle_checkpoint_v2(
&self,
request: CheckpointRequestV2,
) -> Result<CheckpointResponseV2, SuiError> {
self.client()
.checkpoint_v2(request)
.await
.map(tonic::Response::into_inner)
.map_err(Into::into)
}
async fn handle_system_state_object(
&self,
request: SystemStateRequest,
) -> Result<SuiSystemState, SuiError> {
self.client()
.get_system_state_object(request)
.await
.map(tonic::Response::into_inner)
.map_err(Into::into)
}
}
pub fn make_network_authority_clients_with_network_config(
committee: &CommitteeWithNetworkMetadata,
network_config: &Config,
) -> anyhow::Result<BTreeMap<AuthorityName, NetworkAuthorityClient>> {
let mut authority_clients = BTreeMap::new();
for (name, _stakes) in &committee.committee.voting_rights {
let address = &committee
.network_metadata
.get(name)
.ok_or_else(|| {
SuiError::from("Missing network metadata in CommitteeWithNetworkMetadata")
})?
.network_address;
let channel = network_config
.connect_lazy(address)
.map_err(|err| anyhow!(err.to_string()))?;
let client = NetworkAuthorityClient::new(channel);
authority_clients.insert(*name, client);
}
Ok(authority_clients)
}
pub fn make_authority_clients_with_timeout_config(
committee: &CommitteeWithNetworkMetadata,
connect_timeout: Duration,
request_timeout: Duration,
) -> anyhow::Result<BTreeMap<AuthorityName, NetworkAuthorityClient>> {
let mut network_config = mysten_network::config::Config::new();
network_config.connect_timeout = Some(connect_timeout);
network_config.request_timeout = Some(request_timeout);
make_network_authority_clients_with_network_config(committee, &network_config)
}
fn insert_metadata<T>(request: &mut tonic::Request<T>, client_addr: Option<SocketAddr>) {
if let Some(client_addr) = client_addr {
let mut metadata = tonic::metadata::MetadataMap::new();
metadata.insert("x-forwarded-for", client_addr.to_string().parse().unwrap());
metadata
.iter()
.for_each(|key_and_value| match key_and_value {
KeyAndValueRef::Ascii(key, value) => {
request.metadata_mut().insert(key, value.clone());
}
KeyAndValueRef::Binary(key, value) => {
request.metadata_mut().insert_bin(key, value.clone());
}
});
}
}