sui_rpc/client/
transaction_execution.rs1use super::Client;
2use crate::field::FieldMaskUtil;
3use crate::proto::TryFromProtoError;
4use crate::proto::sui::rpc::v2::ExecuteTransactionRequest;
5use crate::proto::sui::rpc::v2::ExecuteTransactionResponse;
6use crate::proto::sui::rpc::v2::ExecutionError;
7use crate::proto::sui::rpc::v2::GetEpochRequest;
8use crate::proto::sui::rpc::v2::SubscribeCheckpointsRequest;
9use futures::TryStreamExt;
10use prost_types::FieldMask;
11use std::fmt;
12use std::time::Duration;
13use tonic::Response;
14
15#[derive(Debug)]
17#[non_exhaustive]
18pub enum ExecuteAndWaitError {
19 RpcError(tonic::Status),
21 MissingTransaction,
23 ProtoConversionError(TryFromProtoError),
25 CheckpointTimeout(Response<ExecuteTransactionResponse>),
27 CheckpointStreamError {
29 response: Response<ExecuteTransactionResponse>,
30 error: tonic::Status,
31 },
32}
33
34impl std::fmt::Display for ExecuteAndWaitError {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 match self {
37 Self::RpcError(status) => write!(f, "RPC error: {status}"),
38 Self::MissingTransaction => {
39 write!(f, "Request is missing the required transaction field")
40 }
41 Self::ProtoConversionError(e) => write!(f, "Failed to convert transaction: {e}"),
42 Self::CheckpointTimeout(_) => {
43 write!(f, "Transaction executed but checkpoint wait timed out")
44 }
45 Self::CheckpointStreamError { error, .. } => {
46 write!(
47 f,
48 "Transaction executed but checkpoint stream had an error: {error}"
49 )
50 }
51 }
52 }
53}
54
55impl std::error::Error for ExecuteAndWaitError {
56 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
57 match self {
58 Self::RpcError(status) => Some(status),
59 Self::ProtoConversionError(e) => Some(e),
60 Self::CheckpointStreamError { error, .. } => Some(error),
61 Self::MissingTransaction => None,
62 Self::CheckpointTimeout(_) => None,
63 }
64 }
65}
66
67impl Client {
68 pub async fn execute_transaction_and_wait_for_checkpoint(
83 &mut self,
84 request: impl tonic::IntoRequest<ExecuteTransactionRequest>,
85 timeout: Duration,
86 ) -> Result<Response<ExecuteTransactionResponse>, ExecuteAndWaitError> {
87 let mut checkpoint_stream = match self
92 .subscription_client()
93 .subscribe_checkpoints(
94 SubscribeCheckpointsRequest::default()
95 .with_read_mask(FieldMask::from_str("transactions.digest,sequence_number")),
96 )
97 .await
98 {
99 Ok(stream) => stream.into_inner(),
100 Err(e) => return Err(ExecuteAndWaitError::RpcError(e)),
101 };
102
103 let request = request.into_request();
105 let transaction = match request.get_ref().transaction_opt() {
106 Some(tx) => tx,
107 None => return Err(ExecuteAndWaitError::MissingTransaction),
108 };
109
110 let executed_txn_digest = match sui_sdk_types::Transaction::try_from(transaction) {
111 Ok(tx) => tx.digest().to_string(),
112 Err(e) => return Err(ExecuteAndWaitError::ProtoConversionError(e)),
113 };
114
115 let response = match self.execution_client().execute_transaction(request).await {
116 Ok(resp) => resp,
117 Err(e) => return Err(ExecuteAndWaitError::RpcError(e)),
118 };
119
120 let timeout_future = tokio::time::sleep(timeout);
123 let checkpoint_future = async {
124 while let Some(response) = checkpoint_stream.try_next().await? {
125 let checkpoint = response.checkpoint();
126
127 for tx in checkpoint.transactions() {
128 let digest = tx.digest();
129
130 if digest == executed_txn_digest {
131 return Ok(());
132 }
133 }
134 }
135 Err(tonic::Status::aborted(
136 "checkpoint stream ended unexpectedly",
137 ))
138 };
139
140 tokio::select! {
141 result = checkpoint_future => {
142 match result {
143 Ok(()) => Ok(response),
144 Err(e) => Err(ExecuteAndWaitError::CheckpointStreamError { response, error: e })
145 }
146 },
147 _ = timeout_future => {
148 Err(ExecuteAndWaitError::CheckpointTimeout ( response))
149 }
150 }
151 }
152
153 pub async fn get_reference_gas_price(&mut self) -> Result<u64, tonic::Status> {
161 let request = GetEpochRequest::latest()
162 .with_read_mask(FieldMask::from_paths(["reference_gas_price"]));
163 let response = self.ledger_client().get_epoch(request).await?.into_inner();
164 Ok(response.epoch().reference_gas_price())
165 }
166}
167
168impl fmt::Display for ExecutionError {
169 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
170 let description = self.description.as_deref().unwrap_or("No description");
171 write!(
172 f,
173 "ExecutionError: Kind: {}, Description: {}",
174 self.kind().as_str_name(),
175 description
176 )
177 }
178}