sui_rpc_store/indexer/
events.rs1use std::sync::Arc;
10
11use async_trait::async_trait;
12use sui_indexer_alt_framework::pipeline::Processor;
13use sui_indexer_alt_framework::pipeline::sequential;
14use sui_types::full_checkpoint_content::Checkpoint;
15
16use crate::indexer::Schema;
17use crate::indexer::Store;
18use crate::indexer::tx_seq_at;
19use crate::schema::events;
20use crate::schema::primitives::U64Be;
21
22pub struct Events;
24
25pub struct Row {
26 pub tx_seq: u64,
27 pub value: events::Value,
28}
29
30#[async_trait]
31impl Processor for Events {
32 const NAME: &'static str = "events";
33 type Value = Row;
34
35 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Row>> {
36 let mut rows = Vec::with_capacity(checkpoint.transactions.len());
37 for (i, tx) in checkpoint.transactions.iter().enumerate() {
38 if let Some(events) = &tx.events {
39 rows.push(Row {
40 tx_seq: tx_seq_at(checkpoint, i),
41 value: events::store(events),
42 });
43 }
44 }
45 Ok(rows)
46 }
47}
48
49#[async_trait]
50impl sequential::Handler for Events {
51 type Store = Store;
52 type Batch = Vec<Row>;
53
54 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Row>) {
55 batch.extend(values);
56 }
57
58 async fn commit<'a>(
59 &self,
60 batch: &Self::Batch,
61 conn: &mut sui_consistent_store::Connection<'a, Schema>,
62 ) -> anyhow::Result<usize> {
63 let cf = &conn.store.schema().events;
64 for row in batch {
65 conn.batch.put(cf, &U64Be(row.tx_seq), &row.value)?;
66 }
67 Ok(batch.len())
68 }
69}
70
71#[cfg(test)]
72mod tests {
73 use std::sync::Arc;
74
75 use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
76
77 use super::*;
78
79 #[tokio::test]
80 async fn process_emits_one_row_per_transaction() {
81 let checkpoint = Arc::new(TestCheckpointBuilder::new(2).build_checkpoint());
82 let rows = Events.process(&checkpoint).await.unwrap();
83 assert_eq!(rows.len(), checkpoint.transactions.len());
84 }
85}