sui_rpc_store/indexer/
object_by_owner.rs1use std::sync::Arc;
18
19use async_trait::async_trait;
20use sui_consistent_store::Batch;
21use sui_consistent_store::Restore;
22use sui_indexer_alt_framework::pipeline::Processor;
23use sui_indexer_alt_framework::pipeline::sequential;
24use sui_types::full_checkpoint_content::Checkpoint;
25use sui_types::object::Object;
26
27use crate::RpcStoreSchema;
28use crate::indexer::Schema;
29use crate::indexer::Store;
30use crate::indexer::checkpoint_input_objects;
31use crate::indexer::checkpoint_output_objects;
32use crate::schema::object_by_owner;
33use crate::schema::primitives::U64Varint;
34
35pub struct ObjectByOwner;
37
38pub enum Row {
39 Delete(object_by_owner::Key),
40 Put(object_by_owner::Key, U64Varint),
41}
42
43#[async_trait]
44impl Processor for ObjectByOwner {
45 const NAME: &'static str = "object_by_owner";
46 type Value = Row;
47
48 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Row>> {
49 let mut rows = Vec::new();
50 for (_, (input, _)) in checkpoint_input_objects(checkpoint)? {
51 if let Some((key, _)) = object_by_owner::store(input) {
52 rows.push(Row::Delete(key));
53 }
54 }
55 for (_, (output, _)) in checkpoint_output_objects(checkpoint)? {
56 if let Some((key, version)) = object_by_owner::store(output) {
57 rows.push(Row::Put(key, version));
58 }
59 }
60 Ok(rows)
61 }
62}
63
64impl Restore for ObjectByOwner {
65 type Schema = RpcStoreSchema;
66
67 fn restore(
68 &self,
69 schema: &Self::Schema,
70 object: &Object,
71 batch: &mut Batch,
72 ) -> anyhow::Result<()> {
73 if let Some((key, value)) = object_by_owner::store(object) {
74 batch.put(&schema.object_by_owner, &key, &value)?;
75 }
76 Ok(())
77 }
78}
79
80#[async_trait]
81impl sequential::Handler for ObjectByOwner {
82 type Store = Store;
83 type Batch = Vec<Row>;
84
85 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Row>) {
86 batch.extend(values);
87 }
88
89 async fn commit<'a>(
90 &self,
91 batch: &Self::Batch,
92 conn: &mut sui_consistent_store::Connection<'a, Schema>,
93 ) -> anyhow::Result<usize> {
94 let cf = &conn.store.schema().object_by_owner;
95 for row in batch {
96 match row {
97 Row::Delete(key) => {
98 conn.batch.delete(cf, key)?;
99 }
100 Row::Put(key, value) => {
101 conn.batch.put(cf, key, value)?;
102 }
103 }
104 }
105 Ok(batch.len())
106 }
107}
108
109#[cfg(test)]
110mod tests {
111 use std::sync::Arc;
112
113 use sui_consistent_store::Db;
114 use sui_consistent_store::DbOptions;
115 use sui_types::base_types::ObjectID;
116 use sui_types::base_types::SuiAddress;
117 use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
118
119 use super::*;
120
121 #[tokio::test]
122 async fn process_runs_against_synthetic_checkpoint() {
123 let checkpoint = Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
124 let _rows = ObjectByOwner.process(&checkpoint).await.unwrap();
125 }
126
127 #[test]
128 fn restore_indexes_address_owned_object() {
129 let dir = tempfile::tempdir().unwrap();
130 let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
131
132 let owner = SuiAddress::ZERO;
133 let object = Object::with_id_owner_for_testing(ObjectID::from_single_byte(3), owner);
134
135 let mut batch = db.batch();
136 ObjectByOwner.restore(&schema, &object, &mut batch).unwrap();
137 batch.commit().unwrap();
138
139 let rows: Vec<_> = schema
140 .iter_objects_owned_by_address(owner)
141 .unwrap()
142 .collect::<Result<Vec<_>, _>>()
143 .unwrap();
144 assert_eq!(rows.len(), 1);
145 assert_eq!(rows[0].0.object_id, object.id());
146 }
147}