sui_analytics_indexer/handlers/tables/
object.rs1use std::sync::Arc;
5
6use anyhow::Result;
7use async_trait::async_trait;
8use sui_indexer_alt_framework::pipeline::Processor;
9use sui_json_rpc_types::SuiMoveStruct;
10use sui_types::TypeTag;
11use sui_types::base_types::EpochId;
12use sui_types::base_types::ObjectID;
13use sui_types::full_checkpoint_content::Checkpoint;
14use sui_types::object::Object;
15
16use crate::Row;
17use crate::handlers::tables::ObjectStatusTracker;
18use crate::handlers::tables::get_is_consensus;
19use crate::handlers::tables::get_move_struct;
20use crate::handlers::tables::get_owner_address;
21use crate::handlers::tables::get_owner_type;
22use crate::handlers::tables::initial_shared_version;
23use crate::metrics::Metrics;
24use crate::package_store::PackageCache;
25use crate::pipeline::Pipeline;
26use crate::tables::ObjectRow;
27use crate::tables::ObjectStatus;
28
29pub struct ObjectProcessor {
30 package_cache: Arc<PackageCache>,
31 package_filter: Option<ObjectID>,
32 metrics: Metrics,
33}
34
35impl ObjectProcessor {
36 pub fn new(
37 package_cache: Arc<PackageCache>,
38 package_filter: &Option<String>,
39 metrics: Metrics,
40 ) -> Self {
41 Self {
42 package_cache,
43 package_filter: package_filter
44 .clone()
45 .map(|x| ObjectID::from_hex_literal(&x).unwrap()),
46 metrics,
47 }
48 }
49
50 async fn check_type_hierarchy(
51 &self,
52 type_tag: &TypeTag,
53 original_package_id: ObjectID,
54 ) -> Result<bool> {
55 use std::collections::BTreeSet;
56 use tokio::task::JoinSet;
57
58 let mut types = vec![type_tag];
60 let mut package_ids = BTreeSet::new();
61
62 while let Some(type_) = types.pop() {
63 match type_ {
64 TypeTag::Struct(s) => {
65 package_ids.insert(s.address);
66 types.extend(s.type_params.iter());
67 }
68 TypeTag::Vector(inner) => types.push(inner.as_ref()),
69 _ => {}
70 }
71 }
72
73 let mut original_ids = JoinSet::new();
75
76 for id in package_ids {
77 let package_cache = self.package_cache.clone();
78 original_ids.spawn(async move { package_cache.get_original_package_id(id).await });
79 }
80
81 while let Some(result) = original_ids.join_next().await {
83 if result?? == original_package_id {
84 return Ok(true);
85 }
86 }
87
88 Ok(false)
89 }
90
91 async fn process_object(
92 &self,
93 epoch: u64,
94 checkpoint: u64,
95 timestamp_ms: u64,
96 object: &Object,
97 object_status_tracker: &ObjectStatusTracker,
98 ) -> Result<Option<ObjectRow>> {
99 let move_obj_opt = object.data.try_as_move();
100 let has_public_transfer = move_obj_opt
101 .map(|o| o.has_public_transfer())
102 .unwrap_or(false);
103 let move_struct = if let Some((tag, contents)) = object
104 .struct_tag()
105 .and_then(|tag| object.data.try_as_move().map(|mo| (tag, mo.contents())))
106 {
107 match get_move_struct(
108 &tag,
109 contents,
110 &self.package_cache.resolver_for_epoch(epoch),
111 )
112 .await
113 {
114 Ok(move_struct) => Some(move_struct),
115 Err(err)
116 if err
117 .downcast_ref::<sui_types::object::bounded_visitor::Error>()
118 .filter(|e| {
119 matches!(e, sui_types::object::bounded_visitor::Error::OutOfBudget)
120 })
121 .is_some() =>
122 {
123 self.metrics
124 .total_too_large_to_deserialize
125 .with_label_values(&["Object"])
126 .inc();
127 tracing::warn!(
128 "Skipping struct with type {} because it was too large.",
129 tag
130 );
131 None
132 }
133 Err(err) => return Err(err),
134 }
135 } else {
136 None
137 };
138 let (struct_tag, sui_move_struct) = if let Some(move_struct) = move_struct {
139 match move_struct.into() {
140 SuiMoveStruct::WithTypes { type_, fields } => {
141 (Some(type_), Some(SuiMoveStruct::WithFields(fields)))
142 }
143 fields => (object.struct_tag(), Some(fields)),
144 }
145 } else {
146 (None, None)
147 };
148
149 let object_type = move_obj_opt.map(|o| o.type_());
150
151 let is_match = if let Some(package_id) = self.package_filter {
152 if let Some(object_type) = object_type {
153 let original_package_id = self
154 .package_cache
155 .get_original_package_id(package_id.into())
156 .await?;
157
158 let type_tag: TypeTag = object_type.clone().into();
159 self.check_type_hierarchy(&type_tag, original_package_id)
160 .await?
161 } else {
162 false
163 }
164 } else {
165 true
166 };
167
168 if !is_match {
169 return Ok(None);
170 }
171
172 let object_id = object.id();
173 let row = ObjectRow {
174 object_id: object_id.to_string(),
175 digest: object.digest().to_string(),
176 version: object.version().value(),
177 type_: object_type.map(|t| t.to_string()),
178 checkpoint,
179 epoch,
180 timestamp_ms,
181 owner_type: Some(get_owner_type(object)),
182 owner_address: get_owner_address(object),
183 is_consensus: get_is_consensus(object),
184 object_status: object_status_tracker
185 .get_object_status(&object_id)
186 .expect("Object must be in output objects"),
187 initial_shared_version: initial_shared_version(object),
188 previous_transaction: object.previous_transaction.base58_encode(),
189 has_public_transfer,
190 storage_rebate: Some(object.storage_rebate),
191 bcs: "".to_string(),
192 bcs_length: bcs::to_bytes(object).unwrap().len() as u64,
193 coin_type: object.coin_type_maybe().map(|t| t.to_string()),
194 coin_balance: if object.coin_type_maybe().is_some() {
195 Some(object.get_coin_value_unsafe())
196 } else {
197 None
198 },
199 struct_tag: struct_tag.map(|x| x.to_string()),
200 object_json: sui_move_struct.map(|x| x.to_json_value().to_string()),
201 };
202 Ok(Some(row))
203 }
204}
205
206impl Row for ObjectRow {
207 fn get_epoch(&self) -> EpochId {
208 self.epoch
209 }
210
211 fn get_checkpoint(&self) -> u64 {
212 self.checkpoint
213 }
214}
215
216#[async_trait]
217impl Processor for ObjectProcessor {
218 const NAME: &'static str = Pipeline::Object.name();
219 type Value = ObjectRow;
220
221 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
222 let epoch = checkpoint.summary.data().epoch;
223 let checkpoint_num = checkpoint.summary.data().sequence_number;
224 let timestamp_ms = checkpoint.summary.data().timestamp_ms;
225
226 let mut entries = Vec::new();
227
228 for checkpoint_transaction in &checkpoint.transactions {
229 let effects = &checkpoint_transaction.effects;
230 let object_status_tracker = ObjectStatusTracker::new(effects);
231
232 for object in checkpoint_transaction.output_objects(&checkpoint.object_set) {
233 if let Some(object_row) = self
234 .process_object(
235 epoch,
236 checkpoint_num,
237 timestamp_ms,
238 object,
239 &object_status_tracker,
240 )
241 .await?
242 {
243 entries.push(object_row);
244 }
245 }
246
247 for (object_ref, _) in effects.all_removed_objects().iter() {
248 let object_row = ObjectRow {
249 object_id: object_ref.0.to_string(),
250 digest: object_ref.2.to_string(),
251 version: u64::from(object_ref.1),
252 type_: None,
253 checkpoint: checkpoint_num,
254 epoch,
255 timestamp_ms,
256 owner_type: None,
257 owner_address: None,
258 object_status: ObjectStatus::Deleted,
259 initial_shared_version: None,
260 previous_transaction: checkpoint_transaction
261 .transaction
262 .digest()
263 .base58_encode(),
264 has_public_transfer: false,
265 is_consensus: false,
266 storage_rebate: None,
267 bcs: "".to_string(),
268 coin_type: None,
269 coin_balance: None,
270 struct_tag: None,
271 object_json: None,
272 bcs_length: 0,
273 };
274 entries.push(object_row);
275 }
276 }
277
278 Ok(entries)
279 }
280}