sui_indexer_alt/handlers/
ev_struct_inst.rs1use std::collections::BTreeSet;
5use std::ops::Range;
6use std::sync::Arc;
7
8use anyhow::Context;
9use anyhow::Result;
10use async_trait::async_trait;
11use diesel::ExpressionMethods;
12use diesel::QueryDsl;
13use diesel_async::RunQueryDsl;
14use sui_indexer_alt_framework::pipeline::Processor;
15use sui_indexer_alt_framework::postgres::Connection;
16use sui_indexer_alt_framework::postgres::handler::Handler;
17use sui_indexer_alt_framework::types::full_checkpoint_content::Checkpoint;
18use sui_indexer_alt_schema::events::StoredEvStructInst;
19use sui_indexer_alt_schema::schema::ev_struct_inst;
20
21use crate::handlers::cp_sequence_numbers::tx_interval;
22
23pub(crate) struct EvStructInst;
24
25#[async_trait]
26impl Processor for EvStructInst {
27 const NAME: &'static str = "ev_struct_inst";
28
29 type Value = StoredEvStructInst;
30
31 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
32 let Checkpoint {
33 transactions,
34 summary,
35 ..
36 } = checkpoint.as_ref();
37
38 let mut values = BTreeSet::new();
39 let first_tx = summary.network_total_transactions as usize - transactions.len();
40
41 for (i, tx) in transactions.iter().enumerate() {
42 let tx_sequence_number = (first_tx + i) as i64;
43 for (j, ev) in tx.events.iter().flat_map(|evs| evs.data.iter().enumerate()) {
44 values.insert(StoredEvStructInst {
45 package: ev.type_.address.to_vec(),
46 module: ev.type_.module.to_string(),
47 name: ev.type_.name.to_string(),
48 instantiation: bcs::to_bytes(&ev.type_.type_params)
49 .with_context(|| format!(
50 "Failed to serialize type parameters for event ({tx_sequence_number}, {j})"
51 ))?,
52 tx_sequence_number: (first_tx + i) as i64,
53 sender: ev.sender.to_vec(),
54 });
55 }
56 }
57
58 Ok(values.into_iter().collect())
59 }
60}
61
62#[async_trait]
63impl Handler for EvStructInst {
64 const MIN_EAGER_ROWS: usize = 100;
65 const MAX_PENDING_ROWS: usize = 10000;
66
67 async fn commit<'a>(values: &[Self::Value], conn: &mut Connection<'a>) -> Result<usize> {
68 Ok(diesel::insert_into(ev_struct_inst::table)
69 .values(values)
70 .on_conflict_do_nothing()
71 .execute(conn)
72 .await?)
73 }
74
75 async fn prune<'a>(
76 &self,
77 from: u64,
78 to_exclusive: u64,
79 conn: &mut Connection<'a>,
80 ) -> Result<usize> {
81 let Range {
82 start: from_tx,
83 end: to_tx,
84 } = tx_interval(conn, from..to_exclusive).await?;
85
86 let filter = ev_struct_inst::table
87 .filter(ev_struct_inst::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1));
88
89 Ok(diesel::delete(filter).execute(conn).await?)
90 }
91}
92
93#[cfg(test)]
94mod tests {
95 use diesel_async::RunQueryDsl;
96 use sui_indexer_alt_framework::Indexer;
97 use sui_indexer_alt_framework::types::event::Event;
98 use sui_indexer_alt_framework::types::test_checkpoint_data_builder::TestCheckpointBuilder;
99 use sui_indexer_alt_schema::MIGRATIONS;
100
101 use crate::handlers::cp_sequence_numbers::CpSequenceNumbers;
102
103 use super::*;
104
105 async fn get_all_ev_struct_inst(conn: &mut Connection<'_>) -> Result<Vec<StoredEvStructInst>> {
106 let query = ev_struct_inst::table
107 .order_by((
108 ev_struct_inst::tx_sequence_number,
109 ev_struct_inst::sender,
110 ev_struct_inst::package,
111 ev_struct_inst::module,
112 ev_struct_inst::name,
113 ev_struct_inst::instantiation,
114 ))
115 .load(conn)
116 .await?;
117 Ok(query)
118 }
119
120 #[tokio::test]
121 async fn test_ev_struct_inst_pruning_complains_if_no_mapping() {
122 let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
123 let mut conn = indexer.store().connect().await.unwrap();
124
125 let result = EvStructInst.prune(0, 2, &mut conn).await;
126
127 assert!(result.is_err());
128 assert_eq!(
129 result.unwrap_err().to_string(),
130 "No checkpoint mapping found for checkpoint 0"
131 );
132 }
133
134 #[tokio::test]
135 async fn test_ev_struct_inst_process_no_events() {
136 let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
137 let mut conn = indexer.store().connect().await.unwrap();
138
139 let checkpoint: Arc<Checkpoint> = Arc::new(
140 TestCheckpointBuilder::new(0)
141 .start_transaction(0)
142 .finish_transaction()
143 .build_checkpoint(),
144 );
145
146 let values = EvStructInst.process(&checkpoint).await.unwrap();
147 EvStructInst::commit(&values, &mut conn).await.unwrap();
148
149 assert_eq!(values.len(), 0);
150 }
151
152 #[tokio::test]
153 async fn test_ev_struct_inst_process_single_event() {
154 let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
155 let mut conn = indexer.store().connect().await.unwrap();
156
157 let checkpoint: Arc<Checkpoint> = Arc::new(
158 TestCheckpointBuilder::new(0)
159 .start_transaction(0)
160 .with_events(vec![Event::random_for_testing()])
161 .finish_transaction()
162 .build_checkpoint(),
163 );
164
165 let values = EvStructInst.process(&checkpoint).await.unwrap();
167 EvStructInst::commit(&values, &mut conn).await.unwrap();
168
169 let events = get_all_ev_struct_inst(&mut conn).await.unwrap();
170 assert_eq!(events.len(), 1);
171 }
172
173 #[tokio::test]
174 async fn test_ev_struct_inst_prune_events() {
175 let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
176 let mut conn = indexer.store().connect().await.unwrap();
177
178 let mut builder = TestCheckpointBuilder::new(0);
180 builder = builder.start_transaction(0).finish_transaction();
181 let checkpoint = Arc::new(builder.build_checkpoint());
182 let values = EvStructInst.process(&checkpoint).await.unwrap();
183
184 EvStructInst::commit(&values, &mut conn).await.unwrap();
185 let values = CpSequenceNumbers.process(&checkpoint).await.unwrap();
186 CpSequenceNumbers::commit(&values, &mut conn).await.unwrap();
187
188 builder = builder
190 .start_transaction(0)
191 .with_events(vec![Event::random_for_testing()])
192 .finish_transaction();
193 let checkpoint = Arc::new(builder.build_checkpoint());
194 let values = EvStructInst.process(&checkpoint).await.unwrap();
195
196 EvStructInst::commit(&values, &mut conn).await.unwrap();
197 let values = CpSequenceNumbers.process(&checkpoint).await.unwrap();
198 CpSequenceNumbers::commit(&values, &mut conn).await.unwrap();
199
200 builder = builder
202 .start_transaction(0)
203 .with_events(vec![
204 Event::random_for_testing(),
205 Event::random_for_testing(),
206 ])
207 .finish_transaction();
208 let checkpoint = Arc::new(builder.build_checkpoint());
209 let values = EvStructInst.process(&checkpoint).await.unwrap();
210
211 EvStructInst::commit(&values, &mut conn).await.unwrap();
212 let values = CpSequenceNumbers.process(&checkpoint).await.unwrap();
213 CpSequenceNumbers::commit(&values, &mut conn).await.unwrap();
214
215 let rows_pruned = EvStructInst.prune(0, 2, &mut conn).await.unwrap();
217 assert_eq!(rows_pruned, 1);
218
219 let remaining_events = get_all_ev_struct_inst(&mut conn).await.unwrap();
220 assert_eq!(remaining_events.len(), 2);
221 }
222}