sui_rpc_store/indexer/
tx_metadata_by_seq.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Sequential pipeline that populates the
5//! [`schema::tx_metadata_by_seq`](crate::schema::tx_metadata_by_seq)
6//! CF: one `Metadata` row per executed transaction, keyed by
7//! `tx_seq`.
8
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use sui_indexer_alt_framework::pipeline::Processor;
13use sui_indexer_alt_framework::pipeline::sequential;
14use sui_types::effects::TransactionEffectsAPI;
15use sui_types::full_checkpoint_content::Checkpoint;
16
17use crate::indexer::Schema;
18use crate::indexer::Store;
19use crate::indexer::tx_seq_at;
20use crate::schema::primitives::U64Be;
21use crate::schema::tx_metadata_by_seq;
22use crate::schema::tx_metadata_by_seq::Metadata;
23
24/// Pipeline marker for `tx_metadata_by_seq`.
25pub struct TxMetadataBySeq;
26
27pub struct Row {
28    pub tx_seq: u64,
29    pub metadata: Metadata,
30}
31
32#[async_trait]
33impl Processor for TxMetadataBySeq {
34    const NAME: &'static str = "tx_metadata_by_seq";
35    type Value = Row;
36
37    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Row>> {
38        let summary = checkpoint.summary.data();
39        let mut rows = Vec::with_capacity(checkpoint.transactions.len());
40        for (i, tx) in checkpoint.transactions.iter().enumerate() {
41            let event_count = tx.events.as_ref().map(|e| e.data.len()).unwrap_or(0) as u32;
42            rows.push(Row {
43                tx_seq: tx_seq_at(checkpoint, i),
44                metadata: Metadata {
45                    digest: *tx.effects.transaction_digest(),
46                    checkpoint_seq: summary.sequence_number,
47                    ckpt_position: i as u32,
48                    event_count,
49                    timestamp_ms: summary.timestamp_ms,
50                },
51            });
52        }
53        Ok(rows)
54    }
55}
56
57#[async_trait]
58impl sequential::Handler for TxMetadataBySeq {
59    type Store = Store;
60    type Batch = Vec<Row>;
61
62    fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Row>) {
63        batch.extend(values);
64    }
65
66    async fn commit<'a>(
67        &self,
68        batch: &Self::Batch,
69        conn: &mut sui_consistent_store::Connection<'a, Schema>,
70    ) -> anyhow::Result<usize> {
71        let cf = &conn.store.schema().tx_metadata_by_seq;
72        for row in batch {
73            conn.batch.put(
74                cf,
75                &U64Be(row.tx_seq),
76                &tx_metadata_by_seq::store(&row.metadata),
77            )?;
78        }
79        Ok(batch.len())
80    }
81}
82
83#[cfg(test)]
84mod tests {
85    use std::sync::Arc;
86
87    use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
88
89    use super::*;
90
91    #[tokio::test]
92    async fn process_emits_one_row_per_transaction_with_correct_position() {
93        let checkpoint = Arc::new(
94            TestCheckpointBuilder::new(3)
95                .with_timestamp_ms(123_456)
96                .build_checkpoint(),
97        );
98        let rows = TxMetadataBySeq.process(&checkpoint).await.unwrap();
99        for (i, row) in rows.iter().enumerate() {
100            assert_eq!(row.metadata.checkpoint_seq, 3);
101            assert_eq!(row.metadata.ckpt_position, i as u32);
102            assert_eq!(row.metadata.timestamp_ms, 123_456);
103        }
104    }
105}