sui_analytics_indexer/handlers/tables/
wrapped_object.rs1use std::collections::BTreeMap;
5use std::sync::Arc;
6
7use anyhow::Result;
8use async_trait::async_trait;
9use sui_indexer_alt_framework::pipeline::Processor;
10use sui_types::base_types::EpochId;
11use sui_types::full_checkpoint_content::Checkpoint;
12
13use crate::Row;
14use crate::handlers::tables::get_move_struct;
15use crate::handlers::tables::parse_struct;
16use crate::package_store::PackageCache;
17use crate::pipeline::Pipeline;
18use crate::tables::WrappedObjectRow;
19
20pub struct WrappedObjectProcessor {
21 package_cache: Arc<PackageCache>,
22}
23
24impl WrappedObjectProcessor {
25 pub fn new(package_cache: Arc<PackageCache>) -> Self {
26 Self { package_cache }
27 }
28}
29
30impl Row for WrappedObjectRow {
31 fn get_epoch(&self) -> EpochId {
32 self.epoch
33 }
34
35 fn get_checkpoint(&self) -> u64 {
36 self.checkpoint
37 }
38}
39
40#[async_trait]
41impl Processor for WrappedObjectProcessor {
42 const NAME: &'static str = Pipeline::WrappedObject.name();
43 type Value = WrappedObjectRow;
44
45 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
46 let epoch = checkpoint.summary.data().epoch;
47 let checkpoint_seq = checkpoint.summary.data().sequence_number;
48 let timestamp_ms = checkpoint.summary.data().timestamp_ms;
49
50 let mut wrapped_objects = Vec::new();
51
52 for transaction in &checkpoint.transactions {
53 for object in transaction.output_objects(&checkpoint.object_set) {
54 let move_struct = if let Some((tag, contents)) = object
55 .struct_tag()
56 .and_then(|tag| object.data.try_as_move().map(|mo| (tag, mo.contents())))
57 {
58 match get_move_struct(
59 &tag,
60 contents,
61 &self.package_cache.resolver_for_epoch(epoch),
62 )
63 .await
64 {
65 Ok(move_struct) => Some(move_struct),
66 Err(err)
67 if err
68 .downcast_ref::<sui_types::object::bounded_visitor::Error>()
69 .filter(|e| {
70 matches!(
71 e,
72 sui_types::object::bounded_visitor::Error::OutOfBudget
73 )
74 })
75 .is_some() =>
76 {
77 tracing::warn!(
78 "Skipping struct with type {} because it was too large.",
79 tag
80 );
81 None
82 }
83 Err(err) => return Err(err),
84 }
85 } else {
86 None
87 };
88
89 let mut object_wrapped_structs = BTreeMap::new();
90 if let Some(move_struct) = move_struct {
91 parse_struct("$", move_struct, &mut object_wrapped_structs);
92 }
93
94 for (json_path, wrapped_struct) in object_wrapped_structs.iter() {
95 let row = WrappedObjectRow {
96 object_id: wrapped_struct.object_id.map(|id| id.to_string()),
97 root_object_id: object.id().to_string(),
98 root_object_version: object.version().value(),
99 checkpoint: checkpoint_seq,
100 epoch,
101 timestamp_ms,
102 json_path: json_path.to_string(),
103 struct_tag: wrapped_struct.struct_tag.clone().map(|tag| tag.to_string()),
104 };
105 wrapped_objects.push(row);
106 }
107 }
108 }
109
110 Ok(wrapped_objects)
111 }
112}