sui_indexer/models/
transactions.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use diesel::prelude::*;
7
8use move_core_types::annotated_value::{MoveDatatypeLayout, MoveTypeLayout};
9use move_core_types::language_storage::TypeTag;
10use sui_json_rpc_types::{
11    BalanceChange, ObjectChange, SuiEvent, SuiTransactionBlock, SuiTransactionBlockEffects,
12    SuiTransactionBlockEvents, SuiTransactionBlockResponse, SuiTransactionBlockResponseOptions,
13};
14use sui_package_resolver::{PackageStore, Resolver};
15use sui_types::digests::TransactionDigest;
16use sui_types::effects::TransactionEffects;
17use sui_types::effects::TransactionEvents;
18use sui_types::event::Event;
19use sui_types::transaction::SenderSignedData;
20
21use crate::errors::IndexerError;
22use crate::schema::transactions;
23use crate::types::IndexedObjectChange;
24use crate::types::IndexedTransaction;
25use crate::types::IndexerResult;
26
27#[derive(Clone, Debug, Queryable, Insertable, QueryableByName, Selectable)]
28#[diesel(table_name = transactions)]
29pub struct StoredTransaction {
30    pub tx_sequence_number: i64,
31    pub transaction_digest: Vec<u8>,
32    pub raw_transaction: Vec<u8>,
33    pub raw_effects: Vec<u8>,
34    pub checkpoint_sequence_number: i64,
35    pub timestamp_ms: i64,
36    pub object_changes: Vec<Option<Vec<u8>>>,
37    pub balance_changes: Vec<Option<Vec<u8>>>,
38    pub events: Vec<Option<Vec<u8>>>,
39    pub transaction_kind: i16,
40    pub success_command_count: i16,
41}
42
43pub type StoredTransactionEvents = Vec<Option<Vec<u8>>>;
44
45#[derive(Debug, Queryable)]
46pub struct TxSeq {
47    pub seq: i64,
48}
49
50impl Default for TxSeq {
51    fn default() -> Self {
52        Self { seq: -1 }
53    }
54}
55
56#[derive(Clone, Debug, Queryable)]
57pub struct StoredTransactionTimestamp {
58    pub tx_sequence_number: i64,
59    pub timestamp_ms: i64,
60}
61
62#[derive(Clone, Debug, Queryable)]
63pub struct StoredTransactionCheckpoint {
64    pub tx_sequence_number: i64,
65    pub checkpoint_sequence_number: i64,
66}
67
68#[derive(Clone, Debug, Queryable)]
69pub struct StoredTransactionSuccessCommandCount {
70    pub tx_sequence_number: i64,
71    pub checkpoint_sequence_number: i64,
72    pub success_command_count: i16,
73    pub timestamp_ms: i64,
74}
75
76impl From<&IndexedTransaction> for StoredTransaction {
77    fn from(tx: &IndexedTransaction) -> Self {
78        StoredTransaction {
79            tx_sequence_number: tx.tx_sequence_number as i64,
80            transaction_digest: tx.tx_digest.into_inner().to_vec(),
81            raw_transaction: bcs::to_bytes(&tx.sender_signed_data).unwrap(),
82            raw_effects: bcs::to_bytes(&tx.effects).unwrap(),
83            checkpoint_sequence_number: tx.checkpoint_sequence_number as i64,
84            object_changes: tx
85                .object_changes
86                .iter()
87                .map(|oc| Some(bcs::to_bytes(&oc).unwrap()))
88                .collect(),
89            balance_changes: tx
90                .balance_change
91                .iter()
92                .map(|bc| Some(bcs::to_bytes(&bc).unwrap()))
93                .collect(),
94            events: tx
95                .events
96                .iter()
97                .map(|e| Some(bcs::to_bytes(&e).unwrap()))
98                .collect(),
99            timestamp_ms: tx.timestamp_ms as i64,
100            transaction_kind: tx.transaction_kind.clone() as i16,
101            success_command_count: tx.successful_tx_num as i16,
102        }
103    }
104}
105
106impl StoredTransaction {
107    pub fn get_balance_len(&self) -> usize {
108        self.balance_changes.len()
109    }
110
111    pub fn get_balance_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
112        self.balance_changes.get(idx).cloned().flatten()
113    }
114
115    pub fn get_object_len(&self) -> usize {
116        self.object_changes.len()
117    }
118
119    pub fn get_object_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
120        self.object_changes.get(idx).cloned().flatten()
121    }
122
123    pub fn get_event_len(&self) -> usize {
124        self.events.len()
125    }
126
127    pub fn get_event_at_idx(&self, idx: usize) -> Option<Vec<u8>> {
128        self.events.get(idx).cloned().flatten()
129    }
130
131    pub async fn try_into_sui_transaction_block_response(
132        self,
133        options: SuiTransactionBlockResponseOptions,
134        package_resolver: Arc<Resolver<impl PackageStore>>,
135    ) -> IndexerResult<SuiTransactionBlockResponse> {
136        let options = options.clone();
137        let tx_digest =
138            TransactionDigest::try_from(self.transaction_digest.as_slice()).map_err(|e| {
139                IndexerError::PersistentStorageDataCorruptionError(format!(
140                    "Can't convert {:?} as tx_digest. Error: {e}",
141                    self.transaction_digest
142                ))
143            })?;
144
145        let transaction = if options.show_input {
146            let sender_signed_data = self.try_into_sender_signed_data()?;
147            let tx_block = SuiTransactionBlock::try_from_with_package_resolver(
148                sender_signed_data,
149                &package_resolver,
150            )
151            .await?;
152            Some(tx_block)
153        } else {
154            None
155        };
156
157        let effects = if options.show_effects {
158            let effects = self.try_into_sui_transaction_effects()?;
159            Some(effects)
160        } else {
161            None
162        };
163
164        let raw_transaction = if options.show_raw_input {
165            self.raw_transaction
166        } else {
167            Vec::new()
168        };
169
170        let events = if options.show_events {
171            let events = {
172                self
173                        .events
174                        .into_iter()
175                        .map(|event| match event {
176                            Some(event) => {
177                                let event: Event = bcs::from_bytes(&event).map_err(|e| {
178                                    IndexerError::PersistentStorageDataCorruptionError(format!(
179                                        "Can't convert event bytes into Event. tx_digest={:?} Error: {e}",
180                                        tx_digest
181                                    ))
182                                })?;
183                                Ok(event)
184                            }
185                            None => Err(IndexerError::PersistentStorageDataCorruptionError(format!(
186                                "Event should not be null, tx_digest={:?}",
187                                tx_digest
188                            ))),
189                        })
190                        .collect::<Result<Vec<Event>, IndexerError>>()?
191            };
192            let timestamp = self.timestamp_ms as u64;
193            let tx_events = TransactionEvents { data: events };
194
195            tx_events_to_sui_tx_events(tx_events, package_resolver, tx_digest, timestamp).await?
196        } else {
197            None
198        };
199
200        let object_changes = if options.show_object_changes {
201            let object_changes = {
202                self.object_changes.into_iter().map(|object_change| {
203                        match object_change {
204                            Some(object_change) => {
205                                let object_change: IndexedObjectChange = bcs::from_bytes(&object_change)
206                                    .map_err(|e| IndexerError::PersistentStorageDataCorruptionError(
207                                        format!("Can't convert object_change bytes into IndexedObjectChange. tx_digest={:?} Error: {e}", tx_digest)
208                                    ))?;
209                                Ok(ObjectChange::from(object_change))
210                            }
211                            None => Err(IndexerError::PersistentStorageDataCorruptionError(format!("object_change should not be null, tx_digest={:?}", tx_digest))),
212                        }
213                    }).collect::<Result<Vec<ObjectChange>, IndexerError>>()?
214            };
215            Some(object_changes)
216        } else {
217            None
218        };
219
220        let balance_changes = if options.show_balance_changes {
221            let balance_changes = {
222                self.balance_changes.into_iter().map(|balance_change| {
223                        match balance_change {
224                            Some(balance_change) => {
225                                let balance_change: BalanceChange = bcs::from_bytes(&balance_change)
226                                    .map_err(|e| IndexerError::PersistentStorageDataCorruptionError(
227                                        format!("Can't convert balance_change bytes into BalanceChange. tx_digest={:?} Error: {e}", tx_digest)
228                                    ))?;
229                                Ok(balance_change)
230                            }
231                            None => Err(IndexerError::PersistentStorageDataCorruptionError(format!("object_change should not be null, tx_digest={:?}", tx_digest))),
232                        }
233                    }).collect::<Result<Vec<BalanceChange>, IndexerError>>()?
234            };
235            Some(balance_changes)
236        } else {
237            None
238        };
239
240        Ok(SuiTransactionBlockResponse {
241            digest: tx_digest,
242            transaction,
243            raw_transaction,
244            effects,
245            events,
246            object_changes,
247            balance_changes,
248            timestamp_ms: Some(self.timestamp_ms as u64),
249            checkpoint: Some(self.checkpoint_sequence_number as u64),
250            confirmed_local_execution: None,
251            errors: vec![],
252            raw_effects: self.raw_effects,
253        })
254    }
255    fn try_into_sender_signed_data(&self) -> IndexerResult<SenderSignedData> {
256        let sender_signed_data: SenderSignedData =
257            bcs::from_bytes(&self.raw_transaction).map_err(|e| {
258                IndexerError::PersistentStorageDataCorruptionError(format!(
259                    "Can't convert raw_transaction of {} into SenderSignedData. Error: {e}",
260                    self.tx_sequence_number
261                ))
262            })?;
263        Ok(sender_signed_data)
264    }
265
266    pub fn try_into_sui_transaction_effects(&self) -> IndexerResult<SuiTransactionBlockEffects> {
267        let effects: TransactionEffects = bcs::from_bytes(&self.raw_effects).map_err(|e| {
268            IndexerError::PersistentStorageDataCorruptionError(format!(
269                "Can't convert raw_effects of {} into TransactionEffects. Error: {e}",
270                self.tx_sequence_number
271            ))
272        })?;
273        let effects = SuiTransactionBlockEffects::try_from(effects)?;
274        Ok(effects)
275    }
276}
277
278pub fn stored_events_to_events(
279    stored_events: StoredTransactionEvents,
280) -> Result<Vec<Event>, IndexerError> {
281    stored_events
282        .into_iter()
283        .map(|event| match event {
284            Some(event) => {
285                let event: Event = bcs::from_bytes(&event).map_err(|e| {
286                    IndexerError::PersistentStorageDataCorruptionError(format!(
287                        "Can't convert event bytes into Event. Error: {e}",
288                    ))
289                })?;
290                Ok(event)
291            }
292            None => Err(IndexerError::PersistentStorageDataCorruptionError(
293                "Event should not be null".to_string(),
294            )),
295        })
296        .collect::<Result<Vec<Event>, IndexerError>>()
297}
298
299pub async fn tx_events_to_sui_tx_events(
300    tx_events: TransactionEvents,
301    package_resolver: Arc<Resolver<impl PackageStore>>,
302    tx_digest: TransactionDigest,
303    timestamp: u64,
304) -> Result<Option<SuiTransactionBlockEvents>, IndexerError> {
305    let mut sui_event_futures = vec![];
306    let tx_events_data_len = tx_events.data.len();
307    for tx_event in tx_events.data.clone() {
308        let package_resolver_clone = package_resolver.clone();
309        sui_event_futures.push(tokio::task::spawn(async move {
310            let resolver = package_resolver_clone;
311            resolver
312                .type_layout(TypeTag::Struct(Box::new(tx_event.type_.clone())))
313                .await
314        }));
315    }
316    let event_move_type_layouts = futures::future::join_all(sui_event_futures)
317        .await
318        .into_iter()
319        .collect::<Result<Vec<_>, _>>()?
320        .into_iter()
321        .collect::<Result<Vec<_>, _>>()
322        .map_err(|e| {
323            IndexerError::ResolveMoveStructError(format!(
324                "Failed to convert to sui event with Error: {e}",
325            ))
326        })?;
327    let event_move_datatype_layouts = event_move_type_layouts
328        .into_iter()
329        .filter_map(|move_type_layout| match move_type_layout {
330            MoveTypeLayout::Struct(s) => Some(MoveDatatypeLayout::Struct(s)),
331            MoveTypeLayout::Enum(e) => Some(MoveDatatypeLayout::Enum(e)),
332            _ => None,
333        })
334        .collect::<Vec<_>>();
335    assert!(tx_events_data_len == event_move_datatype_layouts.len());
336    let sui_events = tx_events
337        .data
338        .into_iter()
339        .enumerate()
340        .zip(event_move_datatype_layouts)
341        .map(|((seq, tx_event), move_datatype_layout)| {
342            SuiEvent::try_from(
343                tx_event,
344                tx_digest,
345                seq as u64,
346                Some(timestamp),
347                move_datatype_layout,
348            )
349        })
350        .collect::<Result<Vec<_>, _>>()?;
351    let sui_tx_events = SuiTransactionBlockEvents { data: sui_events };
352    Ok(Some(sui_tx_events))
353}