sui_analytics_indexer/package_store/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::num::NonZeroUsize;
5use std::ops::Deref;
6use std::path::Path;
7use std::result::Result as StdResult;
8use std::sync::Arc;
9use std::sync::Mutex;
10
11use async_trait::async_trait;
12use lru::LruCache;
13use move_core_types::account_address::AccountAddress;
14use sui_package_resolver::Package;
15use sui_package_resolver::PackageStore;
16use sui_package_resolver::PackageStoreWithLruCache;
17use sui_package_resolver::Resolver;
18use sui_package_resolver::Result;
19use sui_package_resolver::error::Error as ResolverError;
20use sui_rpc_api::Client;
21use sui_types::SYSTEM_PACKAGE_ADDRESSES;
22use sui_types::base_types::ObjectID;
23use sui_types::object::Data;
24use sui_types::object::Object;
25use thiserror::Error;
26use typed_store::DBMapUtils;
27use typed_store::Map;
28use typed_store::TypedStoreError;
29use typed_store::rocks::DBMap;
30use typed_store::rocks::MetricConf;
31
32const STORE: &str = "RocksDB";
33const MAX_EPOCH_CACHES: usize = 2; // keep at most two epochs in memory
34
35#[derive(Error, Debug)]
36pub enum Error {
37    #[error("{0}")]
38    TypedStore(#[from] TypedStoreError),
39    #[error("Package not found: {0}")]
40    PackageNotFound(AccountAddress),
41}
42
43impl From<Error> for ResolverError {
44    fn from(e: Error) -> Self {
45        ResolverError::Store {
46            store: STORE,
47            error: e.to_string(),
48        }
49    }
50}
51
52#[derive(DBMapUtils)]
53pub struct PackageStoreTables {
54    pub(crate) packages: DBMap<ObjectID, Object>,
55}
56
57impl PackageStoreTables {
58    pub fn new(path: &Path) -> Arc<Self> {
59        Arc::new(Self::open_tables_read_write(
60            path.to_path_buf(),
61            MetricConf::new("package"),
62            None,
63            None,
64        ))
65    }
66
67    fn update(&self, object: &Object) -> StdResult<(), Error> {
68        self.update_batch(std::iter::once(object))
69    }
70
71    fn update_batch<'a, I>(&self, objects: I) -> StdResult<(), Error>
72    where
73        I: IntoIterator<Item = &'a Object>,
74    {
75        let mut batch = self.packages.batch();
76        batch.insert_batch(&self.packages, objects.into_iter().map(|o| (o.id(), o)))?;
77
78        batch.write()?;
79        Ok(())
80    }
81}
82
83#[derive(Clone)]
84pub struct LocalDBPackageStore {
85    tables: Arc<PackageStoreTables>,
86    client: Client,
87}
88
89impl LocalDBPackageStore {
90    pub fn new(path: &Path, rpc_url: &str) -> Self {
91        Self {
92            tables: PackageStoreTables::new(path),
93            client: Client::new(rpc_url).expect("invalid rpc url"),
94        }
95    }
96
97    fn update(&self, object: &Object) -> StdResult<(), Error> {
98        if object.data.try_as_package().is_some() {
99            self.tables.update(object)?;
100        }
101        Ok(())
102    }
103
104    async fn get(&self, id: AccountAddress) -> StdResult<Object, Error> {
105        if let Some(o) = self.tables.packages.get(&ObjectID::from(id))? {
106            return Ok(o);
107        }
108        let o = self
109            .client
110            .clone()
111            .get_object(ObjectID::from(id))
112            .await
113            .map_err(|_| Error::PackageNotFound(id))?;
114        self.update(&o)?;
115        Ok(o)
116    }
117
118    pub async fn get_original_package_id(&self, id: AccountAddress) -> StdResult<ObjectID, Error> {
119        let o = self.get(id).await?;
120        let Data::Package(p) = &o.data else {
121            return Err(Error::TypedStore(TypedStoreError::SerializationError(
122                "not a package".into(),
123            )));
124        };
125        Ok(p.original_package_id())
126    }
127}
128
129#[async_trait]
130impl PackageStore for LocalDBPackageStore {
131    async fn fetch(&self, id: AccountAddress) -> Result<Arc<Package>> {
132        let o = self.get(id).await?;
133        Ok(Arc::new(Package::read_from_object(&o)?))
134    }
135}
136
137// A thin new‑type wrapper so we can hand an `Arc` to `Resolver`
138#[derive(Clone)]
139pub struct GlobalArcStore(pub Arc<PackageStoreWithLruCache<LocalDBPackageStore>>);
140
141#[async_trait]
142impl PackageStore for GlobalArcStore {
143    async fn fetch(&self, id: AccountAddress) -> Result<Arc<Package>> {
144        self.0.fetch(id).await
145    }
146}
147
148impl Deref for GlobalArcStore {
149    type Target = PackageStoreWithLruCache<LocalDBPackageStore>;
150    fn deref(&self) -> &Self::Target {
151        &self.0
152    }
153}
154
155// Multi-level cache. System packages can change across epochs while non-system packages are
156// immutable and can be cached across epochs. This impl assumes the system is at most working on
157// 2 epochs at a time (at the epoch boundary). When the indexer begins processing a new epoch it
158// will create a new PackageStoreWithLruCache for that epoch and the oldest epoch in the cache
159// will be dropped.
160#[derive(Clone)]
161pub struct CompositeStore {
162    pub epoch: u64,
163    pub global: Arc<PackageStoreWithLruCache<LocalDBPackageStore>>,
164    pub base: LocalDBPackageStore,
165    pub epochs: Arc<Mutex<LruCache<u64, Arc<PackageStoreWithLruCache<LocalDBPackageStore>>>>>,
166}
167
168impl CompositeStore {
169    /// Lazily obtain (or create) the cache for the current epoch.
170    fn epoch_cache(&self) -> Arc<PackageStoreWithLruCache<LocalDBPackageStore>> {
171        let mut caches = self.epochs.lock().unwrap();
172        if let Some(cache) = caches.get(&self.epoch) {
173            return cache.clone();
174        }
175        // Not present — create a fresh cache backed by the same LocalDB store.
176        let cache = Arc::new(PackageStoreWithLruCache::new(self.base.clone()));
177        caches.put(self.epoch, cache.clone());
178        cache
179    }
180}
181
182#[async_trait]
183impl PackageStore for CompositeStore {
184    async fn fetch(&self, id: AccountAddress) -> Result<Arc<Package>> {
185        if SYSTEM_PACKAGE_ADDRESSES.contains(&id) {
186            let cache = self.epoch_cache();
187            return cache.fetch(id).await;
188        }
189        self.global.fetch(id).await
190    }
191}
192
193// Top‑level cache façade
194pub struct PackageCache {
195    pub base_store: LocalDBPackageStore,
196    pub global_cache: Arc<PackageStoreWithLruCache<LocalDBPackageStore>>,
197    pub epochs: Arc<Mutex<LruCache<u64, Arc<PackageStoreWithLruCache<LocalDBPackageStore>>>>>,
198    pub resolver: Resolver<GlobalArcStore>,
199}
200
201impl PackageCache {
202    pub fn new(path: &Path, rpc_url: &str) -> Self {
203        let base_store = LocalDBPackageStore::new(path, rpc_url);
204        let global_cache = Arc::new(PackageStoreWithLruCache::new(base_store.clone()));
205        Self {
206            resolver: Resolver::new(GlobalArcStore(global_cache.clone())),
207            base_store,
208            global_cache,
209            epochs: Arc::new(Mutex::new(LruCache::new(
210                NonZeroUsize::new(MAX_EPOCH_CACHES).unwrap(),
211            ))),
212        }
213    }
214
215    pub fn resolver_for_epoch(&self, epoch: u64) -> Resolver<CompositeStore> {
216        Resolver::new(CompositeStore {
217            epoch,
218            global: self.global_cache.clone(),
219            base: self.base_store.clone(),
220            epochs: self.epochs.clone(),
221        })
222    }
223
224    #[cfg(not(test))]
225    pub async fn get_original_package_id(&self, id: AccountAddress) -> Result<ObjectID> {
226        Ok(self.base_store.get_original_package_id(id).await?)
227    }
228
229    #[cfg(test)]
230    pub async fn get_original_package_id(&self, id: AccountAddress) -> Result<ObjectID> {
231        Ok(id.into())
232    }
233}