sui_rpc_store/indexer/
objects.rs1use std::sync::Arc;
24
25use async_trait::async_trait;
26use sui_consistent_store::Batch;
27use sui_consistent_store::Restore;
28use sui_indexer_alt_framework::pipeline::Processor;
29use sui_indexer_alt_framework::pipeline::sequential;
30use sui_types::base_types::ObjectID;
31use sui_types::base_types::SequenceNumber;
32use sui_types::effects::TransactionEffectsAPI;
33use sui_types::full_checkpoint_content::Checkpoint;
34use sui_types::object::Object;
35
36use crate::RpcStoreSchema;
37use crate::indexer::Schema;
38use crate::indexer::Store;
39use crate::schema::objects;
40use crate::schema::objects::TombstoneKind;
41
42pub struct Objects;
44
45pub struct Row {
46 pub id: ObjectID,
47 pub version: SequenceNumber,
48 pub value: objects::Value,
49}
50
51#[async_trait]
52impl Processor for Objects {
53 const NAME: &'static str = "objects";
54 type Value = Row;
55
56 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Row>> {
57 let mut rows = Vec::new();
58 for tx in &checkpoint.transactions {
59 for object in tx.output_objects(&checkpoint.object_set) {
60 rows.push(Row {
61 id: object.id(),
62 version: object.version(),
63 value: objects::store(object),
64 });
65 }
66 let lamport_version = tx.effects.lamport_version();
67 for (id, _) in tx
68 .effects
69 .deleted()
70 .into_iter()
71 .chain(tx.effects.unwrapped_then_deleted())
72 .map(|oref| (oref.0, oref.1))
73 {
74 rows.push(Row {
75 id,
76 version: lamport_version,
77 value: objects::tombstone(TombstoneKind::Deleted),
78 });
79 }
80 for oref in tx.effects.wrapped() {
81 rows.push(Row {
82 id: oref.0,
83 version: lamport_version,
84 value: objects::tombstone(TombstoneKind::Wrapped),
85 });
86 }
87 }
88 Ok(rows)
89 }
90}
91
92impl Restore for Objects {
93 type Schema = RpcStoreSchema;
94
95 fn restore(
96 &self,
97 schema: &Self::Schema,
98 object: &Object,
99 batch: &mut Batch,
100 ) -> anyhow::Result<()> {
101 batch.put(
106 &schema.objects,
107 &objects::Key {
108 id: object.id(),
109 version: object.version(),
110 },
111 &objects::store(object),
112 )?;
113 Ok(())
114 }
115}
116
117#[async_trait]
118impl sequential::Handler for Objects {
119 type Store = Store;
120 type Batch = Vec<Row>;
121
122 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Row>) {
123 batch.extend(values);
124 }
125
126 async fn commit<'a>(
127 &self,
128 batch: &Self::Batch,
129 conn: &mut sui_consistent_store::Connection<'a, Schema>,
130 ) -> anyhow::Result<usize> {
131 let cf = &conn.store.schema().objects;
132 for row in batch {
133 conn.batch.put(
134 cf,
135 &objects::Key {
136 id: row.id,
137 version: row.version,
138 },
139 &row.value,
140 )?;
141 }
142 Ok(batch.len())
143 }
144}
145
146#[cfg(test)]
147mod tests {
148 use std::sync::Arc;
149
150 use sui_consistent_store::Db;
151 use sui_consistent_store::DbOptions;
152 use sui_types::base_types::SuiAddress;
153 use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
154
155 use super::*;
156
157 #[tokio::test]
158 async fn process_runs_against_synthetic_checkpoint() {
159 let checkpoint = Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
160 let _rows = Objects.process(&checkpoint).await.unwrap();
161 }
162
163 #[tokio::test]
169 async fn process_emits_tombstones_for_deleted_and_wrapped_objects() {
170 let checkpoint = Arc::new(
171 TestCheckpointBuilder::new(1)
172 .start_transaction(0)
173 .create_owned_object(0)
174 .create_owned_object(1)
175 .finish_transaction()
176 .start_transaction(0)
177 .delete_object(0)
178 .finish_transaction()
179 .start_transaction(0)
180 .wrap_object(1)
181 .finish_transaction()
182 .build_checkpoint(),
183 );
184
185 let deleted_id = TestCheckpointBuilder::derive_object_id(0);
186 let wrapped_id = TestCheckpointBuilder::derive_object_id(1);
187 let delete_lamport = checkpoint.transactions[1].effects.lamport_version();
188 let wrap_lamport = checkpoint.transactions[2].effects.lamport_version();
189
190 let dir = tempfile::tempdir().unwrap();
191 let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
192
193 let rows = Objects.process(&checkpoint).await.unwrap();
194 let mut batch = db.batch();
195 for row in &rows {
196 batch
197 .put(
198 &schema.objects,
199 &objects::Key {
200 id: row.id,
201 version: row.version,
202 },
203 &row.value,
204 )
205 .unwrap();
206 }
207 batch.commit().unwrap();
208
209 assert_eq!(
212 schema
213 .get_object_status_by_key(deleted_id, delete_lamport)
214 .unwrap(),
215 Some(objects::Status::Tombstone(TombstoneKind::Deleted)),
216 );
217 assert_eq!(
218 schema
219 .get_object_status_by_key(wrapped_id, wrap_lamport)
220 .unwrap(),
221 Some(objects::Status::Tombstone(TombstoneKind::Wrapped)),
222 );
223
224 assert!(
227 schema
228 .get_object_by_key(deleted_id, delete_lamport)
229 .unwrap()
230 .is_none(),
231 );
232 assert!(
233 schema
234 .get_object_by_key(wrapped_id, wrap_lamport)
235 .unwrap()
236 .is_none(),
237 );
238 }
239
240 #[test]
241 fn restore_writes_one_row_per_object_version() {
242 let dir = tempfile::tempdir().unwrap();
243 let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
244
245 let o1 = Object::with_id_owner_for_testing(ObjectID::from_single_byte(1), SuiAddress::ZERO);
246 let o2 = Object::with_id_owner_for_testing(ObjectID::from_single_byte(2), SuiAddress::ZERO);
247
248 let mut batch = db.batch();
249 Objects.restore(&schema, &o1, &mut batch).unwrap();
250 Objects.restore(&schema, &o2, &mut batch).unwrap();
251 batch.commit().unwrap();
252
253 assert_eq!(
254 schema.get_object_by_key(o1.id(), o1.version()).unwrap(),
255 Some(o1),
256 );
257 assert_eq!(
258 schema.get_object_by_key(o2.id(), o2.version()).unwrap(),
259 Some(o2),
260 );
261 }
262}