1use bytes::Buf;
19use bytes::BufMut;
20use sui_consistent_store::Decode;
21use sui_consistent_store::Encode;
22use sui_consistent_store::Iter;
23use sui_consistent_store::Protobuf;
24use sui_consistent_store::error::DecodeError;
25use sui_consistent_store::error::EncodeError;
26use sui_consistent_store::error::Error;
27use sui_consistent_store::reader::Reader;
28use sui_types::base_types::ObjectID;
29
30use crate::proto::PackageVersionInfo;
31
32pub const NAME: &str = "package_versions";
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
35pub struct Key {
36 pub original_id: ObjectID,
37 pub version: u64,
38}
39
40pub type Value = Protobuf<PackageVersionInfo>;
41
42impl Encode for Key {
43 fn encode_into<B: BufMut>(&self, buf: &mut B) -> Result<(), EncodeError> {
44 buf.put_slice(self.original_id.as_ref());
45 buf.put_slice(&self.version.to_be_bytes());
46 Ok(())
47 }
48}
49
50impl Decode for Key {
51 fn decode<B: Buf>(buf: &mut B) -> Result<Self, DecodeError> {
52 if buf.remaining() != ObjectID::LENGTH + 8 {
53 return Err(DecodeError::msg(format!(
54 "expected {} bytes for {NAME} Key, got {}",
55 ObjectID::LENGTH + 8,
56 buf.remaining(),
57 )));
58 }
59 let mut id = [0u8; ObjectID::LENGTH];
60 buf.copy_to_slice(&mut id);
61 let version = buf.get_u64();
62 Ok(Key {
63 original_id: ObjectID::new(id),
64 version,
65 })
66 }
67}
68
69pub fn options(resolver: &sui_consistent_store::CfOptionsResolver) -> rocksdb::Options {
70 resolver.options(NAME)
71}
72
73pub fn store(
78 original_id: ObjectID,
79 version: u64,
80 storage_id: ObjectID,
81 checkpoint: u64,
82) -> (Key, Value) {
83 (
84 Key {
85 original_id,
86 version,
87 },
88 Protobuf(PackageVersionInfo {
89 storage_id: storage_id.to_vec().into(),
90 checkpoint: Some(checkpoint),
91 }),
92 )
93}
94
95pub fn store_restored(original_id: ObjectID, version: u64, storage_id: ObjectID) -> (Key, Value) {
101 (
102 Key {
103 original_id,
104 version,
105 },
106 Protobuf(PackageVersionInfo {
107 storage_id: storage_id.to_vec().into(),
108 checkpoint: None,
109 }),
110 )
111}
112
113pub struct OriginalIdPrefix(pub ObjectID);
118
119impl Encode for OriginalIdPrefix {
120 fn encode_into<B: BufMut>(&self, buf: &mut B) -> Result<(), EncodeError> {
121 buf.put_slice(self.0.as_ref());
122 Ok(())
123 }
124}
125
126fn decode_storage_id(bytes: &[u8]) -> Result<ObjectID, DecodeError> {
129 let array: [u8; ObjectID::LENGTH] = bytes.try_into().map_err(|_| {
130 DecodeError::msg(format!(
131 "expected {} bytes for storage_id, got {}",
132 ObjectID::LENGTH,
133 bytes.len(),
134 ))
135 })?;
136 Ok(ObjectID::new(array))
137}
138
139impl<R: Reader> super::RpcStoreSchema<R> {
140 pub fn get_package_storage_id(
143 &self,
144 original_id: ObjectID,
145 version: u64,
146 ) -> Result<Option<ObjectID>, Error> {
147 let Some(stored) = self.package_versions.get(&Key {
148 original_id,
149 version,
150 })?
151 else {
152 return Ok(None);
153 };
154 Ok(Some(decode_storage_id(&stored.into_inner().storage_id)?))
155 }
156
157 pub fn iter_package_versions(
160 &self,
161 original_id: ObjectID,
162 ) -> Result<Iter<'_, Key, Value>, Error> {
163 self.package_versions
164 .iter_prefix(&OriginalIdPrefix(original_id))
165 }
166
167 pub fn get_package_at_checkpoint(
182 &self,
183 original_id: ObjectID,
184 checkpoint: u64,
185 ) -> Result<Option<(u64, ObjectID)>, Error> {
186 let mut latest: Option<(u64, ObjectID)> = None;
187 for row in self.iter_package_versions(original_id)? {
188 let (key, value) = row?;
189 let info = value.into_inner();
190 let existed = match info.checkpoint {
191 None => true,
194 Some(published) => published <= checkpoint,
195 };
196 if !existed {
197 continue;
198 }
199 latest = Some((key.version, decode_storage_id(&info.storage_id)?));
202 }
203 Ok(latest)
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use sui_consistent_store::Db;
210 use sui_consistent_store::DbOptions;
211
212 use super::*;
213 use crate::RpcStoreSchema;
214
215 fn fresh_db() -> (tempfile::TempDir, sui_consistent_store::Db, RpcStoreSchema) {
216 let dir = tempfile::tempdir().unwrap();
217 let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
218 (dir, db, schema)
219 }
220
221 #[test]
222 fn get_returns_none_for_unknown_version() {
223 let (_dir, _db, schema) = fresh_db();
224 let original = ObjectID::random();
225 assert!(
226 schema
227 .get_package_storage_id(original, 1)
228 .unwrap()
229 .is_none()
230 );
231 }
232
233 #[test]
234 fn store_then_get_round_trips() {
235 let (_dir, db, schema) = fresh_db();
236 let original = ObjectID::random();
237 let storage = ObjectID::random();
238
239 let (k, v) = store(original, 3, storage, 100);
240 let mut batch = db.batch();
241 batch.put(&schema.package_versions, &k, &v).unwrap();
242 batch.commit().unwrap();
243
244 assert_eq!(
245 schema.get_package_storage_id(original, 3).unwrap(),
246 Some(storage),
247 );
248 }
249
250 #[test]
251 fn iter_walks_versions_for_one_package() {
252 let (_dir, db, schema) = fresh_db();
253 let original = ObjectID::random();
254 let other = ObjectID::random();
255
256 let mut batch = db.batch();
257 for version in [1u64, 2, 5] {
259 let (k, v) = store(original, version, ObjectID::random(), version);
260 batch.put(&schema.package_versions, &k, &v).unwrap();
261 }
262 let (k, v) = store(other, 1, ObjectID::random(), 1);
264 batch.put(&schema.package_versions, &k, &v).unwrap();
265 batch.commit().unwrap();
266
267 let versions: Vec<u64> = schema
268 .iter_package_versions(original)
269 .unwrap()
270 .map(|res| res.unwrap().0.version)
271 .collect();
272 assert_eq!(versions, vec![1, 2, 5]);
273 }
274
275 #[test]
276 fn get_package_at_checkpoint_resolves_latest_in_window() {
277 let (_dir, db, schema) = fresh_db();
278 let original = ObjectID::random();
279 let (s1, s2, s3) = (ObjectID::random(), ObjectID::random(), ObjectID::random());
280
281 let mut batch = db.batch();
282 let (k, v) = store_restored(original, 1, s1);
285 batch.put(&schema.package_versions, &k, &v).unwrap();
286 let (k, v) = store(original, 2, s2, 10);
287 batch.put(&schema.package_versions, &k, &v).unwrap();
288 let (k, v) = store(original, 3, s3, 20);
289 batch.put(&schema.package_versions, &k, &v).unwrap();
290 batch.commit().unwrap();
291
292 assert_eq!(
294 schema.get_package_at_checkpoint(original, 5).unwrap(),
295 Some((1, s1)),
296 );
297 assert_eq!(
299 schema.get_package_at_checkpoint(original, 10).unwrap(),
300 Some((2, s2)),
301 );
302 assert_eq!(
303 schema.get_package_at_checkpoint(original, 15).unwrap(),
304 Some((2, s2)),
305 );
306 assert_eq!(
308 schema.get_package_at_checkpoint(original, 20).unwrap(),
309 Some((3, s3)),
310 );
311 assert_eq!(
312 schema.get_package_at_checkpoint(original, 9_999).unwrap(),
313 Some((3, s3)),
314 );
315 }
316
317 #[test]
318 fn get_package_at_checkpoint_returns_none_before_first_publish() {
319 let (_dir, db, schema) = fresh_db();
320 let original = ObjectID::random();
321
322 let mut batch = db.batch();
323 let (k, v) = store(original, 1, ObjectID::random(), 50);
324 batch.put(&schema.package_versions, &k, &v).unwrap();
325 batch.commit().unwrap();
326
327 assert_eq!(
330 schema.get_package_at_checkpoint(original, 49).unwrap(),
331 None,
332 );
333 assert!(
334 schema
335 .get_package_at_checkpoint(original, 50)
336 .unwrap()
337 .is_some()
338 );
339 assert_eq!(
341 schema
342 .get_package_at_checkpoint(ObjectID::random(), 100)
343 .unwrap(),
344 None,
345 );
346 }
347}