sui_rpc_store/indexer/
tx_seq_by_digest.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Sequential pipeline that populates the
5//! [`schema::tx_seq_by_digest`](crate::schema::tx_seq_by_digest)
6//! CF: one `TransactionDigest → tx_seq` row per executed
7//! transaction.
8
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use sui_indexer_alt_framework::pipeline::Processor;
13use sui_indexer_alt_framework::pipeline::sequential;
14use sui_types::digests::TransactionDigest;
15use sui_types::effects::TransactionEffectsAPI;
16use sui_types::full_checkpoint_content::Checkpoint;
17
18use crate::indexer::Schema;
19use crate::indexer::Store;
20use crate::indexer::tx_seq_at;
21use crate::schema::primitives::U64Varint;
22use crate::schema::tx_seq_by_digest;
23
24/// Pipeline marker for `tx_seq_by_digest`.
25pub struct TxSeqByDigest;
26
27pub struct Row {
28    pub digest: TransactionDigest,
29    pub tx_seq: u64,
30}
31
32#[async_trait]
33impl Processor for TxSeqByDigest {
34    const NAME: &'static str = "tx_seq_by_digest";
35    type Value = Row;
36
37    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Row>> {
38        let mut rows = Vec::with_capacity(checkpoint.transactions.len());
39        for (i, tx) in checkpoint.transactions.iter().enumerate() {
40            rows.push(Row {
41                digest: *tx.effects.transaction_digest(),
42                tx_seq: tx_seq_at(checkpoint, i),
43            });
44        }
45        Ok(rows)
46    }
47}
48
49#[async_trait]
50impl sequential::Handler for TxSeqByDigest {
51    type Store = Store;
52    type Batch = Vec<Row>;
53
54    fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Row>) {
55        batch.extend(values);
56    }
57
58    async fn commit<'a>(
59        &self,
60        batch: &Self::Batch,
61        conn: &mut sui_consistent_store::Connection<'a, Schema>,
62    ) -> anyhow::Result<usize> {
63        let cf = &conn.store.schema().tx_seq_by_digest;
64        for row in batch {
65            conn.batch.put(
66                cf,
67                &tx_seq_by_digest::Key(row.digest),
68                &U64Varint(row.tx_seq),
69            )?;
70        }
71        Ok(batch.len())
72    }
73}
74
75#[cfg(test)]
76mod tests {
77    use std::sync::Arc;
78
79    use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
80
81    use super::*;
82
83    #[tokio::test]
84    async fn process_emits_one_row_per_transaction() {
85        let checkpoint = Arc::new(TestCheckpointBuilder::new(5).build_checkpoint());
86        let rows = TxSeqByDigest.process(&checkpoint).await.unwrap();
87        assert_eq!(rows.len(), checkpoint.transactions.len());
88    }
89}