sui_analytics_indexer/handlers/tables/
transaction.rs1use std::collections::BTreeSet;
5use std::collections::HashMap;
6use std::sync::Arc;
7
8use anyhow::Result;
9use async_trait::async_trait;
10use sui_indexer_alt_framework::pipeline::Processor;
11use sui_types::base_types::EpochId;
12use sui_types::digests::TransactionDigest;
13use sui_types::effects::TransactionEffectsAPI;
14use sui_types::full_checkpoint_content::Checkpoint;
15use sui_types::messages_checkpoint::CheckpointContents;
16use sui_types::transaction::Command;
17use sui_types::transaction::TransactionDataAPI;
18use sui_types::transaction::TransactionKind;
19use tracing::error;
20
21use crate::Row;
22use crate::pipeline::Pipeline;
23use crate::tables::TransactionRow;
24
25pub struct TransactionProcessor;
26
27impl Row for TransactionRow {
28 fn get_epoch(&self) -> EpochId {
29 self.epoch
30 }
31
32 fn get_checkpoint(&self) -> u64 {
33 self.checkpoint
34 }
35}
36
37#[async_trait]
38impl Processor for TransactionProcessor {
39 const NAME: &'static str = Pipeline::Transaction.name();
40 type Value = TransactionRow;
41
42 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
43 let epoch = checkpoint.summary.data().epoch;
44 let checkpoint_seq = checkpoint.summary.data().sequence_number;
45 let timestamp_ms = checkpoint.summary.data().timestamp_ms;
46
47 let transaction_positions = compute_transaction_positions(&checkpoint.contents);
48
49 let mut entries = Vec::with_capacity(checkpoint.transactions.len());
50
51 for executed_transaction in &checkpoint.transactions {
52 let effects = &executed_transaction.effects;
53 let txn_data = &executed_transaction.transaction;
54 let gas_object = effects.gas_object();
55 let gas_summary = effects.gas_cost_summary();
56 let move_calls_vec = txn_data.move_calls();
57 let packages: BTreeSet<_> = move_calls_vec
58 .iter()
59 .map(|(_cmd_idx, package, _, _)| {
60 package.to_canonical_string(false)
61 })
62 .collect();
63 let packages = packages
64 .iter()
65 .map(|s| s.as_str())
66 .collect::<Vec<_>>()
67 .join("-");
68 let transaction_digest = effects.transaction_digest().base58_encode();
69 let events_digest = executed_transaction
70 .events
71 .as_ref()
72 .map(|events| events.digest().base58_encode());
73
74 let transaction_position = *transaction_positions
75 .get(effects.transaction_digest())
76 .expect("Expect transaction to exist in checkpoint_contents.")
77 as u64;
78
79 let transaction_data_bcs_length = bcs::to_bytes(&txn_data).unwrap().len() as u64;
80 let effects_bcs_length =
81 bcs::to_bytes(&executed_transaction.effects).unwrap().len() as u64;
82 let events_bcs_length = executed_transaction
83 .events
84 .as_ref()
85 .map(|events| bcs::to_bytes(events).unwrap().len() as u64)
86 .unwrap_or(0);
87 let signatures_bcs_length = bcs::to_bytes(&executed_transaction.signatures)
88 .unwrap()
89 .len() as u64;
90
91 let mut transfers: u64 = 0;
92 let mut split_coins: u64 = 0;
93 let mut merge_coins: u64 = 0;
94 let mut publish: u64 = 0;
95 let mut upgrade: u64 = 0;
96 let mut others: u64 = 0;
97 let mut move_calls_count = 0;
98 let move_calls = move_calls_vec.len() as u64;
99
100 let is_sponsored_tx = txn_data.is_sponsored_tx();
101 let is_system_txn = txn_data.is_system_tx();
102 if !is_system_txn {
103 let kind = txn_data.kind();
104 if let TransactionKind::ProgrammableTransaction(pt) = txn_data.kind() {
105 for cmd in &pt.commands {
106 match cmd {
107 Command::MoveCall(_) => move_calls_count += 1,
108 Command::TransferObjects(_, _) => transfers += 1,
109 Command::SplitCoins(_, _) => split_coins += 1,
110 Command::MergeCoins(_, _) => merge_coins += 1,
111 Command::Publish(_, _) => publish += 1,
112 Command::Upgrade(_, _, _, _) => upgrade += 1,
113 _ => others += 1,
114 }
115 }
116 } else {
117 error!(
118 "Transaction kind [{kind}] is not programmable transaction and not a system transaction"
119 );
120 }
121 if move_calls_count != move_calls {
122 error!(
123 "Mismatch in move calls count: commands {move_calls_count} != {move_calls} calls"
124 );
125 }
126 }
127
128 let transaction_json = serde_json::to_string(&txn_data)?;
129 let effects_json = serde_json::to_string(&executed_transaction.effects)?;
130
131 let row = TransactionRow {
132 transaction_digest,
133 checkpoint: checkpoint_seq,
134 epoch,
135 timestamp_ms,
136 sender: txn_data.sender().to_string(),
137 transaction_kind: txn_data.kind().name().to_owned(),
138 is_system_txn,
139 is_sponsored_tx,
140 transaction_count: txn_data.kind().num_commands() as u64,
141 execution_success: effects.status().is_ok(),
142 input: txn_data
143 .input_objects()
144 .expect("Input objects must be valid")
145 .len() as u64,
146 shared_input: txn_data.shared_input_objects().len() as u64,
147 gas_coins: txn_data.gas().len() as u64,
148 created: effects.created().len() as u64,
149 mutated: (effects.mutated().len() + effects.unwrapped().len()) as u64,
150 deleted: (effects.deleted().len()
151 + effects.unwrapped_then_deleted().len()
152 + effects.wrapped().len()) as u64,
153 transfers,
154 split_coins,
155 merge_coins,
156 publish,
157 upgrade,
158 others,
159 move_calls,
160 packages,
161 gas_owner: txn_data.gas_owner().to_string(),
162 gas_object_id: gas_object.0.0.to_string(),
163 gas_object_sequence: gas_object.0.1.value(),
164 gas_object_digest: gas_object.0.2.to_string(),
165 gas_budget: txn_data.gas_budget(),
166 total_gas_cost: gas_summary.net_gas_usage(),
167 computation_cost: gas_summary.computation_cost,
168 storage_cost: gas_summary.storage_cost,
169 storage_rebate: gas_summary.storage_rebate,
170 non_refundable_storage_fee: gas_summary.non_refundable_storage_fee,
171 gas_price: txn_data.gas_price(),
172 has_zklogin_sig: executed_transaction
173 .signatures
174 .iter()
175 .any(|sig| sig.is_zklogin()),
176 has_upgraded_multisig: executed_transaction
177 .signatures
178 .iter()
179 .any(|sig| sig.is_upgraded_multisig()),
180 transaction_json: Some(transaction_json),
181 effects_json: Some(effects_json),
182 transaction_position,
183 events_digest,
184 raw_transaction: "".to_string(),
185 transaction_data_bcs_length,
186 effects_bcs_length,
187 events_bcs_length,
188 signatures_bcs_length,
189 };
190
191 entries.push(row);
192 }
193
194 Ok(entries)
195 }
196}
197
198fn compute_transaction_positions(
199 checkpoint_contents: &CheckpointContents,
200) -> HashMap<TransactionDigest, usize> {
201 let mut digest_to_position: HashMap<TransactionDigest, usize> = HashMap::new();
202
203 for (position, execution_digest) in checkpoint_contents.iter().enumerate() {
204 digest_to_position.insert(execution_digest.transaction, position);
205 }
206
207 digest_to_position
208}