sui_analytics_indexer/handlers/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::FileType;
5use crate::TRANSACTION_CONCURRENCY_LIMIT;
6use crate::package_store::PackageCache;
7use crate::tables::{InputObjectKind, ObjectStatus, OwnerType};
8use anyhow::{Result, anyhow};
9use async_trait::async_trait;
10use futures::stream::{self, StreamExt};
11use move_core_types::annotated_value::{MoveStruct, MoveTypeLayout, MoveValue};
12use move_core_types::language_storage::{StructTag, TypeTag};
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::Arc;
15use sui_package_resolver::{PackageStore, Resolver};
16use sui_types::base_types::ObjectID;
17use sui_types::effects::TransactionEffects;
18use sui_types::effects::TransactionEffectsAPI;
19use sui_types::full_checkpoint_content::CheckpointData;
20use sui_types::object::bounded_visitor::BoundedVisitor;
21use sui_types::object::{Object, Owner};
22use sui_types::transaction::TransactionData;
23use sui_types::transaction::TransactionDataAPI;
24
25pub mod checkpoint_handler;
26pub mod df_handler;
27pub mod event_handler;
28pub mod move_call_handler;
29pub mod object_handler;
30pub mod package_bcs_handler;
31pub mod package_handler;
32pub mod transaction_bcs_handler;
33pub mod transaction_handler;
34pub mod transaction_objects_handler;
35pub mod wrapped_object_handler;
36const WRAPPED_INDEXING_DISALLOW_LIST: [&str; 4] = [
37    "0x1::string::String",
38    "0x1::ascii::String",
39    "0x2::url::Url",
40    "0x2::object::ID",
41];
42
43#[async_trait::async_trait]
44pub trait AnalyticsHandler<S>: Send + Sync {
45    /// Process a checkpoint and return a boxed iterator over the rows.
46    /// This function is invoked by the analytics processor for each checkpoint.
47    async fn process_checkpoint(
48        &self,
49        checkpoint_data: &Arc<CheckpointData>,
50    ) -> Result<Box<dyn Iterator<Item = S> + Send + Sync>>
51    where
52        S: Send + Sync;
53    /// Type of data being written by this processor i.e. checkpoint, object, etc
54    fn file_type(&self) -> Result<FileType>;
55    fn name(&self) -> &'static str;
56}
57
58/// Trait for processing transactions in parallel across all transactions in a checkpoint.
59/// Implementations will extract and transform transaction data into structured rows for analytics.
60#[async_trait]
61pub trait TransactionProcessor<Row>: Send + Sync + 'static {
62    /// Process a single transaction at the given index and return a boxed iterator over the rows.
63    /// The implementation should handle extracting the transaction from the checkpoint.
64    async fn process_transaction(
65        &self,
66        tx_idx: usize,
67        checkpoint: &CheckpointData,
68    ) -> Result<Box<dyn Iterator<Item = Row> + Send + Sync>>;
69}
70
71/// Run transaction processing in parallel across all transactions in a checkpoint.
72pub async fn process_transactions<Row, P>(
73    checkpoint: Arc<CheckpointData>,
74    processor: Arc<P>,
75) -> Result<Box<dyn Iterator<Item = Row> + Send + Sync>>
76where
77    Row: Send + Sync + 'static,
78    P: TransactionProcessor<Row>,
79{
80    // Process transactions in parallel using buffered stream for ordered execution
81    let txn_len = checkpoint.transactions.len();
82    let mut entries_vec = Vec::with_capacity(txn_len);
83
84    let mut stream = stream::iter(0..txn_len)
85        .map(|idx| {
86            let checkpoint = checkpoint.clone();
87            let processor = processor.clone();
88            tokio::spawn(async move { processor.process_transaction(idx, &checkpoint).await })
89        })
90        .buffered(*TRANSACTION_CONCURRENCY_LIMIT);
91
92    while let Some(join_res) = stream.next().await {
93        match join_res {
94            Ok(Ok(tx_entries)) => {
95                // Store the iterator for later flattening
96                entries_vec.push(tx_entries);
97            }
98            Ok(Err(e)) => {
99                // Task executed but application logic returned an error
100                return Err(e);
101            }
102            Err(e) => {
103                // Task panicked or was cancelled
104                return Err(anyhow::anyhow!("Task join error: {}", e));
105            }
106        }
107    }
108
109    let flattened_iter = entries_vec.into_iter().flatten();
110    Ok(Box::new(flattened_iter))
111}
112
113fn initial_shared_version(object: &Object) -> Option<u64> {
114    match object.owner {
115        Owner::Shared {
116            initial_shared_version,
117        } => Some(initial_shared_version.value()),
118        _ => None,
119    }
120}
121
122fn get_owner_type(object: &Object) -> OwnerType {
123    match object.owner {
124        Owner::AddressOwner(_) => OwnerType::AddressOwner,
125        Owner::ObjectOwner(_) => OwnerType::ObjectOwner,
126        Owner::Shared { .. } => OwnerType::Shared,
127        Owner::Immutable => OwnerType::Immutable,
128        Owner::ConsensusAddressOwner { .. } => OwnerType::AddressOwner,
129    }
130}
131
132fn get_owner_address(object: &Object) -> Option<String> {
133    match object.owner {
134        Owner::AddressOwner(address) => Some(address.to_string()),
135        Owner::ObjectOwner(address) => Some(address.to_string()),
136        Owner::Shared { .. } => None,
137        Owner::Immutable => None,
138        Owner::ConsensusAddressOwner { owner, .. } => Some(owner.to_string()),
139    }
140}
141
142fn get_is_consensus(object: &Object) -> bool {
143    match object.owner {
144        Owner::AddressOwner(_) => false,
145        Owner::ObjectOwner(_) => false,
146        Owner::Shared { .. } => true,
147        Owner::Immutable => false,
148        Owner::ConsensusAddressOwner { .. } => true,
149    }
150}
151
152// Helper class to track input object kind.
153// Build sets of object ids for input, shared input and gas coin objects as defined
154// in the transaction data.
155// Input objects include coins and shared.
156struct InputObjectTracker {
157    shared: BTreeSet<ObjectID>,
158    coins: BTreeSet<ObjectID>,
159    input: BTreeSet<ObjectID>,
160}
161
162impl InputObjectTracker {
163    fn new(txn_data: &TransactionData) -> Self {
164        let shared: BTreeSet<ObjectID> = txn_data
165            .shared_input_objects()
166            .iter()
167            .map(|shared_io| shared_io.id())
168            .collect();
169        let coins: BTreeSet<ObjectID> = txn_data.gas().iter().map(|obj_ref| obj_ref.0).collect();
170        let input: BTreeSet<ObjectID> = txn_data
171            .input_objects()
172            .expect("Input objects must be valid")
173            .iter()
174            .map(|io_kind| io_kind.object_id())
175            .collect();
176        Self {
177            shared,
178            coins,
179            input,
180        }
181    }
182
183    fn get_input_object_kind(&self, object_id: &ObjectID) -> Option<InputObjectKind> {
184        if self.coins.contains(object_id) {
185            Some(InputObjectKind::GasCoin)
186        } else if self.shared.contains(object_id) {
187            Some(InputObjectKind::SharedInput)
188        } else if self.input.contains(object_id) {
189            Some(InputObjectKind::Input)
190        } else {
191            None
192        }
193    }
194}
195
196// Helper class to track object status.
197// Build sets of object ids for created, mutated and deleted objects as reported
198// in the transaction effects.
199struct ObjectStatusTracker {
200    created: BTreeSet<ObjectID>,
201    mutated: BTreeSet<ObjectID>,
202    deleted: BTreeSet<ObjectID>,
203}
204
205impl ObjectStatusTracker {
206    fn new(effects: &TransactionEffects) -> Self {
207        let created: BTreeSet<ObjectID> = effects
208            .created()
209            .iter()
210            .map(|(obj_ref, _)| obj_ref.0)
211            .collect();
212        let mutated: BTreeSet<ObjectID> = effects
213            .mutated()
214            .iter()
215            .chain(effects.unwrapped().iter())
216            .map(|(obj_ref, _)| obj_ref.0)
217            .collect();
218        let deleted: BTreeSet<ObjectID> = effects
219            .all_tombstones()
220            .into_iter()
221            .map(|(id, _)| id)
222            .collect();
223        Self {
224            created,
225            mutated,
226            deleted,
227        }
228    }
229
230    fn get_object_status(&self, object_id: &ObjectID) -> Option<ObjectStatus> {
231        if self.mutated.contains(object_id) {
232            Some(ObjectStatus::Mutated)
233        } else if self.deleted.contains(object_id) {
234            Some(ObjectStatus::Deleted)
235        } else if self.created.contains(object_id) {
236            Some(ObjectStatus::Created)
237        } else {
238            None
239        }
240    }
241}
242
243async fn get_move_struct<T: PackageStore>(
244    struct_tag: &StructTag,
245    contents: &[u8],
246    resolver: &Resolver<T>,
247) -> Result<MoveStruct> {
248    let move_struct = match resolver
249        .type_layout(TypeTag::Struct(Box::new(struct_tag.clone())))
250        .await?
251    {
252        MoveTypeLayout::Struct(move_struct_layout) => {
253            BoundedVisitor::deserialize_struct(contents, &move_struct_layout)
254        }
255        _ => Err(anyhow!("Object is not a move struct")),
256    }?;
257    Ok(move_struct)
258}
259
260#[derive(Debug, Default)]
261pub struct WrappedStruct {
262    object_id: Option<ObjectID>,
263    struct_tag: Option<StructTag>,
264}
265
266fn parse_struct(
267    path: &str,
268    move_struct: MoveStruct,
269    all_structs: &mut BTreeMap<String, WrappedStruct>,
270) {
271    let mut wrapped_struct = WrappedStruct {
272        struct_tag: Some(move_struct.type_),
273        ..Default::default()
274    };
275    for (k, v) in move_struct.fields {
276        parse_struct_field(
277            &format!("{}.{}", path, &k),
278            v,
279            &mut wrapped_struct,
280            all_structs,
281        );
282    }
283    all_structs.insert(path.to_string(), wrapped_struct);
284}
285
286fn parse_struct_field(
287    path: &str,
288    move_value: MoveValue,
289    curr_struct: &mut WrappedStruct,
290    all_structs: &mut BTreeMap<String, WrappedStruct>,
291) {
292    match move_value {
293        MoveValue::Struct(move_struct) => {
294            let values = move_struct
295                .fields
296                .iter()
297                .map(|(id, value)| (id.to_string(), value))
298                .collect::<BTreeMap<_, _>>();
299            let struct_name = format!(
300                "0x{}::{}::{}",
301                move_struct.type_.address.short_str_lossless(),
302                move_struct.type_.module,
303                move_struct.type_.name
304            );
305            if "0x2::object::UID" == struct_name {
306                if let Some(MoveValue::Struct(id_struct)) = values.get("id").cloned() {
307                    let id_values = id_struct
308                        .fields
309                        .iter()
310                        .map(|(id, value)| (id.to_string(), value))
311                        .collect::<BTreeMap<_, _>>();
312                    if let Some(MoveValue::Address(address) | MoveValue::Signer(address)) =
313                        id_values.get("bytes").cloned()
314                    {
315                        curr_struct.object_id = Some(ObjectID::from_address(*address))
316                    }
317                }
318            } else if "0x1::option::Option" == struct_name {
319                // Option in sui move is implemented as vector of size 1
320                if let Some(MoveValue::Vector(vec_values)) = values.get("vec").cloned()
321                    && let Some(first_value) = vec_values.first()
322                {
323                    parse_struct_field(
324                        &format!("{}[0]", path),
325                        first_value.clone(),
326                        curr_struct,
327                        all_structs,
328                    );
329                }
330            } else if !WRAPPED_INDEXING_DISALLOW_LIST.contains(&&*struct_name) {
331                // Do not index most common struct types i.e. string, url, etc
332                parse_struct(path, move_struct, all_structs)
333            }
334        }
335        MoveValue::Variant(v) => {
336            for (k, field) in v.fields.iter() {
337                parse_struct_field(
338                    &format!("{}.{}", path, k),
339                    field.clone(),
340                    curr_struct,
341                    all_structs,
342                );
343            }
344        }
345        MoveValue::Vector(fields) => {
346            for (index, field) in fields.iter().enumerate() {
347                parse_struct_field(
348                    &format!("{}[{}]", path, &index),
349                    field.clone(),
350                    curr_struct,
351                    all_structs,
352                );
353            }
354        }
355        _ => {}
356    }
357}
358
359pub async fn wait_for_cache(checkpoint_data: &CheckpointData, package_cache: &PackageCache) {
360    let sequence_number = *checkpoint_data.checkpoint_summary.sequence_number();
361    package_cache.coordinator.wait(sequence_number).await;
362}
363
364#[cfg(test)]
365mod tests {
366    use crate::handlers::parse_struct;
367    use move_core_types::account_address::AccountAddress;
368    use move_core_types::annotated_value::{MoveStruct, MoveValue, MoveVariant};
369    use move_core_types::identifier::Identifier;
370    use move_core_types::language_storage::StructTag;
371    use std::collections::BTreeMap;
372    use std::str::FromStr;
373    use sui_types::base_types::ObjectID;
374
375    #[tokio::test]
376    async fn test_wrapped_object_parsing() -> anyhow::Result<()> {
377        let uid_field = MoveValue::Struct(MoveStruct {
378            type_: StructTag::from_str("0x2::object::UID")?,
379            fields: vec![(
380                Identifier::from_str("id")?,
381                MoveValue::Struct(MoveStruct {
382                    type_: StructTag::from_str("0x2::object::ID")?,
383                    fields: vec![(
384                        Identifier::from_str("bytes")?,
385                        MoveValue::Signer(AccountAddress::from_hex_literal("0x300")?),
386                    )],
387                }),
388            )],
389        });
390        let balance_field = MoveValue::Struct(MoveStruct {
391            type_: StructTag::from_str("0x2::balance::Balance")?,
392            fields: vec![(Identifier::from_str("value")?, MoveValue::U32(10))],
393        });
394        let move_struct = MoveStruct {
395            type_: StructTag::from_str("0x2::test::Test")?,
396            fields: vec![
397                (Identifier::from_str("id")?, uid_field),
398                (Identifier::from_str("principal")?, balance_field),
399            ],
400        };
401        let mut all_structs = BTreeMap::new();
402        parse_struct("$", move_struct, &mut all_structs);
403        assert_eq!(
404            all_structs.get("$").unwrap().object_id,
405            Some(ObjectID::from_hex_literal("0x300")?)
406        );
407        assert_eq!(
408            all_structs.get("$.principal").unwrap().struct_tag,
409            Some(StructTag::from_str("0x2::balance::Balance")?)
410        );
411        Ok(())
412    }
413
414    #[tokio::test]
415    async fn test_wrapped_object_parsing_within_enum() -> anyhow::Result<()> {
416        let uid_field = MoveValue::Struct(MoveStruct {
417            type_: StructTag::from_str("0x2::object::UID")?,
418            fields: vec![(
419                Identifier::from_str("id")?,
420                MoveValue::Struct(MoveStruct {
421                    type_: StructTag::from_str("0x2::object::ID")?,
422                    fields: vec![(
423                        Identifier::from_str("bytes")?,
424                        MoveValue::Signer(AccountAddress::from_hex_literal("0x300")?),
425                    )],
426                }),
427            )],
428        });
429        let balance_field = MoveValue::Struct(MoveStruct {
430            type_: StructTag::from_str("0x2::balance::Balance")?,
431            fields: vec![(Identifier::from_str("value")?, MoveValue::U32(10))],
432        });
433        let move_enum = MoveVariant {
434            type_: StructTag::from_str("0x2::test::TestEnum")?,
435            variant_name: Identifier::from_str("TestVariant")?,
436            tag: 0,
437            fields: vec![
438                (Identifier::from_str("field0")?, MoveValue::U64(10)),
439                (Identifier::from_str("principal")?, balance_field),
440            ],
441        };
442        let move_struct = MoveStruct {
443            type_: StructTag::from_str("0x2::test::Test")?,
444            fields: vec![
445                (Identifier::from_str("id")?, uid_field),
446                (
447                    Identifier::from_str("enum_field")?,
448                    MoveValue::Variant(move_enum),
449                ),
450            ],
451        };
452        let mut all_structs = BTreeMap::new();
453        parse_struct("$", move_struct, &mut all_structs);
454        assert_eq!(
455            all_structs.get("$").unwrap().object_id,
456            Some(ObjectID::from_hex_literal("0x300")?)
457        );
458        assert_eq!(
459            all_structs
460                .get("$.enum_field.principal")
461                .unwrap()
462                .struct_tag,
463            Some(StructTag::from_str("0x2::balance::Balance")?)
464        );
465        Ok(())
466    }
467}