sui_rpc_store/indexer/
tx_seq_by_digest.rs1use std::sync::Arc;
10
11use async_trait::async_trait;
12use sui_indexer_alt_framework::pipeline::Processor;
13use sui_indexer_alt_framework::pipeline::sequential;
14use sui_types::digests::TransactionDigest;
15use sui_types::effects::TransactionEffectsAPI;
16use sui_types::full_checkpoint_content::Checkpoint;
17
18use crate::indexer::Schema;
19use crate::indexer::Store;
20use crate::indexer::tx_seq_at;
21use crate::schema::primitives::U64Varint;
22use crate::schema::tx_seq_by_digest;
23
24pub struct TxSeqByDigest;
26
27pub struct Row {
28 pub digest: TransactionDigest,
29 pub tx_seq: u64,
30}
31
32#[async_trait]
33impl Processor for TxSeqByDigest {
34 const NAME: &'static str = "tx_seq_by_digest";
35 type Value = Row;
36
37 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Row>> {
38 let mut rows = Vec::with_capacity(checkpoint.transactions.len());
39 for (i, tx) in checkpoint.transactions.iter().enumerate() {
40 rows.push(Row {
41 digest: *tx.effects.transaction_digest(),
42 tx_seq: tx_seq_at(checkpoint, i),
43 });
44 }
45 Ok(rows)
46 }
47}
48
49#[async_trait]
50impl sequential::Handler for TxSeqByDigest {
51 type Store = Store;
52 type Batch = Vec<Row>;
53
54 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Row>) {
55 batch.extend(values);
56 }
57
58 async fn commit<'a>(
59 &self,
60 batch: &Self::Batch,
61 conn: &mut sui_consistent_store::Connection<'a, Schema>,
62 ) -> anyhow::Result<usize> {
63 let cf = &conn.store.schema().tx_seq_by_digest;
64 for row in batch {
65 conn.batch.put(
66 cf,
67 &tx_seq_by_digest::Key(row.digest),
68 &U64Varint(row.tx_seq),
69 )?;
70 }
71 Ok(batch.len())
72 }
73}
74
75#[cfg(test)]
76mod tests {
77 use std::sync::Arc;
78
79 use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
80
81 use super::*;
82
83 #[tokio::test]
84 async fn process_emits_one_row_per_transaction() {
85 let checkpoint = Arc::new(TestCheckpointBuilder::new(5).build_checkpoint());
86 let rows = TxSeqByDigest.process(&checkpoint).await.unwrap();
87 assert_eq!(rows.len(), checkpoint.transactions.len());
88 }
89}