sui_analytics_indexer/handlers/tables/
event.rs1use std::sync::Arc;
5
6use anyhow::Result;
7use async_trait::async_trait;
8use move_core_types::annotated_value::MoveValue;
9use sui_indexer_alt_framework::pipeline::Processor;
10use sui_json_rpc_types::type_and_fields_from_move_event_data;
11use sui_types::base_types::EpochId;
12use sui_types::effects::TransactionEffectsAPI;
13use sui_types::event::Event;
14use sui_types::full_checkpoint_content::Checkpoint;
15
16use crate::Row;
17use crate::package_store::PackageCache;
18use crate::pipeline::Pipeline;
19use crate::tables::EventRow;
20
21pub struct EventProcessor {
22 package_cache: Arc<PackageCache>,
23}
24
25impl EventProcessor {
26 pub fn new(package_cache: Arc<PackageCache>) -> Self {
27 Self { package_cache }
28 }
29}
30
31impl Row for EventRow {
32 fn get_epoch(&self) -> EpochId {
33 self.epoch
34 }
35
36 fn get_checkpoint(&self) -> u64 {
37 self.checkpoint
38 }
39}
40
41#[async_trait]
42impl Processor for EventProcessor {
43 const NAME: &'static str = Pipeline::Event.name();
44 type Value = EventRow;
45
46 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
47 let epoch = checkpoint.summary.data().epoch;
48 let checkpoint_seq = checkpoint.summary.data().sequence_number;
49 let timestamp_ms = checkpoint.summary.data().timestamp_ms;
50
51 let mut entries = Vec::new();
52
53 for executed_tx in &checkpoint.transactions {
54 let digest = executed_tx.effects.transaction_digest();
55
56 if let Some(events) = &executed_tx.events {
57 for (idx, event) in events.data.iter().enumerate() {
58 let Event {
59 package_id,
60 transaction_module,
61 sender,
62 type_,
63 contents,
64 } = event;
65
66 let layout = self
67 .package_cache
68 .resolver_for_epoch(epoch)
69 .type_layout(move_core_types::language_storage::TypeTag::Struct(
70 Box::new(type_.clone()),
71 ))
72 .await?;
73
74 let move_value = MoveValue::simple_deserialize(contents, &layout)?;
75 let (_, event_json) = type_and_fields_from_move_event_data(move_value)?;
76
77 let row = EventRow {
78 transaction_digest: digest.base58_encode(),
79 event_index: idx as u64,
80 checkpoint: checkpoint_seq,
81 epoch,
82 timestamp_ms,
83 sender: sender.to_string(),
84 package: package_id.to_string(),
85 module: transaction_module.to_string(),
86 event_type: type_.to_string(),
87 bcs: "".to_string(),
88 bcs_length: contents.len() as u64,
89 event_json: event_json.to_string(),
90 };
91
92 entries.push(row);
93 }
94 }
95 }
96
97 Ok(entries)
98 }
99}