sui_analytics_indexer/package_store/
mod.rs1use async_trait::async_trait;
5use cache_coordinator::CacheReadyCoordinator;
6use lru::LruCache;
7use move_core_types::account_address::AccountAddress;
8use std::num::NonZeroUsize;
9use std::ops::Deref;
10use std::path::Path;
11use std::result::Result as StdResult;
12use std::sync::Arc;
13use std::sync::Mutex;
14use sui_package_resolver::{
15 Package, PackageStore, PackageStoreWithLruCache, Resolver, Result,
16 error::Error as ResolverError,
17};
18use sui_rpc_api::Client;
19use sui_types::{
20 SYSTEM_PACKAGE_ADDRESSES,
21 base_types::ObjectID,
22 object::{Data, Object},
23};
24use thiserror::Error;
25use typed_store::rocks::{DBMap, MetricConf};
26use typed_store::{DBMapUtils, Map, TypedStoreError};
27
28pub mod cache_coordinator;
29pub mod package_cache_worker;
30
31use std::sync::OnceLock;
32
33pub struct LazyPackageCache {
35 cache: Option<OnceLock<Arc<PackageCache>>>,
36 constructor: Box<dyn Fn() -> Arc<PackageCache> + Send + Sync>,
37}
38
39impl LazyPackageCache {
40 pub fn new(path: std::path::PathBuf, rest_url: String) -> Self {
41 let constructor = Box::new(move || Arc::new(PackageCache::new(&path, &rest_url)));
42
43 Self {
44 cache: None,
45 constructor,
46 }
47 }
48
49 pub fn initialize_or_get_cache(&mut self) -> Arc<PackageCache> {
51 if self.cache.is_none() {
52 self.cache = Some(OnceLock::new());
53 }
54
55 self.cache
56 .as_ref()
57 .unwrap()
58 .get_or_init(|| (self.constructor)())
59 .clone()
60 }
61
62 pub fn get_cache_if_initialized(&self) -> Option<Arc<PackageCache>> {
64 self.cache.as_ref().and_then(|cell| cell.get().cloned())
65 }
66}
67
68const STORE: &str = "RocksDB";
69const MAX_EPOCH_CACHES: usize = 2; #[derive(Error, Debug)]
72pub enum Error {
73 #[error("{0}")]
74 TypedStore(#[from] TypedStoreError),
75 #[error("Package not found: {0}")]
76 PackageNotFound(AccountAddress),
77}
78
79impl From<Error> for ResolverError {
80 fn from(e: Error) -> Self {
81 ResolverError::Store {
82 store: STORE,
83 error: e.to_string(),
84 }
85 }
86}
87
88#[derive(DBMapUtils)]
89pub struct PackageStoreTables {
90 pub(crate) packages: DBMap<ObjectID, Object>,
91}
92
93impl PackageStoreTables {
94 pub fn new(path: &Path) -> Arc<Self> {
95 Arc::new(Self::open_tables_read_write(
96 path.to_path_buf(),
97 MetricConf::new("package"),
98 None,
99 None,
100 ))
101 }
102
103 fn update(&self, object: &Object) -> StdResult<(), Error> {
104 self.update_batch(std::iter::once(object))
105 }
106
107 fn update_batch<'a, I>(&self, objects: I) -> StdResult<(), Error>
108 where
109 I: IntoIterator<Item = &'a Object>,
110 {
111 let mut batch = self.packages.batch();
112 batch.insert_batch(&self.packages, objects.into_iter().map(|o| (o.id(), o)))?;
113
114 batch.write()?;
115 Ok(())
116 }
117}
118
119#[derive(Clone)]
120pub struct LocalDBPackageStore {
121 tables: Arc<PackageStoreTables>,
122 client: Client,
123}
124
125impl LocalDBPackageStore {
126 pub fn new(path: &Path, rest_url: &str) -> Self {
127 Self {
128 tables: PackageStoreTables::new(path),
129 client: Client::new(rest_url).expect("invalid REST URL"),
130 }
131 }
132
133 fn update(&self, object: &Object) -> StdResult<(), Error> {
134 if object.data.try_as_package().is_some() {
135 self.tables.update(object)?;
136 }
137 Ok(())
138 }
139
140 fn update_batch<'a, I>(&self, objects: I) -> StdResult<(), Error>
141 where
142 I: IntoIterator<Item = &'a Object>,
143 {
144 let filtered = objects
145 .into_iter()
146 .filter(|o| o.data.try_as_package().is_some());
147
148 self.tables.update_batch(filtered)?;
149 Ok(())
150 }
151
152 async fn get(&self, id: AccountAddress) -> StdResult<Object, Error> {
153 if let Some(o) = self.tables.packages.get(&ObjectID::from(id))? {
154 return Ok(o);
155 }
156 let o = self
157 .client
158 .clone()
159 .get_object(ObjectID::from(id))
160 .await
161 .map_err(|_| Error::PackageNotFound(id))?;
162 self.update(&o)?;
163 Ok(o)
164 }
165
166 pub async fn get_original_package_id(&self, id: AccountAddress) -> StdResult<ObjectID, Error> {
167 let o = self.get(id).await?;
168 let Data::Package(p) = &o.data else {
169 return Err(Error::TypedStore(TypedStoreError::SerializationError(
170 "not a package".into(),
171 )));
172 };
173 Ok(p.original_package_id())
174 }
175}
176
177#[async_trait]
178impl PackageStore for LocalDBPackageStore {
179 async fn fetch(&self, id: AccountAddress) -> Result<Arc<Package>> {
180 let o = self.get(id).await?;
181 Ok(Arc::new(Package::read_from_object(&o)?))
182 }
183}
184
185#[derive(Clone)]
187pub struct GlobalArcStore(pub Arc<PackageStoreWithLruCache<LocalDBPackageStore>>);
188
189#[async_trait]
190impl PackageStore for GlobalArcStore {
191 async fn fetch(&self, id: AccountAddress) -> Result<Arc<Package>> {
192 self.0.fetch(id).await
193 }
194}
195
196impl Deref for GlobalArcStore {
197 type Target = PackageStoreWithLruCache<LocalDBPackageStore>;
198 fn deref(&self) -> &Self::Target {
199 &self.0
200 }
201}
202
203#[derive(Clone)]
209pub struct CompositeStore {
210 pub epoch: u64,
211 pub global: Arc<PackageStoreWithLruCache<LocalDBPackageStore>>,
212 pub base: LocalDBPackageStore,
213 pub epochs: Arc<Mutex<LruCache<u64, Arc<PackageStoreWithLruCache<LocalDBPackageStore>>>>>,
214}
215
216impl CompositeStore {
217 fn epoch_cache(&self) -> Arc<PackageStoreWithLruCache<LocalDBPackageStore>> {
219 let mut caches = self.epochs.lock().unwrap();
220 if let Some(cache) = caches.get(&self.epoch) {
221 return cache.clone();
222 }
223 let cache = Arc::new(PackageStoreWithLruCache::new(self.base.clone()));
225 caches.put(self.epoch, cache.clone());
226 cache
227 }
228}
229
230#[async_trait]
231impl PackageStore for CompositeStore {
232 async fn fetch(&self, id: AccountAddress) -> Result<Arc<Package>> {
233 if SYSTEM_PACKAGE_ADDRESSES.contains(&id) {
234 let cache = self.epoch_cache();
235 return cache.fetch(id).await;
236 }
237 self.global.fetch(id).await
238 }
239}
240
241pub struct PackageCache {
243 pub base_store: LocalDBPackageStore,
244 pub global_cache: Arc<PackageStoreWithLruCache<LocalDBPackageStore>>,
245 pub epochs: Arc<Mutex<LruCache<u64, Arc<PackageStoreWithLruCache<LocalDBPackageStore>>>>>,
246 pub resolver: Resolver<GlobalArcStore>,
247 pub coordinator: CacheReadyCoordinator,
248}
249
250impl PackageCache {
251 pub fn new(path: &Path, rest_url: &str) -> Self {
252 let base_store = LocalDBPackageStore::new(path, rest_url);
253 let global_cache = Arc::new(PackageStoreWithLruCache::new(base_store.clone()));
254 Self {
255 resolver: Resolver::new(GlobalArcStore(global_cache.clone())),
256 base_store,
257 global_cache,
258 epochs: Arc::new(Mutex::new(LruCache::new(
259 NonZeroUsize::new(MAX_EPOCH_CACHES).unwrap(),
260 ))),
261 coordinator: CacheReadyCoordinator::new(),
262 }
263 }
264
265 pub fn resolver_for_epoch(&self, epoch: u64) -> Resolver<CompositeStore> {
266 Resolver::new(CompositeStore {
267 epoch,
268 global: self.global_cache.clone(),
269 base: self.base_store.clone(),
270 epochs: self.epochs.clone(),
271 })
272 }
273
274 pub fn update(&self, object: &Object) -> Result<()> {
275 self.base_store.update(object)?;
276 Ok(())
277 }
278
279 fn update_batch<'a, I>(&self, objects: I) -> Result<()>
280 where
281 I: IntoIterator<Item = &'a Object>,
282 {
283 self.base_store.update_batch(objects)?;
284 Ok(())
285 }
286
287 #[cfg(not(test))]
288 pub async fn get_original_package_id(&self, id: AccountAddress) -> Result<ObjectID> {
289 Ok(self.base_store.get_original_package_id(id).await?)
290 }
291
292 #[cfg(test)]
293 pub async fn get_original_package_id(&self, id: AccountAddress) -> Result<ObjectID> {
294 Ok(id.into())
295 }
296}