sui_rpc/client/
transaction_execution.rs1use super::Client;
2use crate::field::FieldMaskUtil;
3use crate::proto::TryFromProtoError;
4use crate::proto::sui::rpc::v2::ExecuteTransactionRequest;
5use crate::proto::sui::rpc::v2::ExecuteTransactionResponse;
6use crate::proto::sui::rpc::v2::ExecutionError;
7use crate::proto::sui::rpc::v2::GetEpochRequest;
8use crate::proto::sui::rpc::v2::GetTransactionRequest;
9use crate::proto::sui::rpc::v2::SubscribeCheckpointsRequest;
10use futures::TryStreamExt;
11use prost_types::FieldMask;
12use std::fmt;
13use std::time::Duration;
14use tonic::Response;
15
16#[derive(Debug)]
18#[non_exhaustive]
19pub enum ExecuteAndWaitError {
20 RpcError(tonic::Status),
22 MissingTransaction,
24 ProtoConversionError(TryFromProtoError),
26 CheckpointTimeout(Response<ExecuteTransactionResponse>),
28 CheckpointStreamError {
30 response: Response<ExecuteTransactionResponse>,
31 error: tonic::Status,
32 },
33}
34
35impl std::fmt::Display for ExecuteAndWaitError {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 match self {
38 Self::RpcError(status) => write!(f, "RPC error: {status}"),
39 Self::MissingTransaction => {
40 write!(f, "Request is missing the required transaction field")
41 }
42 Self::ProtoConversionError(e) => write!(f, "Failed to convert transaction: {e}"),
43 Self::CheckpointTimeout(_) => {
44 write!(f, "Transaction executed but checkpoint wait timed out")
45 }
46 Self::CheckpointStreamError { error, .. } => {
47 write!(
48 f,
49 "Transaction executed but checkpoint stream had an error: {error}"
50 )
51 }
52 }
53 }
54}
55
56impl std::error::Error for ExecuteAndWaitError {
57 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
58 match self {
59 Self::RpcError(status) => Some(status),
60 Self::ProtoConversionError(e) => Some(e),
61 Self::CheckpointStreamError { error, .. } => Some(error),
62 Self::MissingTransaction => None,
63 Self::CheckpointTimeout(_) => None,
64 }
65 }
66}
67
68impl Client {
69 pub async fn execute_transaction_and_wait_for_checkpoint(
84 &mut self,
85 request: impl tonic::IntoRequest<ExecuteTransactionRequest>,
86 timeout: Duration,
87 ) -> Result<Response<ExecuteTransactionResponse>, ExecuteAndWaitError> {
88 let mut checkpoint_stream = match self
93 .subscription_client()
94 .subscribe_checkpoints(SubscribeCheckpointsRequest::default().with_read_mask(
95 FieldMask::from_str("transactions.digest,sequence_number,summary.timestamp"),
96 ))
97 .await
98 {
99 Ok(stream) => stream.into_inner(),
100 Err(e) => return Err(ExecuteAndWaitError::RpcError(e)),
101 };
102
103 let request = request.into_request();
105 let transaction = match request.get_ref().transaction_opt() {
106 Some(tx) => tx,
107 None => return Err(ExecuteAndWaitError::MissingTransaction),
108 };
109
110 let executed_txn_digest = match sui_sdk_types::Transaction::try_from(transaction) {
111 Ok(tx) => tx.digest().to_string(),
112 Err(e) => return Err(ExecuteAndWaitError::ProtoConversionError(e)),
113 };
114
115 let mut response = match self.execution_client().execute_transaction(request).await {
116 Ok(resp) => resp,
117 Err(e) => return Err(ExecuteAndWaitError::RpcError(e)),
118 };
119
120 if let Ok(resp) = self
123 .ledger_client()
124 .get_transaction(
125 GetTransactionRequest::default()
126 .with_digest(&executed_txn_digest)
127 .with_read_mask(FieldMask::from_str("digest,checkpoint,timestamp")),
128 )
129 .await
130 && resp.get_ref().transaction().checkpoint_opt().is_some()
131 {
132 let checkpoint = resp.get_ref().transaction().checkpoint();
133 let timestamp = resp.get_ref().transaction().timestamp;
134 response
135 .get_mut()
136 .transaction_mut()
137 .set_checkpoint(checkpoint);
138 response.get_mut().transaction_mut().timestamp = timestamp;
139 return Ok(response);
140 }
141
142 let timeout_future = tokio::time::sleep(timeout);
145 let checkpoint_future = async {
146 while let Some(response) = checkpoint_stream.try_next().await? {
147 let checkpoint = response.checkpoint();
148
149 for tx in checkpoint.transactions() {
150 let digest = tx.digest();
151
152 if digest == executed_txn_digest {
153 return Ok((checkpoint.sequence_number(), checkpoint.summary().timestamp));
154 }
155 }
156 }
157 Err(tonic::Status::aborted(
158 "checkpoint stream ended unexpectedly",
159 ))
160 };
161
162 tokio::select! {
163 result = checkpoint_future => {
164 match result {
165 Ok((checkpoint, timestamp)) => {
166 response
167 .get_mut()
168 .transaction_mut()
169 .set_checkpoint(checkpoint);
170 response.get_mut().transaction_mut().timestamp = timestamp;
171 Ok(response)
172 }
173 Err(e) => Err(ExecuteAndWaitError::CheckpointStreamError { response, error: e })
174 }
175 },
176 _ = timeout_future => {
177 Err(ExecuteAndWaitError::CheckpointTimeout ( response))
178 }
179 }
180 }
181
182 pub async fn get_reference_gas_price(&mut self) -> Result<u64, tonic::Status> {
190 let request = GetEpochRequest::latest()
191 .with_read_mask(FieldMask::from_paths(["reference_gas_price"]));
192 let response = self.ledger_client().get_epoch(request).await?.into_inner();
193 Ok(response.epoch().reference_gas_price())
194 }
195}
196
197impl fmt::Display for ExecutionError {
198 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
199 let description = self.description.as_deref().unwrap_or("No description");
200 write!(
201 f,
202 "ExecutionError: Kind: {}, Description: {}",
203 self.kind().as_str_name(),
204 description
205 )
206 }
207}