sui_rpc_store/indexer/
effects.rs1use std::sync::Arc;
10
11use async_trait::async_trait;
12use sui_indexer_alt_framework::pipeline::Processor;
13use sui_indexer_alt_framework::pipeline::sequential;
14use sui_types::full_checkpoint_content::Checkpoint;
15
16use crate::indexer::Schema;
17use crate::indexer::Store;
18use crate::indexer::tx_seq_at;
19use crate::schema::effects;
20use crate::schema::primitives::U64Be;
21
22pub struct Effects;
24
25pub struct Row {
26 pub tx_seq: u64,
27 pub value: effects::Value,
28}
29
30#[async_trait]
31impl Processor for Effects {
32 const NAME: &'static str = "effects";
33 type Value = Row;
34
35 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Row>> {
36 let mut rows = Vec::with_capacity(checkpoint.transactions.len());
37 for (i, tx) in checkpoint.transactions.iter().enumerate() {
38 rows.push(Row {
39 tx_seq: tx_seq_at(checkpoint, i),
40 value: effects::store(&tx.effects, &tx.unchanged_loaded_runtime_objects),
41 });
42 }
43 Ok(rows)
44 }
45}
46
47#[async_trait]
48impl sequential::Handler for Effects {
49 type Store = Store;
50 type Batch = Vec<Row>;
51
52 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Row>) {
53 batch.extend(values);
54 }
55
56 async fn commit<'a>(
57 &self,
58 batch: &Self::Batch,
59 conn: &mut sui_consistent_store::Connection<'a, Schema>,
60 ) -> anyhow::Result<usize> {
61 let cf = &conn.store.schema().effects;
62 for row in batch {
63 conn.batch.put(cf, &U64Be(row.tx_seq), &row.value)?;
64 }
65 Ok(batch.len())
66 }
67}
68
69#[cfg(test)]
70mod tests {
71 use std::sync::Arc;
72
73 use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
74
75 use super::*;
76
77 #[tokio::test]
78 async fn process_emits_one_row_per_transaction() {
79 let checkpoint = Arc::new(TestCheckpointBuilder::new(2).build_checkpoint());
80 let rows = Effects.process(&checkpoint).await.unwrap();
81 assert_eq!(rows.len(), checkpoint.transactions.len());
82 }
83}