sui_indexer_alt/handlers/
ev_struct_inst.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::BTreeSet, ops::Range, sync::Arc};
5
6use anyhow::{Context, Result};
7use diesel::{ExpressionMethods, QueryDsl};
8use diesel_async::RunQueryDsl;
9use sui_indexer_alt_framework::{
10    pipeline::{concurrent::Handler, Processor},
11    postgres::{Connection, Db},
12    types::full_checkpoint_content::CheckpointData,
13};
14use sui_indexer_alt_schema::{events::StoredEvStructInst, schema::ev_struct_inst};
15
16use crate::handlers::cp_sequence_numbers::tx_interval;
17use async_trait::async_trait;
18
19pub(crate) struct EvStructInst;
20
21#[async_trait]
22impl Processor for EvStructInst {
23    const NAME: &'static str = "ev_struct_inst";
24
25    type Value = StoredEvStructInst;
26
27    async fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
28        let CheckpointData {
29            transactions,
30            checkpoint_summary,
31            ..
32        } = checkpoint.as_ref();
33
34        let mut values = BTreeSet::new();
35        let first_tx = checkpoint_summary.network_total_transactions as usize - transactions.len();
36
37        for (i, tx) in transactions.iter().enumerate() {
38            let tx_sequence_number = (first_tx + i) as i64;
39            for (j, ev) in tx.events.iter().flat_map(|evs| evs.data.iter().enumerate()) {
40                values.insert(StoredEvStructInst {
41                    package: ev.type_.address.to_vec(),
42                    module: ev.type_.module.to_string(),
43                    name: ev.type_.name.to_string(),
44                    instantiation: bcs::to_bytes(&ev.type_.type_params)
45                        .with_context(|| format!(
46                            "Failed to serialize type parameters for event ({tx_sequence_number}, {j})"
47                        ))?,
48                    tx_sequence_number: (first_tx + i) as i64,
49                    sender: ev.sender.to_vec(),
50                });
51            }
52        }
53
54        Ok(values.into_iter().collect())
55    }
56}
57
58#[async_trait]
59impl Handler for EvStructInst {
60    type Store = Db;
61
62    const MIN_EAGER_ROWS: usize = 100;
63    const MAX_PENDING_ROWS: usize = 10000;
64
65    async fn commit<'a>(values: &[Self::Value], conn: &mut Connection<'a>) -> Result<usize> {
66        Ok(diesel::insert_into(ev_struct_inst::table)
67            .values(values)
68            .on_conflict_do_nothing()
69            .execute(conn)
70            .await?)
71    }
72
73    async fn prune<'a>(
74        &self,
75        from: u64,
76        to_exclusive: u64,
77        conn: &mut Connection<'a>,
78    ) -> Result<usize> {
79        let Range {
80            start: from_tx,
81            end: to_tx,
82        } = tx_interval(conn, from..to_exclusive).await?;
83
84        let filter = ev_struct_inst::table
85            .filter(ev_struct_inst::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1));
86
87        Ok(diesel::delete(filter).execute(conn).await?)
88    }
89}
90
91#[cfg(test)]
92mod tests {
93    use super::*;
94    use diesel_async::RunQueryDsl;
95    use sui_indexer_alt_framework::{
96        types::{event::Event, test_checkpoint_data_builder::TestCheckpointDataBuilder},
97        Indexer,
98    };
99    use sui_indexer_alt_schema::MIGRATIONS;
100
101    use crate::handlers::cp_sequence_numbers::CpSequenceNumbers;
102
103    async fn get_all_ev_struct_inst(conn: &mut Connection<'_>) -> Result<Vec<StoredEvStructInst>> {
104        let query = ev_struct_inst::table
105            .order_by((
106                ev_struct_inst::tx_sequence_number,
107                ev_struct_inst::sender,
108                ev_struct_inst::package,
109                ev_struct_inst::module,
110                ev_struct_inst::name,
111                ev_struct_inst::instantiation,
112            ))
113            .load(conn)
114            .await?;
115        Ok(query)
116    }
117
118    #[tokio::test]
119    async fn test_ev_struct_inst_pruning_complains_if_no_mapping() {
120        let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
121        let mut conn = indexer.store().connect().await.unwrap();
122
123        let result = EvStructInst.prune(0, 2, &mut conn).await;
124
125        assert!(result.is_err());
126        assert_eq!(
127            result.unwrap_err().to_string(),
128            "No checkpoint mapping found for checkpoint 0"
129        );
130    }
131
132    #[tokio::test]
133    async fn test_ev_struct_inst_process_no_events() {
134        let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
135        let mut conn = indexer.store().connect().await.unwrap();
136
137        let checkpoint = Arc::new(
138            TestCheckpointDataBuilder::new(0)
139                .start_transaction(0)
140                .finish_transaction()
141                .build_checkpoint(),
142        );
143
144        let values = EvStructInst.process(&checkpoint).await.unwrap();
145        EvStructInst::commit(&values, &mut conn).await.unwrap();
146
147        assert_eq!(values.len(), 0);
148    }
149
150    #[tokio::test]
151    async fn test_ev_struct_inst_process_single_event() {
152        let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
153        let mut conn = indexer.store().connect().await.unwrap();
154
155        let checkpoint = Arc::new(
156            TestCheckpointDataBuilder::new(0)
157                .start_transaction(0)
158                .with_events(vec![Event::random_for_testing()])
159                .finish_transaction()
160                .build_checkpoint(),
161        );
162
163        // Process checkpoint with one event
164        let values = EvStructInst.process(&checkpoint).await.unwrap();
165        EvStructInst::commit(&values, &mut conn).await.unwrap();
166
167        let events = get_all_ev_struct_inst(&mut conn).await.unwrap();
168        assert_eq!(events.len(), 1);
169    }
170
171    #[tokio::test]
172    async fn test_ev_struct_inst_prune_events() {
173        let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
174        let mut conn = indexer.store().connect().await.unwrap();
175
176        // 0th checkpoint has no events
177        let mut builder = TestCheckpointDataBuilder::new(0);
178        builder = builder.start_transaction(0).finish_transaction();
179        let checkpoint = Arc::new(builder.build_checkpoint());
180        let values = EvStructInst.process(&checkpoint).await.unwrap();
181        EvStructInst::commit(&values, &mut conn).await.unwrap();
182        let values = CpSequenceNumbers.process(&checkpoint).await.unwrap();
183        CpSequenceNumbers::commit(&values, &mut conn).await.unwrap();
184
185        // 1st checkpoint has 1 event
186        builder = builder
187            .start_transaction(0)
188            .with_events(vec![Event::random_for_testing()])
189            .finish_transaction();
190        let checkpoint = Arc::new(builder.build_checkpoint());
191        let values = EvStructInst.process(&checkpoint).await.unwrap();
192        EvStructInst::commit(&values, &mut conn).await.unwrap();
193        let values = CpSequenceNumbers.process(&checkpoint).await.unwrap();
194        CpSequenceNumbers::commit(&values, &mut conn).await.unwrap();
195
196        // 2nd checkpoint has 2 events
197        builder = builder
198            .start_transaction(0)
199            .with_events(vec![
200                Event::random_for_testing(),
201                Event::random_for_testing(),
202            ])
203            .finish_transaction();
204        let checkpoint = Arc::new(builder.build_checkpoint());
205        let values = EvStructInst.process(&checkpoint).await.unwrap();
206        EvStructInst::commit(&values, &mut conn).await.unwrap();
207        let values = CpSequenceNumbers.process(&checkpoint).await.unwrap();
208        CpSequenceNumbers::commit(&values, &mut conn).await.unwrap();
209
210        // Prune checkpoints from `[0, 2)`, expect 2 events remaining
211        let rows_pruned = EvStructInst.prune(0, 2, &mut conn).await.unwrap();
212        assert_eq!(rows_pruned, 1);
213
214        let remaining_events = get_all_ev_struct_inst(&mut conn).await.unwrap();
215        assert_eq!(remaining_events.len(), 2);
216    }
217}