sui_indexer_alt/handlers/
ev_struct_inst.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::BTreeSet;
5use std::ops::Range;
6use std::sync::Arc;
7
8use anyhow::Context;
9use anyhow::Result;
10use async_trait::async_trait;
11use diesel::ExpressionMethods;
12use diesel::QueryDsl;
13use diesel_async::RunQueryDsl;
14use sui_indexer_alt_framework::pipeline::Processor;
15use sui_indexer_alt_framework::postgres::Connection;
16use sui_indexer_alt_framework::postgres::handler::Handler;
17use sui_indexer_alt_framework::types::full_checkpoint_content::Checkpoint;
18use sui_indexer_alt_schema::events::StoredEvStructInst;
19use sui_indexer_alt_schema::schema::ev_struct_inst;
20
21use crate::handlers::cp_sequence_numbers::tx_interval;
22
23pub(crate) struct EvStructInst;
24
25#[async_trait]
26impl Processor for EvStructInst {
27    const NAME: &'static str = "ev_struct_inst";
28
29    type Value = StoredEvStructInst;
30
31    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
32        let Checkpoint {
33            transactions,
34            summary,
35            ..
36        } = checkpoint.as_ref();
37
38        let mut values = BTreeSet::new();
39        let first_tx = summary.network_total_transactions as usize - transactions.len();
40
41        for (i, tx) in transactions.iter().enumerate() {
42            let tx_sequence_number = (first_tx + i) as i64;
43            for (j, ev) in tx.events.iter().flat_map(|evs| evs.data.iter().enumerate()) {
44                values.insert(StoredEvStructInst {
45                    package: ev.type_.address.to_vec(),
46                    module: ev.type_.module.to_string(),
47                    name: ev.type_.name.to_string(),
48                    instantiation: bcs::to_bytes(&ev.type_.type_params)
49                        .with_context(|| format!(
50                            "Failed to serialize type parameters for event ({tx_sequence_number}, {j})"
51                        ))?,
52                    tx_sequence_number: (first_tx + i) as i64,
53                    sender: ev.sender.to_vec(),
54                });
55            }
56        }
57
58        Ok(values.into_iter().collect())
59    }
60}
61
62#[async_trait]
63impl Handler for EvStructInst {
64    const MIN_EAGER_ROWS: usize = 100;
65    const MAX_PENDING_ROWS: usize = 10000;
66
67    async fn commit<'a>(values: &[Self::Value], conn: &mut Connection<'a>) -> Result<usize> {
68        Ok(diesel::insert_into(ev_struct_inst::table)
69            .values(values)
70            .on_conflict_do_nothing()
71            .execute(conn)
72            .await?)
73    }
74
75    async fn prune<'a>(
76        &self,
77        from: u64,
78        to_exclusive: u64,
79        conn: &mut Connection<'a>,
80    ) -> Result<usize> {
81        let Range {
82            start: from_tx,
83            end: to_tx,
84        } = tx_interval(conn, from..to_exclusive).await?;
85
86        let filter = ev_struct_inst::table
87            .filter(ev_struct_inst::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1));
88
89        Ok(diesel::delete(filter).execute(conn).await?)
90    }
91}
92
93#[cfg(test)]
94mod tests {
95    use diesel_async::RunQueryDsl;
96    use sui_indexer_alt_framework::Indexer;
97    use sui_indexer_alt_framework::types::event::Event;
98    use sui_indexer_alt_framework::types::test_checkpoint_data_builder::TestCheckpointBuilder;
99    use sui_indexer_alt_schema::MIGRATIONS;
100
101    use crate::handlers::cp_sequence_numbers::CpSequenceNumbers;
102
103    use super::*;
104
105    async fn get_all_ev_struct_inst(conn: &mut Connection<'_>) -> Result<Vec<StoredEvStructInst>> {
106        let query = ev_struct_inst::table
107            .order_by((
108                ev_struct_inst::tx_sequence_number,
109                ev_struct_inst::sender,
110                ev_struct_inst::package,
111                ev_struct_inst::module,
112                ev_struct_inst::name,
113                ev_struct_inst::instantiation,
114            ))
115            .load(conn)
116            .await?;
117        Ok(query)
118    }
119
120    #[tokio::test]
121    async fn test_ev_struct_inst_pruning_complains_if_no_mapping() {
122        let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
123        let mut conn = indexer.store().connect().await.unwrap();
124
125        let result = EvStructInst.prune(0, 2, &mut conn).await;
126
127        assert!(result.is_err());
128        assert_eq!(
129            result.unwrap_err().to_string(),
130            "No checkpoint mapping found for checkpoint 0"
131        );
132    }
133
134    #[tokio::test]
135    async fn test_ev_struct_inst_process_no_events() {
136        let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
137        let mut conn = indexer.store().connect().await.unwrap();
138
139        let checkpoint: Arc<Checkpoint> = Arc::new(
140            TestCheckpointBuilder::new(0)
141                .start_transaction(0)
142                .finish_transaction()
143                .build_checkpoint(),
144        );
145
146        let values = EvStructInst.process(&checkpoint).await.unwrap();
147        EvStructInst::commit(&values, &mut conn).await.unwrap();
148
149        assert_eq!(values.len(), 0);
150    }
151
152    #[tokio::test]
153    async fn test_ev_struct_inst_process_single_event() {
154        let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
155        let mut conn = indexer.store().connect().await.unwrap();
156
157        let checkpoint: Arc<Checkpoint> = Arc::new(
158            TestCheckpointBuilder::new(0)
159                .start_transaction(0)
160                .with_events(vec![Event::random_for_testing()])
161                .finish_transaction()
162                .build_checkpoint(),
163        );
164
165        // Process checkpoint with one event
166        let values = EvStructInst.process(&checkpoint).await.unwrap();
167        EvStructInst::commit(&values, &mut conn).await.unwrap();
168
169        let events = get_all_ev_struct_inst(&mut conn).await.unwrap();
170        assert_eq!(events.len(), 1);
171    }
172
173    #[tokio::test]
174    async fn test_ev_struct_inst_prune_events() {
175        let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
176        let mut conn = indexer.store().connect().await.unwrap();
177
178        // 0th checkpoint has no events
179        let mut builder = TestCheckpointBuilder::new(0);
180        builder = builder.start_transaction(0).finish_transaction();
181        let checkpoint = Arc::new(builder.build_checkpoint());
182        let values = EvStructInst.process(&checkpoint).await.unwrap();
183
184        EvStructInst::commit(&values, &mut conn).await.unwrap();
185        let values = CpSequenceNumbers.process(&checkpoint).await.unwrap();
186        CpSequenceNumbers::commit(&values, &mut conn).await.unwrap();
187
188        // 1st checkpoint has 1 event
189        builder = builder
190            .start_transaction(0)
191            .with_events(vec![Event::random_for_testing()])
192            .finish_transaction();
193        let checkpoint = Arc::new(builder.build_checkpoint());
194        let values = EvStructInst.process(&checkpoint).await.unwrap();
195
196        EvStructInst::commit(&values, &mut conn).await.unwrap();
197        let values = CpSequenceNumbers.process(&checkpoint).await.unwrap();
198        CpSequenceNumbers::commit(&values, &mut conn).await.unwrap();
199
200        // 2nd checkpoint has 2 events
201        builder = builder
202            .start_transaction(0)
203            .with_events(vec![
204                Event::random_for_testing(),
205                Event::random_for_testing(),
206            ])
207            .finish_transaction();
208        let checkpoint = Arc::new(builder.build_checkpoint());
209        let values = EvStructInst.process(&checkpoint).await.unwrap();
210
211        EvStructInst::commit(&values, &mut conn).await.unwrap();
212        let values = CpSequenceNumbers.process(&checkpoint).await.unwrap();
213        CpSequenceNumbers::commit(&values, &mut conn).await.unwrap();
214
215        // Prune checkpoints from `[0, 2)`, expect 2 events remaining
216        let rows_pruned = EvStructInst.prune(0, 2, &mut conn).await.unwrap();
217        assert_eq!(rows_pruned, 1);
218
219        let remaining_events = get_all_ev_struct_inst(&mut conn).await.unwrap();
220        assert_eq!(remaining_events.len(), 2);
221    }
222}