sui_rpc/client/
transaction_execution.rs

1use super::Client;
2use crate::field::FieldMaskUtil;
3use crate::proto::TryFromProtoError;
4use crate::proto::sui::rpc::v2::ExecuteTransactionRequest;
5use crate::proto::sui::rpc::v2::ExecuteTransactionResponse;
6use crate::proto::sui::rpc::v2::ExecutionError;
7use crate::proto::sui::rpc::v2::GetEpochRequest;
8use crate::proto::sui::rpc::v2::SubscribeCheckpointsRequest;
9use futures::TryStreamExt;
10use prost_types::FieldMask;
11use std::fmt;
12use std::time::Duration;
13use tonic::Response;
14
15/// Error types that can occur when executing a transaction and waiting for checkpoint
16#[derive(Debug)]
17#[non_exhaustive]
18pub enum ExecuteAndWaitError {
19    /// RPC Error (actual tonic::Status from the client/server)
20    RpcError(tonic::Status),
21    /// Request is missing the required transaction field
22    MissingTransaction,
23    /// Failed to parse/convert the transaction for digest calculation
24    ProtoConversionError(TryFromProtoError),
25    /// Transaction executed but checkpoint wait timed out
26    CheckpointTimeout(Response<ExecuteTransactionResponse>),
27    /// Transaction executed but checkpoint stream had an error
28    CheckpointStreamError {
29        response: Response<ExecuteTransactionResponse>,
30        error: tonic::Status,
31    },
32}
33
34impl std::fmt::Display for ExecuteAndWaitError {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        match self {
37            Self::RpcError(status) => write!(f, "RPC error: {status}"),
38            Self::MissingTransaction => {
39                write!(f, "Request is missing the required transaction field")
40            }
41            Self::ProtoConversionError(e) => write!(f, "Failed to convert transaction: {e}"),
42            Self::CheckpointTimeout(_) => {
43                write!(f, "Transaction executed but checkpoint wait timed out")
44            }
45            Self::CheckpointStreamError { error, .. } => {
46                write!(
47                    f,
48                    "Transaction executed but checkpoint stream had an error: {error}"
49                )
50            }
51        }
52    }
53}
54
55impl std::error::Error for ExecuteAndWaitError {
56    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
57        match self {
58            Self::RpcError(status) => Some(status),
59            Self::ProtoConversionError(e) => Some(e),
60            Self::CheckpointStreamError { error, .. } => Some(error),
61            Self::MissingTransaction => None,
62            Self::CheckpointTimeout(_) => None,
63        }
64    }
65}
66
67impl Client {
68    /// Executes a transaction and waits for it to be included in a checkpoint.
69    ///
70    /// This method provides "read your writes" consistency by executing the transaction
71    /// and waiting for it to appear in a checkpoint, which gauruntees indexes have been updated on
72    /// this node.
73    ///
74    /// # Arguments
75    /// * `request` - The transaction execution request (ExecuteTransactionRequest)
76    /// * `timeout` - Maximum time to wait for indexing confirmation
77    ///
78    /// # Returns
79    /// A `Result` containing the response if the transaction was executed and checkpoint confirmed,
80    /// or an error that may still include the response if execution succeeded but checkpoint
81    /// confirmation failed.
82    pub async fn execute_transaction_and_wait_for_checkpoint(
83        &mut self,
84        request: impl tonic::IntoRequest<ExecuteTransactionRequest>,
85        timeout: Duration,
86    ) -> Result<Response<ExecuteTransactionResponse>, ExecuteAndWaitError> {
87        // Subscribe to checkpoint stream before execution to avoid missing the transaction.
88        // Uses minimal read mask for efficiency since we only nee digest confirmation.
89        // Once server-side filtering is available, we should filter by transaction digest to
90        // further reduce bandwidth.
91        let mut checkpoint_stream = match self
92            .subscription_client()
93            .subscribe_checkpoints(
94                SubscribeCheckpointsRequest::default()
95                    .with_read_mask(FieldMask::from_str("transactions.digest,sequence_number")),
96            )
97            .await
98        {
99            Ok(stream) => stream.into_inner(),
100            Err(e) => return Err(ExecuteAndWaitError::RpcError(e)),
101        };
102
103        // Calculate digest from the input transaction to avoid relying on response read mask
104        let request = request.into_request();
105        let transaction = match request.get_ref().transaction_opt() {
106            Some(tx) => tx,
107            None => return Err(ExecuteAndWaitError::MissingTransaction),
108        };
109
110        let executed_txn_digest = match sui_sdk_types::Transaction::try_from(transaction) {
111            Ok(tx) => tx.digest().to_string(),
112            Err(e) => return Err(ExecuteAndWaitError::ProtoConversionError(e)),
113        };
114
115        let response = match self.execution_client().execute_transaction(request).await {
116            Ok(resp) => resp,
117            Err(e) => return Err(ExecuteAndWaitError::RpcError(e)),
118        };
119
120        // Wait for the transaction to appear in a checkpoint, at which point indexes will have been
121        // updated.
122        let timeout_future = tokio::time::sleep(timeout);
123        let checkpoint_future = async {
124            while let Some(response) = checkpoint_stream.try_next().await? {
125                let checkpoint = response.checkpoint();
126
127                for tx in checkpoint.transactions() {
128                    let digest = tx.digest();
129
130                    if digest == executed_txn_digest {
131                        return Ok(());
132                    }
133                }
134            }
135            Err(tonic::Status::aborted(
136                "checkpoint stream ended unexpectedly",
137            ))
138        };
139
140        tokio::select! {
141            result = checkpoint_future => {
142                match result {
143                    Ok(()) => Ok(response),
144                    Err(e) => Err(ExecuteAndWaitError::CheckpointStreamError { response, error: e })
145                }
146            },
147            _ = timeout_future => {
148                Err(ExecuteAndWaitError::CheckpointTimeout ( response))
149            }
150        }
151    }
152
153    /// Retrieves the current reference gas price from the latest epoch information.
154    ///
155    /// # Returns
156    /// The reference gas price as a `u64`
157    ///
158    /// # Errors
159    /// Returns an error if there is an RPC error when fetching the epoch information
160    pub async fn get_reference_gas_price(&mut self) -> Result<u64, tonic::Status> {
161        let request = GetEpochRequest::latest()
162            .with_read_mask(FieldMask::from_paths(["reference_gas_price"]));
163        let response = self.ledger_client().get_epoch(request).await?.into_inner();
164        Ok(response.epoch().reference_gas_price())
165    }
166}
167
168impl fmt::Display for ExecutionError {
169    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
170        let description = self.description.as_deref().unwrap_or("No description");
171        write!(
172            f,
173            "ExecutionError: Kind: {}, Description: {}",
174            self.kind().as_str_name(),
175            description
176        )
177    }
178}