sui_rpc_store/schema/object_version_by_checkpoint.rs
1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! `(ObjectID, checkpoint)` -> `version`.
5//!
6//! Resolves an object's version *as of* a checkpoint: the version it
7//! had at the end of the most recent checkpoint, at or before the one
8//! queried, in which it changed. One row is written per
9//! `(object, checkpoint-it-changed-in)`, carrying the object's final
10//! version in that checkpoint -- a live version, or the tombstone
11//! version at which it was deleted or wrapped.
12//!
13//! The key is the 32-byte object id followed by an 8-byte big-endian
14//! checkpoint, so a reverse prefix scan from `(id, C)` lands on the
15//! greatest checkpoint `<= C` -- the object's state as of `C`. This is
16//! the checkpoint-pinned counterpart to the version-keyed
17//! [`super::objects`] CF (which answers "object at version V" but not
18//! "object as of checkpoint C"), and the analog of the indexer's
19//! Postgres `obj_versions` table that the GraphQL service relies on
20//! for `checkpoint_viewed_at` historical reads.
21//!
22//! Rows carry a `from_restore` provenance flag, set only by the
23//! live-set restore at the anchor checkpoint and left unset by tip
24//! indexing and backfill. It lets a read whose checkpoint falls before
25//! any recorded change fall back to the restore floor -- telling an
26//! object that existed before the available window (and so is live at
27//! the queried checkpoint) apart from one created later. See
28//! [`get_object_version_at_checkpoint`](super::RpcStoreSchema::get_object_version_at_checkpoint).
29//!
30//! Pruning is effects-driven, in lockstep with [`super::objects`]: a
31//! transaction that supersedes an object retracts that object's
32//! checkpoint-pinned entries below the superseding checkpoint, so the
33//! retained set always matches the versions [`super::objects`] keeps
34//! (the floor at the retention boundary, and everything newer).
35
36use std::ops::Bound;
37
38use bytes::Buf;
39use bytes::BufMut;
40use sui_consistent_store::Decode;
41use sui_consistent_store::Encode;
42use sui_consistent_store::Iter;
43use sui_consistent_store::Protobuf;
44use sui_consistent_store::error::DecodeError;
45use sui_consistent_store::error::EncodeError;
46use sui_consistent_store::error::Error;
47use sui_consistent_store::reader::Reader;
48use sui_types::base_types::ObjectID;
49use sui_types::base_types::SequenceNumber;
50use sui_types::object::Object;
51
52use crate::proto::ObjectVersionInfo;
53
54pub const NAME: &str = "object_version_by_checkpoint";
55
56/// `(ObjectID, checkpoint)`. Encoded as 32 raw id bytes followed by an
57/// 8-byte big-endian checkpoint sequence number, so the rows for one
58/// object cluster together in ascending checkpoint order and a reverse
59/// scan resolves the floor checkpoint for a point-in-time read.
60#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
61pub struct Key {
62 pub id: ObjectID,
63 pub checkpoint: u64,
64}
65
66/// The object's final version in the keyed checkpoint, plus a
67/// `from_restore` provenance flag (set only on rows written by the
68/// live-set restore at the anchor checkpoint).
69pub type Value = Protobuf<ObjectVersionInfo>;
70
71impl Encode for Key {
72 fn encode_into<B: BufMut>(&self, buf: &mut B) -> Result<(), EncodeError> {
73 buf.put_slice(self.id.as_ref());
74 buf.put_slice(&self.checkpoint.to_be_bytes());
75 Ok(())
76 }
77}
78
79impl Decode for Key {
80 fn decode<B: Buf>(buf: &mut B) -> Result<Self, DecodeError> {
81 if buf.remaining() != ObjectID::LENGTH + 8 {
82 return Err(DecodeError::msg(format!(
83 "expected {} bytes for {NAME} Key, got {}",
84 ObjectID::LENGTH + 8,
85 buf.remaining(),
86 )));
87 }
88 let mut id = [0u8; ObjectID::LENGTH];
89 buf.copy_to_slice(&mut id);
90 let checkpoint = buf.get_u64();
91 Ok(Key {
92 id: ObjectID::new(id),
93 checkpoint,
94 })
95 }
96}
97
98pub fn options(resolver: &sui_consistent_store::CfOptionsResolver) -> rocksdb::Options {
99 resolver.options(NAME)
100}
101
102/// Build the `(Key, Value)` pair recording that, as of `checkpoint`,
103/// the object `id`'s final version was `version`. Written by tip
104/// indexing and backfill; `from_restore` is left unset so these (the
105/// common case) pay no extra bytes.
106pub fn store(id: ObjectID, checkpoint: u64, version: SequenceNumber) -> (Key, Value) {
107 (
108 Key { id, checkpoint },
109 Protobuf(ObjectVersionInfo {
110 version: version.value(),
111 from_restore: None,
112 }),
113 )
114}
115
116/// Like [`store`], but marks the row as written by the live-set restore
117/// at the anchor checkpoint (`from_restore = Some(true)`). The
118/// checkpoint-pinned read uses this flag to tell a static object's
119/// restore floor (the object existed before the available window) apart
120/// from an object created in the anchor checkpoint.
121pub fn store_restored(id: ObjectID, checkpoint: u64, version: SequenceNumber) -> (Key, Value) {
122 (
123 Key { id, checkpoint },
124 Protobuf(ObjectVersionInfo {
125 version: version.value(),
126 from_restore: Some(true),
127 }),
128 )
129}
130
131/// Prefix encoder for "every checkpoint at which `id` changed".
132/// Encodes as the 32 raw id bytes -- the leading bytes of every `Key`
133/// whose `id` matches.
134pub struct ObjectIdPrefix(pub ObjectID);
135
136impl Encode for ObjectIdPrefix {
137 fn encode_into<B: BufMut>(&self, buf: &mut B) -> Result<(), EncodeError> {
138 buf.put_slice(self.0.as_ref());
139 Ok(())
140 }
141}
142
143impl<R: Reader> super::RpcStoreSchema<R> {
144 /// Resolve the version an object had as of `checkpoint`.
145 ///
146 /// The caller is responsible for confirming `checkpoint` is within
147 /// this index's available range; this method assumes it is.
148 ///
149 /// Two steps:
150 ///
151 /// 1. **Floor scan** — the greatest entry at or before `checkpoint`
152 /// is the version live then: the row written at the most recent
153 /// checkpoint, at or before the queried one, in which the object
154 /// changed (or a restore/backfill floor row).
155 /// 2. **Restore fallback** — if no entry is at or before
156 /// `checkpoint`, the object's earliest entry is the only
157 /// candidate, and it lies after `checkpoint`. If that row is a
158 /// restore floor (`from_restore` set), the object predates the
159 /// available window and was live then at that version. Otherwise
160 /// it is a later change or creation, so the object did not exist
161 /// as of `checkpoint`.
162 ///
163 /// Returns `Ok(None)` when the object did not exist as of
164 /// `checkpoint` (or its history there has been pruned). The returned
165 /// version may point at a tombstone row in [`super::objects`]; use
166 /// [`get_object_at_checkpoint`](Self::get_object_at_checkpoint) to
167 /// collapse that to "no live object".
168 pub fn get_object_version_at_checkpoint(
169 &self,
170 id: ObjectID,
171 checkpoint: u64,
172 ) -> Result<Option<SequenceNumber>, Error> {
173 // Floor scan: reverse-scan this object's own prefix from
174 // `(id, checkpoint)` downward and take the first row. The
175 // `(id, 0)` lower bound keeps the scan from spilling into the
176 // previous object id when this one has no entry in range.
177 let lo = Key { id, checkpoint: 0 };
178 let hi = Key { id, checkpoint };
179 if let Some(row) = self
180 .object_version_by_checkpoint
181 .iter_rev((Bound::Included(lo), Bound::Included(hi)))?
182 .next()
183 {
184 let (_key, value) = row?;
185 return Ok(Some(SequenceNumber::from_u64(value.into_inner().version)));
186 }
187
188 // Restore fallback: a restore-floor row means the object
189 // predates the window and was live at this version; any other
190 // earliest row is a later change/creation, so the object did not
191 // exist as of `checkpoint`.
192 if let Some(row) = self
193 .object_version_by_checkpoint
194 .iter_prefix(&ObjectIdPrefix(id))?
195 .next()
196 {
197 let (_key, value) = row?;
198 let info = value.into_inner();
199 if info.from_restore.unwrap_or(false) {
200 return Ok(Some(SequenceNumber::from_u64(info.version)));
201 }
202 }
203
204 Ok(None)
205 }
206
207 /// Resolve the live object as of `checkpoint`, composing
208 /// [`get_object_version_at_checkpoint`](Self::get_object_version_at_checkpoint)
209 /// with the version-keyed [`super::objects`] lookup.
210 ///
211 /// Returns `Ok(None)` when the object did not exist as of
212 /// `checkpoint`, or was deleted or wrapped at or before it (the
213 /// resolved version points at a tombstone row).
214 pub fn get_object_at_checkpoint(
215 &self,
216 id: ObjectID,
217 checkpoint: u64,
218 ) -> Result<Option<Object>, Error> {
219 let Some(version) = self.get_object_version_at_checkpoint(id, checkpoint)? else {
220 return Ok(None);
221 };
222 self.get_object_by_key(id, version)
223 }
224
225 /// Iterate every `(checkpoint, version)` recorded for `id`, in
226 /// ascending checkpoint order.
227 pub fn iter_object_versions_by_checkpoint(
228 &self,
229 id: ObjectID,
230 ) -> Result<Iter<'_, Key, Value>, Error> {
231 self.object_version_by_checkpoint
232 .iter_prefix(&ObjectIdPrefix(id))
233 }
234
235 /// Resolve the latest version recorded for an object via this index:
236 /// the version at its greatest-checkpoint row.
237 ///
238 /// The checkpoint-unbounded counterpart to
239 /// [`get_object_version_at_checkpoint`](Self::get_object_version_at_checkpoint):
240 /// a reverse prefix scan takes the object's highest-checkpoint row,
241 /// whose version is the one live at the tip (a restore floor for an
242 /// object that never changed in the window is itself that row). It
243 /// agrees with the `objects`-CF reverse scan that backs
244 /// [`get_object`](Self::get_object), since an object's version
245 /// increases monotonically with the checkpoints it changes in; this
246 /// is offered as a convenience for callers already working through
247 /// the checkpoint index.
248 ///
249 /// The returned version may point at a tombstone row in
250 /// [`super::objects`] (the object was deleted or wrapped and the
251 /// removal has not yet been pruned). Returns `Ok(None)` when `id` has
252 /// no row (it never existed, or its history has been fully pruned).
253 pub fn get_latest_object_version(&self, id: ObjectID) -> Result<Option<SequenceNumber>, Error> {
254 let Some(row) = self
255 .object_version_by_checkpoint
256 .iter_rev_prefix(&ObjectIdPrefix(id))?
257 .next()
258 else {
259 return Ok(None);
260 };
261 let (_key, value) = row?;
262 Ok(Some(SequenceNumber::from_u64(value.into_inner().version)))
263 }
264}
265
266#[cfg(test)]
267mod tests {
268 use sui_consistent_store::Db;
269 use sui_consistent_store::DbOptions;
270 use sui_types::base_types::SuiAddress;
271
272 use super::*;
273 use crate::RpcStoreSchema;
274 use crate::schema::objects;
275
276 fn fresh_db() -> (tempfile::TempDir, sui_consistent_store::Db, RpcStoreSchema) {
277 let dir = tempfile::tempdir().unwrap();
278 let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
279 (dir, db, schema)
280 }
281
282 fn seq(v: u64) -> SequenceNumber {
283 SequenceNumber::from_u64(v)
284 }
285
286 fn put(schema: &RpcStoreSchema, db: &Db, id: ObjectID, checkpoint: u64, version: u64) {
287 let (k, v) = store(id, checkpoint, seq(version));
288 let mut batch = db.batch();
289 batch
290 .put(&schema.object_version_by_checkpoint, &k, &v)
291 .unwrap();
292 batch.commit().unwrap();
293 }
294
295 #[test]
296 fn returns_none_before_first_entry() {
297 let (_dir, db, schema) = fresh_db();
298 let id = ObjectID::random();
299 // Object first appears at checkpoint 10.
300 put(&schema, &db, id, 10, 5);
301
302 // A read pinned before the object existed sees nothing.
303 assert!(
304 schema
305 .get_object_version_at_checkpoint(id, 9)
306 .unwrap()
307 .is_none()
308 );
309 }
310
311 #[test]
312 fn resolves_greatest_checkpoint_at_or_before() {
313 let (_dir, db, schema) = fresh_db();
314 let id = ObjectID::random();
315 // The object changed at checkpoints 10, 20, and 30.
316 put(&schema, &db, id, 10, 5);
317 put(&schema, &db, id, 20, 6);
318 put(&schema, &db, id, 30, 7);
319
320 // Exactly on a change checkpoint resolves that version.
321 assert_eq!(
322 schema.get_object_version_at_checkpoint(id, 20).unwrap(),
323 Some(seq(6)),
324 );
325 // Between changes resolves the prior change (the floor).
326 assert_eq!(
327 schema.get_object_version_at_checkpoint(id, 25).unwrap(),
328 Some(seq(6)),
329 );
330 // After the last change resolves the latest known version.
331 assert_eq!(
332 schema.get_object_version_at_checkpoint(id, 1_000).unwrap(),
333 Some(seq(7)),
334 );
335 // The first change checkpoint resolves its version.
336 assert_eq!(
337 schema.get_object_version_at_checkpoint(id, 10).unwrap(),
338 Some(seq(5)),
339 );
340 }
341
342 #[test]
343 fn isolates_objects_from_each_other() {
344 let (_dir, db, schema) = fresh_db();
345 let a = ObjectID::from_single_byte(1);
346 let b = ObjectID::from_single_byte(2);
347 put(&schema, &db, a, 10, 5);
348 put(&schema, &db, b, 5, 9);
349
350 // `a` has no entry at or before checkpoint 9 even though `b`
351 // does -- the reverse scan must not spill across the id bound.
352 assert!(
353 schema
354 .get_object_version_at_checkpoint(a, 9)
355 .unwrap()
356 .is_none()
357 );
358 assert_eq!(
359 schema.get_object_version_at_checkpoint(b, 9).unwrap(),
360 Some(seq(9)),
361 );
362 }
363
364 #[test]
365 fn restore_floor_resolves_below_anchor_but_a_creation_does_not() {
366 let (_dir, db, schema) = fresh_db();
367 let static_obj = ObjectID::from_single_byte(1);
368 let created_obj = ObjectID::from_single_byte(2);
369
370 let mut batch = db.batch();
371 // `static_obj`: a live-set restore floor at anchor checkpoint
372 // 100 -- the object existed before the available window and
373 // never changed in it.
374 let (k1, v1) = store_restored(static_obj, 100, seq(5));
375 batch
376 .put(&schema.object_version_by_checkpoint, &k1, &v1)
377 .unwrap();
378 // `created_obj`: an ordinary (tip/backfill) row at 100 -- the
379 // object was created in checkpoint 100. Same key checkpoint, no
380 // `from_restore` flag.
381 let (k2, v2) = store(created_obj, 100, seq(7));
382 batch
383 .put(&schema.object_version_by_checkpoint, &k2, &v2)
384 .unwrap();
385 batch.commit().unwrap();
386
387 // Below the anchor the provenance flag is what separates them:
388 // the restore floor is live, the creation did not yet exist.
389 assert_eq!(
390 schema
391 .get_object_version_at_checkpoint(static_obj, 50)
392 .unwrap(),
393 Some(seq(5)),
394 "restore floor is live below the anchor",
395 );
396 assert!(
397 schema
398 .get_object_version_at_checkpoint(created_obj, 50)
399 .unwrap()
400 .is_none(),
401 "an object created at the anchor did not exist below it",
402 );
403
404 // At and above the anchor the plain floor scan resolves both.
405 assert_eq!(
406 schema
407 .get_object_version_at_checkpoint(static_obj, 100)
408 .unwrap(),
409 Some(seq(5)),
410 );
411 assert_eq!(
412 schema
413 .get_object_version_at_checkpoint(created_obj, 100)
414 .unwrap(),
415 Some(seq(7)),
416 );
417 assert_eq!(
418 schema
419 .get_object_version_at_checkpoint(static_obj, 999)
420 .unwrap(),
421 Some(seq(5)),
422 );
423 }
424
425 #[test]
426 fn get_object_at_checkpoint_composes_with_objects() {
427 let (_dir, db, schema) = fresh_db();
428 let id = ObjectID::random();
429 let object = Object::with_id_owner_for_testing(id, SuiAddress::ZERO);
430 let version = object.version();
431
432 let mut batch = db.batch();
433 batch
434 .put(
435 &schema.objects,
436 &objects::Key { id, version },
437 &objects::store(&object),
438 )
439 .unwrap();
440 let (k, v) = store(id, 42, version);
441 batch
442 .put(&schema.object_version_by_checkpoint, &k, &v)
443 .unwrap();
444 batch.commit().unwrap();
445
446 let read = schema
447 .get_object_at_checkpoint(id, 42)
448 .unwrap()
449 .expect("object present");
450 assert_eq!(read, object);
451 }
452
453 #[test]
454 fn get_object_at_checkpoint_returns_none_for_tombstone() {
455 let (_dir, db, schema) = fresh_db();
456 let id = ObjectID::random();
457 let tombstone_version = seq(7);
458
459 let mut batch = db.batch();
460 // The object was removed at checkpoint 50: a tombstone in
461 // `objects` at the tombstone version, and an index row pointing
462 // at it.
463 batch
464 .put(
465 &schema.objects,
466 &objects::Key {
467 id,
468 version: tombstone_version,
469 },
470 &objects::tombstone(objects::TombstoneKind::Deleted),
471 )
472 .unwrap();
473 let (k, v) = store(id, 50, tombstone_version);
474 batch
475 .put(&schema.object_version_by_checkpoint, &k, &v)
476 .unwrap();
477 batch.commit().unwrap();
478
479 // The version resolves, but the object collapses to None.
480 assert_eq!(
481 schema.get_object_version_at_checkpoint(id, 50).unwrap(),
482 Some(tombstone_version),
483 );
484 assert!(schema.get_object_at_checkpoint(id, 50).unwrap().is_none());
485 }
486
487 #[test]
488 fn iter_walks_checkpoints_for_one_object() {
489 let (_dir, db, schema) = fresh_db();
490 let id = ObjectID::random();
491 let other = ObjectID::random();
492 put(&schema, &db, id, 30, 7);
493 put(&schema, &db, id, 10, 5);
494 put(&schema, &db, id, 20, 6);
495 put(&schema, &db, other, 15, 1);
496
497 let checkpoints: Vec<u64> = schema
498 .iter_object_versions_by_checkpoint(id)
499 .unwrap()
500 .map(|res| res.unwrap().0.checkpoint)
501 .collect();
502 assert_eq!(checkpoints, vec![10, 20, 30]);
503 }
504
505 #[test]
506 fn get_latest_object_version_returns_none_for_unknown_id() {
507 let (_dir, _db, schema) = fresh_db();
508 assert!(
509 schema
510 .get_latest_object_version(ObjectID::random())
511 .unwrap()
512 .is_none()
513 );
514 }
515
516 #[test]
517 fn get_latest_object_version_returns_the_greatest_checkpoint_row() {
518 let (_dir, db, schema) = fresh_db();
519 let id = ObjectID::random();
520 // The object changed at checkpoints 10, 20, and 30; the latest
521 // version is the one recorded at the greatest checkpoint.
522 put(&schema, &db, id, 10, 5);
523 put(&schema, &db, id, 30, 7);
524 put(&schema, &db, id, 20, 6);
525
526 assert_eq!(schema.get_latest_object_version(id).unwrap(), Some(seq(7)));
527 }
528
529 #[test]
530 fn get_latest_object_version_uses_the_restore_floor_when_static() {
531 let (_dir, db, schema) = fresh_db();
532 let id = ObjectID::random();
533 // A static object restored before the available window has only
534 // its `from_restore` floor row; that floor is its latest version.
535 let (k, v) = store_restored(id, 100, seq(5));
536 let mut batch = db.batch();
537 batch
538 .put(&schema.object_version_by_checkpoint, &k, &v)
539 .unwrap();
540 batch.commit().unwrap();
541
542 assert_eq!(schema.get_latest_object_version(id).unwrap(), Some(seq(5)));
543 }
544
545 #[test]
546 fn get_latest_object_version_isolates_objects_by_id() {
547 let (_dir, db, schema) = fresh_db();
548 let a = ObjectID::from_single_byte(1);
549 let b = ObjectID::from_single_byte(2);
550 put(&schema, &db, a, 30, 9);
551 put(&schema, &db, b, 10, 4);
552
553 // The reverse prefix scan must not spill across the id bound.
554 assert_eq!(schema.get_latest_object_version(a).unwrap(), Some(seq(9)));
555 assert_eq!(schema.get_latest_object_version(b).unwrap(), Some(seq(4)));
556 }
557}