sui_rpc_store/indexer/
transactions.rs1use std::sync::Arc;
9
10use async_trait::async_trait;
11use sui_indexer_alt_framework::pipeline::Processor;
12use sui_indexer_alt_framework::pipeline::sequential;
13use sui_types::full_checkpoint_content::Checkpoint;
14
15use crate::indexer::Schema;
16use crate::indexer::Store;
17use crate::indexer::tx_seq_at;
18use crate::schema::primitives::U64Be;
19use crate::schema::transactions;
20
21pub struct Transactions;
23
24pub struct Row {
25 pub tx_seq: u64,
26 pub value: transactions::Value,
27}
28
29#[async_trait]
30impl Processor for Transactions {
31 const NAME: &'static str = "transactions";
32 type Value = Row;
33
34 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Row>> {
35 let mut rows = Vec::with_capacity(checkpoint.transactions.len());
36 for (i, tx) in checkpoint.transactions.iter().enumerate() {
37 rows.push(Row {
38 tx_seq: tx_seq_at(checkpoint, i),
39 value: transactions::store(&tx.transaction, &tx.signatures),
40 });
41 }
42 Ok(rows)
43 }
44}
45
46#[async_trait]
47impl sequential::Handler for Transactions {
48 type Store = Store;
49 type Batch = Vec<Row>;
50
51 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Row>) {
52 batch.extend(values);
53 }
54
55 async fn commit<'a>(
56 &self,
57 batch: &Self::Batch,
58 conn: &mut sui_consistent_store::Connection<'a, Schema>,
59 ) -> anyhow::Result<usize> {
60 let cf = &conn.store.schema().transactions;
61 for row in batch {
62 conn.batch.put(cf, &U64Be(row.tx_seq), &row.value)?;
63 }
64 Ok(batch.len())
65 }
66}
67
68#[cfg(test)]
69mod tests {
70 use std::sync::Arc;
71
72 use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
73
74 use super::*;
75
76 #[tokio::test]
77 async fn process_emits_one_row_per_transaction() {
78 let checkpoint = Arc::new(
79 TestCheckpointBuilder::new(5)
80 .with_network_total_transactions(10)
81 .build_checkpoint(),
82 );
83 let n = checkpoint.transactions.len() as u64;
84 let rows = Transactions.process(&checkpoint).await.unwrap();
85 assert_eq!(rows.len() as u64, n);
86 for (i, row) in rows.iter().enumerate() {
89 assert_eq!(row.tx_seq, 10 - n + i as u64);
90 }
91 }
92}