sui_rpc_store/indexer/
events.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Sequential pipeline that populates the
5//! [`schema::events`](crate::schema::events) CF: one row per
6//! executed transaction carrying its
7//! [`TransactionEvents`](sui_types::effects::TransactionEvents).
8
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use sui_indexer_alt_framework::pipeline::Processor;
13use sui_indexer_alt_framework::pipeline::sequential;
14use sui_types::full_checkpoint_content::Checkpoint;
15
16use crate::indexer::Schema;
17use crate::indexer::Store;
18use crate::indexer::tx_seq_at;
19use crate::schema::events;
20use crate::schema::primitives::U64Be;
21
22/// Pipeline marker for `events`.
23pub struct Events;
24
25pub struct Row {
26    pub tx_seq: u64,
27    pub value: events::Value,
28}
29
30#[async_trait]
31impl Processor for Events {
32    const NAME: &'static str = "events";
33    type Value = Row;
34
35    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Row>> {
36        let mut rows = Vec::with_capacity(checkpoint.transactions.len());
37        for (i, tx) in checkpoint.transactions.iter().enumerate() {
38            if let Some(events) = &tx.events {
39                rows.push(Row {
40                    tx_seq: tx_seq_at(checkpoint, i),
41                    value: events::store(events),
42                });
43            }
44        }
45        Ok(rows)
46    }
47}
48
49#[async_trait]
50impl sequential::Handler for Events {
51    type Store = Store;
52    type Batch = Vec<Row>;
53
54    fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Row>) {
55        batch.extend(values);
56    }
57
58    async fn commit<'a>(
59        &self,
60        batch: &Self::Batch,
61        conn: &mut sui_consistent_store::Connection<'a, Schema>,
62    ) -> anyhow::Result<usize> {
63        let cf = &conn.store.schema().events;
64        for row in batch {
65            conn.batch.put(cf, &U64Be(row.tx_seq), &row.value)?;
66        }
67        Ok(batch.len())
68    }
69}
70
71#[cfg(test)]
72mod tests {
73    use std::sync::Arc;
74
75    use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
76
77    use super::*;
78
79    #[tokio::test]
80    async fn process_emits_one_row_per_transaction() {
81        let checkpoint = Arc::new(TestCheckpointBuilder::new(2).build_checkpoint());
82        let rows = Events.process(&checkpoint).await.unwrap();
83        assert_eq!(rows.len(), checkpoint.transactions.len());
84    }
85}