sui_analytics_indexer/handlers/tables/
transaction_objects.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use anyhow::Result;
7use async_trait::async_trait;
8use sui_indexer_alt_framework::pipeline::Processor;
9use sui_types::base_types::EpochId;
10use sui_types::effects::TransactionEffectsAPI;
11use sui_types::full_checkpoint_content::Checkpoint;
12use sui_types::transaction::TransactionDataAPI;
13
14use crate::Row;
15use crate::handlers::tables::InputObjectTracker;
16use crate::handlers::tables::ObjectStatusTracker;
17use crate::pipeline::Pipeline;
18use crate::tables::TransactionObjectRow;
19
20pub struct TransactionObjectsProcessor;
21
22impl Row for TransactionObjectRow {
23    fn get_epoch(&self) -> EpochId {
24        self.epoch
25    }
26
27    fn get_checkpoint(&self) -> u64 {
28        self.checkpoint
29    }
30}
31
32#[async_trait]
33impl Processor for TransactionObjectsProcessor {
34    const NAME: &'static str = Pipeline::TransactionObjects.name();
35    type Value = TransactionObjectRow;
36
37    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
38        let mut entries = Vec::new();
39
40        let epoch = checkpoint.summary.data().epoch;
41        let checkpoint_seq = checkpoint.summary.data().sequence_number;
42        let timestamp_ms = checkpoint.summary.data().timestamp_ms;
43
44        for transaction in &checkpoint.transactions {
45            let effects = &transaction.effects;
46            let transaction_digest_str = effects.transaction_digest().base58_encode();
47            let txn_data = &transaction.transaction;
48
49            let input_object_tracker = InputObjectTracker::new(txn_data);
50            let object_status_tracker = ObjectStatusTracker::new(effects);
51
52            // Process input objects
53            for object in txn_data
54                .input_objects()
55                .expect("Input objects must be valid")
56                .iter()
57            {
58                let object_id = object.object_id();
59                let version = object.version().map(|v| v.value());
60                let row = TransactionObjectRow {
61                    object_id: object_id.to_string(),
62                    version,
63                    transaction_digest: transaction_digest_str.clone(),
64                    checkpoint: checkpoint_seq,
65                    epoch,
66                    timestamp_ms,
67                    input_kind: input_object_tracker.get_input_object_kind(&object_id),
68                    object_status: object_status_tracker.get_object_status(&object_id),
69                };
70                entries.push(row);
71            }
72
73            // Process output objects
74            for object in transaction.output_objects(&checkpoint.object_set) {
75                let object_id = object.id();
76                let version = Some(object.version().value());
77                let row = TransactionObjectRow {
78                    object_id: object_id.to_string(),
79                    version,
80                    transaction_digest: transaction_digest_str.clone(),
81                    checkpoint: checkpoint_seq,
82                    epoch,
83                    timestamp_ms,
84                    input_kind: input_object_tracker.get_input_object_kind(&object_id),
85                    object_status: object_status_tracker.get_object_status(&object_id),
86                };
87                entries.push(row);
88            }
89        }
90
91        Ok(entries)
92    }
93}