1use crate::FileType;
5use crate::TRANSACTION_CONCURRENCY_LIMIT;
6use crate::package_store::PackageCache;
7use crate::tables::{InputObjectKind, ObjectStatus, OwnerType};
8use anyhow::{Result, anyhow};
9use async_trait::async_trait;
10use futures::stream::{self, StreamExt};
11use move_core_types::annotated_value::{MoveStruct, MoveTypeLayout, MoveValue};
12use move_core_types::language_storage::{StructTag, TypeTag};
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use sui_package_resolver::{PackageStore, Resolver};
16use sui_types::base_types::ObjectID;
17use sui_types::effects::TransactionEffects;
18use sui_types::effects::TransactionEffectsAPI;
19use sui_types::full_checkpoint_content::CheckpointData;
20use sui_types::object::bounded_visitor::BoundedVisitor;
21use sui_types::object::{Object, Owner};
22use sui_types::transaction::TransactionData;
23use sui_types::transaction::TransactionDataAPI;
24
25pub mod checkpoint_handler;
26pub mod df_handler;
27pub mod event_handler;
28pub mod move_call_handler;
29pub mod object_handler;
30pub mod package_bcs_handler;
31pub mod package_handler;
32pub mod transaction_bcs_handler;
33pub mod transaction_handler;
34pub mod transaction_objects_handler;
35pub mod wrapped_object_handler;
36const WRAPPED_INDEXING_DISALLOW_LIST: [&str; 4] = [
37 "0x1::string::String",
38 "0x1::ascii::String",
39 "0x2::url::Url",
40 "0x2::object::ID",
41];
42
43#[async_trait::async_trait]
44pub trait AnalyticsHandler<S>: Send + Sync {
45 async fn process_checkpoint(
48 &self,
49 checkpoint_data: &Arc<CheckpointData>,
50 ) -> Result<Box<dyn Iterator<Item = S> + Send + Sync>>
51 where
52 S: Send + Sync;
53 fn file_type(&self) -> Result<FileType>;
55 fn name(&self) -> &'static str;
56}
57
58#[async_trait]
61pub trait TransactionProcessor<Row>: Send + Sync + 'static {
62 async fn process_transaction(
65 &self,
66 tx_idx: usize,
67 checkpoint: &CheckpointData,
68 ) -> Result<Box<dyn Iterator<Item = Row> + Send + Sync>>;
69}
70
71pub async fn process_transactions<Row, P>(
73 checkpoint: Arc<CheckpointData>,
74 processor: Arc<P>,
75) -> Result<Box<dyn Iterator<Item = Row> + Send + Sync>>
76where
77 Row: Send + Sync + 'static,
78 P: TransactionProcessor<Row>,
79{
80 let txn_len = checkpoint.transactions.len();
82 let mut entries_vec = Vec::with_capacity(txn_len);
83
84 let mut stream = stream::iter(0..txn_len)
85 .map(|idx| {
86 let checkpoint = checkpoint.clone();
87 let processor = processor.clone();
88 tokio::spawn(async move { processor.process_transaction(idx, &checkpoint).await })
89 })
90 .buffered(*TRANSACTION_CONCURRENCY_LIMIT);
91
92 while let Some(join_res) = stream.next().await {
93 match join_res {
94 Ok(Ok(tx_entries)) => {
95 entries_vec.push(tx_entries);
97 }
98 Ok(Err(e)) => {
99 return Err(e);
101 }
102 Err(e) => {
103 return Err(anyhow::anyhow!("Task join error: {}", e));
105 }
106 }
107 }
108
109 let flattened_iter = entries_vec.into_iter().flatten();
110 Ok(Box::new(flattened_iter))
111}
112
113fn initial_shared_version(object: &Object) -> Option<u64> {
114 match object.owner {
115 Owner::Shared {
116 initial_shared_version,
117 } => Some(initial_shared_version.value()),
118 _ => None,
119 }
120}
121
122fn get_owner_type(object: &Object) -> OwnerType {
123 match object.owner {
124 Owner::AddressOwner(_) => OwnerType::AddressOwner,
125 Owner::ObjectOwner(_) => OwnerType::ObjectOwner,
126 Owner::Shared { .. } => OwnerType::Shared,
127 Owner::Immutable => OwnerType::Immutable,
128 Owner::ConsensusAddressOwner { .. } => OwnerType::AddressOwner,
129 }
130}
131
132fn get_owner_address(object: &Object) -> Option<String> {
133 match object.owner {
134 Owner::AddressOwner(address) => Some(address.to_string()),
135 Owner::ObjectOwner(address) => Some(address.to_string()),
136 Owner::Shared { .. } => None,
137 Owner::Immutable => None,
138 Owner::ConsensusAddressOwner { owner, .. } => Some(owner.to_string()),
139 }
140}
141
142fn get_is_consensus(object: &Object) -> bool {
143 match object.owner {
144 Owner::AddressOwner(_) => false,
145 Owner::ObjectOwner(_) => false,
146 Owner::Shared { .. } => true,
147 Owner::Immutable => false,
148 Owner::ConsensusAddressOwner { .. } => true,
149 }
150}
151
152struct InputObjectTracker {
157 shared: BTreeSet<ObjectID>,
158 coins: BTreeSet<ObjectID>,
159 input: BTreeSet<ObjectID>,
160}
161
162impl InputObjectTracker {
163 fn new(txn_data: &TransactionData) -> Self {
164 let shared: BTreeSet<ObjectID> = txn_data
165 .shared_input_objects()
166 .iter()
167 .map(|shared_io| shared_io.id())
168 .collect();
169 let coins: BTreeSet<ObjectID> = txn_data.gas().iter().map(|obj_ref| obj_ref.0).collect();
170 let input: BTreeSet<ObjectID> = txn_data
171 .input_objects()
172 .expect("Input objects must be valid")
173 .iter()
174 .map(|io_kind| io_kind.object_id())
175 .collect();
176 Self {
177 shared,
178 coins,
179 input,
180 }
181 }
182
183 fn get_input_object_kind(&self, object_id: &ObjectID) -> Option<InputObjectKind> {
184 if self.coins.contains(object_id) {
185 Some(InputObjectKind::GasCoin)
186 } else if self.shared.contains(object_id) {
187 Some(InputObjectKind::SharedInput)
188 } else if self.input.contains(object_id) {
189 Some(InputObjectKind::Input)
190 } else {
191 None
192 }
193 }
194}
195
196struct ObjectStatusTracker {
200 created: BTreeSet<ObjectID>,
201 mutated: BTreeSet<ObjectID>,
202 deleted: BTreeSet<ObjectID>,
203}
204
205impl ObjectStatusTracker {
206 fn new(effects: &TransactionEffects) -> Self {
207 let created: BTreeSet<ObjectID> = effects
208 .created()
209 .iter()
210 .map(|(obj_ref, _)| obj_ref.0)
211 .collect();
212 let mutated: BTreeSet<ObjectID> = effects
213 .mutated()
214 .iter()
215 .chain(effects.unwrapped().iter())
216 .map(|(obj_ref, _)| obj_ref.0)
217 .collect();
218 let deleted: BTreeSet<ObjectID> = effects
219 .all_tombstones()
220 .into_iter()
221 .map(|(id, _)| id)
222 .collect();
223 Self {
224 created,
225 mutated,
226 deleted,
227 }
228 }
229
230 fn get_object_status(&self, object_id: &ObjectID) -> Option<ObjectStatus> {
231 if self.mutated.contains(object_id) {
232 Some(ObjectStatus::Mutated)
233 } else if self.deleted.contains(object_id) {
234 Some(ObjectStatus::Deleted)
235 } else if self.created.contains(object_id) {
236 Some(ObjectStatus::Created)
237 } else {
238 None
239 }
240 }
241}
242
243async fn get_move_struct<T: PackageStore>(
244 struct_tag: &StructTag,
245 contents: &[u8],
246 resolver: &Resolver<T>,
247) -> Result<MoveStruct> {
248 let move_struct = match resolver
249 .type_layout(TypeTag::Struct(Box::new(struct_tag.clone())))
250 .await?
251 {
252 MoveTypeLayout::Struct(move_struct_layout) => {
253 BoundedVisitor::deserialize_struct(contents, &move_struct_layout)
254 }
255 _ => Err(anyhow!("Object is not a move struct")),
256 }?;
257 Ok(move_struct)
258}
259
260#[derive(Debug, Default)]
261pub struct WrappedStruct {
262 object_id: Option<ObjectID>,
263 struct_tag: Option<StructTag>,
264}
265
266fn parse_struct(
267 path: &str,
268 move_struct: MoveStruct,
269 all_structs: &mut BTreeMap<String, WrappedStruct>,
270) {
271 let mut wrapped_struct = WrappedStruct {
272 struct_tag: Some(move_struct.type_),
273 ..Default::default()
274 };
275 for (k, v) in move_struct.fields {
276 parse_struct_field(
277 &format!("{}.{}", path, &k),
278 v,
279 &mut wrapped_struct,
280 all_structs,
281 );
282 }
283 all_structs.insert(path.to_string(), wrapped_struct);
284}
285
286fn parse_struct_field(
287 path: &str,
288 move_value: MoveValue,
289 curr_struct: &mut WrappedStruct,
290 all_structs: &mut BTreeMap<String, WrappedStruct>,
291) {
292 match move_value {
293 MoveValue::Struct(move_struct) => {
294 let values = move_struct
295 .fields
296 .iter()
297 .map(|(id, value)| (id.to_string(), value))
298 .collect::<BTreeMap<_, _>>();
299 let struct_name = format!(
300 "0x{}::{}::{}",
301 move_struct.type_.address.short_str_lossless(),
302 move_struct.type_.module,
303 move_struct.type_.name
304 );
305 if "0x2::object::UID" == struct_name {
306 if let Some(MoveValue::Struct(id_struct)) = values.get("id").cloned() {
307 let id_values = id_struct
308 .fields
309 .iter()
310 .map(|(id, value)| (id.to_string(), value))
311 .collect::<BTreeMap<_, _>>();
312 if let Some(MoveValue::Address(address) | MoveValue::Signer(address)) =
313 id_values.get("bytes").cloned()
314 {
315 curr_struct.object_id = Some(ObjectID::from_address(*address))
316 }
317 }
318 } else if "0x1::option::Option" == struct_name {
319 if let Some(MoveValue::Vector(vec_values)) = values.get("vec").cloned()
321 && let Some(first_value) = vec_values.first()
322 {
323 parse_struct_field(
324 &format!("{}[0]", path),
325 first_value.clone(),
326 curr_struct,
327 all_structs,
328 );
329 }
330 } else if !WRAPPED_INDEXING_DISALLOW_LIST.contains(&&*struct_name) {
331 parse_struct(path, move_struct, all_structs)
333 }
334 }
335 MoveValue::Variant(v) => {
336 for (k, field) in v.fields.iter() {
337 parse_struct_field(
338 &format!("{}.{}", path, k),
339 field.clone(),
340 curr_struct,
341 all_structs,
342 );
343 }
344 }
345 MoveValue::Vector(fields) => {
346 for (index, field) in fields.iter().enumerate() {
347 parse_struct_field(
348 &format!("{}[{}]", path, &index),
349 field.clone(),
350 curr_struct,
351 all_structs,
352 );
353 }
354 }
355 _ => {}
356 }
357}
358
359pub async fn wait_for_cache(checkpoint_data: &CheckpointData, package_cache: &PackageCache) {
360 let sequence_number = *checkpoint_data.checkpoint_summary.sequence_number();
361 package_cache.coordinator.wait(sequence_number).await;
362}
363
364#[cfg(test)]
365mod tests {
366 use crate::handlers::parse_struct;
367 use move_core_types::account_address::AccountAddress;
368 use move_core_types::annotated_value::{MoveStruct, MoveValue, MoveVariant};
369 use move_core_types::identifier::Identifier;
370 use move_core_types::language_storage::StructTag;
371 use std::collections::BTreeMap;
372 use std::str::FromStr;
373 use sui_types::base_types::ObjectID;
374
375 #[tokio::test]
376 async fn test_wrapped_object_parsing() -> anyhow::Result<()> {
377 let uid_field = MoveValue::Struct(MoveStruct {
378 type_: StructTag::from_str("0x2::object::UID")?,
379 fields: vec![(
380 Identifier::from_str("id")?,
381 MoveValue::Struct(MoveStruct {
382 type_: StructTag::from_str("0x2::object::ID")?,
383 fields: vec![(
384 Identifier::from_str("bytes")?,
385 MoveValue::Signer(AccountAddress::from_hex_literal("0x300")?),
386 )],
387 }),
388 )],
389 });
390 let balance_field = MoveValue::Struct(MoveStruct {
391 type_: StructTag::from_str("0x2::balance::Balance")?,
392 fields: vec![(Identifier::from_str("value")?, MoveValue::U32(10))],
393 });
394 let move_struct = MoveStruct {
395 type_: StructTag::from_str("0x2::test::Test")?,
396 fields: vec![
397 (Identifier::from_str("id")?, uid_field),
398 (Identifier::from_str("principal")?, balance_field),
399 ],
400 };
401 let mut all_structs = BTreeMap::new();
402 parse_struct("$", move_struct, &mut all_structs);
403 assert_eq!(
404 all_structs.get("$").unwrap().object_id,
405 Some(ObjectID::from_hex_literal("0x300")?)
406 );
407 assert_eq!(
408 all_structs.get("$.principal").unwrap().struct_tag,
409 Some(StructTag::from_str("0x2::balance::Balance")?)
410 );
411 Ok(())
412 }
413
414 #[tokio::test]
415 async fn test_wrapped_object_parsing_within_enum() -> anyhow::Result<()> {
416 let uid_field = MoveValue::Struct(MoveStruct {
417 type_: StructTag::from_str("0x2::object::UID")?,
418 fields: vec![(
419 Identifier::from_str("id")?,
420 MoveValue::Struct(MoveStruct {
421 type_: StructTag::from_str("0x2::object::ID")?,
422 fields: vec![(
423 Identifier::from_str("bytes")?,
424 MoveValue::Signer(AccountAddress::from_hex_literal("0x300")?),
425 )],
426 }),
427 )],
428 });
429 let balance_field = MoveValue::Struct(MoveStruct {
430 type_: StructTag::from_str("0x2::balance::Balance")?,
431 fields: vec![(Identifier::from_str("value")?, MoveValue::U32(10))],
432 });
433 let move_enum = MoveVariant {
434 type_: StructTag::from_str("0x2::test::TestEnum")?,
435 variant_name: Identifier::from_str("TestVariant")?,
436 tag: 0,
437 fields: vec![
438 (Identifier::from_str("field0")?, MoveValue::U64(10)),
439 (Identifier::from_str("principal")?, balance_field),
440 ],
441 };
442 let move_struct = MoveStruct {
443 type_: StructTag::from_str("0x2::test::Test")?,
444 fields: vec![
445 (Identifier::from_str("id")?, uid_field),
446 (
447 Identifier::from_str("enum_field")?,
448 MoveValue::Variant(move_enum),
449 ),
450 ],
451 };
452 let mut all_structs = BTreeMap::new();
453 parse_struct("$", move_struct, &mut all_structs);
454 assert_eq!(
455 all_structs.get("$").unwrap().object_id,
456 Some(ObjectID::from_hex_literal("0x300")?)
457 );
458 assert_eq!(
459 all_structs
460 .get("$.enum_field.principal")
461 .unwrap()
462 .struct_tag,
463 Some(StructTag::from_str("0x2::balance::Balance")?)
464 );
465 Ok(())
466 }
467}