sui_rpc/client/
mod.rs

1use std::time::Duration;
2use tap::Pipe;
3use tonic::body::Body;
4use tonic::codec::CompressionEncoding;
5use tonic::transport::channel::ClientTlsConfig;
6use tower::Layer;
7use tower::Service;
8use tower::ServiceBuilder;
9use tower::util::BoxLayer;
10use tower::util::BoxService;
11
12mod response_ext;
13pub use response_ext::ResponseExt;
14
15mod interceptors;
16pub use interceptors::HeadersInterceptor;
17
18mod staking_rewards;
19pub use staking_rewards::DelegatedStake;
20
21mod coin_selection;
22mod lists;
23
24mod transaction_execution;
25pub use transaction_execution::ExecuteAndWaitError;
26
27use crate::proto::sui::rpc::v2::ledger_service_client::LedgerServiceClient;
28use crate::proto::sui::rpc::v2::move_package_service_client::MovePackageServiceClient;
29use crate::proto::sui::rpc::v2::signature_verification_service_client::SignatureVerificationServiceClient;
30use crate::proto::sui::rpc::v2::state_service_client::StateServiceClient;
31use crate::proto::sui::rpc::v2::subscription_service_client::SubscriptionServiceClient;
32use crate::proto::sui::rpc::v2::transaction_execution_service_client::TransactionExecutionServiceClient;
33
34type Result<T, E = tonic::Status> = std::result::Result<T, E>;
35type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
36type BoxedChannel = BoxService<http::Request<Body>, http::Response<Body>, tonic::Status>;
37
38type RequestLayer = BoxLayer<
39    BoxService<http::Request<Body>, http::Response<Body>, BoxError>,
40    http::Request<Body>,
41    http::Response<Body>,
42    BoxError,
43>;
44
45const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
46const DEFAULT_TCP_KEEPALIVE_IDLE: Duration = Duration::from_secs(15);
47const DEFAULT_TCP_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(5);
48const DEFAULT_TCP_KEEPALIVE_RETRIES: u32 = 3;
49const DEFAULT_HTTP2_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(5);
50const DEFAULT_HTTP2_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(20);
51
52#[derive(Clone)]
53pub struct Client {
54    uri: http::Uri,
55    channel: tonic::transport::Channel,
56    headers: HeadersInterceptor,
57    max_decoding_message_size: Option<usize>,
58
59    /// Layer to apply to all RPC requests
60    request_layer: Option<RequestLayer>,
61}
62
63impl Client {
64    /// URL for the public-good, Sui Foundation provided fullnodes for mainnet.
65    pub const MAINNET_FULLNODE: &str = "https://fullnode.mainnet.sui.io";
66
67    /// URL for the public-good, Sui Foundation provided fullnodes for testnet.
68    pub const TESTNET_FULLNODE: &str = "https://fullnode.testnet.sui.io";
69
70    /// URL for the public-good, Sui Foundation provided fullnodes for devnet.
71    pub const DEVNET_FULLNODE: &str = "https://fullnode.devnet.sui.io";
72
73    /// URL for the public-good, Sui Foundation provided archive for mainnet.
74    pub const MAINNET_ARCHIVE: &str = "https://archive.mainnet.sui.io";
75
76    /// URL for the public-good, Sui Foundation provided archive for testnet.
77    pub const TESTNET_ARCHIVE: &str = "https://archive.testnet.sui.io";
78
79    pub fn from_endpoint(endpoint: &tonic::transport::Endpoint) -> Self {
80        let uri = endpoint.uri().clone();
81        let channel = endpoint.connect_lazy();
82        Self {
83            uri,
84            channel,
85            headers: Default::default(),
86            max_decoding_message_size: None,
87            request_layer: None,
88        }
89    }
90
91    #[allow(clippy::result_large_err)]
92    pub fn new<T>(uri: T) -> Result<Self>
93    where
94        T: TryInto<http::Uri>,
95        T::Error: Into<BoxError>,
96    {
97        let uri = uri
98            .try_into()
99            .map_err(Into::into)
100            .map_err(tonic::Status::from_error)?;
101        let mut endpoint = tonic::transport::Endpoint::from(uri.clone());
102        if uri.scheme() == Some(&http::uri::Scheme::HTTPS) {
103            endpoint = endpoint
104                .tls_config(ClientTlsConfig::new().with_enabled_roots())
105                .map_err(Into::into)
106                .map_err(tonic::Status::from_error)?;
107        }
108
109        let channel = endpoint
110            .connect_timeout(DEFAULT_CONNECT_TIMEOUT)
111            .tcp_keepalive(Some(DEFAULT_TCP_KEEPALIVE_IDLE))
112            .tcp_keepalive_interval(Some(DEFAULT_TCP_KEEPALIVE_INTERVAL))
113            .tcp_keepalive_retries(Some(DEFAULT_TCP_KEEPALIVE_RETRIES))
114            .http2_keep_alive_interval(DEFAULT_HTTP2_KEEP_ALIVE_INTERVAL)
115            .keep_alive_timeout(DEFAULT_HTTP2_KEEP_ALIVE_TIMEOUT)
116            .connect_lazy();
117
118        Ok(Self {
119            uri,
120            channel,
121            headers: Default::default(),
122            max_decoding_message_size: None,
123            request_layer: None,
124        })
125    }
126
127    pub fn with_headers(mut self, headers: HeadersInterceptor) -> Self {
128        self.headers = headers;
129        self
130    }
131
132    /// Provide an optional [`Layer`] that will be used to wrap all RPC
133    /// requests.
134    ///
135    /// This could be helpful in providing global metrics and logging
136    /// for all outbound requests.
137    ///
138    /// The layer's service may return any response body that implements
139    /// [`http_body::Body<Data = bytes::Bytes>`] and any error type that
140    /// implements `Into<Box<dyn Error + Send + Sync>>`. Both are mapped
141    /// to the internal types automatically.
142    ///
143    /// # Example
144    ///
145    /// Add a layer that logs each request URI:
146    ///
147    /// ```
148    /// # let _rt = tokio::runtime::Builder::new_current_thread()
149    /// #     .build()
150    /// #     .unwrap();
151    /// # let _guard = _rt.enter();
152    /// use sui_rpc::Client;
153    /// use tower::ServiceBuilder;
154    ///
155    /// let client = Client::new(Client::MAINNET_FULLNODE)
156    ///     .unwrap()
157    ///     .request_layer(ServiceBuilder::new().map_request(|req: http::Request<_>| {
158    ///         println!("request to {}", req.uri());
159    ///         req
160    ///     }));
161    /// ```
162    pub fn request_layer<L, ResBody, E>(mut self, layer: L) -> Self
163    where
164        L: Layer<BoxService<http::Request<Body>, http::Response<Body>, BoxError>>
165            + Send
166            + Sync
167            + 'static,
168        L::Service: Service<http::Request<Body>, Response = http::Response<ResBody>, Error = E>
169            + Send
170            + 'static,
171        <L::Service as Service<http::Request<Body>>>::Future: Send + 'static,
172        ResBody: http_body::Body<Data = bytes::Bytes> + Send + 'static,
173        ResBody::Error: Into<BoxError>,
174        E: Into<BoxError> + Send + 'static,
175    {
176        let layer = BoxLayer::new(
177            ServiceBuilder::new()
178                .map_response(|resp: http::Response<ResBody>| resp.map(Body::new))
179                .map_err(Into::<BoxError>::into)
180                .layer(layer),
181        );
182        self.request_layer = Some(layer);
183        self
184    }
185
186    pub fn with_max_decoding_message_size(mut self, limit: usize) -> Self {
187        self.max_decoding_message_size = Some(limit);
188        self
189    }
190
191    pub fn uri(&self) -> &http::Uri {
192        &self.uri
193    }
194
195    fn channel(&self) -> BoxedChannel {
196        let headers = self.headers.clone();
197
198        // Build the base service with headers applied at the HTTP level and the
199        // transport error mapped to BoxError for compatibility with user layers.
200        let base = BoxService::new(
201            ServiceBuilder::new()
202                .map_err(|e: tonic::transport::Error| -> BoxError { Box::new(e) })
203                .map_request(move |mut req: http::Request<Body>| {
204                    if !headers.headers().is_empty() {
205                        req.headers_mut()
206                            .extend(headers.headers().clone().into_headers());
207                    }
208                    req
209                })
210                .service(self.channel.clone()),
211        );
212
213        // Apply the user's outbound request layer if present.
214        let layered = if let Some(layer) = &self.request_layer {
215            layer.layer(base)
216        } else {
217            base
218        };
219
220        // Map the final error to tonic::Status (a concrete type) so that
221        // downstream users of the tonic-generated clients don't run into
222        // lifetime-inference issues with async_trait and Box<dyn Error>.
223        BoxService::new(
224            ServiceBuilder::new()
225                .map_err(tonic::Status::from_error)
226                .service(layered),
227        )
228    }
229
230    pub fn ledger_client(&mut self) -> LedgerServiceClient<BoxedChannel> {
231        LedgerServiceClient::new(self.channel())
232            .accept_compressed(CompressionEncoding::Zstd)
233            .pipe(|client| {
234                if let Some(limit) = self.max_decoding_message_size {
235                    client.max_decoding_message_size(limit)
236                } else {
237                    client
238                }
239            })
240    }
241
242    pub fn state_client(&mut self) -> StateServiceClient<BoxedChannel> {
243        StateServiceClient::new(self.channel())
244            .accept_compressed(CompressionEncoding::Zstd)
245            .pipe(|client| {
246                if let Some(limit) = self.max_decoding_message_size {
247                    client.max_decoding_message_size(limit)
248                } else {
249                    client
250                }
251            })
252    }
253
254    pub fn execution_client(&mut self) -> TransactionExecutionServiceClient<BoxedChannel> {
255        TransactionExecutionServiceClient::new(self.channel())
256            .accept_compressed(CompressionEncoding::Zstd)
257            .pipe(|client| {
258                if let Some(limit) = self.max_decoding_message_size {
259                    client.max_decoding_message_size(limit)
260                } else {
261                    client
262                }
263            })
264    }
265
266    pub fn package_client(&mut self) -> MovePackageServiceClient<BoxedChannel> {
267        MovePackageServiceClient::new(self.channel())
268            .accept_compressed(CompressionEncoding::Zstd)
269            .pipe(|client| {
270                if let Some(limit) = self.max_decoding_message_size {
271                    client.max_decoding_message_size(limit)
272                } else {
273                    client
274                }
275            })
276    }
277
278    pub fn signature_verification_client(
279        &mut self,
280    ) -> SignatureVerificationServiceClient<BoxedChannel> {
281        SignatureVerificationServiceClient::new(self.channel())
282            .accept_compressed(CompressionEncoding::Zstd)
283            .pipe(|client| {
284                if let Some(limit) = self.max_decoding_message_size {
285                    client.max_decoding_message_size(limit)
286                } else {
287                    client
288                }
289            })
290    }
291
292    pub fn subscription_client(&mut self) -> SubscriptionServiceClient<BoxedChannel> {
293        SubscriptionServiceClient::new(self.channel())
294            .accept_compressed(CompressionEncoding::Zstd)
295            .pipe(|client| {
296                if let Some(limit) = self.max_decoding_message_size {
297                    client.max_decoding_message_size(limit)
298                } else {
299                    client
300                }
301            })
302    }
303}