sui_rpc_store/schema/
checkpoint_seq_by_digest.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! `CheckpointDigest` → `checkpoint_seq`.
5//!
6//! Resolves a checkpoint digest to its sequence number, which then
7//! keys every checkpoint-keyed CF.
8//!
9//! Both `Key` and `Value` are thin newtypes (a 32-byte digest and a
10//! varint-encoded `u64`), so no `store` helper is provided —
11//! indexer pipelines stage writes directly via
12//! `batch.put(&schema.checkpoint_seq_by_digest, &Key(digest), &U64Varint(seq))`.
13
14use bytes::Buf;
15use bytes::BufMut;
16use sui_consistent_store::Decode;
17use sui_consistent_store::Encode;
18use sui_consistent_store::error::DecodeError;
19use sui_consistent_store::error::EncodeError;
20use sui_consistent_store::error::Error;
21use sui_consistent_store::reader::Reader;
22use sui_types::digests::CheckpointDigest;
23use sui_types::messages_checkpoint::CheckpointSequenceNumber;
24
25use crate::schema::primitives::U64Varint;
26
27pub const NAME: &str = "checkpoint_seq_by_digest";
28
29/// Wrapper around `CheckpointDigest` whose encoding is the raw 32
30/// bytes.
31#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
32pub struct Key(pub CheckpointDigest);
33
34pub type Value = U64Varint;
35
36impl Encode for Key {
37    fn encode_into<B: BufMut>(&self, buf: &mut B) -> Result<(), EncodeError> {
38        buf.put_slice(self.0.inner());
39        Ok(())
40    }
41}
42
43impl Decode for Key {
44    fn decode<B: Buf>(buf: &mut B) -> Result<Self, DecodeError> {
45        if buf.remaining() != 32 {
46            return Err(DecodeError::msg(format!(
47                "expected 32 bytes for {NAME} Key, got {}",
48                buf.remaining(),
49            )));
50        }
51        let mut bytes = [0u8; 32];
52        buf.copy_to_slice(&mut bytes);
53        Ok(Key(CheckpointDigest::new(bytes)))
54    }
55}
56
57pub fn options(resolver: &sui_consistent_store::CfOptionsResolver) -> rocksdb::Options {
58    resolver.options(NAME)
59}
60
61impl<R: Reader> super::RpcStoreSchema<R> {
62    /// Look up the sequence number of the checkpoint identified by
63    /// `digest`.
64    pub fn get_checkpoint_seq_by_digest(
65        &self,
66        digest: &CheckpointDigest,
67    ) -> Result<Option<CheckpointSequenceNumber>, Error> {
68        Ok(self
69            .checkpoint_seq_by_digest
70            .get(&Key(*digest))?
71            .map(|v| v.0))
72    }
73}
74
75#[cfg(test)]
76mod tests {
77    use sui_consistent_store::Db;
78    use sui_consistent_store::DbOptions;
79
80    use super::*;
81    use crate::RpcStoreSchema;
82
83    fn fresh_db() -> (tempfile::TempDir, sui_consistent_store::Db, RpcStoreSchema) {
84        let dir = tempfile::tempdir().unwrap();
85        let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
86        (dir, db, schema)
87    }
88
89    #[test]
90    fn get_returns_none_for_unknown_digest() {
91        let (_dir, _db, schema) = fresh_db();
92        let digest = CheckpointDigest::random();
93        assert!(
94            schema
95                .get_checkpoint_seq_by_digest(&digest)
96                .unwrap()
97                .is_none()
98        );
99    }
100
101    #[test]
102    fn put_then_get_round_trips() {
103        let (_dir, db, schema) = fresh_db();
104        let digest = CheckpointDigest::random();
105
106        let mut batch = db.batch();
107        batch
108            .put(
109                &schema.checkpoint_seq_by_digest,
110                &Key(digest),
111                &U64Varint(42),
112            )
113            .unwrap();
114        batch.commit().unwrap();
115
116        let seq = schema
117            .get_checkpoint_seq_by_digest(&digest)
118            .unwrap()
119            .expect("digest present");
120        assert_eq!(seq, 42);
121    }
122
123    #[test]
124    fn distinct_digests_dont_collide() {
125        let (_dir, db, schema) = fresh_db();
126        let d1 = CheckpointDigest::random();
127        let d2 = CheckpointDigest::random();
128
129        let mut batch = db.batch();
130        batch
131            .put(&schema.checkpoint_seq_by_digest, &Key(d1), &U64Varint(1))
132            .unwrap();
133        batch
134            .put(&schema.checkpoint_seq_by_digest, &Key(d2), &U64Varint(2))
135            .unwrap();
136        batch.commit().unwrap();
137
138        assert_eq!(schema.get_checkpoint_seq_by_digest(&d1).unwrap(), Some(1),);
139        assert_eq!(schema.get_checkpoint_seq_by_digest(&d2).unwrap(), Some(2),);
140    }
141}