sui_rpc_store/indexer/
package_versions.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Sequential pipeline that populates the
5//! [`schema::package_versions`](crate::schema::package_versions)
6//! CF: one row per `(original_package_id, version)` published in
7//! the checkpoint.
8//!
9//! Each row records the storage id and the checkpoint at which the
10//! version was published. As a history-cohort member this CF is both
11//! restored and backfilled: the live-set restore writes its rows via
12//! [`store_restored`](crate::schema::package_versions::store_restored),
13//! leaving the publish checkpoint unset (a restore floor), and the
14//! embedded backfill of `(L, T]` then re-publishes every version
15//! created in that window with its real publish checkpoint, overwriting
16//! the floor. Versions that predate the available window are never
17//! re-published, so they keep their floor and read as having always
18//! existed.
19//!
20//! Pure puts — packages are immutable once written, so a later
21//! publish at the same `(original_id, version)` (which would
22//! itself be a chain-level error) deterministically overwrites
23//! the prior storage id rather than dueling with it. The same
24//! overwrite is what lets the backfill replace a restore floor with
25//! the real publish checkpoint.
26
27use std::sync::Arc;
28
29use async_trait::async_trait;
30use sui_consistent_store::Batch;
31use sui_consistent_store::Restore;
32use sui_indexer_alt_framework::pipeline::Processor;
33use sui_indexer_alt_framework::pipeline::sequential;
34use sui_types::base_types::ObjectID;
35use sui_types::full_checkpoint_content::Checkpoint;
36use sui_types::object::Data;
37use sui_types::object::Object;
38
39use crate::RpcStoreSchema;
40use crate::indexer::Schema;
41use crate::indexer::Store;
42use crate::indexer::checkpoint_output_objects;
43use crate::schema::package_versions;
44
45/// Pipeline marker for `package_versions`.
46pub struct PackageVersions;
47
48pub struct Row {
49    pub original_id: ObjectID,
50    pub version: u64,
51    pub storage_id: ObjectID,
52    pub checkpoint: u64,
53}
54
55#[async_trait]
56impl Processor for PackageVersions {
57    const NAME: &'static str = "package_versions";
58    type Value = Row;
59
60    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Row>> {
61        let cp = checkpoint.summary.sequence_number;
62        let mut rows = Vec::new();
63        for (_, (object, _)) in checkpoint_output_objects(checkpoint)? {
64            if let Data::Package(pkg) = &object.data {
65                rows.push(Row {
66                    original_id: pkg.original_package_id(),
67                    version: pkg.version().value(),
68                    storage_id: pkg.id(),
69                    checkpoint: cp,
70                });
71            }
72        }
73        Ok(rows)
74    }
75}
76
77impl Restore for PackageVersions {
78    type Schema = RpcStoreSchema;
79
80    fn restore(
81        &self,
82        schema: &Self::Schema,
83        object: &Object,
84        batch: &mut Batch,
85    ) -> anyhow::Result<()> {
86        // Packages are immutable, so each published version lives
87        // forever as its own live object. The live object set
88        // therefore contains every package version ever published,
89        // each emitting a single row mapping
90        // `(original_id, version) → storage_id`.
91        let Data::Package(pkg) = &object.data else {
92            return Ok(());
93        };
94        let (key, value) = package_versions::store_restored(
95            pkg.original_package_id(),
96            pkg.version().value(),
97            pkg.id(),
98        );
99        batch.put(&schema.package_versions, &key, &value)?;
100        Ok(())
101    }
102}
103
104#[async_trait]
105impl sequential::Handler for PackageVersions {
106    type Store = Store;
107    type Batch = Vec<Row>;
108
109    fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Row>) {
110        batch.extend(values);
111    }
112
113    async fn commit<'a>(
114        &self,
115        batch: &Self::Batch,
116        conn: &mut sui_consistent_store::Connection<'a, Schema>,
117    ) -> anyhow::Result<usize> {
118        let cf = &conn.store.schema().package_versions;
119        for row in batch {
120            let (k, v) = package_versions::store(
121                row.original_id,
122                row.version,
123                row.storage_id,
124                row.checkpoint,
125            );
126            conn.batch.put(cf, &k, &v)?;
127        }
128        Ok(batch.len())
129    }
130}
131
132#[cfg(test)]
133mod tests {
134    use std::sync::Arc;
135
136    use sui_consistent_store::Db;
137    use sui_consistent_store::DbOptions;
138    use sui_types::base_types::SuiAddress;
139    use sui_types::object::Object;
140    use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
141
142    use super::*;
143
144    #[tokio::test]
145    async fn process_runs_against_synthetic_checkpoint() {
146        let checkpoint = Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
147        let _rows = PackageVersions.process(&checkpoint).await.unwrap();
148    }
149
150    /// Verify the non-package skip branch: ordinary Move objects
151    /// have `Data::Move`, not `Data::Package`, and must not produce
152    /// a `package_versions` row. The happy-path encoding is shared
153    /// with the tip pipeline via [`package_versions::store`] and
154    /// covered there.
155    #[test]
156    fn restore_skips_non_package_objects() {
157        let dir = tempfile::tempdir().unwrap();
158        let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
159
160        let non_pkg =
161            Object::with_id_owner_for_testing(ObjectID::from_single_byte(7), SuiAddress::ZERO);
162        let mut batch = db.batch();
163        PackageVersions
164            .restore(&schema, &non_pkg, &mut batch)
165            .unwrap();
166        batch.commit().unwrap();
167
168        // No row written: an `iter_package_versions` over the
169        // (non-existent) original_id of a non-package returns an
170        // empty iterator.
171        let rows: Vec<_> = schema
172            .iter_package_versions(non_pkg.id())
173            .unwrap()
174            .collect::<Result<Vec<_>, _>>()
175            .unwrap();
176        assert!(rows.is_empty());
177    }
178}