sui_rpc_store/indexer/
object_by_owner.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Sequential pipeline that populates the
5//! [`schema::object_by_owner`](crate::schema::object_by_owner)
6//! index.
7//!
8//! Like the other live-set-bounded indexes, the pipeline reads the
9//! checkpoint as a diff: inputs emit a
10//! `Delete` keyed by the *prior* `(kind, owner, type, balance,
11//! id)`, outputs emit a `Put` keyed by the *posterior*
12//! `(kind, owner, type, balance, id)`. For an object that was
13//! merely modified the two rows land at the same key only if the
14//! key components didn't change; otherwise the index correctly
15//! moves the row from the old key to the new one.
16
17use std::sync::Arc;
18
19use async_trait::async_trait;
20use sui_consistent_store::Batch;
21use sui_consistent_store::Restore;
22use sui_indexer_alt_framework::pipeline::Processor;
23use sui_indexer_alt_framework::pipeline::sequential;
24use sui_types::full_checkpoint_content::Checkpoint;
25use sui_types::object::Object;
26
27use crate::RpcStoreSchema;
28use crate::indexer::Schema;
29use crate::indexer::Store;
30use crate::indexer::checkpoint_input_objects;
31use crate::indexer::checkpoint_output_objects;
32use crate::schema::object_by_owner;
33use crate::schema::primitives::U64Varint;
34
35/// Pipeline marker for `object_by_owner`.
36pub struct ObjectByOwner;
37
38pub enum Row {
39    Delete(object_by_owner::Key),
40    Put(object_by_owner::Key, U64Varint),
41}
42
43#[async_trait]
44impl Processor for ObjectByOwner {
45    const NAME: &'static str = "object_by_owner";
46    type Value = Row;
47
48    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Row>> {
49        let mut rows = Vec::new();
50        for (_, (input, _)) in checkpoint_input_objects(checkpoint)? {
51            if let Some((key, _)) = object_by_owner::store(input) {
52                rows.push(Row::Delete(key));
53            }
54        }
55        for (_, (output, _)) in checkpoint_output_objects(checkpoint)? {
56            if let Some((key, version)) = object_by_owner::store(output) {
57                rows.push(Row::Put(key, version));
58            }
59        }
60        Ok(rows)
61    }
62}
63
64impl Restore for ObjectByOwner {
65    type Schema = RpcStoreSchema;
66
67    fn restore(
68        &self,
69        schema: &Self::Schema,
70        object: &Object,
71        batch: &mut Batch,
72    ) -> anyhow::Result<()> {
73        if let Some((key, value)) = object_by_owner::store(object) {
74            batch.put(&schema.object_by_owner, &key, &value)?;
75        }
76        Ok(())
77    }
78}
79
80#[async_trait]
81impl sequential::Handler for ObjectByOwner {
82    type Store = Store;
83    type Batch = Vec<Row>;
84
85    fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Row>) {
86        batch.extend(values);
87    }
88
89    async fn commit<'a>(
90        &self,
91        batch: &Self::Batch,
92        conn: &mut sui_consistent_store::Connection<'a, Schema>,
93    ) -> anyhow::Result<usize> {
94        let cf = &conn.store.schema().object_by_owner;
95        for row in batch {
96            match row {
97                Row::Delete(key) => {
98                    conn.batch.delete(cf, key)?;
99                }
100                Row::Put(key, value) => {
101                    conn.batch.put(cf, key, value)?;
102                }
103            }
104        }
105        Ok(batch.len())
106    }
107}
108
109#[cfg(test)]
110mod tests {
111    use std::sync::Arc;
112
113    use sui_consistent_store::Db;
114    use sui_consistent_store::DbOptions;
115    use sui_types::base_types::ObjectID;
116    use sui_types::base_types::SuiAddress;
117    use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
118
119    use super::*;
120
121    #[tokio::test]
122    async fn process_runs_against_synthetic_checkpoint() {
123        let checkpoint = Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
124        let _rows = ObjectByOwner.process(&checkpoint).await.unwrap();
125    }
126
127    #[test]
128    fn restore_indexes_address_owned_object() {
129        let dir = tempfile::tempdir().unwrap();
130        let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
131
132        let owner = SuiAddress::ZERO;
133        let object = Object::with_id_owner_for_testing(ObjectID::from_single_byte(3), owner);
134
135        let mut batch = db.batch();
136        ObjectByOwner.restore(&schema, &object, &mut batch).unwrap();
137        batch.commit().unwrap();
138
139        let rows: Vec<_> = schema
140            .iter_objects_owned_by_address(owner)
141            .unwrap()
142            .collect::<Result<Vec<_>, _>>()
143            .unwrap();
144        assert_eq!(rows.len(), 1);
145        assert_eq!(rows[0].0.object_id, object.id());
146    }
147}