sui_rpc_store/indexer/
package_versions.rs1use std::sync::Arc;
28
29use async_trait::async_trait;
30use sui_consistent_store::Batch;
31use sui_consistent_store::Restore;
32use sui_indexer_alt_framework::pipeline::Processor;
33use sui_indexer_alt_framework::pipeline::sequential;
34use sui_types::base_types::ObjectID;
35use sui_types::full_checkpoint_content::Checkpoint;
36use sui_types::object::Data;
37use sui_types::object::Object;
38
39use crate::RpcStoreSchema;
40use crate::indexer::Schema;
41use crate::indexer::Store;
42use crate::indexer::checkpoint_output_objects;
43use crate::schema::package_versions;
44
45pub struct PackageVersions;
47
48pub struct Row {
49 pub original_id: ObjectID,
50 pub version: u64,
51 pub storage_id: ObjectID,
52 pub checkpoint: u64,
53}
54
55#[async_trait]
56impl Processor for PackageVersions {
57 const NAME: &'static str = "package_versions";
58 type Value = Row;
59
60 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Row>> {
61 let cp = checkpoint.summary.sequence_number;
62 let mut rows = Vec::new();
63 for (_, (object, _)) in checkpoint_output_objects(checkpoint)? {
64 if let Data::Package(pkg) = &object.data {
65 rows.push(Row {
66 original_id: pkg.original_package_id(),
67 version: pkg.version().value(),
68 storage_id: pkg.id(),
69 checkpoint: cp,
70 });
71 }
72 }
73 Ok(rows)
74 }
75}
76
77impl Restore for PackageVersions {
78 type Schema = RpcStoreSchema;
79
80 fn restore(
81 &self,
82 schema: &Self::Schema,
83 object: &Object,
84 batch: &mut Batch,
85 ) -> anyhow::Result<()> {
86 let Data::Package(pkg) = &object.data else {
92 return Ok(());
93 };
94 let (key, value) = package_versions::store_restored(
95 pkg.original_package_id(),
96 pkg.version().value(),
97 pkg.id(),
98 );
99 batch.put(&schema.package_versions, &key, &value)?;
100 Ok(())
101 }
102}
103
104#[async_trait]
105impl sequential::Handler for PackageVersions {
106 type Store = Store;
107 type Batch = Vec<Row>;
108
109 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Row>) {
110 batch.extend(values);
111 }
112
113 async fn commit<'a>(
114 &self,
115 batch: &Self::Batch,
116 conn: &mut sui_consistent_store::Connection<'a, Schema>,
117 ) -> anyhow::Result<usize> {
118 let cf = &conn.store.schema().package_versions;
119 for row in batch {
120 let (k, v) = package_versions::store(
121 row.original_id,
122 row.version,
123 row.storage_id,
124 row.checkpoint,
125 );
126 conn.batch.put(cf, &k, &v)?;
127 }
128 Ok(batch.len())
129 }
130}
131
132#[cfg(test)]
133mod tests {
134 use std::sync::Arc;
135
136 use sui_consistent_store::Db;
137 use sui_consistent_store::DbOptions;
138 use sui_types::base_types::SuiAddress;
139 use sui_types::object::Object;
140 use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
141
142 use super::*;
143
144 #[tokio::test]
145 async fn process_runs_against_synthetic_checkpoint() {
146 let checkpoint = Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
147 let _rows = PackageVersions.process(&checkpoint).await.unwrap();
148 }
149
150 #[test]
156 fn restore_skips_non_package_objects() {
157 let dir = tempfile::tempdir().unwrap();
158 let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
159
160 let non_pkg =
161 Object::with_id_owner_for_testing(ObjectID::from_single_byte(7), SuiAddress::ZERO);
162 let mut batch = db.batch();
163 PackageVersions
164 .restore(&schema, &non_pkg, &mut batch)
165 .unwrap();
166 batch.commit().unwrap();
167
168 let rows: Vec<_> = schema
172 .iter_package_versions(non_pkg.id())
173 .unwrap()
174 .collect::<Result<Vec<_>, _>>()
175 .unwrap();
176 assert!(rows.is_empty());
177 }
178}