sui_analytics_indexer/handlers/tables/
object.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use anyhow::Result;
7use async_trait::async_trait;
8use sui_indexer_alt_framework::pipeline::Processor;
9use sui_json_rpc_types::SuiMoveStruct;
10use sui_types::TypeTag;
11use sui_types::base_types::EpochId;
12use sui_types::base_types::ObjectID;
13use sui_types::full_checkpoint_content::Checkpoint;
14use sui_types::object::Object;
15
16use crate::Row;
17use crate::handlers::tables::ObjectStatusTracker;
18use crate::handlers::tables::get_is_consensus;
19use crate::handlers::tables::get_move_struct;
20use crate::handlers::tables::get_owner_address;
21use crate::handlers::tables::get_owner_type;
22use crate::handlers::tables::initial_shared_version;
23use crate::metrics::Metrics;
24use crate::package_store::PackageCache;
25use crate::pipeline::Pipeline;
26use crate::tables::ObjectRow;
27use crate::tables::ObjectStatus;
28
29pub struct ObjectProcessor {
30    package_cache: Arc<PackageCache>,
31    package_filter: Option<ObjectID>,
32    metrics: Metrics,
33}
34
35impl ObjectProcessor {
36    pub fn new(
37        package_cache: Arc<PackageCache>,
38        package_filter: &Option<String>,
39        metrics: Metrics,
40    ) -> Self {
41        Self {
42            package_cache,
43            package_filter: package_filter
44                .clone()
45                .map(|x| ObjectID::from_hex_literal(&x).unwrap()),
46            metrics,
47        }
48    }
49
50    async fn check_type_hierarchy(
51        &self,
52        type_tag: &TypeTag,
53        original_package_id: ObjectID,
54    ) -> Result<bool> {
55        use std::collections::BTreeSet;
56        use tokio::task::JoinSet;
57
58        // Collect all package IDs using stack-based traversal
59        let mut types = vec![type_tag];
60        let mut package_ids = BTreeSet::new();
61
62        while let Some(type_) = types.pop() {
63            match type_ {
64                TypeTag::Struct(s) => {
65                    package_ids.insert(s.address);
66                    types.extend(s.type_params.iter());
67                }
68                TypeTag::Vector(inner) => types.push(inner.as_ref()),
69                _ => {}
70            }
71        }
72
73        // Resolve original package IDs in parallel
74        let mut original_ids = JoinSet::new();
75
76        for id in package_ids {
77            let package_cache = self.package_cache.clone();
78            original_ids.spawn(async move { package_cache.get_original_package_id(id).await });
79        }
80
81        // Check if any resolved ID matches our target
82        while let Some(result) = original_ids.join_next().await {
83            if result?? == original_package_id {
84                return Ok(true);
85            }
86        }
87
88        Ok(false)
89    }
90
91    async fn process_object(
92        &self,
93        epoch: u64,
94        checkpoint: u64,
95        timestamp_ms: u64,
96        object: &Object,
97        object_status_tracker: &ObjectStatusTracker,
98    ) -> Result<Option<ObjectRow>> {
99        let move_obj_opt = object.data.try_as_move();
100        let has_public_transfer = move_obj_opt
101            .map(|o| o.has_public_transfer())
102            .unwrap_or(false);
103        let move_struct = if let Some((tag, contents)) = object
104            .struct_tag()
105            .and_then(|tag| object.data.try_as_move().map(|mo| (tag, mo.contents())))
106        {
107            match get_move_struct(
108                &tag,
109                contents,
110                &self.package_cache.resolver_for_epoch(epoch),
111            )
112            .await
113            {
114                Ok(move_struct) => Some(move_struct),
115                Err(err)
116                    if err
117                        .downcast_ref::<sui_types::object::bounded_visitor::Error>()
118                        .filter(|e| {
119                            matches!(e, sui_types::object::bounded_visitor::Error::OutOfBudget)
120                        })
121                        .is_some() =>
122                {
123                    self.metrics
124                        .total_too_large_to_deserialize
125                        .with_label_values(&["Object"])
126                        .inc();
127                    tracing::warn!(
128                        "Skipping struct with type {} because it was too large.",
129                        tag
130                    );
131                    None
132                }
133                Err(err) => return Err(err),
134            }
135        } else {
136            None
137        };
138        let (struct_tag, sui_move_struct) = if let Some(move_struct) = move_struct {
139            match move_struct.into() {
140                SuiMoveStruct::WithTypes { type_, fields } => {
141                    (Some(type_), Some(SuiMoveStruct::WithFields(fields)))
142                }
143                fields => (object.struct_tag(), Some(fields)),
144            }
145        } else {
146            (None, None)
147        };
148
149        let object_type = move_obj_opt.map(|o| o.type_());
150
151        let is_match = if let Some(package_id) = self.package_filter {
152            if let Some(object_type) = object_type {
153                let original_package_id = self
154                    .package_cache
155                    .get_original_package_id(package_id.into())
156                    .await?;
157
158                let type_tag: TypeTag = object_type.clone().into();
159                self.check_type_hierarchy(&type_tag, original_package_id)
160                    .await?
161            } else {
162                false
163            }
164        } else {
165            true
166        };
167
168        if !is_match {
169            return Ok(None);
170        }
171
172        let object_id = object.id();
173        let row = ObjectRow {
174            object_id: object_id.to_string(),
175            digest: object.digest().to_string(),
176            version: object.version().value(),
177            type_: object_type.map(|t| t.to_string()),
178            checkpoint,
179            epoch,
180            timestamp_ms,
181            owner_type: Some(get_owner_type(object)),
182            owner_address: get_owner_address(object),
183            is_consensus: get_is_consensus(object),
184            object_status: object_status_tracker
185                .get_object_status(&object_id)
186                .expect("Object must be in output objects"),
187            initial_shared_version: initial_shared_version(object),
188            previous_transaction: object.previous_transaction.base58_encode(),
189            has_public_transfer,
190            storage_rebate: Some(object.storage_rebate),
191            bcs: "".to_string(),
192            bcs_length: bcs::to_bytes(object).unwrap().len() as u64,
193            coin_type: object.coin_type_maybe().map(|t| t.to_string()),
194            coin_balance: if object.coin_type_maybe().is_some() {
195                Some(object.get_coin_value_unsafe())
196            } else {
197                None
198            },
199            struct_tag: struct_tag.map(|x| x.to_string()),
200            object_json: sui_move_struct.map(|x| x.to_json_value().to_string()),
201        };
202        Ok(Some(row))
203    }
204}
205
206impl Row for ObjectRow {
207    fn get_epoch(&self) -> EpochId {
208        self.epoch
209    }
210
211    fn get_checkpoint(&self) -> u64 {
212        self.checkpoint
213    }
214}
215
216#[async_trait]
217impl Processor for ObjectProcessor {
218    const NAME: &'static str = Pipeline::Object.name();
219    type Value = ObjectRow;
220
221    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
222        let epoch = checkpoint.summary.data().epoch;
223        let checkpoint_num = checkpoint.summary.data().sequence_number;
224        let timestamp_ms = checkpoint.summary.data().timestamp_ms;
225
226        let mut entries = Vec::new();
227
228        for checkpoint_transaction in &checkpoint.transactions {
229            let effects = &checkpoint_transaction.effects;
230            let object_status_tracker = ObjectStatusTracker::new(effects);
231
232            for object in checkpoint_transaction.output_objects(&checkpoint.object_set) {
233                if let Some(object_row) = self
234                    .process_object(
235                        epoch,
236                        checkpoint_num,
237                        timestamp_ms,
238                        object,
239                        &object_status_tracker,
240                    )
241                    .await?
242                {
243                    entries.push(object_row);
244                }
245            }
246
247            for (object_ref, _) in effects.all_removed_objects().iter() {
248                let object_row = ObjectRow {
249                    object_id: object_ref.0.to_string(),
250                    digest: object_ref.2.to_string(),
251                    version: u64::from(object_ref.1),
252                    type_: None,
253                    checkpoint: checkpoint_num,
254                    epoch,
255                    timestamp_ms,
256                    owner_type: None,
257                    owner_address: None,
258                    object_status: ObjectStatus::Deleted,
259                    initial_shared_version: None,
260                    previous_transaction: checkpoint_transaction
261                        .transaction
262                        .digest()
263                        .base58_encode(),
264                    has_public_transfer: false,
265                    is_consensus: false,
266                    storage_rebate: None,
267                    bcs: "".to_string(),
268                    coin_type: None,
269                    coin_balance: None,
270                    struct_tag: None,
271                    object_json: None,
272                    bcs_length: 0,
273                };
274                entries.push(object_row);
275            }
276        }
277
278        Ok(entries)
279    }
280}