Skip to main content

sui_rpc/client/
mod.rs

1use std::time::Duration;
2use tap::Pipe;
3use tonic::body::Body;
4use tonic::codec::CompressionEncoding;
5use tonic::transport::channel::ClientTlsConfig;
6use tower::Layer;
7use tower::Service;
8use tower::ServiceBuilder;
9use tower::util::BoxLayer;
10use tower::util::BoxService;
11
12mod response_ext;
13pub use response_ext::ResponseExt;
14
15mod interceptors;
16pub use interceptors::HeadersInterceptor;
17
18mod staking_rewards;
19pub use staking_rewards::DelegatedStake;
20
21mod coin_selection;
22mod lists;
23
24mod transaction_execution;
25pub use transaction_execution::ExecuteAndWaitError;
26
27use crate::proto::sui::rpc::v2::ledger_service_client::LedgerServiceClient;
28use crate::proto::sui::rpc::v2::move_package_service_client::MovePackageServiceClient;
29use crate::proto::sui::rpc::v2::signature_verification_service_client::SignatureVerificationServiceClient;
30use crate::proto::sui::rpc::v2::state_service_client::StateServiceClient;
31use crate::proto::sui::rpc::v2::subscription_service_client::SubscriptionServiceClient;
32use crate::proto::sui::rpc::v2::transaction_execution_service_client::TransactionExecutionServiceClient;
33#[cfg(feature = "unstable")]
34use crate::proto::sui::rpc::v2alpha::ledger_service_client::LedgerServiceClient as LedgerServiceClientAlpha;
35#[cfg(feature = "unstable")]
36use crate::proto::sui::rpc::v2alpha::proof_service_client::ProofServiceClient;
37
38type Result<T, E = tonic::Status> = std::result::Result<T, E>;
39type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
40type BoxedChannel = BoxService<http::Request<Body>, http::Response<Body>, tonic::Status>;
41
42type RequestLayer = BoxLayer<
43    BoxService<http::Request<Body>, http::Response<Body>, BoxError>,
44    http::Request<Body>,
45    http::Response<Body>,
46    BoxError,
47>;
48
49const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
50const DEFAULT_TCP_KEEPALIVE_IDLE: Duration = Duration::from_secs(15);
51const DEFAULT_TCP_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(5);
52const DEFAULT_TCP_KEEPALIVE_RETRIES: u32 = 3;
53const DEFAULT_HTTP2_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(5);
54const DEFAULT_HTTP2_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(20);
55
56#[derive(Clone)]
57pub struct Client {
58    uri: http::Uri,
59    channel: tonic::transport::Channel,
60    headers: HeadersInterceptor,
61    max_decoding_message_size: Option<usize>,
62
63    /// Layer to apply to all RPC requests
64    request_layer: Option<RequestLayer>,
65}
66
67impl Client {
68    /// URL for the public-good, Sui Foundation provided fullnodes for mainnet.
69    pub const MAINNET_FULLNODE: &str = "https://fullnode.mainnet.sui.io";
70
71    /// URL for the public-good, Sui Foundation provided fullnodes for testnet.
72    pub const TESTNET_FULLNODE: &str = "https://fullnode.testnet.sui.io";
73
74    /// URL for the public-good, Sui Foundation provided fullnodes for devnet.
75    pub const DEVNET_FULLNODE: &str = "https://fullnode.devnet.sui.io";
76
77    /// URL for the public-good, Sui Foundation provided archive for mainnet.
78    pub const MAINNET_ARCHIVE: &str = "https://archive.mainnet.sui.io";
79
80    /// URL for the public-good, Sui Foundation provided archive for testnet.
81    pub const TESTNET_ARCHIVE: &str = "https://archive.testnet.sui.io";
82
83    pub fn from_endpoint(endpoint: &tonic::transport::Endpoint) -> Self {
84        let uri = endpoint.uri().clone();
85        let channel = endpoint.connect_lazy();
86        Self {
87            uri,
88            channel,
89            headers: Default::default(),
90            max_decoding_message_size: None,
91            request_layer: None,
92        }
93    }
94
95    #[allow(clippy::result_large_err)]
96    pub fn new<T>(uri: T) -> Result<Self>
97    where
98        T: TryInto<http::Uri>,
99        T::Error: Into<BoxError>,
100    {
101        let uri = uri
102            .try_into()
103            .map_err(Into::into)
104            .map_err(tonic::Status::from_error)?;
105        let mut endpoint = tonic::transport::Endpoint::from(uri.clone());
106        if uri.scheme() == Some(&http::uri::Scheme::HTTPS) {
107            endpoint = endpoint
108                .tls_config(ClientTlsConfig::new().with_enabled_roots())
109                .map_err(Into::into)
110                .map_err(tonic::Status::from_error)?;
111        }
112
113        let channel = endpoint
114            .connect_timeout(DEFAULT_CONNECT_TIMEOUT)
115            .tcp_keepalive(Some(DEFAULT_TCP_KEEPALIVE_IDLE))
116            .tcp_keepalive_interval(Some(DEFAULT_TCP_KEEPALIVE_INTERVAL))
117            .tcp_keepalive_retries(Some(DEFAULT_TCP_KEEPALIVE_RETRIES))
118            .http2_keep_alive_interval(DEFAULT_HTTP2_KEEP_ALIVE_INTERVAL)
119            .keep_alive_timeout(DEFAULT_HTTP2_KEEP_ALIVE_TIMEOUT)
120            .connect_lazy();
121
122        Ok(Self {
123            uri,
124            channel,
125            headers: Default::default(),
126            max_decoding_message_size: None,
127            request_layer: None,
128        })
129    }
130
131    pub fn with_headers(mut self, headers: HeadersInterceptor) -> Self {
132        self.headers = headers;
133        self
134    }
135
136    /// Provide an optional [`Layer`] that will be used to wrap all RPC
137    /// requests.
138    ///
139    /// This could be helpful in providing global metrics and logging
140    /// for all outbound requests.
141    ///
142    /// The layer's service may return any response body that implements
143    /// [`http_body::Body<Data = bytes::Bytes>`] and any error type that
144    /// implements `Into<Box<dyn Error + Send + Sync>>`. Both are mapped
145    /// to the internal types automatically.
146    ///
147    /// # Example
148    ///
149    /// Add a layer that logs each request URI:
150    ///
151    /// ```
152    /// # let _rt = tokio::runtime::Builder::new_current_thread()
153    /// #     .build()
154    /// #     .unwrap();
155    /// # let _guard = _rt.enter();
156    /// use sui_rpc::Client;
157    /// use tower::ServiceBuilder;
158    ///
159    /// let client = Client::new(Client::MAINNET_FULLNODE)
160    ///     .unwrap()
161    ///     .request_layer(ServiceBuilder::new().map_request(|req: http::Request<_>| {
162    ///         println!("request to {}", req.uri());
163    ///         req
164    ///     }));
165    /// ```
166    pub fn request_layer<L, ResBody, E>(mut self, layer: L) -> Self
167    where
168        L: Layer<BoxService<http::Request<Body>, http::Response<Body>, BoxError>>
169            + Send
170            + Sync
171            + 'static,
172        L::Service: Service<http::Request<Body>, Response = http::Response<ResBody>, Error = E>
173            + Send
174            + 'static,
175        <L::Service as Service<http::Request<Body>>>::Future: Send + 'static,
176        ResBody: http_body::Body<Data = bytes::Bytes> + Send + 'static,
177        ResBody::Error: Into<BoxError>,
178        E: Into<BoxError> + Send + 'static,
179    {
180        let layer = BoxLayer::new(
181            ServiceBuilder::new()
182                .map_response(|resp: http::Response<ResBody>| resp.map(Body::new))
183                .map_err(Into::<BoxError>::into)
184                .layer(layer),
185        );
186        self.request_layer = Some(layer);
187        self
188    }
189
190    pub fn with_max_decoding_message_size(mut self, limit: usize) -> Self {
191        self.max_decoding_message_size = Some(limit);
192        self
193    }
194
195    pub fn uri(&self) -> &http::Uri {
196        &self.uri
197    }
198
199    fn channel(&self) -> BoxedChannel {
200        let headers = self.headers.clone();
201
202        // Build the base service with headers applied at the HTTP level and the
203        // transport error mapped to BoxError for compatibility with user layers.
204        let base = BoxService::new(
205            ServiceBuilder::new()
206                .map_err(|e: tonic::transport::Error| -> BoxError { Box::new(e) })
207                .map_request(move |mut req: http::Request<Body>| {
208                    if !headers.headers().is_empty() {
209                        req.headers_mut()
210                            .extend(headers.headers().clone().into_headers());
211                    }
212                    req
213                })
214                .service(self.channel.clone()),
215        );
216
217        // Apply the user's outbound request layer if present.
218        let layered = if let Some(layer) = &self.request_layer {
219            layer.layer(base)
220        } else {
221            base
222        };
223
224        // Map the final error to tonic::Status (a concrete type) so that
225        // downstream users of the tonic-generated clients don't run into
226        // lifetime-inference issues with async_trait and Box<dyn Error>.
227        BoxService::new(
228            ServiceBuilder::new()
229                .map_err(tonic::Status::from_error)
230                .service(layered),
231        )
232    }
233
234    pub fn ledger_client(&mut self) -> LedgerServiceClient<BoxedChannel> {
235        LedgerServiceClient::new(self.channel())
236            .accept_compressed(CompressionEncoding::Zstd)
237            .pipe(|client| {
238                if let Some(limit) = self.max_decoding_message_size {
239                    client.max_decoding_message_size(limit)
240                } else {
241                    client
242                }
243            })
244    }
245
246    pub fn state_client(&mut self) -> StateServiceClient<BoxedChannel> {
247        StateServiceClient::new(self.channel())
248            .accept_compressed(CompressionEncoding::Zstd)
249            .pipe(|client| {
250                if let Some(limit) = self.max_decoding_message_size {
251                    client.max_decoding_message_size(limit)
252                } else {
253                    client
254                }
255            })
256    }
257
258    pub fn execution_client(&mut self) -> TransactionExecutionServiceClient<BoxedChannel> {
259        TransactionExecutionServiceClient::new(self.channel())
260            .accept_compressed(CompressionEncoding::Zstd)
261            .pipe(|client| {
262                if let Some(limit) = self.max_decoding_message_size {
263                    client.max_decoding_message_size(limit)
264                } else {
265                    client
266                }
267            })
268    }
269
270    pub fn package_client(&mut self) -> MovePackageServiceClient<BoxedChannel> {
271        MovePackageServiceClient::new(self.channel())
272            .accept_compressed(CompressionEncoding::Zstd)
273            .pipe(|client| {
274                if let Some(limit) = self.max_decoding_message_size {
275                    client.max_decoding_message_size(limit)
276                } else {
277                    client
278                }
279            })
280    }
281
282    pub fn signature_verification_client(
283        &mut self,
284    ) -> SignatureVerificationServiceClient<BoxedChannel> {
285        SignatureVerificationServiceClient::new(self.channel())
286            .accept_compressed(CompressionEncoding::Zstd)
287            .pipe(|client| {
288                if let Some(limit) = self.max_decoding_message_size {
289                    client.max_decoding_message_size(limit)
290                } else {
291                    client
292                }
293            })
294    }
295
296    pub fn subscription_client(&mut self) -> SubscriptionServiceClient<BoxedChannel> {
297        SubscriptionServiceClient::new(self.channel())
298            .accept_compressed(CompressionEncoding::Zstd)
299            .pipe(|client| {
300                if let Some(limit) = self.max_decoding_message_size {
301                    client.max_decoding_message_size(limit)
302                } else {
303                    client
304                }
305            })
306    }
307
308    /// Returns a client for the unstable alpha `ProofService`, which serves
309    /// Object Checkpoint State (OCS) inclusion proofs.
310    #[cfg(feature = "unstable")]
311    #[cfg_attr(doc_cfg, doc(cfg(feature = "unstable")))]
312    pub fn proof_client(&mut self) -> ProofServiceClient<BoxedChannel> {
313        ProofServiceClient::new(self.channel())
314            .accept_compressed(CompressionEncoding::Zstd)
315            .pipe(|client| {
316                if let Some(limit) = self.max_decoding_message_size {
317                    client.max_decoding_message_size(limit)
318                } else {
319                    client
320                }
321            })
322    }
323
324    /// Returns a client for the unstable v2alpha `LedgerService`, which
325    /// exposes the indexed checkpoint / transaction / event listing RPCs
326    /// (`ListCheckpoints`, `ListTransactions`, `ListEvents`) on top of the
327    /// stable v2 surface accessible via [`Self::ledger_client`].
328    #[cfg(feature = "unstable")]
329    #[cfg_attr(doc_cfg, doc(cfg(feature = "unstable")))]
330    pub fn ledger_client_alpha(&mut self) -> LedgerServiceClientAlpha<BoxedChannel> {
331        LedgerServiceClientAlpha::new(self.channel())
332            .accept_compressed(CompressionEncoding::Zstd)
333            .pipe(|client| {
334                if let Some(limit) = self.max_decoding_message_size {
335                    client.max_decoding_message_size(limit)
336                } else {
337                    client
338                }
339            })
340    }
341}