sui_rpc_store/schema/
objects.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! `(ObjectID, version)` → `StoredObject`.
5//!
6//! Holds every version of every object that has ever existed plus
7//! tombstones for versions at which an object was deleted or
8//! wrapped. A prefix scan on the 32-byte object id walks all
9//! versions of one object in ascending order; the value at each
10//! position is either a BCS-encoded live [`Object`] or a tombstone
11//! marker carrying the [`TombstoneKind`].
12//!
13//! Tombstones let version-bounded reads distinguish three states
14//! at `(id, version)`: a live row (object existed at that version),
15//! a tombstone row (object was removed at that version), and a
16//! missing row (object did not exist at that version).
17
18use bytes::Buf;
19use bytes::BufMut;
20use sui_consistent_store::Decode;
21use sui_consistent_store::Encode;
22use sui_consistent_store::Protobuf;
23use sui_consistent_store::error::DecodeError;
24use sui_consistent_store::error::EncodeError;
25use sui_consistent_store::error::Error;
26use sui_consistent_store::reader::Reader;
27use sui_types::base_types::ObjectID;
28use sui_types::base_types::SequenceNumber;
29use sui_types::object::Object;
30
31use crate::proto::StoredObject;
32use crate::proto::StoredObjectTombstone;
33use crate::proto::StoredObjectTombstoneKind;
34use crate::proto::stored_object;
35
36pub const NAME: &str = "objects";
37
38/// `(ObjectID, version)`. Encoded as 32 raw id bytes followed by an
39/// 8-byte big-endian version, so versions of the same object cluster
40/// in sorted order.
41#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
42pub struct Key {
43    pub id: ObjectID,
44    pub version: SequenceNumber,
45}
46
47pub type Value = Protobuf<StoredObject>;
48
49impl Encode for Key {
50    fn encode_into<B: BufMut>(&self, buf: &mut B) -> Result<(), EncodeError> {
51        buf.put_slice(self.id.as_ref());
52        buf.put_slice(&self.version.value().to_be_bytes());
53        Ok(())
54    }
55}
56
57impl Decode for Key {
58    fn decode<B: Buf>(buf: &mut B) -> Result<Self, DecodeError> {
59        if buf.remaining() != ObjectID::LENGTH + 8 {
60            return Err(DecodeError::msg(format!(
61                "expected {} bytes for {NAME} Key, got {}",
62                ObjectID::LENGTH + 8,
63                buf.remaining(),
64            )));
65        }
66        let mut id_bytes = [0u8; ObjectID::LENGTH];
67        buf.copy_to_slice(&mut id_bytes);
68        let version = SequenceNumber::from_u64(buf.get_u64());
69        Ok(Key {
70            id: ObjectID::new(id_bytes),
71            version,
72        })
73    }
74}
75
76pub fn options(resolver: &sui_consistent_store::CfOptionsResolver) -> rocksdb::Options {
77    resolver.options(NAME)
78}
79
80/// Prefix encoder for "every version of `id`". Encodes as the 32 raw
81/// id bytes -- the leading bytes of every `Key` whose `id` matches, so
82/// a prefix scan walks one object's versions in isolation.
83pub struct ObjectIdPrefix(pub ObjectID);
84
85impl Encode for ObjectIdPrefix {
86    fn encode_into<B: BufMut>(&self, buf: &mut B) -> Result<(), EncodeError> {
87        buf.put_slice(self.0.as_ref());
88        Ok(())
89    }
90}
91
92/// Why a tombstone row was written: the object was either
93/// `Deleted` (including the `unwrapped_then_deleted` shape) or
94/// `Wrapped` (nested inside another object and removed from the
95/// live set).
96#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
97pub enum TombstoneKind {
98    Deleted,
99    Wrapped,
100}
101
102/// Typed view of a row in the `objects` CF.
103#[derive(Debug, Clone, PartialEq, Eq)]
104pub enum Status {
105    /// The row carries a live version of the object.
106    Live(Object),
107    /// The row marks the version at which the object was removed
108    /// from the live set.
109    Tombstone(TombstoneKind),
110}
111
112impl Status {
113    /// Convenience for callers that only care about the live case.
114    pub fn into_live(self) -> Option<Object> {
115        match self {
116            Status::Live(object) => Some(object),
117            Status::Tombstone(_) => None,
118        }
119    }
120}
121
122/// Build a live `StoredObject` row from a canonical [`Object`].
123///
124/// BCS-encode failures here would indicate either OOM or a bug in
125/// the type's `Serialize` impl; we panic rather than thread a
126/// `Result` through every call site.
127pub fn store(object: &Object) -> Value {
128    let bcs = bcs::to_bytes(object).expect("bcs encode Object");
129    Protobuf(StoredObject {
130        kind: Some(stored_object::Kind::Bcs(bcs.into())),
131    })
132}
133
134/// Build a tombstone `StoredObject` row marking the version at
135/// which an object was deleted or wrapped.
136pub fn tombstone(kind: TombstoneKind) -> Value {
137    let proto_kind = match kind {
138        TombstoneKind::Deleted => StoredObjectTombstoneKind::Deleted,
139        TombstoneKind::Wrapped => StoredObjectTombstoneKind::Wrapped,
140    };
141    Protobuf(StoredObject {
142        kind: Some(stored_object::Kind::Tombstone(StoredObjectTombstone {
143            kind: proto_kind as i32,
144        })),
145    })
146}
147
148/// Decode a stored row into the typed [`Status`].
149fn decode(stored: StoredObject) -> Result<Status, Error> {
150    match stored.kind {
151        Some(stored_object::Kind::Bcs(bcs)) => {
152            let object: Object = bcs::from_bytes(&bcs)
153                .map_err(|e| DecodeError::with_source("bcs decode Object", e))?;
154            Ok(Status::Live(object))
155        }
156        Some(stored_object::Kind::Tombstone(t)) => {
157            let kind = match StoredObjectTombstoneKind::try_from(t.kind) {
158                Ok(StoredObjectTombstoneKind::Deleted) => TombstoneKind::Deleted,
159                Ok(StoredObjectTombstoneKind::Wrapped) => TombstoneKind::Wrapped,
160                Ok(StoredObjectTombstoneKind::Unspecified) | Err(_) => {
161                    return Err(DecodeError::msg(format!(
162                        "unrecognised tombstone kind: {}",
163                        t.kind,
164                    ))
165                    .into());
166                }
167            };
168            Ok(Status::Tombstone(kind))
169        }
170        None => Err(DecodeError::msg("StoredObject row missing kind").into()),
171    }
172}
173
174impl<R: Reader> super::RpcStoreSchema<R> {
175    /// Look up a specific version of an object, returning the live
176    /// object if and only if the row at `(id, version)` is a live
177    /// version. Tombstone rows and missing rows both return `None`;
178    /// callers that need to distinguish the two should use
179    /// [`get_object_status_by_key`](Self::get_object_status_by_key).
180    ///
181    /// For the "latest live version" of an object id, use
182    /// [`get_object`](Self::get_object), which reverse-scans this CF
183    /// for the greatest version.
184    pub fn get_object_by_key(
185        &self,
186        id: ObjectID,
187        version: SequenceNumber,
188    ) -> Result<Option<Object>, Error> {
189        Ok(self
190            .get_object_status_by_key(id, version)?
191            .and_then(Status::into_live))
192    }
193
194    /// Look up a specific version of an object, returning the
195    /// typed [`Status`] so callers can distinguish live versions
196    /// from tombstones. `Ok(None)` means no row was written at
197    /// `(id, version)`.
198    pub fn get_object_status_by_key(
199        &self,
200        id: ObjectID,
201        version: SequenceNumber,
202    ) -> Result<Option<Status>, Error> {
203        let Some(stored) = self.objects.get(&Key { id, version })? else {
204            return Ok(None);
205        };
206        Ok(Some(decode(stored.into_inner())?))
207    }
208
209    /// Look up the latest live version of an object by id.
210    ///
211    /// A single reverse prefix scan takes the object's greatest
212    /// `(id, version)` row and decodes it, returning the live object or
213    /// `None` if that row is a tombstone. This mirrors the validator
214    /// perpetual store's `get_object` (reverse scan, then collapse
215    /// `Deleted` / `Wrapped` to `None`). Returns `Ok(None)` when the
216    /// object has no recorded version.
217    pub fn get_object(&self, id: ObjectID) -> Result<Option<Object>, Error> {
218        let Some(row) = self.objects.iter_rev_prefix(&ObjectIdPrefix(id))?.next() else {
219            return Ok(None);
220        };
221        let (_key, value) = row?;
222        Ok(decode(value.into_inner())?.into_live())
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use sui_consistent_store::Db;
229    use sui_consistent_store::DbOptions;
230    use sui_types::base_types::SuiAddress;
231
232    use super::*;
233    use crate::RpcStoreSchema;
234
235    fn fresh_db() -> (tempfile::TempDir, sui_consistent_store::Db, RpcStoreSchema) {
236        let dir = tempfile::tempdir().unwrap();
237        let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
238        (dir, db, schema)
239    }
240
241    fn dummy_object(id: ObjectID) -> Object {
242        Object::with_id_owner_for_testing(id, SuiAddress::ZERO)
243    }
244
245    #[test]
246    fn get_returns_none_for_unknown_key() {
247        let (_dir, _db, schema) = fresh_db();
248        let id = ObjectID::random();
249        assert!(
250            schema
251                .get_object_by_key(id, SequenceNumber::from_u64(1))
252                .unwrap()
253                .is_none()
254        );
255    }
256
257    #[test]
258    fn store_then_get_round_trips() {
259        let (_dir, db, schema) = fresh_db();
260        let id = ObjectID::random();
261        let object = dummy_object(id);
262        let version = object.version();
263
264        let mut batch = db.batch();
265        batch
266            .put(&schema.objects, &Key { id, version }, &store(&object))
267            .unwrap();
268        batch.commit().unwrap();
269
270        let read = schema
271            .get_object_by_key(id, version)
272            .unwrap()
273            .expect("object present");
274        assert_eq!(read, object);
275    }
276
277    #[test]
278    fn tombstone_round_trips_with_kind() {
279        let (_dir, db, schema) = fresh_db();
280        let id = ObjectID::random();
281        let v_del = SequenceNumber::from_u64(5);
282        let v_wrap = SequenceNumber::from_u64(9);
283
284        let mut batch = db.batch();
285        batch
286            .put(
287                &schema.objects,
288                &Key { id, version: v_del },
289                &tombstone(TombstoneKind::Deleted),
290            )
291            .unwrap();
292        batch
293            .put(
294                &schema.objects,
295                &Key {
296                    id,
297                    version: v_wrap,
298                },
299                &tombstone(TombstoneKind::Wrapped),
300            )
301            .unwrap();
302        batch.commit().unwrap();
303
304        assert_eq!(
305            schema.get_object_status_by_key(id, v_del).unwrap(),
306            Some(Status::Tombstone(TombstoneKind::Deleted)),
307        );
308        assert_eq!(
309            schema.get_object_status_by_key(id, v_wrap).unwrap(),
310            Some(Status::Tombstone(TombstoneKind::Wrapped)),
311        );
312        // The live-only accessor flattens both tombstones to None.
313        assert!(schema.get_object_by_key(id, v_del).unwrap().is_none());
314        assert!(schema.get_object_by_key(id, v_wrap).unwrap().is_none());
315    }
316
317    #[test]
318    fn get_object_status_distinguishes_missing_from_tombstone() {
319        let (_dir, db, schema) = fresh_db();
320        let id = ObjectID::random();
321        let version = SequenceNumber::from_u64(3);
322
323        // No row at all: status is None.
324        assert!(
325            schema
326                .get_object_status_by_key(id, version)
327                .unwrap()
328                .is_none()
329        );
330
331        // Write a tombstone at `version` and observe the
332        // distinction from the missing-row case above.
333        let mut batch = db.batch();
334        batch
335            .put(
336                &schema.objects,
337                &Key { id, version },
338                &tombstone(TombstoneKind::Deleted),
339            )
340            .unwrap();
341        batch.commit().unwrap();
342        assert_eq!(
343            schema.get_object_status_by_key(id, version).unwrap(),
344            Some(Status::Tombstone(TombstoneKind::Deleted)),
345        );
346    }
347
348    #[test]
349    fn distinct_versions_of_same_id_are_isolated() {
350        let (_dir, db, schema) = fresh_db();
351        let id = ObjectID::random();
352        let v1 = SequenceNumber::from_u64(1);
353        let v2 = SequenceNumber::from_u64(2);
354        let o1 = dummy_object(id);
355        let o2 = dummy_object(id);
356
357        let mut batch = db.batch();
358        batch
359            .put(&schema.objects, &Key { id, version: v1 }, &store(&o1))
360            .unwrap();
361        batch
362            .put(&schema.objects, &Key { id, version: v2 }, &store(&o2))
363            .unwrap();
364        batch.commit().unwrap();
365
366        assert_eq!(schema.get_object_by_key(id, v1).unwrap().unwrap(), o1,);
367        assert_eq!(schema.get_object_by_key(id, v2).unwrap().unwrap(), o2,);
368    }
369
370    #[test]
371    fn get_object_returns_the_latest_live_version() {
372        let (_dir, db, schema) = fresh_db();
373        let id = ObjectID::random();
374        let object = dummy_object(id);
375        let version = object.version();
376
377        let mut batch = db.batch();
378        batch
379            .put(&schema.objects, &Key { id, version }, &store(&object))
380            .unwrap();
381        batch.commit().unwrap();
382
383        assert_eq!(schema.get_object(id).unwrap(), Some(object));
384    }
385
386    #[test]
387    fn get_object_returns_none_when_latest_is_tombstone() {
388        let (_dir, db, schema) = fresh_db();
389        let id = ObjectID::random();
390        let live = dummy_object(id);
391        let live_version = SequenceNumber::from_u64(1);
392        let tombstone_version = SequenceNumber::from_u64(5);
393
394        let mut batch = db.batch();
395        // An older live version, then a newer tombstone marking the
396        // object's removal.
397        batch
398            .put(
399                &schema.objects,
400                &Key {
401                    id,
402                    version: live_version,
403                },
404                &store(&live),
405            )
406            .unwrap();
407        batch
408            .put(
409                &schema.objects,
410                &Key {
411                    id,
412                    version: tombstone_version,
413                },
414                &tombstone(TombstoneKind::Deleted),
415            )
416            .unwrap();
417        batch.commit().unwrap();
418
419        // The greatest row is the tombstone, which collapses to "no
420        // live object" even though an older live version is still
421        // present.
422        assert!(schema.get_object(id).unwrap().is_none());
423    }
424
425    #[test]
426    fn get_object_isolates_objects_by_id() {
427        let (_dir, db, schema) = fresh_db();
428        let a = ObjectID::from_single_byte(1);
429        let b = ObjectID::from_single_byte(2);
430        let oa = dummy_object(a);
431        let ob = dummy_object(b);
432
433        let mut batch = db.batch();
434        batch
435            .put(
436                &schema.objects,
437                &Key {
438                    id: a,
439                    version: SequenceNumber::from_u64(9),
440                },
441                &store(&oa),
442            )
443            .unwrap();
444        batch
445            .put(
446                &schema.objects,
447                &Key {
448                    id: b,
449                    version: SequenceNumber::from_u64(4),
450                },
451                &store(&ob),
452            )
453            .unwrap();
454        batch.commit().unwrap();
455
456        // The reverse prefix scan must not spill across the id bound.
457        assert_eq!(schema.get_object(a).unwrap(), Some(oa));
458        assert_eq!(schema.get_object(b).unwrap(), Some(ob));
459    }
460}