sui_rpc_store/indexer/
transactions.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Sequential pipeline that populates the
5//! [`schema::transactions`](crate::schema::transactions) CF: one
6//! row per executed transaction, keyed by its assigned `tx_seq`.
7
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use sui_indexer_alt_framework::pipeline::Processor;
12use sui_indexer_alt_framework::pipeline::sequential;
13use sui_types::full_checkpoint_content::Checkpoint;
14
15use crate::indexer::Schema;
16use crate::indexer::Store;
17use crate::indexer::tx_seq_at;
18use crate::schema::primitives::U64Be;
19use crate::schema::transactions;
20
21/// Pipeline marker for `transactions`.
22pub struct Transactions;
23
24pub struct Row {
25    pub tx_seq: u64,
26    pub value: transactions::Value,
27}
28
29#[async_trait]
30impl Processor for Transactions {
31    const NAME: &'static str = "transactions";
32    type Value = Row;
33
34    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Row>> {
35        let mut rows = Vec::with_capacity(checkpoint.transactions.len());
36        for (i, tx) in checkpoint.transactions.iter().enumerate() {
37            rows.push(Row {
38                tx_seq: tx_seq_at(checkpoint, i),
39                value: transactions::store(&tx.transaction, &tx.signatures),
40            });
41        }
42        Ok(rows)
43    }
44}
45
46#[async_trait]
47impl sequential::Handler for Transactions {
48    type Store = Store;
49    type Batch = Vec<Row>;
50
51    fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Row>) {
52        batch.extend(values);
53    }
54
55    async fn commit<'a>(
56        &self,
57        batch: &Self::Batch,
58        conn: &mut sui_consistent_store::Connection<'a, Schema>,
59    ) -> anyhow::Result<usize> {
60        let cf = &conn.store.schema().transactions;
61        for row in batch {
62            conn.batch.put(cf, &U64Be(row.tx_seq), &row.value)?;
63        }
64        Ok(batch.len())
65    }
66}
67
68#[cfg(test)]
69mod tests {
70    use std::sync::Arc;
71
72    use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
73
74    use super::*;
75
76    #[tokio::test]
77    async fn process_emits_one_row_per_transaction() {
78        let checkpoint = Arc::new(
79            TestCheckpointBuilder::new(5)
80                .with_network_total_transactions(10)
81                .build_checkpoint(),
82        );
83        let n = checkpoint.transactions.len() as u64;
84        let rows = Transactions.process(&checkpoint).await.unwrap();
85        assert_eq!(rows.len() as u64, n);
86        // tx_seqs should be a contiguous range ending at
87        // network_total_transactions - 1.
88        for (i, row) in rows.iter().enumerate() {
89            assert_eq!(row.tx_seq, 10 - n + i as u64);
90        }
91    }
92}