sui_rpc_store/indexer/
object_version_by_checkpoint.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Sequential pipeline that populates the
5//! [`schema::object_version_by_checkpoint`](crate::schema::object_version_by_checkpoint)
6//! CF, which resolves an object's version as of a checkpoint.
7//!
8//! It writes three kinds of rows:
9//!
10//! - **Change rows** `(id, c) -> final version` -- one per object that
11//!   changed in checkpoint `c`, carrying its final version at the end
12//!   of `c` (a live version, or the tombstone version for an object
13//!   deleted or wrapped and not re-created). Only the final version is
14//!   recorded; intra-checkpoint intermediate versions stay addressable
15//!   through the version-keyed [`objects`](crate::schema::objects) CF.
16//! - **Restore floor rows** `(id, T) -> version`, marked `from_restore`
17//!   -- one per live object at the restore anchor `T`, bulk-loaded by
18//!   the restore impl. A read below `T` for an object that never
19//!   changed in the available window falls back to these.
20//! - **Synthetic floor rows** `(id, 0) -> window-entry version` -- for
21//!   an object that existed before the available window `[L, T]` and
22//!   first changes within it, this records the version it entered the
23//!   window with, so a read in `[L, first-change)` resolves to that
24//!   instead of the newer restore floor. Written during the embedded
25//!   backfill only: past `T` the restore floor already covers
26//!   pre-window objects, so the dedup read is skipped. The row is keyed
27//!   at checkpoint 0 (below the window, where `L > 0`) so the floor
28//!   scan finds it, and the effects-driven pruner retracts it once the
29//!   object's first in-window change ages out.
30
31use std::collections::BTreeMap;
32use std::ops::Bound;
33use std::sync::Arc;
34
35use anyhow::Context as _;
36use async_trait::async_trait;
37use sui_consistent_store::Batch;
38use sui_consistent_store::Db;
39use sui_consistent_store::DbMap;
40use sui_consistent_store::PipelineTaskKey;
41use sui_consistent_store::Restore;
42use sui_consistent_store::error::Error;
43use sui_consistent_store::reader::Reader;
44use sui_consistent_store::restore_state;
45use sui_indexer_alt_framework::pipeline::Processor;
46use sui_indexer_alt_framework::pipeline::sequential;
47use sui_types::base_types::ObjectID;
48use sui_types::base_types::SequenceNumber;
49use sui_types::effects::TransactionEffectsAPI;
50use sui_types::full_checkpoint_content::Checkpoint;
51use sui_types::object::Object;
52
53use crate::RpcStoreSchema;
54use crate::indexer::Schema;
55use crate::indexer::Store;
56use crate::indexer::checkpoint_input_objects;
57use crate::indexer::checkpoint_output_objects;
58use crate::schema::object_version_by_checkpoint;
59
60/// Pipeline marker for `object_version_by_checkpoint`.
61///
62/// `anchor` is the restore anchor `T`, used two ways: the restore impl
63/// writes its `from_restore` floor rows at `T`, and the processor only
64/// produces floor candidates within the backfill window `[L, T]`
65/// (checkpoints at or below `T`). It is `None` for a from-genesis build
66/// (no restore), which has no window and needs no floor rows.
67#[derive(Default)]
68pub struct ObjectVersionByCheckpoint {
69    anchor: Option<u64>,
70}
71
72/// One staged write produced by [`process`](ObjectVersionByCheckpoint::process).
73pub enum Row {
74    /// Object `id`'s final version at the end of `checkpoint` -- a live
75    /// version, or a tombstone version for a removal.
76    Change {
77        id: ObjectID,
78        checkpoint: u64,
79        version: SequenceNumber,
80    },
81    /// Object `id` existed before `checkpoint` and was an input to it,
82    /// entering with `version`. A synthetic floor row is written at
83    /// `(id, 0)` iff this is the object's first appearance in the
84    /// backfill window (so it predates the window).
85    Floor {
86        id: ObjectID,
87        checkpoint: u64,
88        version: SequenceNumber,
89    },
90}
91
92impl ObjectVersionByCheckpoint {
93    /// Marker for the restore-driver registration: writes `from_restore`
94    /// floor rows at the anchor `checkpoint`.
95    pub fn for_restore(checkpoint: u64) -> Self {
96        Self {
97            anchor: Some(checkpoint),
98        }
99    }
100
101    /// Marker for the tip/backfill registration, carrying the restore
102    /// anchor `T` (or `None` for a from-genesis build) so the processor
103    /// scopes floor candidates to the backfill window `[L, T]`.
104    pub fn with_anchor(anchor: Option<u64>) -> Self {
105        Self { anchor }
106    }
107}
108
109#[async_trait]
110impl Processor for ObjectVersionByCheckpoint {
111    const NAME: &'static str = "object_version_by_checkpoint";
112    type Value = Row;
113
114    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Row>> {
115        let cp = checkpoint.summary.data().sequence_number;
116
117        // Change rows: objects live at the end of the checkpoint, each
118        // with its final version.
119        let outputs = checkpoint_output_objects(checkpoint)?;
120        let mut rows: Vec<Row> = outputs
121            .iter()
122            .map(|(id, (object, _))| Row::Change {
123                id: *id,
124                checkpoint: cp,
125                version: object.version(),
126            })
127            .collect();
128
129        // Objects removed (deleted or wrapped) during the checkpoint:
130        // record the tombstone version -- the removing transaction's
131        // lamport version, where the `objects` pipeline writes the
132        // tombstone row -- keeping the highest such version if an id is
133        // touched more than once. A read pinned at `cp` then resolves
134        // to the tombstone (and thus "no live object") instead of the
135        // stale prior version.
136        let mut removed: BTreeMap<ObjectID, SequenceNumber> = BTreeMap::new();
137        for tx in &checkpoint.transactions {
138            let lamport = tx.effects.lamport_version();
139            for oref in tx
140                .effects
141                .deleted()
142                .into_iter()
143                .chain(tx.effects.unwrapped_then_deleted())
144                .chain(tx.effects.wrapped())
145            {
146                removed
147                    .entry(oref.0)
148                    .and_modify(|v| *v = (*v).max(lamport))
149                    .or_insert(lamport);
150            }
151        }
152
153        for (id, version) in removed {
154            // Removed then re-created within the same checkpoint (e.g.
155            // wrapped then unwrapped) -- it is live at the end and
156            // already covered by its output row above.
157            if outputs.contains_key(&id) {
158                continue;
159            }
160            rows.push(Row::Change {
161                id,
162                checkpoint: cp,
163                version,
164            });
165        }
166
167        // Floor candidates, only within the backfill window `[L, T]`:
168        // objects that existed *before* this checkpoint and were inputs
169        // to it (so they predate any creation this checkpoint), each
170        // carrying its incoming version. `commit` turns the first such
171        // appearance per object into a synthetic floor row. Past `T` the
172        // restore floor already covers pre-window objects, so producing
173        // these (in the worker pool) would be wasted work.
174        if self.anchor.is_some_and(|t| cp <= t) {
175            for (id, (input, _)) in checkpoint_input_objects(checkpoint)? {
176                rows.push(Row::Floor {
177                    id,
178                    checkpoint: cp,
179                    version: input.version(),
180                });
181            }
182        }
183
184        Ok(rows)
185    }
186}
187
188impl Restore for ObjectVersionByCheckpoint {
189    type Schema = RpcStoreSchema;
190
191    fn restore(
192        &self,
193        schema: &Self::Schema,
194        object: &Object,
195        batch: &mut Batch,
196    ) -> anyhow::Result<()> {
197        // Restoration runs against a live-object snapshot with no
198        // per-checkpoint history, so every live object contributes one
199        // row at the restore anchor. The anchor is supplied at
200        // registration (`for_restore`); a tip-mode marker would never
201        // be registered with the restore driver, so its absence is a
202        // programmer error.
203        let checkpoint = self
204            .anchor
205            .context("object_version_by_checkpoint restored without a restore anchor checkpoint")?;
206        // Mark these as restore-floor rows so a checkpoint-pinned read
207        // below the anchor can tell a pre-window object (live) apart
208        // from one created in the anchor checkpoint.
209        let (key, value) =
210            object_version_by_checkpoint::store_restored(object.id(), checkpoint, object.version());
211        batch.put(&schema.object_version_by_checkpoint, &key, &value)?;
212        Ok(())
213    }
214}
215
216#[async_trait]
217impl sequential::Handler for ObjectVersionByCheckpoint {
218    type Store = Store;
219    type Batch = Vec<Row>;
220
221    fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Row>) {
222        batch.extend(values);
223    }
224
225    async fn commit<'a>(
226        &self,
227        batch: &Self::Batch,
228        conn: &mut sui_consistent_store::Connection<'a, Schema>,
229    ) -> anyhow::Result<usize> {
230        let cf = &conn.store.schema().object_version_by_checkpoint;
231
232        let mut count = 0;
233        for row in batch {
234            match row {
235                Row::Change {
236                    id,
237                    checkpoint,
238                    version,
239                } => {
240                    let (k, v) = object_version_by_checkpoint::store(*id, *checkpoint, *version);
241                    conn.batch.put(cf, &k, &v)?;
242                    count += 1;
243                }
244                Row::Floor {
245                    id,
246                    checkpoint,
247                    version,
248                } => {
249                    // The processor only emits floor candidates within
250                    // the backfill window, so all that is left is to
251                    // dedup repeated and re-indexed appearances: only the
252                    // object's first appearance writes the floor row.
253                    if is_first_appearance(cf, *id, *checkpoint)? {
254                        let (k, v) = object_version_by_checkpoint::store(*id, 0, *version);
255                        conn.batch.put(cf, &k, &v)?;
256                        count += 1;
257                    }
258                }
259            }
260        }
261        Ok(count)
262    }
263}
264
265/// The restore anchor `T` (`__restore.restored_at`) for this pipeline,
266/// or `None` if it was never restored. Read once at registration so the
267/// processor can scope floor candidates to the backfill window `[L, T]`.
268pub(crate) fn restored_anchor(db: &Db) -> anyhow::Result<Option<u64>> {
269    let key = PipelineTaskKey::new(ObjectVersionByCheckpoint::NAME);
270    Ok(db
271        .framework()
272        .restore
273        .get(&key)?
274        .and_then(|s| match s.state {
275            Some(restore_state::State::Complete(c)) => Some(c.restored_at),
276            _ => None,
277        }))
278}
279
280/// Whether `checkpoint` is the object's first appearance in the index:
281/// it has no row strictly below `checkpoint`. The restore floor sits at
282/// `T >= checkpoint`, so it is excluded, as is the change row written
283/// for this same checkpoint. Dedups repeated and re-indexed appearances
284/// so only the first writes the synthetic floor row.
285fn is_first_appearance<R: Reader>(
286    cf: &DbMap<object_version_by_checkpoint::Key, object_version_by_checkpoint::Value, R>,
287    id: ObjectID,
288    checkpoint: u64,
289) -> Result<bool, Error> {
290    let lo = object_version_by_checkpoint::Key { id, checkpoint: 0 };
291    let hi = object_version_by_checkpoint::Key { id, checkpoint };
292    let seen = cf
293        .iter_rev((Bound::Included(lo), Bound::Excluded(hi)))?
294        .next()
295        .is_some();
296    Ok(!seen)
297}
298
299#[cfg(test)]
300mod tests {
301    use std::sync::Arc;
302
303    use sui_consistent_store::Db;
304    use sui_consistent_store::DbOptions;
305    use sui_consistent_store::FrameworkSchema;
306    use sui_consistent_store::RestoreState;
307    use sui_types::base_types::SuiAddress;
308    use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
309
310    use super::*;
311
312    fn fresh_db() -> (tempfile::TempDir, Db, RpcStoreSchema) {
313        let dir = tempfile::tempdir().unwrap();
314        let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
315        (dir, db, schema)
316    }
317
318    /// Seed this pipeline's `__restore` row as `Complete { restored_at }`.
319    fn seed_restored_at(db: &Db, restored_at: u64) {
320        let framework = FrameworkSchema::new(db.clone());
321        let mut batch = db.batch();
322        batch
323            .put(
324                &framework.restore,
325                &PipelineTaskKey::new(ObjectVersionByCheckpoint::NAME),
326                &RestoreState {
327                    state: Some(restore_state::State::Complete(restore_state::Complete {
328                        restored_at,
329                    })),
330                },
331            )
332            .unwrap();
333        batch.commit().unwrap();
334    }
335
336    fn put(schema: &RpcStoreSchema, db: &Db, id: ObjectID, checkpoint: u64, version: u64) {
337        let (k, v) =
338            object_version_by_checkpoint::store(id, checkpoint, SequenceNumber::from_u64(version));
339        let mut batch = db.batch();
340        batch
341            .put(&schema.object_version_by_checkpoint, &k, &v)
342            .unwrap();
343        batch.commit().unwrap();
344    }
345
346    #[tokio::test]
347    async fn process_runs_against_synthetic_checkpoint() {
348        let checkpoint = Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
349        let _rows = ObjectVersionByCheckpoint::default()
350            .process(&checkpoint)
351            .await
352            .unwrap();
353    }
354
355    /// A live object created in the checkpoint gets a change row at the
356    /// checkpoint's sequence number, carrying its current version.
357    #[tokio::test]
358    async fn process_records_final_live_version() {
359        let checkpoint = Arc::new(
360            TestCheckpointBuilder::new(7)
361                .start_transaction(0)
362                .create_owned_object(0)
363                .finish_transaction()
364                .build_checkpoint(),
365        );
366        let created_id = TestCheckpointBuilder::derive_object_id(0);
367        let version = checkpoint.transactions[0].effects.lamport_version();
368
369        // Within the backfill window (anchor above this checkpoint), so
370        // the floor candidates are produced.
371        let rows = ObjectVersionByCheckpoint::with_anchor(Some(100))
372            .process(&checkpoint)
373            .await
374            .unwrap();
375        let found = rows.iter().find_map(|r| match r {
376            Row::Change {
377                id,
378                checkpoint,
379                version,
380            } if *id == created_id => Some((*checkpoint, *version)),
381            _ => None,
382        });
383        assert_eq!(found, Some((7, version)));
384        // A freshly created object is not an input to its own checkpoint,
385        // so it produces no floor candidate.
386        assert!(
387            !rows
388                .iter()
389                .any(|r| matches!(r, Row::Floor { id, .. } if *id == created_id))
390        );
391    }
392
393    /// An object deleted in the checkpoint is recorded at the tombstone
394    /// (lamport) version, not its prior live version.
395    #[tokio::test]
396    async fn process_records_tombstone_for_deleted_object() {
397        let mut builder = TestCheckpointBuilder::new(0)
398            .start_transaction(0)
399            .create_owned_object(0)
400            .finish_transaction();
401        let _cp0 = builder.build_checkpoint();
402        builder = builder
403            .start_transaction(0)
404            .delete_object(0)
405            .finish_transaction();
406        let cp1 = Arc::new(builder.build_checkpoint());
407
408        let deleted_id = TestCheckpointBuilder::derive_object_id(0);
409        let tombstone_version = cp1.transactions[0].effects.lamport_version();
410
411        let rows = ObjectVersionByCheckpoint::default()
412            .process(&cp1)
413            .await
414            .unwrap();
415        let found = rows.iter().find_map(|r| match r {
416            Row::Change {
417                id,
418                checkpoint,
419                version,
420            } if *id == deleted_id => Some((*checkpoint, *version)),
421            _ => None,
422        });
423        assert_eq!(found, Some((1, tombstone_version)));
424    }
425
426    /// An object that existed before the checkpoint and is consumed by
427    /// it produces a floor candidate carrying its incoming version.
428    #[tokio::test]
429    async fn process_emits_floor_candidate_for_input_object() {
430        let mut builder = TestCheckpointBuilder::new(0)
431            .start_transaction(0)
432            .create_owned_object(0)
433            .finish_transaction();
434        let cp0 = builder.build_checkpoint();
435        builder = builder
436            .start_transaction(0)
437            .transfer_object(0, 1)
438            .finish_transaction();
439        let cp1 = Arc::new(builder.build_checkpoint());
440
441        let obj = TestCheckpointBuilder::derive_object_id(0);
442        let incoming = cp0.transactions[0].effects.lamport_version();
443
444        // Within the backfill window: the input object is floored.
445        let rows = ObjectVersionByCheckpoint::with_anchor(Some(100))
446            .process(&cp1)
447            .await
448            .unwrap();
449        let floor = rows.iter().find_map(|r| match r {
450            Row::Floor {
451                id,
452                checkpoint,
453                version,
454            } if *id == obj => Some((*checkpoint, *version)),
455            _ => None,
456        });
457        assert_eq!(floor, Some((1, incoming)), "input object floor candidate");
458    }
459
460    /// Past the restore anchor (tip indexing), the processor produces no
461    /// floor candidates at all, even for input objects.
462    #[tokio::test]
463    async fn process_skips_floor_candidates_past_the_anchor() {
464        let mut builder = TestCheckpointBuilder::new(0)
465            .start_transaction(0)
466            .create_owned_object(0)
467            .finish_transaction();
468        let _cp0 = builder.build_checkpoint();
469        builder = builder
470            .start_transaction(0)
471            .transfer_object(0, 1)
472            .finish_transaction();
473        let cp1 = Arc::new(builder.build_checkpoint());
474
475        // Anchor below this checkpoint (cp 1 > T 0): tip indexing, so no
476        // floor candidates are produced.
477        let rows = ObjectVersionByCheckpoint::with_anchor(Some(0))
478            .process(&cp1)
479            .await
480            .unwrap();
481        assert!(!rows.iter().any(|r| matches!(r, Row::Floor { .. })));
482        // And likewise for a from-genesis build (no anchor).
483        let rows = ObjectVersionByCheckpoint::default()
484            .process(&cp1)
485            .await
486            .unwrap();
487        assert!(!rows.iter().any(|r| matches!(r, Row::Floor { .. })));
488    }
489
490    /// Restore writes a `from_restore` floor row at the anchor, which
491    /// resolves at, above, and (via the fallback) below the anchor.
492    #[test]
493    fn restore_writes_one_row_at_the_anchor() {
494        let (_dir, db, schema) = fresh_db();
495
496        let object =
497            Object::with_id_owner_for_testing(ObjectID::from_single_byte(1), SuiAddress::ZERO);
498
499        let mut batch = db.batch();
500        ObjectVersionByCheckpoint::for_restore(123)
501            .restore(&schema, &object, &mut batch)
502            .unwrap();
503        batch.commit().unwrap();
504
505        for cp in [122, 123, 200] {
506            assert_eq!(
507                schema
508                    .get_object_version_at_checkpoint(object.id(), cp)
509                    .unwrap(),
510                Some(object.version()),
511            );
512        }
513    }
514
515    #[test]
516    fn restored_anchor_reads_restore_state() {
517        let (_dir, db, _schema) = fresh_db();
518        // No restore row yet.
519        assert_eq!(restored_anchor(&db).unwrap(), None);
520        // A completed restore exposes its anchor.
521        seed_restored_at(&db, 42);
522        assert_eq!(restored_anchor(&db).unwrap(), Some(42));
523    }
524
525    #[test]
526    fn is_first_appearance_true_with_no_prior_row() {
527        let (_dir, _db, schema) = fresh_db();
528        let id = ObjectID::random();
529        assert!(is_first_appearance(&schema.object_version_by_checkpoint, id, 50).unwrap());
530    }
531
532    #[test]
533    fn is_first_appearance_false_when_already_seen() {
534        let (_dir, db, schema) = fresh_db();
535        let id = ObjectID::random();
536        // A prior row exists below the checkpoint.
537        put(&schema, &db, id, 30, 5);
538        assert!(!is_first_appearance(&schema.object_version_by_checkpoint, id, 50).unwrap());
539    }
540
541    #[test]
542    fn is_first_appearance_ignores_rows_at_or_above_the_checkpoint() {
543        let (_dir, db, schema) = fresh_db();
544        let id = ObjectID::random();
545        // The restore floor row sits at the anchor (100), at or above the
546        // queried checkpoint, so it must not count as a prior row.
547        put(&schema, &db, id, 100, 9);
548        assert!(is_first_appearance(&schema.object_version_by_checkpoint, id, 50).unwrap());
549    }
550
551    /// End-to-end shape for a pre-window object that first changes
552    /// inside the window: the rows the restore and backfill would write,
553    /// then the reads they enable. Below the first change, the synthetic
554    /// floor at `(id, 0)` answers with the pre-window version rather than
555    /// the newer restore floor.
556    #[test]
557    fn synthetic_floor_serves_reads_below_first_change() {
558        let (_dir, db, schema) = fresh_db();
559        let id = ObjectID::random();
560        let anchor = 100; // restore tip T
561        let window_entry = SequenceNumber::from_u64(5); // version entering the window
562        let v1 = SequenceNumber::from_u64(6); // after the first in-window change (cp 50)
563
564        // The restore floor at T, and the backfilled change row at the
565        // object's first in-window change (cp 50).
566        let mut batch = db.batch();
567        let (rk, rv) = object_version_by_checkpoint::store_restored(id, anchor, v1);
568        batch
569            .put(&schema.object_version_by_checkpoint, &rk, &rv)
570            .unwrap();
571        let (ck, cv) = object_version_by_checkpoint::store(id, 50, v1);
572        batch
573            .put(&schema.object_version_by_checkpoint, &ck, &cv)
574            .unwrap();
575        batch.commit().unwrap();
576
577        // That change is the object's first appearance in the window, so
578        // the backfill writes a synthetic floor at `(id, 0)`.
579        assert!(is_first_appearance(&schema.object_version_by_checkpoint, id, 50).unwrap());
580        let mut batch = db.batch();
581        let (fk, fv) = object_version_by_checkpoint::store(id, 0, window_entry);
582        batch
583            .put(&schema.object_version_by_checkpoint, &fk, &fv)
584            .unwrap();
585        batch.commit().unwrap();
586
587        // Below the first change, the synthetic floor answers with the
588        // pre-window version (not the restore floor's newer version).
589        assert_eq!(
590            schema.get_object_version_at_checkpoint(id, 30).unwrap(),
591            Some(window_entry),
592        );
593        // At and after the first change, the change row answers.
594        assert_eq!(
595            schema.get_object_version_at_checkpoint(id, 50).unwrap(),
596            Some(v1),
597        );
598        assert_eq!(
599            schema.get_object_version_at_checkpoint(id, 75).unwrap(),
600            Some(v1),
601        );
602        assert_eq!(
603            schema.get_object_version_at_checkpoint(id, 100).unwrap(),
604            Some(v1),
605        );
606    }
607}