sui_analytics_indexer/package_store/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use async_trait::async_trait;
5use cache_coordinator::CacheReadyCoordinator;
6use lru::LruCache;
7use move_core_types::account_address::AccountAddress;
8use std::num::NonZeroUsize;
9use std::ops::Deref;
10use std::path::Path;
11use std::result::Result as StdResult;
12use std::sync::Arc;
13use std::sync::Mutex;
14use sui_package_resolver::{
15    Package, PackageStore, PackageStoreWithLruCache, Resolver, Result,
16    error::Error as ResolverError,
17};
18use sui_rpc_api::Client;
19use sui_types::{
20    SYSTEM_PACKAGE_ADDRESSES,
21    base_types::ObjectID,
22    object::{Data, Object},
23};
24use thiserror::Error;
25use typed_store::rocks::{DBMap, MetricConf};
26use typed_store::{DBMapUtils, Map, TypedStoreError};
27
28pub mod cache_coordinator;
29pub mod package_cache_worker;
30
31use std::sync::OnceLock;
32
33/// A lazy-initialized package cache that tracks whether it was ever accessed.
34pub struct LazyPackageCache {
35    cache: Option<OnceLock<Arc<PackageCache>>>,
36    constructor: Box<dyn Fn() -> Arc<PackageCache> + Send + Sync>,
37}
38
39impl LazyPackageCache {
40    pub fn new(path: std::path::PathBuf, rest_url: String) -> Self {
41        let constructor = Box::new(move || Arc::new(PackageCache::new(&path, &rest_url)));
42
43        Self {
44            cache: None,
45            constructor,
46        }
47    }
48
49    /// Initialize the cache if needed and return the package cache.
50    pub fn initialize_or_get_cache(&mut self) -> Arc<PackageCache> {
51        if self.cache.is_none() {
52            self.cache = Some(OnceLock::new());
53        }
54
55        self.cache
56            .as_ref()
57            .unwrap()
58            .get_or_init(|| (self.constructor)())
59            .clone()
60    }
61
62    /// Get the package cache if it was initialized, None otherwise.
63    pub fn get_cache_if_initialized(&self) -> Option<Arc<PackageCache>> {
64        self.cache.as_ref().and_then(|cell| cell.get().cloned())
65    }
66}
67
68const STORE: &str = "RocksDB";
69const MAX_EPOCH_CACHES: usize = 2; // keep at most two epochs in memory
70
71#[derive(Error, Debug)]
72pub enum Error {
73    #[error("{0}")]
74    TypedStore(#[from] TypedStoreError),
75    #[error("Package not found: {0}")]
76    PackageNotFound(AccountAddress),
77}
78
79impl From<Error> for ResolverError {
80    fn from(e: Error) -> Self {
81        ResolverError::Store {
82            store: STORE,
83            error: e.to_string(),
84        }
85    }
86}
87
88#[derive(DBMapUtils)]
89pub struct PackageStoreTables {
90    pub(crate) packages: DBMap<ObjectID, Object>,
91}
92
93impl PackageStoreTables {
94    pub fn new(path: &Path) -> Arc<Self> {
95        Arc::new(Self::open_tables_read_write(
96            path.to_path_buf(),
97            MetricConf::new("package"),
98            None,
99            None,
100        ))
101    }
102
103    fn update(&self, object: &Object) -> StdResult<(), Error> {
104        self.update_batch(std::iter::once(object))
105    }
106
107    fn update_batch<'a, I>(&self, objects: I) -> StdResult<(), Error>
108    where
109        I: IntoIterator<Item = &'a Object>,
110    {
111        let mut batch = self.packages.batch();
112        batch.insert_batch(&self.packages, objects.into_iter().map(|o| (o.id(), o)))?;
113
114        batch.write()?;
115        Ok(())
116    }
117}
118
119#[derive(Clone)]
120pub struct LocalDBPackageStore {
121    tables: Arc<PackageStoreTables>,
122    client: Client,
123}
124
125impl LocalDBPackageStore {
126    pub fn new(path: &Path, rest_url: &str) -> Self {
127        Self {
128            tables: PackageStoreTables::new(path),
129            client: Client::new(rest_url).expect("invalid REST URL"),
130        }
131    }
132
133    fn update(&self, object: &Object) -> StdResult<(), Error> {
134        if object.data.try_as_package().is_some() {
135            self.tables.update(object)?;
136        }
137        Ok(())
138    }
139
140    fn update_batch<'a, I>(&self, objects: I) -> StdResult<(), Error>
141    where
142        I: IntoIterator<Item = &'a Object>,
143    {
144        let filtered = objects
145            .into_iter()
146            .filter(|o| o.data.try_as_package().is_some());
147
148        self.tables.update_batch(filtered)?;
149        Ok(())
150    }
151
152    async fn get(&self, id: AccountAddress) -> StdResult<Object, Error> {
153        if let Some(o) = self.tables.packages.get(&ObjectID::from(id))? {
154            return Ok(o);
155        }
156        let o = self
157            .client
158            .clone()
159            .get_object(ObjectID::from(id))
160            .await
161            .map_err(|_| Error::PackageNotFound(id))?;
162        self.update(&o)?;
163        Ok(o)
164    }
165
166    pub async fn get_original_package_id(&self, id: AccountAddress) -> StdResult<ObjectID, Error> {
167        let o = self.get(id).await?;
168        let Data::Package(p) = &o.data else {
169            return Err(Error::TypedStore(TypedStoreError::SerializationError(
170                "not a package".into(),
171            )));
172        };
173        Ok(p.original_package_id())
174    }
175}
176
177#[async_trait]
178impl PackageStore for LocalDBPackageStore {
179    async fn fetch(&self, id: AccountAddress) -> Result<Arc<Package>> {
180        let o = self.get(id).await?;
181        Ok(Arc::new(Package::read_from_object(&o)?))
182    }
183}
184
185// A thin new‑type wrapper so we can hand an `Arc` to `Resolver`
186#[derive(Clone)]
187pub struct GlobalArcStore(pub Arc<PackageStoreWithLruCache<LocalDBPackageStore>>);
188
189#[async_trait]
190impl PackageStore for GlobalArcStore {
191    async fn fetch(&self, id: AccountAddress) -> Result<Arc<Package>> {
192        self.0.fetch(id).await
193    }
194}
195
196impl Deref for GlobalArcStore {
197    type Target = PackageStoreWithLruCache<LocalDBPackageStore>;
198    fn deref(&self) -> &Self::Target {
199        &self.0
200    }
201}
202
203// Multi-level cache. System packages can change across epochs while non-system packages are
204// immutable and can be cached across epochs. This impl assumes the system is at most working on
205// 2 epochs at a time (at the epoch boundary). When the indexer begins processing a new epoch it
206// will create a new PackageStoreWithLruCache for that epoch and the oldest epoch in the cache
207// will be dropped.
208#[derive(Clone)]
209pub struct CompositeStore {
210    pub epoch: u64,
211    pub global: Arc<PackageStoreWithLruCache<LocalDBPackageStore>>,
212    pub base: LocalDBPackageStore,
213    pub epochs: Arc<Mutex<LruCache<u64, Arc<PackageStoreWithLruCache<LocalDBPackageStore>>>>>,
214}
215
216impl CompositeStore {
217    /// Lazily obtain (or create) the cache for the current epoch.
218    fn epoch_cache(&self) -> Arc<PackageStoreWithLruCache<LocalDBPackageStore>> {
219        let mut caches = self.epochs.lock().unwrap();
220        if let Some(cache) = caches.get(&self.epoch) {
221            return cache.clone();
222        }
223        // Not present — create a fresh cache backed by the same LocalDB store.
224        let cache = Arc::new(PackageStoreWithLruCache::new(self.base.clone()));
225        caches.put(self.epoch, cache.clone());
226        cache
227    }
228}
229
230#[async_trait]
231impl PackageStore for CompositeStore {
232    async fn fetch(&self, id: AccountAddress) -> Result<Arc<Package>> {
233        if SYSTEM_PACKAGE_ADDRESSES.contains(&id) {
234            let cache = self.epoch_cache();
235            return cache.fetch(id).await;
236        }
237        self.global.fetch(id).await
238    }
239}
240
241// Top‑level cache façade
242pub struct PackageCache {
243    pub base_store: LocalDBPackageStore,
244    pub global_cache: Arc<PackageStoreWithLruCache<LocalDBPackageStore>>,
245    pub epochs: Arc<Mutex<LruCache<u64, Arc<PackageStoreWithLruCache<LocalDBPackageStore>>>>>,
246    pub resolver: Resolver<GlobalArcStore>,
247    pub coordinator: CacheReadyCoordinator,
248}
249
250impl PackageCache {
251    pub fn new(path: &Path, rest_url: &str) -> Self {
252        let base_store = LocalDBPackageStore::new(path, rest_url);
253        let global_cache = Arc::new(PackageStoreWithLruCache::new(base_store.clone()));
254        Self {
255            resolver: Resolver::new(GlobalArcStore(global_cache.clone())),
256            base_store,
257            global_cache,
258            epochs: Arc::new(Mutex::new(LruCache::new(
259                NonZeroUsize::new(MAX_EPOCH_CACHES).unwrap(),
260            ))),
261            coordinator: CacheReadyCoordinator::new(),
262        }
263    }
264
265    pub fn resolver_for_epoch(&self, epoch: u64) -> Resolver<CompositeStore> {
266        Resolver::new(CompositeStore {
267            epoch,
268            global: self.global_cache.clone(),
269            base: self.base_store.clone(),
270            epochs: self.epochs.clone(),
271        })
272    }
273
274    pub fn update(&self, object: &Object) -> Result<()> {
275        self.base_store.update(object)?;
276        Ok(())
277    }
278
279    fn update_batch<'a, I>(&self, objects: I) -> Result<()>
280    where
281        I: IntoIterator<Item = &'a Object>,
282    {
283        self.base_store.update_batch(objects)?;
284        Ok(())
285    }
286
287    #[cfg(not(test))]
288    pub async fn get_original_package_id(&self, id: AccountAddress) -> Result<ObjectID> {
289        Ok(self.base_store.get_original_package_id(id).await?)
290    }
291
292    #[cfg(test)]
293    pub async fn get_original_package_id(&self, id: AccountAddress) -> Result<ObjectID> {
294        Ok(id.into())
295    }
296}