sui_analytics_indexer/handlers/tables/
transaction_objects.rs1use std::sync::Arc;
5
6use anyhow::Result;
7use async_trait::async_trait;
8use sui_indexer_alt_framework::pipeline::Processor;
9use sui_types::base_types::EpochId;
10use sui_types::effects::TransactionEffectsAPI;
11use sui_types::full_checkpoint_content::Checkpoint;
12use sui_types::transaction::TransactionDataAPI;
13
14use crate::Row;
15use crate::handlers::tables::InputObjectTracker;
16use crate::handlers::tables::ObjectStatusTracker;
17use crate::pipeline::Pipeline;
18use crate::tables::TransactionObjectRow;
19
20pub struct TransactionObjectsProcessor;
21
22impl Row for TransactionObjectRow {
23 fn get_epoch(&self) -> EpochId {
24 self.epoch
25 }
26
27 fn get_checkpoint(&self) -> u64 {
28 self.checkpoint
29 }
30}
31
32#[async_trait]
33impl Processor for TransactionObjectsProcessor {
34 const NAME: &'static str = Pipeline::TransactionObjects.name();
35 type Value = TransactionObjectRow;
36
37 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
38 let mut entries = Vec::new();
39
40 let epoch = checkpoint.summary.data().epoch;
41 let checkpoint_seq = checkpoint.summary.data().sequence_number;
42 let timestamp_ms = checkpoint.summary.data().timestamp_ms;
43
44 for transaction in &checkpoint.transactions {
45 let effects = &transaction.effects;
46 let transaction_digest_str = effects.transaction_digest().base58_encode();
47 let txn_data = &transaction.transaction;
48
49 let input_object_tracker = InputObjectTracker::new(txn_data);
50 let object_status_tracker = ObjectStatusTracker::new(effects);
51
52 for object in txn_data
54 .input_objects()
55 .expect("Input objects must be valid")
56 .iter()
57 {
58 let object_id = object.object_id();
59 let version = object.version().map(|v| v.value());
60 let row = TransactionObjectRow {
61 object_id: object_id.to_string(),
62 version,
63 transaction_digest: transaction_digest_str.clone(),
64 checkpoint: checkpoint_seq,
65 epoch,
66 timestamp_ms,
67 input_kind: input_object_tracker.get_input_object_kind(&object_id),
68 object_status: object_status_tracker.get_object_status(&object_id),
69 };
70 entries.push(row);
71 }
72
73 for object in transaction.output_objects(&checkpoint.object_set) {
75 let object_id = object.id();
76 let version = Some(object.version().value());
77 let row = TransactionObjectRow {
78 object_id: object_id.to_string(),
79 version,
80 transaction_digest: transaction_digest_str.clone(),
81 checkpoint: checkpoint_seq,
82 epoch,
83 timestamp_ms,
84 input_kind: input_object_tracker.get_input_object_kind(&object_id),
85 object_status: object_status_tracker.get_object_status(&object_id),
86 };
87 entries.push(row);
88 }
89 }
90
91 Ok(entries)
92 }
93}