sui_rpc_store/schema/
tx_metadata_by_seq.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! `tx_seq` → `TxMetadata`.
5//!
6//! Carries digest, containing checkpoint, position-within-checkpoint,
7//! event count, and timestamp. The `tx_seq → digest` direction of the
8//! bijection lives here; the inverse is
9//! [`super::tx_seq_by_digest`].
10
11use sui_consistent_store::Protobuf;
12use sui_consistent_store::error::DecodeError;
13use sui_consistent_store::error::Error;
14use sui_consistent_store::reader::Reader;
15use sui_types::digests::TransactionDigest;
16use sui_types::messages_checkpoint::CheckpointSequenceNumber;
17
18use crate::proto::TxMetadata as StoredTxMetadata;
19use crate::schema::primitives::U64Be;
20
21pub const NAME: &str = "tx_metadata_by_seq";
22
23pub type Key = U64Be;
24pub type Value = Protobuf<StoredTxMetadata>;
25
26pub fn options(resolver: &sui_consistent_store::CfOptionsResolver) -> rocksdb::Options {
27    resolver.options(NAME)
28}
29
30/// Caller-facing view of one row, with the digest decoded back to
31/// `TransactionDigest` and the integer fields exposed in canonical
32/// widths.
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct Metadata {
35    pub digest: TransactionDigest,
36    pub checkpoint_seq: CheckpointSequenceNumber,
37    /// 0-based position of this transaction within its checkpoint's
38    /// contents.
39    pub ckpt_position: u32,
40    /// Number of events emitted by this transaction.
41    pub event_count: u32,
42    /// Wall-clock timestamp of the containing checkpoint, in
43    /// milliseconds since the Unix epoch.
44    pub timestamp_ms: u64,
45}
46
47/// Build a `TxMetadata` row from a `Metadata` view.
48pub fn store(metadata: &Metadata) -> Value {
49    Protobuf(StoredTxMetadata {
50        digest: metadata.digest.inner().to_vec().into(),
51        checkpoint_seq: metadata.checkpoint_seq,
52        ckpt_position: metadata.ckpt_position,
53        event_count: metadata.event_count,
54        timestamp_ms: metadata.timestamp_ms,
55    })
56}
57
58impl<R: Reader> super::RpcStoreSchema<R> {
59    /// Look up the metadata for the transaction at the given
60    /// assigned `tx_seq`.
61    pub fn get_tx_metadata_by_seq(&self, tx_seq: u64) -> Result<Option<Metadata>, Error> {
62        let Some(stored) = self.tx_metadata_by_seq.get(&U64Be(tx_seq))? else {
63            return Ok(None);
64        };
65        let stored = stored.into_inner();
66        let digest_bytes: [u8; 32] = stored.digest.as_ref().try_into().map_err(|_| {
67            DecodeError::msg(format!(
68                "expected 32 bytes for {NAME} digest, got {}",
69                stored.digest.len(),
70            ))
71        })?;
72        Ok(Some(Metadata {
73            digest: TransactionDigest::new(digest_bytes),
74            checkpoint_seq: stored.checkpoint_seq,
75            ckpt_position: stored.ckpt_position,
76            event_count: stored.event_count,
77            timestamp_ms: stored.timestamp_ms,
78        }))
79    }
80
81    /// Iterate `(tx_seq, digest)` pairs over `[from, to_exclusive)`,
82    /// decoding only the digest from each row.
83    ///
84    /// The pruner uses this to unindex `tx_seq_by_digest` for a pruned
85    /// range. Iterating the table seeks straight to the first present
86    /// row and visits only rows that exist, so a sparse range — or a
87    /// floor of `0` when the lower bound is unknown — costs work
88    /// proportional to the rows actually present, not to the width of
89    /// the `tx_seq` interval.
90    pub fn iter_tx_seq_digests(
91        &self,
92        from: u64,
93        to_exclusive: u64,
94    ) -> Result<impl Iterator<Item = Result<(u64, TransactionDigest), Error>> + '_, Error> {
95        let iter = self
96            .tx_metadata_by_seq
97            .iter(U64Be(from)..U64Be(to_exclusive))?
98            .map(|entry| {
99                let (U64Be(tx_seq), stored) = entry?;
100                let stored = stored.into_inner();
101                let digest_bytes: [u8; 32] = stored.digest.as_ref().try_into().map_err(|_| {
102                    DecodeError::msg(format!(
103                        "expected 32 bytes for {NAME} digest, got {}",
104                        stored.digest.len(),
105                    ))
106                })?;
107                Ok((tx_seq, TransactionDigest::new(digest_bytes)))
108            });
109        Ok(iter)
110    }
111}
112
113#[cfg(test)]
114mod tests {
115    use sui_consistent_store::Db;
116    use sui_consistent_store::DbOptions;
117
118    use super::*;
119    use crate::RpcStoreSchema;
120
121    fn fresh_db() -> (tempfile::TempDir, sui_consistent_store::Db, RpcStoreSchema) {
122        let dir = tempfile::tempdir().unwrap();
123        let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
124        (dir, db, schema)
125    }
126
127    fn dummy_metadata() -> Metadata {
128        Metadata {
129            digest: TransactionDigest::random(),
130            checkpoint_seq: 100,
131            ckpt_position: 3,
132            event_count: 5,
133            timestamp_ms: 1_700_000_000_000,
134        }
135    }
136
137    #[test]
138    fn get_returns_none_for_unknown_seq() {
139        let (_dir, _db, schema) = fresh_db();
140        assert!(schema.get_tx_metadata_by_seq(7).unwrap().is_none());
141    }
142
143    #[test]
144    fn store_then_get_round_trips() {
145        let (_dir, db, schema) = fresh_db();
146        let metadata = dummy_metadata();
147
148        let mut batch = db.batch();
149        batch
150            .put(&schema.tx_metadata_by_seq, &U64Be(42), &store(&metadata))
151            .unwrap();
152        batch.commit().unwrap();
153
154        let read = schema
155            .get_tx_metadata_by_seq(42)
156            .unwrap()
157            .expect("metadata present");
158        assert_eq!(read, metadata);
159    }
160
161    #[test]
162    fn overwrite_replaces_previous() {
163        let (_dir, db, schema) = fresh_db();
164        let first = dummy_metadata();
165        let later = dummy_metadata();
166
167        let mut batch = db.batch();
168        batch
169            .put(&schema.tx_metadata_by_seq, &U64Be(42), &store(&first))
170            .unwrap();
171        batch
172            .put(&schema.tx_metadata_by_seq, &U64Be(42), &store(&later))
173            .unwrap();
174        batch.commit().unwrap();
175
176        let read = schema
177            .get_tx_metadata_by_seq(42)
178            .unwrap()
179            .expect("metadata present");
180        assert_eq!(read, later);
181    }
182}