1use futures::TryStreamExt;
2use std::time::Duration;
3use tap::Pipe;
4use tonic::codec::CompressionEncoding;
5use tonic::transport::channel::ClientTlsConfig;
6
7mod response_ext;
8pub use response_ext::ResponseExt;
9
10mod auth;
11pub use auth::AuthInterceptor;
12
13mod staking_rewards;
14pub use staking_rewards::DelegatedStake;
15
16pub mod v2;
17
18use crate::field::FieldMaskUtil;
19use crate::proto::sui::rpc::v2beta2::ledger_service_client::LedgerServiceClient;
20use crate::proto::sui::rpc::v2beta2::live_data_service_client::LiveDataServiceClient;
21use crate::proto::sui::rpc::v2beta2::move_package_service_client::MovePackageServiceClient;
22use crate::proto::sui::rpc::v2beta2::signature_verification_service_client::SignatureVerificationServiceClient;
23use crate::proto::sui::rpc::v2beta2::subscription_service_client::SubscriptionServiceClient;
24use crate::proto::sui::rpc::v2beta2::transaction_execution_service_client::TransactionExecutionServiceClient;
25use crate::proto::sui::rpc::v2beta2::ExecuteTransactionRequest;
26use crate::proto::sui::rpc::v2beta2::ExecutedTransaction;
27use crate::proto::sui::rpc::v2beta2::SubscribeCheckpointsRequest;
28use prost_types::FieldMask;
29
30type Result<T, E = tonic::Status> = std::result::Result<T, E>;
31type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
32type Channel<'a> = tonic::service::interceptor::InterceptedService<
33 &'a mut tonic::transport::Channel,
34 &'a mut AuthInterceptor,
35>;
36
37#[derive(Clone)]
38pub struct Client {
39 uri: http::Uri,
40 channel: tonic::transport::Channel,
41 auth: AuthInterceptor,
42 max_decoding_message_size: Option<usize>,
43}
44
45impl Client {
46 pub const MAINNET_FULLNODE: &str = "https://fullnode.mainnet.sui.io";
48
49 pub const TESTNET_FULLNODE: &str = "https://fullnode.testnet.sui.io";
51
52 pub const DEVNET_FULLNODE: &str = "https://fullnode.devnet.sui.io";
54
55 pub const MAINNET_ARCHIVE: &str = "https://archive.mainnet.sui.io";
57
58 pub const TESTNET_ARCHIVE: &str = "https://archive.testnet.sui.io";
60
61 #[allow(clippy::result_large_err)]
62 pub fn new<T>(uri: T) -> Result<Self>
63 where
64 T: TryInto<http::Uri>,
65 T::Error: Into<BoxError>,
66 {
67 let uri = uri
68 .try_into()
69 .map_err(Into::into)
70 .map_err(tonic::Status::from_error)?;
71 let mut endpoint = tonic::transport::Endpoint::from(uri.clone());
72 if uri.scheme() == Some(&http::uri::Scheme::HTTPS) {
73 endpoint = endpoint
74 .tls_config(ClientTlsConfig::new().with_enabled_roots())
75 .map_err(Into::into)
76 .map_err(tonic::Status::from_error)?;
77 }
78 let channel = endpoint
79 .connect_timeout(Duration::from_secs(5))
80 .http2_keep_alive_interval(Duration::from_secs(5))
81 .connect_lazy();
82
83 Ok(Self {
84 uri,
85 channel,
86 auth: Default::default(),
87 max_decoding_message_size: None,
88 })
89 }
90
91 pub fn with_auth(mut self, auth: AuthInterceptor) -> Self {
92 self.auth = auth;
93 self
94 }
95
96 pub fn with_max_decoding_message_size(mut self, limit: usize) -> Self {
97 self.max_decoding_message_size = Some(limit);
98 self
99 }
100
101 pub fn uri(&self) -> &http::Uri {
102 &self.uri
103 }
104
105 pub fn ledger_client(&mut self) -> LedgerServiceClient<Channel<'_>> {
106 LedgerServiceClient::with_interceptor(&mut self.channel, &mut self.auth)
107 .accept_compressed(CompressionEncoding::Zstd)
108 .pipe(|client| {
109 if let Some(limit) = self.max_decoding_message_size {
110 client.max_decoding_message_size(limit)
111 } else {
112 client
113 }
114 })
115 }
116
117 pub fn live_data_client(&mut self) -> LiveDataServiceClient<Channel<'_>> {
118 LiveDataServiceClient::with_interceptor(&mut self.channel, &mut self.auth)
119 .accept_compressed(CompressionEncoding::Zstd)
120 .pipe(|client| {
121 if let Some(limit) = self.max_decoding_message_size {
122 client.max_decoding_message_size(limit)
123 } else {
124 client
125 }
126 })
127 }
128
129 pub fn execution_client(&mut self) -> TransactionExecutionServiceClient<Channel<'_>> {
130 TransactionExecutionServiceClient::with_interceptor(&mut self.channel, &mut self.auth)
131 .accept_compressed(CompressionEncoding::Zstd)
132 .pipe(|client| {
133 if let Some(limit) = self.max_decoding_message_size {
134 client.max_decoding_message_size(limit)
135 } else {
136 client
137 }
138 })
139 }
140
141 pub fn package_client(&mut self) -> MovePackageServiceClient<Channel<'_>> {
142 MovePackageServiceClient::with_interceptor(&mut self.channel, &mut self.auth)
143 .accept_compressed(CompressionEncoding::Zstd)
144 .pipe(|client| {
145 if let Some(limit) = self.max_decoding_message_size {
146 client.max_decoding_message_size(limit)
147 } else {
148 client
149 }
150 })
151 }
152
153 pub fn signature_verification_client(
154 &mut self,
155 ) -> SignatureVerificationServiceClient<Channel<'_>> {
156 SignatureVerificationServiceClient::with_interceptor(&mut self.channel, &mut self.auth)
157 .accept_compressed(CompressionEncoding::Zstd)
158 .pipe(|client| {
159 if let Some(limit) = self.max_decoding_message_size {
160 client.max_decoding_message_size(limit)
161 } else {
162 client
163 }
164 })
165 }
166
167 pub fn subscription_client(&mut self) -> SubscriptionServiceClient<Channel<'_>> {
168 SubscriptionServiceClient::with_interceptor(&mut self.channel, &mut self.auth)
169 .accept_compressed(CompressionEncoding::Zstd)
170 .pipe(|client| {
171 if let Some(limit) = self.max_decoding_message_size {
172 client.max_decoding_message_size(limit)
173 } else {
174 client
175 }
176 })
177 }
178
179 pub async fn execute_transaction_and_wait_for_checkpoint(
193 &mut self,
194 request: impl tonic::IntoRequest<ExecuteTransactionRequest>,
195 timeout: Duration,
196 ) -> Result<ExecutedTransaction> {
197 let mut checkpoint_stream = self
202 .subscription_client()
203 .subscribe_checkpoints(SubscribeCheckpointsRequest {
204 read_mask: Some(FieldMask::from_str("transactions.digest,sequence_number")),
205 })
206 .await?
207 .into_inner();
208
209 let request = request.into_request();
211 let transaction = request
212 .get_ref()
213 .transaction
214 .as_ref()
215 .ok_or_else(|| tonic::Status::invalid_argument("transaction is required"))?;
216
217 let executed_txn_digest = sui_sdk_types::Transaction::try_from(transaction)
218 .map_err(|e| tonic::Status::internal(format!("failed to convert transaction: {e}")))?
219 .digest()
220 .to_string();
221
222 let executed_transaction = self
223 .execution_client()
224 .execute_transaction(request)
225 .await?
226 .into_inner()
227 .transaction()
228 .to_owned();
229
230 let timeout_future = tokio::time::sleep(timeout);
233 let checkpoint_future = async {
234 while let Some(response) = checkpoint_stream.try_next().await? {
235 let checkpoint = response.checkpoint();
236
237 for tx in checkpoint.transactions() {
238 let digest = tx.digest();
239
240 if digest == executed_txn_digest {
241 return Ok(());
242 }
243 }
244 }
245 Err(tonic::Status::aborted(
246 "checkpoint stream ended unexpectedly",
247 ))
248 };
249
250 tokio::select! {
251 result = checkpoint_future => {
252 result?;
253 Ok(executed_transaction)
254 },
255 _ = timeout_future => Err(tonic::Status::deadline_exceeded(format!("timeout waiting for checkpoint after {timeout:?}"))),
256 }
257 }
258}