sui_rpc_store/indexer/
tx_metadata_by_seq.rs1use std::sync::Arc;
10
11use async_trait::async_trait;
12use sui_indexer_alt_framework::pipeline::Processor;
13use sui_indexer_alt_framework::pipeline::sequential;
14use sui_types::effects::TransactionEffectsAPI;
15use sui_types::full_checkpoint_content::Checkpoint;
16
17use crate::indexer::Schema;
18use crate::indexer::Store;
19use crate::indexer::tx_seq_at;
20use crate::schema::primitives::U64Be;
21use crate::schema::tx_metadata_by_seq;
22use crate::schema::tx_metadata_by_seq::Metadata;
23
24pub struct TxMetadataBySeq;
26
27pub struct Row {
28 pub tx_seq: u64,
29 pub metadata: Metadata,
30}
31
32#[async_trait]
33impl Processor for TxMetadataBySeq {
34 const NAME: &'static str = "tx_metadata_by_seq";
35 type Value = Row;
36
37 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Row>> {
38 let summary = checkpoint.summary.data();
39 let mut rows = Vec::with_capacity(checkpoint.transactions.len());
40 for (i, tx) in checkpoint.transactions.iter().enumerate() {
41 let event_count = tx.events.as_ref().map(|e| e.data.len()).unwrap_or(0) as u32;
42 rows.push(Row {
43 tx_seq: tx_seq_at(checkpoint, i),
44 metadata: Metadata {
45 digest: *tx.effects.transaction_digest(),
46 checkpoint_seq: summary.sequence_number,
47 ckpt_position: i as u32,
48 event_count,
49 timestamp_ms: summary.timestamp_ms,
50 },
51 });
52 }
53 Ok(rows)
54 }
55}
56
57#[async_trait]
58impl sequential::Handler for TxMetadataBySeq {
59 type Store = Store;
60 type Batch = Vec<Row>;
61
62 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Row>) {
63 batch.extend(values);
64 }
65
66 async fn commit<'a>(
67 &self,
68 batch: &Self::Batch,
69 conn: &mut sui_consistent_store::Connection<'a, Schema>,
70 ) -> anyhow::Result<usize> {
71 let cf = &conn.store.schema().tx_metadata_by_seq;
72 for row in batch {
73 conn.batch.put(
74 cf,
75 &U64Be(row.tx_seq),
76 &tx_metadata_by_seq::store(&row.metadata),
77 )?;
78 }
79 Ok(batch.len())
80 }
81}
82
83#[cfg(test)]
84mod tests {
85 use std::sync::Arc;
86
87 use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
88
89 use super::*;
90
91 #[tokio::test]
92 async fn process_emits_one_row_per_transaction_with_correct_position() {
93 let checkpoint = Arc::new(
94 TestCheckpointBuilder::new(3)
95 .with_timestamp_ms(123_456)
96 .build_checkpoint(),
97 );
98 let rows = TxMetadataBySeq.process(&checkpoint).await.unwrap();
99 for (i, row) in rows.iter().enumerate() {
100 assert_eq!(row.metadata.checkpoint_seq, 3);
101 assert_eq!(row.metadata.ckpt_position, i as u32);
102 assert_eq!(row.metadata.timestamp_ms, 123_456);
103 }
104 }
105}