sui_rpc/client/
transaction_execution.rs

1use super::Client;
2use crate::field::FieldMaskUtil;
3use crate::proto::TryFromProtoError;
4use crate::proto::sui::rpc::v2::ExecuteTransactionRequest;
5use crate::proto::sui::rpc::v2::ExecuteTransactionResponse;
6use crate::proto::sui::rpc::v2::ExecutionError;
7use crate::proto::sui::rpc::v2::GetEpochRequest;
8use crate::proto::sui::rpc::v2::GetTransactionRequest;
9use crate::proto::sui::rpc::v2::SubscribeCheckpointsRequest;
10use futures::TryStreamExt;
11use prost_types::FieldMask;
12use std::fmt;
13use std::time::Duration;
14use tonic::Response;
15
16/// Error types that can occur when executing a transaction and waiting for checkpoint
17#[derive(Debug)]
18#[non_exhaustive]
19pub enum ExecuteAndWaitError {
20    /// RPC Error (actual tonic::Status from the client/server)
21    RpcError(tonic::Status),
22    /// Request is missing the required transaction field
23    MissingTransaction,
24    /// Failed to parse/convert the transaction for digest calculation
25    ProtoConversionError(TryFromProtoError),
26    /// Transaction executed but checkpoint wait timed out
27    CheckpointTimeout(Response<ExecuteTransactionResponse>),
28    /// Transaction executed but checkpoint stream had an error
29    CheckpointStreamError {
30        response: Response<ExecuteTransactionResponse>,
31        error: tonic::Status,
32    },
33}
34
35impl std::fmt::Display for ExecuteAndWaitError {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        match self {
38            Self::RpcError(status) => write!(f, "RPC error: {status}"),
39            Self::MissingTransaction => {
40                write!(f, "Request is missing the required transaction field")
41            }
42            Self::ProtoConversionError(e) => write!(f, "Failed to convert transaction: {e}"),
43            Self::CheckpointTimeout(_) => {
44                write!(f, "Transaction executed but checkpoint wait timed out")
45            }
46            Self::CheckpointStreamError { error, .. } => {
47                write!(
48                    f,
49                    "Transaction executed but checkpoint stream had an error: {error}"
50                )
51            }
52        }
53    }
54}
55
56impl std::error::Error for ExecuteAndWaitError {
57    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
58        match self {
59            Self::RpcError(status) => Some(status),
60            Self::ProtoConversionError(e) => Some(e),
61            Self::CheckpointStreamError { error, .. } => Some(error),
62            Self::MissingTransaction => None,
63            Self::CheckpointTimeout(_) => None,
64        }
65    }
66}
67
68impl Client {
69    /// Executes a transaction and waits for it to be included in a checkpoint.
70    ///
71    /// This method provides "read your writes" consistency by executing the transaction
72    /// and waiting for it to appear in a checkpoint, which gauruntees indexes have been updated on
73    /// this node.
74    ///
75    /// # Arguments
76    /// * `request` - The transaction execution request (ExecuteTransactionRequest)
77    /// * `timeout` - Maximum time to wait for indexing confirmation
78    ///
79    /// # Returns
80    /// A `Result` containing the response if the transaction was executed and checkpoint confirmed,
81    /// or an error that may still include the response if execution succeeded but checkpoint
82    /// confirmation failed.
83    pub async fn execute_transaction_and_wait_for_checkpoint(
84        &mut self,
85        request: impl tonic::IntoRequest<ExecuteTransactionRequest>,
86        timeout: Duration,
87    ) -> Result<Response<ExecuteTransactionResponse>, ExecuteAndWaitError> {
88        // Subscribe to checkpoint stream before execution to avoid missing the transaction.
89        // Uses minimal read mask for efficiency since we only nee digest confirmation.
90        // Once server-side filtering is available, we should filter by transaction digest to
91        // further reduce bandwidth.
92        let mut checkpoint_stream = match self
93            .subscription_client()
94            .subscribe_checkpoints(SubscribeCheckpointsRequest::default().with_read_mask(
95                FieldMask::from_str("transactions.digest,sequence_number,summary.timestamp"),
96            ))
97            .await
98        {
99            Ok(stream) => stream.into_inner(),
100            Err(e) => return Err(ExecuteAndWaitError::RpcError(e)),
101        };
102
103        // Calculate digest from the input transaction to avoid relying on response read mask
104        let request = request.into_request();
105        let transaction = match request.get_ref().transaction_opt() {
106            Some(tx) => tx,
107            None => return Err(ExecuteAndWaitError::MissingTransaction),
108        };
109
110        let executed_txn_digest = match sui_sdk_types::Transaction::try_from(transaction) {
111            Ok(tx) => tx.digest().to_string(),
112            Err(e) => return Err(ExecuteAndWaitError::ProtoConversionError(e)),
113        };
114
115        let mut response = match self.execution_client().execute_transaction(request).await {
116            Ok(resp) => resp,
117            Err(e) => return Err(ExecuteAndWaitError::RpcError(e)),
118        };
119
120        // First query the fullnode directly to see if it already has the txn. This is to handle
121        // the case where an already executed transaction is sent multiple times
122        if let Ok(resp) = self
123            .ledger_client()
124            .get_transaction(
125                GetTransactionRequest::default()
126                    .with_digest(&executed_txn_digest)
127                    .with_read_mask(FieldMask::from_str("digest,checkpoint,timestamp")),
128            )
129            .await
130            && resp.get_ref().transaction().checkpoint_opt().is_some()
131        {
132            let checkpoint = resp.get_ref().transaction().checkpoint();
133            let timestamp = resp.get_ref().transaction().timestamp;
134            response
135                .get_mut()
136                .transaction_mut()
137                .set_checkpoint(checkpoint);
138            response.get_mut().transaction_mut().timestamp = timestamp;
139            return Ok(response);
140        }
141
142        // Wait for the transaction to appear in a checkpoint, at which point indexes will have been
143        // updated.
144        let timeout_future = tokio::time::sleep(timeout);
145        let checkpoint_future = async {
146            while let Some(response) = checkpoint_stream.try_next().await? {
147                let checkpoint = response.checkpoint();
148
149                for tx in checkpoint.transactions() {
150                    let digest = tx.digest();
151
152                    if digest == executed_txn_digest {
153                        return Ok((checkpoint.sequence_number(), checkpoint.summary().timestamp));
154                    }
155                }
156            }
157            Err(tonic::Status::aborted(
158                "checkpoint stream ended unexpectedly",
159            ))
160        };
161
162        tokio::select! {
163            result = checkpoint_future => {
164                match result {
165                    Ok((checkpoint, timestamp)) => {
166                        response
167                            .get_mut()
168                            .transaction_mut()
169                            .set_checkpoint(checkpoint);
170                        response.get_mut().transaction_mut().timestamp = timestamp;
171                        Ok(response)
172                    }
173                    Err(e) => Err(ExecuteAndWaitError::CheckpointStreamError { response, error: e })
174                }
175            },
176            _ = timeout_future => {
177                Err(ExecuteAndWaitError::CheckpointTimeout ( response))
178            }
179        }
180    }
181
182    /// Retrieves the current reference gas price from the latest epoch information.
183    ///
184    /// # Returns
185    /// The reference gas price as a `u64`
186    ///
187    /// # Errors
188    /// Returns an error if there is an RPC error when fetching the epoch information
189    pub async fn get_reference_gas_price(&mut self) -> Result<u64, tonic::Status> {
190        let request = GetEpochRequest::latest()
191            .with_read_mask(FieldMask::from_paths(["reference_gas_price"]));
192        let response = self.ledger_client().get_epoch(request).await?.into_inner();
193        Ok(response.epoch().reference_gas_price())
194    }
195}
196
197impl fmt::Display for ExecutionError {
198    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
199        let description = self.description.as_deref().unwrap_or("No description");
200        write!(
201            f,
202            "ExecutionError: Kind: {}, Description: {}",
203            self.kind().as_str_name(),
204            description
205        )
206    }
207}