sui_rpc_store/schema/
object_version_by_checkpoint.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! `(ObjectID, checkpoint)` -> `version`.
5//!
6//! Resolves an object's version *as of* a checkpoint: the version it
7//! had at the end of the most recent checkpoint, at or before the one
8//! queried, in which it changed. One row is written per
9//! `(object, checkpoint-it-changed-in)`, carrying the object's final
10//! version in that checkpoint -- a live version, or the tombstone
11//! version at which it was deleted or wrapped.
12//!
13//! The key is the 32-byte object id followed by an 8-byte big-endian
14//! checkpoint, so a reverse prefix scan from `(id, C)` lands on the
15//! greatest checkpoint `<= C` -- the object's state as of `C`. This is
16//! the checkpoint-pinned counterpart to the version-keyed
17//! [`super::objects`] CF (which answers "object at version V" but not
18//! "object as of checkpoint C"), and the analog of the indexer's
19//! Postgres `obj_versions` table that the GraphQL service relies on
20//! for `checkpoint_viewed_at` historical reads.
21//!
22//! Rows carry a `from_restore` provenance flag, set only by the
23//! live-set restore at the anchor checkpoint and left unset by tip
24//! indexing and backfill. It lets a read whose checkpoint falls before
25//! any recorded change fall back to the restore floor -- telling an
26//! object that existed before the available window (and so is live at
27//! the queried checkpoint) apart from one created later. See
28//! [`get_object_version_at_checkpoint`](super::RpcStoreSchema::get_object_version_at_checkpoint).
29//!
30//! Pruning is effects-driven, in lockstep with [`super::objects`]: a
31//! transaction that supersedes an object retracts that object's
32//! checkpoint-pinned entries below the superseding checkpoint, so the
33//! retained set always matches the versions [`super::objects`] keeps
34//! (the floor at the retention boundary, and everything newer).
35
36use std::ops::Bound;
37
38use bytes::Buf;
39use bytes::BufMut;
40use sui_consistent_store::Decode;
41use sui_consistent_store::Encode;
42use sui_consistent_store::Iter;
43use sui_consistent_store::Protobuf;
44use sui_consistent_store::error::DecodeError;
45use sui_consistent_store::error::EncodeError;
46use sui_consistent_store::error::Error;
47use sui_consistent_store::reader::Reader;
48use sui_types::base_types::ObjectID;
49use sui_types::base_types::SequenceNumber;
50use sui_types::object::Object;
51
52use crate::proto::ObjectVersionInfo;
53
54pub const NAME: &str = "object_version_by_checkpoint";
55
56/// `(ObjectID, checkpoint)`. Encoded as 32 raw id bytes followed by an
57/// 8-byte big-endian checkpoint sequence number, so the rows for one
58/// object cluster together in ascending checkpoint order and a reverse
59/// scan resolves the floor checkpoint for a point-in-time read.
60#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
61pub struct Key {
62    pub id: ObjectID,
63    pub checkpoint: u64,
64}
65
66/// The object's final version in the keyed checkpoint, plus a
67/// `from_restore` provenance flag (set only on rows written by the
68/// live-set restore at the anchor checkpoint).
69pub type Value = Protobuf<ObjectVersionInfo>;
70
71impl Encode for Key {
72    fn encode_into<B: BufMut>(&self, buf: &mut B) -> Result<(), EncodeError> {
73        buf.put_slice(self.id.as_ref());
74        buf.put_slice(&self.checkpoint.to_be_bytes());
75        Ok(())
76    }
77}
78
79impl Decode for Key {
80    fn decode<B: Buf>(buf: &mut B) -> Result<Self, DecodeError> {
81        if buf.remaining() != ObjectID::LENGTH + 8 {
82            return Err(DecodeError::msg(format!(
83                "expected {} bytes for {NAME} Key, got {}",
84                ObjectID::LENGTH + 8,
85                buf.remaining(),
86            )));
87        }
88        let mut id = [0u8; ObjectID::LENGTH];
89        buf.copy_to_slice(&mut id);
90        let checkpoint = buf.get_u64();
91        Ok(Key {
92            id: ObjectID::new(id),
93            checkpoint,
94        })
95    }
96}
97
98pub fn options(resolver: &sui_consistent_store::CfOptionsResolver) -> rocksdb::Options {
99    resolver.options(NAME)
100}
101
102/// Build the `(Key, Value)` pair recording that, as of `checkpoint`,
103/// the object `id`'s final version was `version`. Written by tip
104/// indexing and backfill; `from_restore` is left unset so these (the
105/// common case) pay no extra bytes.
106pub fn store(id: ObjectID, checkpoint: u64, version: SequenceNumber) -> (Key, Value) {
107    (
108        Key { id, checkpoint },
109        Protobuf(ObjectVersionInfo {
110            version: version.value(),
111            from_restore: None,
112        }),
113    )
114}
115
116/// Like [`store`], but marks the row as written by the live-set restore
117/// at the anchor checkpoint (`from_restore = Some(true)`). The
118/// checkpoint-pinned read uses this flag to tell a static object's
119/// restore floor (the object existed before the available window) apart
120/// from an object created in the anchor checkpoint.
121pub fn store_restored(id: ObjectID, checkpoint: u64, version: SequenceNumber) -> (Key, Value) {
122    (
123        Key { id, checkpoint },
124        Protobuf(ObjectVersionInfo {
125            version: version.value(),
126            from_restore: Some(true),
127        }),
128    )
129}
130
131/// Prefix encoder for "every checkpoint at which `id` changed".
132/// Encodes as the 32 raw id bytes -- the leading bytes of every `Key`
133/// whose `id` matches.
134pub struct ObjectIdPrefix(pub ObjectID);
135
136impl Encode for ObjectIdPrefix {
137    fn encode_into<B: BufMut>(&self, buf: &mut B) -> Result<(), EncodeError> {
138        buf.put_slice(self.0.as_ref());
139        Ok(())
140    }
141}
142
143impl<R: Reader> super::RpcStoreSchema<R> {
144    /// Resolve the version an object had as of `checkpoint`.
145    ///
146    /// The caller is responsible for confirming `checkpoint` is within
147    /// this index's available range; this method assumes it is.
148    ///
149    /// Two steps:
150    ///
151    /// 1. **Floor scan** — the greatest entry at or before `checkpoint`
152    ///    is the version live then: the row written at the most recent
153    ///    checkpoint, at or before the queried one, in which the object
154    ///    changed (or a restore/backfill floor row).
155    /// 2. **Restore fallback** — if no entry is at or before
156    ///    `checkpoint`, the object's earliest entry is the only
157    ///    candidate, and it lies after `checkpoint`. If that row is a
158    ///    restore floor (`from_restore` set), the object predates the
159    ///    available window and was live then at that version. Otherwise
160    ///    it is a later change or creation, so the object did not exist
161    ///    as of `checkpoint`.
162    ///
163    /// Returns `Ok(None)` when the object did not exist as of
164    /// `checkpoint` (or its history there has been pruned). The returned
165    /// version may point at a tombstone row in [`super::objects`]; use
166    /// [`get_object_at_checkpoint`](Self::get_object_at_checkpoint) to
167    /// collapse that to "no live object".
168    pub fn get_object_version_at_checkpoint(
169        &self,
170        id: ObjectID,
171        checkpoint: u64,
172    ) -> Result<Option<SequenceNumber>, Error> {
173        // Floor scan: reverse-scan this object's own prefix from
174        // `(id, checkpoint)` downward and take the first row. The
175        // `(id, 0)` lower bound keeps the scan from spilling into the
176        // previous object id when this one has no entry in range.
177        let lo = Key { id, checkpoint: 0 };
178        let hi = Key { id, checkpoint };
179        if let Some(row) = self
180            .object_version_by_checkpoint
181            .iter_rev((Bound::Included(lo), Bound::Included(hi)))?
182            .next()
183        {
184            let (_key, value) = row?;
185            return Ok(Some(SequenceNumber::from_u64(value.into_inner().version)));
186        }
187
188        // Restore fallback: a restore-floor row means the object
189        // predates the window and was live at this version; any other
190        // earliest row is a later change/creation, so the object did not
191        // exist as of `checkpoint`.
192        if let Some(row) = self
193            .object_version_by_checkpoint
194            .iter_prefix(&ObjectIdPrefix(id))?
195            .next()
196        {
197            let (_key, value) = row?;
198            let info = value.into_inner();
199            if info.from_restore.unwrap_or(false) {
200                return Ok(Some(SequenceNumber::from_u64(info.version)));
201            }
202        }
203
204        Ok(None)
205    }
206
207    /// Resolve the live object as of `checkpoint`, composing
208    /// [`get_object_version_at_checkpoint`](Self::get_object_version_at_checkpoint)
209    /// with the version-keyed [`super::objects`] lookup.
210    ///
211    /// Returns `Ok(None)` when the object did not exist as of
212    /// `checkpoint`, or was deleted or wrapped at or before it (the
213    /// resolved version points at a tombstone row).
214    pub fn get_object_at_checkpoint(
215        &self,
216        id: ObjectID,
217        checkpoint: u64,
218    ) -> Result<Option<Object>, Error> {
219        let Some(version) = self.get_object_version_at_checkpoint(id, checkpoint)? else {
220            return Ok(None);
221        };
222        self.get_object_by_key(id, version)
223    }
224
225    /// Iterate every `(checkpoint, version)` recorded for `id`, in
226    /// ascending checkpoint order.
227    pub fn iter_object_versions_by_checkpoint(
228        &self,
229        id: ObjectID,
230    ) -> Result<Iter<'_, Key, Value>, Error> {
231        self.object_version_by_checkpoint
232            .iter_prefix(&ObjectIdPrefix(id))
233    }
234
235    /// Resolve the latest version recorded for an object via this index:
236    /// the version at its greatest-checkpoint row.
237    ///
238    /// The checkpoint-unbounded counterpart to
239    /// [`get_object_version_at_checkpoint`](Self::get_object_version_at_checkpoint):
240    /// a reverse prefix scan takes the object's highest-checkpoint row,
241    /// whose version is the one live at the tip (a restore floor for an
242    /// object that never changed in the window is itself that row). It
243    /// agrees with the `objects`-CF reverse scan that backs
244    /// [`get_object`](Self::get_object), since an object's version
245    /// increases monotonically with the checkpoints it changes in; this
246    /// is offered as a convenience for callers already working through
247    /// the checkpoint index.
248    ///
249    /// The returned version may point at a tombstone row in
250    /// [`super::objects`] (the object was deleted or wrapped and the
251    /// removal has not yet been pruned). Returns `Ok(None)` when `id` has
252    /// no row (it never existed, or its history has been fully pruned).
253    pub fn get_latest_object_version(&self, id: ObjectID) -> Result<Option<SequenceNumber>, Error> {
254        let Some(row) = self
255            .object_version_by_checkpoint
256            .iter_rev_prefix(&ObjectIdPrefix(id))?
257            .next()
258        else {
259            return Ok(None);
260        };
261        let (_key, value) = row?;
262        Ok(Some(SequenceNumber::from_u64(value.into_inner().version)))
263    }
264}
265
266#[cfg(test)]
267mod tests {
268    use sui_consistent_store::Db;
269    use sui_consistent_store::DbOptions;
270    use sui_types::base_types::SuiAddress;
271
272    use super::*;
273    use crate::RpcStoreSchema;
274    use crate::schema::objects;
275
276    fn fresh_db() -> (tempfile::TempDir, sui_consistent_store::Db, RpcStoreSchema) {
277        let dir = tempfile::tempdir().unwrap();
278        let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
279        (dir, db, schema)
280    }
281
282    fn seq(v: u64) -> SequenceNumber {
283        SequenceNumber::from_u64(v)
284    }
285
286    fn put(schema: &RpcStoreSchema, db: &Db, id: ObjectID, checkpoint: u64, version: u64) {
287        let (k, v) = store(id, checkpoint, seq(version));
288        let mut batch = db.batch();
289        batch
290            .put(&schema.object_version_by_checkpoint, &k, &v)
291            .unwrap();
292        batch.commit().unwrap();
293    }
294
295    #[test]
296    fn returns_none_before_first_entry() {
297        let (_dir, db, schema) = fresh_db();
298        let id = ObjectID::random();
299        // Object first appears at checkpoint 10.
300        put(&schema, &db, id, 10, 5);
301
302        // A read pinned before the object existed sees nothing.
303        assert!(
304            schema
305                .get_object_version_at_checkpoint(id, 9)
306                .unwrap()
307                .is_none()
308        );
309    }
310
311    #[test]
312    fn resolves_greatest_checkpoint_at_or_before() {
313        let (_dir, db, schema) = fresh_db();
314        let id = ObjectID::random();
315        // The object changed at checkpoints 10, 20, and 30.
316        put(&schema, &db, id, 10, 5);
317        put(&schema, &db, id, 20, 6);
318        put(&schema, &db, id, 30, 7);
319
320        // Exactly on a change checkpoint resolves that version.
321        assert_eq!(
322            schema.get_object_version_at_checkpoint(id, 20).unwrap(),
323            Some(seq(6)),
324        );
325        // Between changes resolves the prior change (the floor).
326        assert_eq!(
327            schema.get_object_version_at_checkpoint(id, 25).unwrap(),
328            Some(seq(6)),
329        );
330        // After the last change resolves the latest known version.
331        assert_eq!(
332            schema.get_object_version_at_checkpoint(id, 1_000).unwrap(),
333            Some(seq(7)),
334        );
335        // The first change checkpoint resolves its version.
336        assert_eq!(
337            schema.get_object_version_at_checkpoint(id, 10).unwrap(),
338            Some(seq(5)),
339        );
340    }
341
342    #[test]
343    fn isolates_objects_from_each_other() {
344        let (_dir, db, schema) = fresh_db();
345        let a = ObjectID::from_single_byte(1);
346        let b = ObjectID::from_single_byte(2);
347        put(&schema, &db, a, 10, 5);
348        put(&schema, &db, b, 5, 9);
349
350        // `a` has no entry at or before checkpoint 9 even though `b`
351        // does -- the reverse scan must not spill across the id bound.
352        assert!(
353            schema
354                .get_object_version_at_checkpoint(a, 9)
355                .unwrap()
356                .is_none()
357        );
358        assert_eq!(
359            schema.get_object_version_at_checkpoint(b, 9).unwrap(),
360            Some(seq(9)),
361        );
362    }
363
364    #[test]
365    fn restore_floor_resolves_below_anchor_but_a_creation_does_not() {
366        let (_dir, db, schema) = fresh_db();
367        let static_obj = ObjectID::from_single_byte(1);
368        let created_obj = ObjectID::from_single_byte(2);
369
370        let mut batch = db.batch();
371        // `static_obj`: a live-set restore floor at anchor checkpoint
372        // 100 -- the object existed before the available window and
373        // never changed in it.
374        let (k1, v1) = store_restored(static_obj, 100, seq(5));
375        batch
376            .put(&schema.object_version_by_checkpoint, &k1, &v1)
377            .unwrap();
378        // `created_obj`: an ordinary (tip/backfill) row at 100 -- the
379        // object was created in checkpoint 100. Same key checkpoint, no
380        // `from_restore` flag.
381        let (k2, v2) = store(created_obj, 100, seq(7));
382        batch
383            .put(&schema.object_version_by_checkpoint, &k2, &v2)
384            .unwrap();
385        batch.commit().unwrap();
386
387        // Below the anchor the provenance flag is what separates them:
388        // the restore floor is live, the creation did not yet exist.
389        assert_eq!(
390            schema
391                .get_object_version_at_checkpoint(static_obj, 50)
392                .unwrap(),
393            Some(seq(5)),
394            "restore floor is live below the anchor",
395        );
396        assert!(
397            schema
398                .get_object_version_at_checkpoint(created_obj, 50)
399                .unwrap()
400                .is_none(),
401            "an object created at the anchor did not exist below it",
402        );
403
404        // At and above the anchor the plain floor scan resolves both.
405        assert_eq!(
406            schema
407                .get_object_version_at_checkpoint(static_obj, 100)
408                .unwrap(),
409            Some(seq(5)),
410        );
411        assert_eq!(
412            schema
413                .get_object_version_at_checkpoint(created_obj, 100)
414                .unwrap(),
415            Some(seq(7)),
416        );
417        assert_eq!(
418            schema
419                .get_object_version_at_checkpoint(static_obj, 999)
420                .unwrap(),
421            Some(seq(5)),
422        );
423    }
424
425    #[test]
426    fn get_object_at_checkpoint_composes_with_objects() {
427        let (_dir, db, schema) = fresh_db();
428        let id = ObjectID::random();
429        let object = Object::with_id_owner_for_testing(id, SuiAddress::ZERO);
430        let version = object.version();
431
432        let mut batch = db.batch();
433        batch
434            .put(
435                &schema.objects,
436                &objects::Key { id, version },
437                &objects::store(&object),
438            )
439            .unwrap();
440        let (k, v) = store(id, 42, version);
441        batch
442            .put(&schema.object_version_by_checkpoint, &k, &v)
443            .unwrap();
444        batch.commit().unwrap();
445
446        let read = schema
447            .get_object_at_checkpoint(id, 42)
448            .unwrap()
449            .expect("object present");
450        assert_eq!(read, object);
451    }
452
453    #[test]
454    fn get_object_at_checkpoint_returns_none_for_tombstone() {
455        let (_dir, db, schema) = fresh_db();
456        let id = ObjectID::random();
457        let tombstone_version = seq(7);
458
459        let mut batch = db.batch();
460        // The object was removed at checkpoint 50: a tombstone in
461        // `objects` at the tombstone version, and an index row pointing
462        // at it.
463        batch
464            .put(
465                &schema.objects,
466                &objects::Key {
467                    id,
468                    version: tombstone_version,
469                },
470                &objects::tombstone(objects::TombstoneKind::Deleted),
471            )
472            .unwrap();
473        let (k, v) = store(id, 50, tombstone_version);
474        batch
475            .put(&schema.object_version_by_checkpoint, &k, &v)
476            .unwrap();
477        batch.commit().unwrap();
478
479        // The version resolves, but the object collapses to None.
480        assert_eq!(
481            schema.get_object_version_at_checkpoint(id, 50).unwrap(),
482            Some(tombstone_version),
483        );
484        assert!(schema.get_object_at_checkpoint(id, 50).unwrap().is_none());
485    }
486
487    #[test]
488    fn iter_walks_checkpoints_for_one_object() {
489        let (_dir, db, schema) = fresh_db();
490        let id = ObjectID::random();
491        let other = ObjectID::random();
492        put(&schema, &db, id, 30, 7);
493        put(&schema, &db, id, 10, 5);
494        put(&schema, &db, id, 20, 6);
495        put(&schema, &db, other, 15, 1);
496
497        let checkpoints: Vec<u64> = schema
498            .iter_object_versions_by_checkpoint(id)
499            .unwrap()
500            .map(|res| res.unwrap().0.checkpoint)
501            .collect();
502        assert_eq!(checkpoints, vec![10, 20, 30]);
503    }
504
505    #[test]
506    fn get_latest_object_version_returns_none_for_unknown_id() {
507        let (_dir, _db, schema) = fresh_db();
508        assert!(
509            schema
510                .get_latest_object_version(ObjectID::random())
511                .unwrap()
512                .is_none()
513        );
514    }
515
516    #[test]
517    fn get_latest_object_version_returns_the_greatest_checkpoint_row() {
518        let (_dir, db, schema) = fresh_db();
519        let id = ObjectID::random();
520        // The object changed at checkpoints 10, 20, and 30; the latest
521        // version is the one recorded at the greatest checkpoint.
522        put(&schema, &db, id, 10, 5);
523        put(&schema, &db, id, 30, 7);
524        put(&schema, &db, id, 20, 6);
525
526        assert_eq!(schema.get_latest_object_version(id).unwrap(), Some(seq(7)));
527    }
528
529    #[test]
530    fn get_latest_object_version_uses_the_restore_floor_when_static() {
531        let (_dir, db, schema) = fresh_db();
532        let id = ObjectID::random();
533        // A static object restored before the available window has only
534        // its `from_restore` floor row; that floor is its latest version.
535        let (k, v) = store_restored(id, 100, seq(5));
536        let mut batch = db.batch();
537        batch
538            .put(&schema.object_version_by_checkpoint, &k, &v)
539            .unwrap();
540        batch.commit().unwrap();
541
542        assert_eq!(schema.get_latest_object_version(id).unwrap(), Some(seq(5)));
543    }
544
545    #[test]
546    fn get_latest_object_version_isolates_objects_by_id() {
547        let (_dir, db, schema) = fresh_db();
548        let a = ObjectID::from_single_byte(1);
549        let b = ObjectID::from_single_byte(2);
550        put(&schema, &db, a, 30, 9);
551        put(&schema, &db, b, 10, 4);
552
553        // The reverse prefix scan must not spill across the id bound.
554        assert_eq!(schema.get_latest_object_version(a).unwrap(), Some(seq(9)));
555        assert_eq!(schema.get_latest_object_version(b).unwrap(), Some(seq(4)));
556    }
557}