sui_indexer_alt/handlers/
ev_struct_inst.rs1use std::{collections::BTreeSet, ops::Range, sync::Arc};
5
6use anyhow::{Context, Result};
7use diesel::{ExpressionMethods, QueryDsl};
8use diesel_async::RunQueryDsl;
9use sui_indexer_alt_framework::{
10 pipeline::{concurrent::Handler, Processor},
11 postgres::{Connection, Db},
12 types::full_checkpoint_content::CheckpointData,
13};
14use sui_indexer_alt_schema::{events::StoredEvStructInst, schema::ev_struct_inst};
15
16use crate::handlers::cp_sequence_numbers::tx_interval;
17use async_trait::async_trait;
18
19pub(crate) struct EvStructInst;
20
21#[async_trait]
22impl Processor for EvStructInst {
23 const NAME: &'static str = "ev_struct_inst";
24
25 type Value = StoredEvStructInst;
26
27 async fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
28 let CheckpointData {
29 transactions,
30 checkpoint_summary,
31 ..
32 } = checkpoint.as_ref();
33
34 let mut values = BTreeSet::new();
35 let first_tx = checkpoint_summary.network_total_transactions as usize - transactions.len();
36
37 for (i, tx) in transactions.iter().enumerate() {
38 let tx_sequence_number = (first_tx + i) as i64;
39 for (j, ev) in tx.events.iter().flat_map(|evs| evs.data.iter().enumerate()) {
40 values.insert(StoredEvStructInst {
41 package: ev.type_.address.to_vec(),
42 module: ev.type_.module.to_string(),
43 name: ev.type_.name.to_string(),
44 instantiation: bcs::to_bytes(&ev.type_.type_params)
45 .with_context(|| format!(
46 "Failed to serialize type parameters for event ({tx_sequence_number}, {j})"
47 ))?,
48 tx_sequence_number: (first_tx + i) as i64,
49 sender: ev.sender.to_vec(),
50 });
51 }
52 }
53
54 Ok(values.into_iter().collect())
55 }
56}
57
58#[async_trait]
59impl Handler for EvStructInst {
60 type Store = Db;
61
62 const MIN_EAGER_ROWS: usize = 100;
63 const MAX_PENDING_ROWS: usize = 10000;
64
65 async fn commit<'a>(values: &[Self::Value], conn: &mut Connection<'a>) -> Result<usize> {
66 Ok(diesel::insert_into(ev_struct_inst::table)
67 .values(values)
68 .on_conflict_do_nothing()
69 .execute(conn)
70 .await?)
71 }
72
73 async fn prune<'a>(
74 &self,
75 from: u64,
76 to_exclusive: u64,
77 conn: &mut Connection<'a>,
78 ) -> Result<usize> {
79 let Range {
80 start: from_tx,
81 end: to_tx,
82 } = tx_interval(conn, from..to_exclusive).await?;
83
84 let filter = ev_struct_inst::table
85 .filter(ev_struct_inst::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1));
86
87 Ok(diesel::delete(filter).execute(conn).await?)
88 }
89}
90
91#[cfg(test)]
92mod tests {
93 use super::*;
94 use diesel_async::RunQueryDsl;
95 use sui_indexer_alt_framework::{
96 types::{event::Event, test_checkpoint_data_builder::TestCheckpointDataBuilder},
97 Indexer,
98 };
99 use sui_indexer_alt_schema::MIGRATIONS;
100
101 use crate::handlers::cp_sequence_numbers::CpSequenceNumbers;
102
103 async fn get_all_ev_struct_inst(conn: &mut Connection<'_>) -> Result<Vec<StoredEvStructInst>> {
104 let query = ev_struct_inst::table
105 .order_by((
106 ev_struct_inst::tx_sequence_number,
107 ev_struct_inst::sender,
108 ev_struct_inst::package,
109 ev_struct_inst::module,
110 ev_struct_inst::name,
111 ev_struct_inst::instantiation,
112 ))
113 .load(conn)
114 .await?;
115 Ok(query)
116 }
117
118 #[tokio::test]
119 async fn test_ev_struct_inst_pruning_complains_if_no_mapping() {
120 let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
121 let mut conn = indexer.store().connect().await.unwrap();
122
123 let result = EvStructInst.prune(0, 2, &mut conn).await;
124
125 assert!(result.is_err());
126 assert_eq!(
127 result.unwrap_err().to_string(),
128 "No checkpoint mapping found for checkpoint 0"
129 );
130 }
131
132 #[tokio::test]
133 async fn test_ev_struct_inst_process_no_events() {
134 let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
135 let mut conn = indexer.store().connect().await.unwrap();
136
137 let checkpoint = Arc::new(
138 TestCheckpointDataBuilder::new(0)
139 .start_transaction(0)
140 .finish_transaction()
141 .build_checkpoint(),
142 );
143
144 let values = EvStructInst.process(&checkpoint).await.unwrap();
145 EvStructInst::commit(&values, &mut conn).await.unwrap();
146
147 assert_eq!(values.len(), 0);
148 }
149
150 #[tokio::test]
151 async fn test_ev_struct_inst_process_single_event() {
152 let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
153 let mut conn = indexer.store().connect().await.unwrap();
154
155 let checkpoint = Arc::new(
156 TestCheckpointDataBuilder::new(0)
157 .start_transaction(0)
158 .with_events(vec![Event::random_for_testing()])
159 .finish_transaction()
160 .build_checkpoint(),
161 );
162
163 let values = EvStructInst.process(&checkpoint).await.unwrap();
165 EvStructInst::commit(&values, &mut conn).await.unwrap();
166
167 let events = get_all_ev_struct_inst(&mut conn).await.unwrap();
168 assert_eq!(events.len(), 1);
169 }
170
171 #[tokio::test]
172 async fn test_ev_struct_inst_prune_events() {
173 let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
174 let mut conn = indexer.store().connect().await.unwrap();
175
176 let mut builder = TestCheckpointDataBuilder::new(0);
178 builder = builder.start_transaction(0).finish_transaction();
179 let checkpoint = Arc::new(builder.build_checkpoint());
180 let values = EvStructInst.process(&checkpoint).await.unwrap();
181 EvStructInst::commit(&values, &mut conn).await.unwrap();
182 let values = CpSequenceNumbers.process(&checkpoint).await.unwrap();
183 CpSequenceNumbers::commit(&values, &mut conn).await.unwrap();
184
185 builder = builder
187 .start_transaction(0)
188 .with_events(vec![Event::random_for_testing()])
189 .finish_transaction();
190 let checkpoint = Arc::new(builder.build_checkpoint());
191 let values = EvStructInst.process(&checkpoint).await.unwrap();
192 EvStructInst::commit(&values, &mut conn).await.unwrap();
193 let values = CpSequenceNumbers.process(&checkpoint).await.unwrap();
194 CpSequenceNumbers::commit(&values, &mut conn).await.unwrap();
195
196 builder = builder
198 .start_transaction(0)
199 .with_events(vec![
200 Event::random_for_testing(),
201 Event::random_for_testing(),
202 ])
203 .finish_transaction();
204 let checkpoint = Arc::new(builder.build_checkpoint());
205 let values = EvStructInst.process(&checkpoint).await.unwrap();
206 EvStructInst::commit(&values, &mut conn).await.unwrap();
207 let values = CpSequenceNumbers.process(&checkpoint).await.unwrap();
208 CpSequenceNumbers::commit(&values, &mut conn).await.unwrap();
209
210 let rows_pruned = EvStructInst.prune(0, 2, &mut conn).await.unwrap();
212 assert_eq!(rows_pruned, 1);
213
214 let remaining_events = get_all_ev_struct_inst(&mut conn).await.unwrap();
215 assert_eq!(remaining_events.len(), 2);
216 }
217}