sui_rpc_store/indexer/
objects.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Sequential pipeline that populates the
5//! [`schema::objects`](crate::schema::objects) CF: one row per
6//! `(ObjectID, version)` written or removed by any transaction in
7//! the checkpoint.
8//!
9//! Every output version is preserved — historical versions accrue
10//! so callers can read an object at any version it has ever
11//! existed at. Intra-checkpoint intermediate versions (created
12//! when a shared object is touched by multiple transactions in
13//! the same checkpoint) are all retained.
14//!
15//! In addition to live versions, every object that was deleted or
16//! wrapped by a transaction gets a tombstone row at the
17//! transaction's lamport version. Tombstones let version-bounded
18//! reads tell "no row at this version" (object did not exist)
19//! apart from "tombstone row at this version" (object was
20//! removed). `unwrapped_then_deleted` objects also get a `Deleted`
21//! tombstone, matching the validator's perpetual-store semantics.
22
23use std::sync::Arc;
24
25use async_trait::async_trait;
26use sui_consistent_store::Batch;
27use sui_consistent_store::Restore;
28use sui_indexer_alt_framework::pipeline::Processor;
29use sui_indexer_alt_framework::pipeline::sequential;
30use sui_types::base_types::ObjectID;
31use sui_types::base_types::SequenceNumber;
32use sui_types::effects::TransactionEffectsAPI;
33use sui_types::full_checkpoint_content::Checkpoint;
34use sui_types::object::Object;
35
36use crate::RpcStoreSchema;
37use crate::indexer::Schema;
38use crate::indexer::Store;
39use crate::schema::objects;
40use crate::schema::objects::TombstoneKind;
41
42/// Pipeline marker for `objects`.
43pub struct Objects;
44
45pub struct Row {
46    pub id: ObjectID,
47    pub version: SequenceNumber,
48    pub value: objects::Value,
49}
50
51#[async_trait]
52impl Processor for Objects {
53    const NAME: &'static str = "objects";
54    type Value = Row;
55
56    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Row>> {
57        let mut rows = Vec::new();
58        for tx in &checkpoint.transactions {
59            for object in tx.output_objects(&checkpoint.object_set) {
60                rows.push(Row {
61                    id: object.id(),
62                    version: object.version(),
63                    value: objects::store(object),
64                });
65            }
66            let lamport_version = tx.effects.lamport_version();
67            for (id, _) in tx
68                .effects
69                .deleted()
70                .into_iter()
71                .chain(tx.effects.unwrapped_then_deleted())
72                .map(|oref| (oref.0, oref.1))
73            {
74                rows.push(Row {
75                    id,
76                    version: lamport_version,
77                    value: objects::tombstone(TombstoneKind::Deleted),
78                });
79            }
80            for oref in tx.effects.wrapped() {
81                rows.push(Row {
82                    id: oref.0,
83                    version: lamport_version,
84                    value: objects::tombstone(TombstoneKind::Wrapped),
85                });
86            }
87        }
88        Ok(rows)
89    }
90}
91
92impl Restore for Objects {
93    type Schema = RpcStoreSchema;
94
95    fn restore(
96        &self,
97        schema: &Self::Schema,
98        object: &Object,
99        batch: &mut Batch,
100    ) -> anyhow::Result<()> {
101        // Restoration runs against a live-object snapshot, so each
102        // object contributes exactly one `(id, version)` row at its
103        // current version. Historical versions are not present in
104        // the source and accrue only from tip indexing onward.
105        batch.put(
106            &schema.objects,
107            &objects::Key {
108                id: object.id(),
109                version: object.version(),
110            },
111            &objects::store(object),
112        )?;
113        Ok(())
114    }
115}
116
117#[async_trait]
118impl sequential::Handler for Objects {
119    type Store = Store;
120    type Batch = Vec<Row>;
121
122    fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Row>) {
123        batch.extend(values);
124    }
125
126    async fn commit<'a>(
127        &self,
128        batch: &Self::Batch,
129        conn: &mut sui_consistent_store::Connection<'a, Schema>,
130    ) -> anyhow::Result<usize> {
131        let cf = &conn.store.schema().objects;
132        for row in batch {
133            conn.batch.put(
134                cf,
135                &objects::Key {
136                    id: row.id,
137                    version: row.version,
138                },
139                &row.value,
140            )?;
141        }
142        Ok(batch.len())
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use std::sync::Arc;
149
150    use sui_consistent_store::Db;
151    use sui_consistent_store::DbOptions;
152    use sui_types::base_types::SuiAddress;
153    use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
154
155    use super::*;
156
157    #[tokio::test]
158    async fn process_runs_against_synthetic_checkpoint() {
159        let checkpoint = Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
160        let _rows = Objects.process(&checkpoint).await.unwrap();
161    }
162
163    /// End-to-end: drive a checkpoint that creates an object, then
164    /// deletes one and wraps another. The pipeline should emit a
165    /// live row for every output object and a tombstone (with the
166    /// right kind) for every deleted / wrapped id, all at the
167    /// transaction's lamport version.
168    #[tokio::test]
169    async fn process_emits_tombstones_for_deleted_and_wrapped_objects() {
170        let checkpoint = Arc::new(
171            TestCheckpointBuilder::new(1)
172                .start_transaction(0)
173                .create_owned_object(0)
174                .create_owned_object(1)
175                .finish_transaction()
176                .start_transaction(0)
177                .delete_object(0)
178                .finish_transaction()
179                .start_transaction(0)
180                .wrap_object(1)
181                .finish_transaction()
182                .build_checkpoint(),
183        );
184
185        let deleted_id = TestCheckpointBuilder::derive_object_id(0);
186        let wrapped_id = TestCheckpointBuilder::derive_object_id(1);
187        let delete_lamport = checkpoint.transactions[1].effects.lamport_version();
188        let wrap_lamport = checkpoint.transactions[2].effects.lamport_version();
189
190        let dir = tempfile::tempdir().unwrap();
191        let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
192
193        let rows = Objects.process(&checkpoint).await.unwrap();
194        let mut batch = db.batch();
195        for row in &rows {
196            batch
197                .put(
198                    &schema.objects,
199                    &objects::Key {
200                        id: row.id,
201                        version: row.version,
202                    },
203                    &row.value,
204                )
205                .unwrap();
206        }
207        batch.commit().unwrap();
208
209        // Tombstones land at the deleting / wrapping tx's lamport
210        // version with the right kind.
211        assert_eq!(
212            schema
213                .get_object_status_by_key(deleted_id, delete_lamport)
214                .unwrap(),
215            Some(objects::Status::Tombstone(TombstoneKind::Deleted)),
216        );
217        assert_eq!(
218            schema
219                .get_object_status_by_key(wrapped_id, wrap_lamport)
220                .unwrap(),
221            Some(objects::Status::Tombstone(TombstoneKind::Wrapped)),
222        );
223
224        // `get_object_by_key` flattens tombstones to None, matching
225        // its "live object or nothing" contract.
226        assert!(
227            schema
228                .get_object_by_key(deleted_id, delete_lamport)
229                .unwrap()
230                .is_none(),
231        );
232        assert!(
233            schema
234                .get_object_by_key(wrapped_id, wrap_lamport)
235                .unwrap()
236                .is_none(),
237        );
238    }
239
240    #[test]
241    fn restore_writes_one_row_per_object_version() {
242        let dir = tempfile::tempdir().unwrap();
243        let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
244
245        let o1 = Object::with_id_owner_for_testing(ObjectID::from_single_byte(1), SuiAddress::ZERO);
246        let o2 = Object::with_id_owner_for_testing(ObjectID::from_single_byte(2), SuiAddress::ZERO);
247
248        let mut batch = db.batch();
249        Objects.restore(&schema, &o1, &mut batch).unwrap();
250        Objects.restore(&schema, &o2, &mut batch).unwrap();
251        batch.commit().unwrap();
252
253        assert_eq!(
254            schema.get_object_by_key(o1.id(), o1.version()).unwrap(),
255            Some(o1),
256        );
257        assert_eq!(
258            schema.get_object_by_key(o2.id(), o2.version()).unwrap(),
259            Some(o2),
260        );
261    }
262}