1use bytes::Buf;
19use bytes::BufMut;
20use sui_consistent_store::Decode;
21use sui_consistent_store::Encode;
22use sui_consistent_store::Protobuf;
23use sui_consistent_store::error::DecodeError;
24use sui_consistent_store::error::EncodeError;
25use sui_consistent_store::error::Error;
26use sui_consistent_store::reader::Reader;
27use sui_types::base_types::ObjectID;
28use sui_types::base_types::SequenceNumber;
29use sui_types::object::Object;
30
31use crate::proto::StoredObject;
32use crate::proto::StoredObjectTombstone;
33use crate::proto::StoredObjectTombstoneKind;
34use crate::proto::stored_object;
35
36pub const NAME: &str = "objects";
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
42pub struct Key {
43 pub id: ObjectID,
44 pub version: SequenceNumber,
45}
46
47pub type Value = Protobuf<StoredObject>;
48
49impl Encode for Key {
50 fn encode_into<B: BufMut>(&self, buf: &mut B) -> Result<(), EncodeError> {
51 buf.put_slice(self.id.as_ref());
52 buf.put_slice(&self.version.value().to_be_bytes());
53 Ok(())
54 }
55}
56
57impl Decode for Key {
58 fn decode<B: Buf>(buf: &mut B) -> Result<Self, DecodeError> {
59 if buf.remaining() != ObjectID::LENGTH + 8 {
60 return Err(DecodeError::msg(format!(
61 "expected {} bytes for {NAME} Key, got {}",
62 ObjectID::LENGTH + 8,
63 buf.remaining(),
64 )));
65 }
66 let mut id_bytes = [0u8; ObjectID::LENGTH];
67 buf.copy_to_slice(&mut id_bytes);
68 let version = SequenceNumber::from_u64(buf.get_u64());
69 Ok(Key {
70 id: ObjectID::new(id_bytes),
71 version,
72 })
73 }
74}
75
76pub fn options(resolver: &sui_consistent_store::CfOptionsResolver) -> rocksdb::Options {
77 resolver.options(NAME)
78}
79
80pub struct ObjectIdPrefix(pub ObjectID);
84
85impl Encode for ObjectIdPrefix {
86 fn encode_into<B: BufMut>(&self, buf: &mut B) -> Result<(), EncodeError> {
87 buf.put_slice(self.0.as_ref());
88 Ok(())
89 }
90}
91
92#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
97pub enum TombstoneKind {
98 Deleted,
99 Wrapped,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq)]
104pub enum Status {
105 Live(Object),
107 Tombstone(TombstoneKind),
110}
111
112impl Status {
113 pub fn into_live(self) -> Option<Object> {
115 match self {
116 Status::Live(object) => Some(object),
117 Status::Tombstone(_) => None,
118 }
119 }
120}
121
122pub fn store(object: &Object) -> Value {
128 let bcs = bcs::to_bytes(object).expect("bcs encode Object");
129 Protobuf(StoredObject {
130 kind: Some(stored_object::Kind::Bcs(bcs.into())),
131 })
132}
133
134pub fn tombstone(kind: TombstoneKind) -> Value {
137 let proto_kind = match kind {
138 TombstoneKind::Deleted => StoredObjectTombstoneKind::Deleted,
139 TombstoneKind::Wrapped => StoredObjectTombstoneKind::Wrapped,
140 };
141 Protobuf(StoredObject {
142 kind: Some(stored_object::Kind::Tombstone(StoredObjectTombstone {
143 kind: proto_kind as i32,
144 })),
145 })
146}
147
148fn decode(stored: StoredObject) -> Result<Status, Error> {
150 match stored.kind {
151 Some(stored_object::Kind::Bcs(bcs)) => {
152 let object: Object = bcs::from_bytes(&bcs)
153 .map_err(|e| DecodeError::with_source("bcs decode Object", e))?;
154 Ok(Status::Live(object))
155 }
156 Some(stored_object::Kind::Tombstone(t)) => {
157 let kind = match StoredObjectTombstoneKind::try_from(t.kind) {
158 Ok(StoredObjectTombstoneKind::Deleted) => TombstoneKind::Deleted,
159 Ok(StoredObjectTombstoneKind::Wrapped) => TombstoneKind::Wrapped,
160 Ok(StoredObjectTombstoneKind::Unspecified) | Err(_) => {
161 return Err(DecodeError::msg(format!(
162 "unrecognised tombstone kind: {}",
163 t.kind,
164 ))
165 .into());
166 }
167 };
168 Ok(Status::Tombstone(kind))
169 }
170 None => Err(DecodeError::msg("StoredObject row missing kind").into()),
171 }
172}
173
174impl<R: Reader> super::RpcStoreSchema<R> {
175 pub fn get_object_by_key(
185 &self,
186 id: ObjectID,
187 version: SequenceNumber,
188 ) -> Result<Option<Object>, Error> {
189 Ok(self
190 .get_object_status_by_key(id, version)?
191 .and_then(Status::into_live))
192 }
193
194 pub fn get_object_status_by_key(
199 &self,
200 id: ObjectID,
201 version: SequenceNumber,
202 ) -> Result<Option<Status>, Error> {
203 let Some(stored) = self.objects.get(&Key { id, version })? else {
204 return Ok(None);
205 };
206 Ok(Some(decode(stored.into_inner())?))
207 }
208
209 pub fn get_object(&self, id: ObjectID) -> Result<Option<Object>, Error> {
218 let Some(row) = self.objects.iter_rev_prefix(&ObjectIdPrefix(id))?.next() else {
219 return Ok(None);
220 };
221 let (_key, value) = row?;
222 Ok(decode(value.into_inner())?.into_live())
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use sui_consistent_store::Db;
229 use sui_consistent_store::DbOptions;
230 use sui_types::base_types::SuiAddress;
231
232 use super::*;
233 use crate::RpcStoreSchema;
234
235 fn fresh_db() -> (tempfile::TempDir, sui_consistent_store::Db, RpcStoreSchema) {
236 let dir = tempfile::tempdir().unwrap();
237 let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
238 (dir, db, schema)
239 }
240
241 fn dummy_object(id: ObjectID) -> Object {
242 Object::with_id_owner_for_testing(id, SuiAddress::ZERO)
243 }
244
245 #[test]
246 fn get_returns_none_for_unknown_key() {
247 let (_dir, _db, schema) = fresh_db();
248 let id = ObjectID::random();
249 assert!(
250 schema
251 .get_object_by_key(id, SequenceNumber::from_u64(1))
252 .unwrap()
253 .is_none()
254 );
255 }
256
257 #[test]
258 fn store_then_get_round_trips() {
259 let (_dir, db, schema) = fresh_db();
260 let id = ObjectID::random();
261 let object = dummy_object(id);
262 let version = object.version();
263
264 let mut batch = db.batch();
265 batch
266 .put(&schema.objects, &Key { id, version }, &store(&object))
267 .unwrap();
268 batch.commit().unwrap();
269
270 let read = schema
271 .get_object_by_key(id, version)
272 .unwrap()
273 .expect("object present");
274 assert_eq!(read, object);
275 }
276
277 #[test]
278 fn tombstone_round_trips_with_kind() {
279 let (_dir, db, schema) = fresh_db();
280 let id = ObjectID::random();
281 let v_del = SequenceNumber::from_u64(5);
282 let v_wrap = SequenceNumber::from_u64(9);
283
284 let mut batch = db.batch();
285 batch
286 .put(
287 &schema.objects,
288 &Key { id, version: v_del },
289 &tombstone(TombstoneKind::Deleted),
290 )
291 .unwrap();
292 batch
293 .put(
294 &schema.objects,
295 &Key {
296 id,
297 version: v_wrap,
298 },
299 &tombstone(TombstoneKind::Wrapped),
300 )
301 .unwrap();
302 batch.commit().unwrap();
303
304 assert_eq!(
305 schema.get_object_status_by_key(id, v_del).unwrap(),
306 Some(Status::Tombstone(TombstoneKind::Deleted)),
307 );
308 assert_eq!(
309 schema.get_object_status_by_key(id, v_wrap).unwrap(),
310 Some(Status::Tombstone(TombstoneKind::Wrapped)),
311 );
312 assert!(schema.get_object_by_key(id, v_del).unwrap().is_none());
314 assert!(schema.get_object_by_key(id, v_wrap).unwrap().is_none());
315 }
316
317 #[test]
318 fn get_object_status_distinguishes_missing_from_tombstone() {
319 let (_dir, db, schema) = fresh_db();
320 let id = ObjectID::random();
321 let version = SequenceNumber::from_u64(3);
322
323 assert!(
325 schema
326 .get_object_status_by_key(id, version)
327 .unwrap()
328 .is_none()
329 );
330
331 let mut batch = db.batch();
334 batch
335 .put(
336 &schema.objects,
337 &Key { id, version },
338 &tombstone(TombstoneKind::Deleted),
339 )
340 .unwrap();
341 batch.commit().unwrap();
342 assert_eq!(
343 schema.get_object_status_by_key(id, version).unwrap(),
344 Some(Status::Tombstone(TombstoneKind::Deleted)),
345 );
346 }
347
348 #[test]
349 fn distinct_versions_of_same_id_are_isolated() {
350 let (_dir, db, schema) = fresh_db();
351 let id = ObjectID::random();
352 let v1 = SequenceNumber::from_u64(1);
353 let v2 = SequenceNumber::from_u64(2);
354 let o1 = dummy_object(id);
355 let o2 = dummy_object(id);
356
357 let mut batch = db.batch();
358 batch
359 .put(&schema.objects, &Key { id, version: v1 }, &store(&o1))
360 .unwrap();
361 batch
362 .put(&schema.objects, &Key { id, version: v2 }, &store(&o2))
363 .unwrap();
364 batch.commit().unwrap();
365
366 assert_eq!(schema.get_object_by_key(id, v1).unwrap().unwrap(), o1,);
367 assert_eq!(schema.get_object_by_key(id, v2).unwrap().unwrap(), o2,);
368 }
369
370 #[test]
371 fn get_object_returns_the_latest_live_version() {
372 let (_dir, db, schema) = fresh_db();
373 let id = ObjectID::random();
374 let object = dummy_object(id);
375 let version = object.version();
376
377 let mut batch = db.batch();
378 batch
379 .put(&schema.objects, &Key { id, version }, &store(&object))
380 .unwrap();
381 batch.commit().unwrap();
382
383 assert_eq!(schema.get_object(id).unwrap(), Some(object));
384 }
385
386 #[test]
387 fn get_object_returns_none_when_latest_is_tombstone() {
388 let (_dir, db, schema) = fresh_db();
389 let id = ObjectID::random();
390 let live = dummy_object(id);
391 let live_version = SequenceNumber::from_u64(1);
392 let tombstone_version = SequenceNumber::from_u64(5);
393
394 let mut batch = db.batch();
395 batch
398 .put(
399 &schema.objects,
400 &Key {
401 id,
402 version: live_version,
403 },
404 &store(&live),
405 )
406 .unwrap();
407 batch
408 .put(
409 &schema.objects,
410 &Key {
411 id,
412 version: tombstone_version,
413 },
414 &tombstone(TombstoneKind::Deleted),
415 )
416 .unwrap();
417 batch.commit().unwrap();
418
419 assert!(schema.get_object(id).unwrap().is_none());
423 }
424
425 #[test]
426 fn get_object_isolates_objects_by_id() {
427 let (_dir, db, schema) = fresh_db();
428 let a = ObjectID::from_single_byte(1);
429 let b = ObjectID::from_single_byte(2);
430 let oa = dummy_object(a);
431 let ob = dummy_object(b);
432
433 let mut batch = db.batch();
434 batch
435 .put(
436 &schema.objects,
437 &Key {
438 id: a,
439 version: SequenceNumber::from_u64(9),
440 },
441 &store(&oa),
442 )
443 .unwrap();
444 batch
445 .put(
446 &schema.objects,
447 &Key {
448 id: b,
449 version: SequenceNumber::from_u64(4),
450 },
451 &store(&ob),
452 )
453 .unwrap();
454 batch.commit().unwrap();
455
456 assert_eq!(schema.get_object(a).unwrap(), Some(oa));
458 assert_eq!(schema.get_object(b).unwrap(), Some(ob));
459 }
460}