sui_rpc_store/indexer/
effects.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Sequential pipeline that populates the
5//! [`schema::effects`](crate::schema::effects) CF: one row per
6//! executed transaction carrying its [`sui_types::effects::TransactionEffects`] and
7//! the set of objects loaded but not modified during execution.
8
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use sui_indexer_alt_framework::pipeline::Processor;
13use sui_indexer_alt_framework::pipeline::sequential;
14use sui_types::full_checkpoint_content::Checkpoint;
15
16use crate::indexer::Schema;
17use crate::indexer::Store;
18use crate::indexer::tx_seq_at;
19use crate::schema::effects;
20use crate::schema::primitives::U64Be;
21
22/// Pipeline marker for `effects`.
23pub struct Effects;
24
25pub struct Row {
26    pub tx_seq: u64,
27    pub value: effects::Value,
28}
29
30#[async_trait]
31impl Processor for Effects {
32    const NAME: &'static str = "effects";
33    type Value = Row;
34
35    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Row>> {
36        let mut rows = Vec::with_capacity(checkpoint.transactions.len());
37        for (i, tx) in checkpoint.transactions.iter().enumerate() {
38            rows.push(Row {
39                tx_seq: tx_seq_at(checkpoint, i),
40                value: effects::store(&tx.effects, &tx.unchanged_loaded_runtime_objects),
41            });
42        }
43        Ok(rows)
44    }
45}
46
47#[async_trait]
48impl sequential::Handler for Effects {
49    type Store = Store;
50    type Batch = Vec<Row>;
51
52    fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Row>) {
53        batch.extend(values);
54    }
55
56    async fn commit<'a>(
57        &self,
58        batch: &Self::Batch,
59        conn: &mut sui_consistent_store::Connection<'a, Schema>,
60    ) -> anyhow::Result<usize> {
61        let cf = &conn.store.schema().effects;
62        for row in batch {
63            conn.batch.put(cf, &U64Be(row.tx_seq), &row.value)?;
64        }
65        Ok(batch.len())
66    }
67}
68
69#[cfg(test)]
70mod tests {
71    use std::sync::Arc;
72
73    use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
74
75    use super::*;
76
77    #[tokio::test]
78    async fn process_emits_one_row_per_transaction() {
79        let checkpoint = Arc::new(TestCheckpointBuilder::new(2).build_checkpoint());
80        let rows = Effects.process(&checkpoint).await.unwrap();
81        assert_eq!(rows.len(), checkpoint.transactions.len());
82    }
83}