sui_rpc/client/
mod.rs

1use futures::TryStreamExt;
2use std::time::Duration;
3use tap::Pipe;
4use tonic::codec::CompressionEncoding;
5use tonic::transport::channel::ClientTlsConfig;
6
7mod response_ext;
8pub use response_ext::ResponseExt;
9
10mod auth;
11pub use auth::AuthInterceptor;
12
13mod staking_rewards;
14pub use staking_rewards::DelegatedStake;
15
16pub mod v2;
17
18use crate::field::FieldMaskUtil;
19use crate::proto::sui::rpc::v2beta2::ledger_service_client::LedgerServiceClient;
20use crate::proto::sui::rpc::v2beta2::live_data_service_client::LiveDataServiceClient;
21use crate::proto::sui::rpc::v2beta2::move_package_service_client::MovePackageServiceClient;
22use crate::proto::sui::rpc::v2beta2::signature_verification_service_client::SignatureVerificationServiceClient;
23use crate::proto::sui::rpc::v2beta2::subscription_service_client::SubscriptionServiceClient;
24use crate::proto::sui::rpc::v2beta2::transaction_execution_service_client::TransactionExecutionServiceClient;
25use crate::proto::sui::rpc::v2beta2::ExecuteTransactionRequest;
26use crate::proto::sui::rpc::v2beta2::ExecutedTransaction;
27use crate::proto::sui::rpc::v2beta2::SubscribeCheckpointsRequest;
28use prost_types::FieldMask;
29
30type Result<T, E = tonic::Status> = std::result::Result<T, E>;
31type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
32type Channel<'a> = tonic::service::interceptor::InterceptedService<
33    &'a mut tonic::transport::Channel,
34    &'a mut AuthInterceptor,
35>;
36
37#[derive(Clone)]
38pub struct Client {
39    uri: http::Uri,
40    channel: tonic::transport::Channel,
41    auth: AuthInterceptor,
42    max_decoding_message_size: Option<usize>,
43}
44
45impl Client {
46    /// URL for the public-good, Sui Foundation provided fullnodes for mainnet.
47    pub const MAINNET_FULLNODE: &str = "https://fullnode.mainnet.sui.io";
48
49    /// URL for the public-good, Sui Foundation provided fullnodes for testnet.
50    pub const TESTNET_FULLNODE: &str = "https://fullnode.testnet.sui.io";
51
52    /// URL for the public-good, Sui Foundation provided fullnodes for devnet.
53    pub const DEVNET_FULLNODE: &str = "https://fullnode.devnet.sui.io";
54
55    /// URL for the public-good, Sui Foundation provided archive for mainnet.
56    pub const MAINNET_ARCHIVE: &str = "https://archive.mainnet.sui.io";
57
58    /// URL for the public-good, Sui Foundation provided archive for testnet.
59    pub const TESTNET_ARCHIVE: &str = "https://archive.testnet.sui.io";
60
61    #[allow(clippy::result_large_err)]
62    pub fn new<T>(uri: T) -> Result<Self>
63    where
64        T: TryInto<http::Uri>,
65        T::Error: Into<BoxError>,
66    {
67        let uri = uri
68            .try_into()
69            .map_err(Into::into)
70            .map_err(tonic::Status::from_error)?;
71        let mut endpoint = tonic::transport::Endpoint::from(uri.clone());
72        if uri.scheme() == Some(&http::uri::Scheme::HTTPS) {
73            endpoint = endpoint
74                .tls_config(ClientTlsConfig::new().with_enabled_roots())
75                .map_err(Into::into)
76                .map_err(tonic::Status::from_error)?;
77        }
78        let channel = endpoint
79            .connect_timeout(Duration::from_secs(5))
80            .http2_keep_alive_interval(Duration::from_secs(5))
81            .connect_lazy();
82
83        Ok(Self {
84            uri,
85            channel,
86            auth: Default::default(),
87            max_decoding_message_size: None,
88        })
89    }
90
91    pub fn with_auth(mut self, auth: AuthInterceptor) -> Self {
92        self.auth = auth;
93        self
94    }
95
96    pub fn with_max_decoding_message_size(mut self, limit: usize) -> Self {
97        self.max_decoding_message_size = Some(limit);
98        self
99    }
100
101    pub fn uri(&self) -> &http::Uri {
102        &self.uri
103    }
104
105    pub fn ledger_client(&mut self) -> LedgerServiceClient<Channel<'_>> {
106        LedgerServiceClient::with_interceptor(&mut self.channel, &mut self.auth)
107            .accept_compressed(CompressionEncoding::Zstd)
108            .pipe(|client| {
109                if let Some(limit) = self.max_decoding_message_size {
110                    client.max_decoding_message_size(limit)
111                } else {
112                    client
113                }
114            })
115    }
116
117    pub fn live_data_client(&mut self) -> LiveDataServiceClient<Channel<'_>> {
118        LiveDataServiceClient::with_interceptor(&mut self.channel, &mut self.auth)
119            .accept_compressed(CompressionEncoding::Zstd)
120            .pipe(|client| {
121                if let Some(limit) = self.max_decoding_message_size {
122                    client.max_decoding_message_size(limit)
123                } else {
124                    client
125                }
126            })
127    }
128
129    pub fn execution_client(&mut self) -> TransactionExecutionServiceClient<Channel<'_>> {
130        TransactionExecutionServiceClient::with_interceptor(&mut self.channel, &mut self.auth)
131            .accept_compressed(CompressionEncoding::Zstd)
132            .pipe(|client| {
133                if let Some(limit) = self.max_decoding_message_size {
134                    client.max_decoding_message_size(limit)
135                } else {
136                    client
137                }
138            })
139    }
140
141    pub fn package_client(&mut self) -> MovePackageServiceClient<Channel<'_>> {
142        MovePackageServiceClient::with_interceptor(&mut self.channel, &mut self.auth)
143            .accept_compressed(CompressionEncoding::Zstd)
144            .pipe(|client| {
145                if let Some(limit) = self.max_decoding_message_size {
146                    client.max_decoding_message_size(limit)
147                } else {
148                    client
149                }
150            })
151    }
152
153    pub fn signature_verification_client(
154        &mut self,
155    ) -> SignatureVerificationServiceClient<Channel<'_>> {
156        SignatureVerificationServiceClient::with_interceptor(&mut self.channel, &mut self.auth)
157            .accept_compressed(CompressionEncoding::Zstd)
158            .pipe(|client| {
159                if let Some(limit) = self.max_decoding_message_size {
160                    client.max_decoding_message_size(limit)
161                } else {
162                    client
163                }
164            })
165    }
166
167    pub fn subscription_client(&mut self) -> SubscriptionServiceClient<Channel<'_>> {
168        SubscriptionServiceClient::with_interceptor(&mut self.channel, &mut self.auth)
169            .accept_compressed(CompressionEncoding::Zstd)
170            .pipe(|client| {
171                if let Some(limit) = self.max_decoding_message_size {
172                    client.max_decoding_message_size(limit)
173                } else {
174                    client
175                }
176            })
177    }
178
179    /// Executes a transaction and waits for it to be included in a checkpoint.
180    ///
181    /// This method provides "read your writes" consistency by executing the transaction
182    /// and waiting for it to appear in a checkpoint, which gauruntees indexes have been updated on
183    /// this node.
184    ///
185    /// # Arguments
186    /// * `request` - The transaction execution request (ExecuteTransactionRequest)
187    /// * `timeout` - Maximum time to wait for indexing confirmation
188    ///
189    /// # Returns
190    /// The executed transaction from the execution response, but only after confirming
191    /// it has been included in a checkpoint and indexes have been updated.
192    pub async fn execute_transaction_and_wait_for_checkpoint(
193        &mut self,
194        request: impl tonic::IntoRequest<ExecuteTransactionRequest>,
195        timeout: Duration,
196    ) -> Result<ExecutedTransaction> {
197        // Subscribe to checkpoint stream before execution to avoid missing the transaction.
198        // Uses minimal read mask for efficiency since we only nee digest confirmation.
199        // Once server-side filtering is available, we should filter by transaction digest to
200        // further reduce bandwidth.
201        let mut checkpoint_stream = self
202            .subscription_client()
203            .subscribe_checkpoints(SubscribeCheckpointsRequest {
204                read_mask: Some(FieldMask::from_str("transactions.digest,sequence_number")),
205            })
206            .await?
207            .into_inner();
208
209        // Calculate digest from the input transaction to avoid relying on response read mask
210        let request = request.into_request();
211        let transaction = request
212            .get_ref()
213            .transaction
214            .as_ref()
215            .ok_or_else(|| tonic::Status::invalid_argument("transaction is required"))?;
216
217        let executed_txn_digest = sui_sdk_types::Transaction::try_from(transaction)
218            .map_err(|e| tonic::Status::internal(format!("failed to convert transaction: {e}")))?
219            .digest()
220            .to_string();
221
222        let executed_transaction = self
223            .execution_client()
224            .execute_transaction(request)
225            .await?
226            .into_inner()
227            .transaction()
228            .to_owned();
229
230        // Wait for the transaction to appear in a checkpoint. At this point indexes have been
231        // updated.
232        let timeout_future = tokio::time::sleep(timeout);
233        let checkpoint_future = async {
234            while let Some(response) = checkpoint_stream.try_next().await? {
235                let checkpoint = response.checkpoint();
236
237                for tx in checkpoint.transactions() {
238                    let digest = tx.digest();
239
240                    if digest == executed_txn_digest {
241                        return Ok(());
242                    }
243                }
244            }
245            Err(tonic::Status::aborted(
246                "checkpoint stream ended unexpectedly",
247            ))
248        };
249
250        tokio::select! {
251            result = checkpoint_future => {
252                result?;
253                Ok(executed_transaction)
254            },
255            _ = timeout_future => Err(tonic::Status::deadline_exceeded(format!("timeout waiting for checkpoint after {timeout:?}"))),
256        }
257    }
258}