sui_analytics_indexer/handlers/tables/
event.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use anyhow::Result;
7use async_trait::async_trait;
8use move_core_types::annotated_value::MoveValue;
9use sui_indexer_alt_framework::pipeline::Processor;
10use sui_json_rpc_types::type_and_fields_from_move_event_data;
11use sui_types::base_types::EpochId;
12use sui_types::effects::TransactionEffectsAPI;
13use sui_types::event::Event;
14use sui_types::full_checkpoint_content::Checkpoint;
15
16use crate::Row;
17use crate::package_store::PackageCache;
18use crate::pipeline::Pipeline;
19use crate::tables::EventRow;
20
21pub struct EventProcessor {
22    package_cache: Arc<PackageCache>,
23}
24
25impl EventProcessor {
26    pub fn new(package_cache: Arc<PackageCache>) -> Self {
27        Self { package_cache }
28    }
29}
30
31impl Row for EventRow {
32    fn get_epoch(&self) -> EpochId {
33        self.epoch
34    }
35
36    fn get_checkpoint(&self) -> u64 {
37        self.checkpoint
38    }
39}
40
41#[async_trait]
42impl Processor for EventProcessor {
43    const NAME: &'static str = Pipeline::Event.name();
44    type Value = EventRow;
45
46    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
47        let epoch = checkpoint.summary.data().epoch;
48        let checkpoint_seq = checkpoint.summary.data().sequence_number;
49        let timestamp_ms = checkpoint.summary.data().timestamp_ms;
50
51        let mut entries = Vec::new();
52
53        for executed_tx in &checkpoint.transactions {
54            let digest = executed_tx.effects.transaction_digest();
55
56            if let Some(events) = &executed_tx.events {
57                for (idx, event) in events.data.iter().enumerate() {
58                    let Event {
59                        package_id,
60                        transaction_module,
61                        sender,
62                        type_,
63                        contents,
64                    } = event;
65
66                    let layout = self
67                        .package_cache
68                        .resolver_for_epoch(epoch)
69                        .type_layout(move_core_types::language_storage::TypeTag::Struct(
70                            Box::new(type_.clone()),
71                        ))
72                        .await?;
73
74                    let move_value = MoveValue::simple_deserialize(contents, &layout)?;
75                    let (_, event_json) = type_and_fields_from_move_event_data(move_value)?;
76
77                    let row = EventRow {
78                        transaction_digest: digest.base58_encode(),
79                        event_index: idx as u64,
80                        checkpoint: checkpoint_seq,
81                        epoch,
82                        timestamp_ms,
83                        sender: sender.to_string(),
84                        package: package_id.to_string(),
85                        module: transaction_module.to_string(),
86                        event_type: type_.to_string(),
87                        bcs: "".to_string(),
88                        bcs_length: contents.len() as u64,
89                        event_json: event_json.to_string(),
90                    };
91
92                    entries.push(row);
93                }
94            }
95        }
96
97        Ok(entries)
98    }
99}