sui_rpc_store/schema/
tx_metadata_by_seq.rs1use sui_consistent_store::Protobuf;
12use sui_consistent_store::error::DecodeError;
13use sui_consistent_store::error::Error;
14use sui_consistent_store::reader::Reader;
15use sui_types::digests::TransactionDigest;
16use sui_types::messages_checkpoint::CheckpointSequenceNumber;
17
18use crate::proto::TxMetadata as StoredTxMetadata;
19use crate::schema::primitives::U64Be;
20
21pub const NAME: &str = "tx_metadata_by_seq";
22
23pub type Key = U64Be;
24pub type Value = Protobuf<StoredTxMetadata>;
25
26pub fn options(resolver: &sui_consistent_store::CfOptionsResolver) -> rocksdb::Options {
27 resolver.options(NAME)
28}
29
30#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct Metadata {
35 pub digest: TransactionDigest,
36 pub checkpoint_seq: CheckpointSequenceNumber,
37 pub ckpt_position: u32,
40 pub event_count: u32,
42 pub timestamp_ms: u64,
45}
46
47pub fn store(metadata: &Metadata) -> Value {
49 Protobuf(StoredTxMetadata {
50 digest: metadata.digest.inner().to_vec().into(),
51 checkpoint_seq: metadata.checkpoint_seq,
52 ckpt_position: metadata.ckpt_position,
53 event_count: metadata.event_count,
54 timestamp_ms: metadata.timestamp_ms,
55 })
56}
57
58impl<R: Reader> super::RpcStoreSchema<R> {
59 pub fn get_tx_metadata_by_seq(&self, tx_seq: u64) -> Result<Option<Metadata>, Error> {
62 let Some(stored) = self.tx_metadata_by_seq.get(&U64Be(tx_seq))? else {
63 return Ok(None);
64 };
65 let stored = stored.into_inner();
66 let digest_bytes: [u8; 32] = stored.digest.as_ref().try_into().map_err(|_| {
67 DecodeError::msg(format!(
68 "expected 32 bytes for {NAME} digest, got {}",
69 stored.digest.len(),
70 ))
71 })?;
72 Ok(Some(Metadata {
73 digest: TransactionDigest::new(digest_bytes),
74 checkpoint_seq: stored.checkpoint_seq,
75 ckpt_position: stored.ckpt_position,
76 event_count: stored.event_count,
77 timestamp_ms: stored.timestamp_ms,
78 }))
79 }
80
81 pub fn iter_tx_seq_digests(
91 &self,
92 from: u64,
93 to_exclusive: u64,
94 ) -> Result<impl Iterator<Item = Result<(u64, TransactionDigest), Error>> + '_, Error> {
95 let iter = self
96 .tx_metadata_by_seq
97 .iter(U64Be(from)..U64Be(to_exclusive))?
98 .map(|entry| {
99 let (U64Be(tx_seq), stored) = entry?;
100 let stored = stored.into_inner();
101 let digest_bytes: [u8; 32] = stored.digest.as_ref().try_into().map_err(|_| {
102 DecodeError::msg(format!(
103 "expected 32 bytes for {NAME} digest, got {}",
104 stored.digest.len(),
105 ))
106 })?;
107 Ok((tx_seq, TransactionDigest::new(digest_bytes)))
108 });
109 Ok(iter)
110 }
111}
112
113#[cfg(test)]
114mod tests {
115 use sui_consistent_store::Db;
116 use sui_consistent_store::DbOptions;
117
118 use super::*;
119 use crate::RpcStoreSchema;
120
121 fn fresh_db() -> (tempfile::TempDir, sui_consistent_store::Db, RpcStoreSchema) {
122 let dir = tempfile::tempdir().unwrap();
123 let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
124 (dir, db, schema)
125 }
126
127 fn dummy_metadata() -> Metadata {
128 Metadata {
129 digest: TransactionDigest::random(),
130 checkpoint_seq: 100,
131 ckpt_position: 3,
132 event_count: 5,
133 timestamp_ms: 1_700_000_000_000,
134 }
135 }
136
137 #[test]
138 fn get_returns_none_for_unknown_seq() {
139 let (_dir, _db, schema) = fresh_db();
140 assert!(schema.get_tx_metadata_by_seq(7).unwrap().is_none());
141 }
142
143 #[test]
144 fn store_then_get_round_trips() {
145 let (_dir, db, schema) = fresh_db();
146 let metadata = dummy_metadata();
147
148 let mut batch = db.batch();
149 batch
150 .put(&schema.tx_metadata_by_seq, &U64Be(42), &store(&metadata))
151 .unwrap();
152 batch.commit().unwrap();
153
154 let read = schema
155 .get_tx_metadata_by_seq(42)
156 .unwrap()
157 .expect("metadata present");
158 assert_eq!(read, metadata);
159 }
160
161 #[test]
162 fn overwrite_replaces_previous() {
163 let (_dir, db, schema) = fresh_db();
164 let first = dummy_metadata();
165 let later = dummy_metadata();
166
167 let mut batch = db.batch();
168 batch
169 .put(&schema.tx_metadata_by_seq, &U64Be(42), &store(&first))
170 .unwrap();
171 batch
172 .put(&schema.tx_metadata_by_seq, &U64Be(42), &store(&later))
173 .unwrap();
174 batch.commit().unwrap();
175
176 let read = schema
177 .get_tx_metadata_by_seq(42)
178 .unwrap()
179 .expect("metadata present");
180 assert_eq!(read, later);
181 }
182}