sui_analytics_indexer/package_store/
mod.rs1use std::num::NonZeroUsize;
5use std::ops::Deref;
6use std::path::Path;
7use std::result::Result as StdResult;
8use std::sync::Arc;
9use std::sync::Mutex;
10
11use async_trait::async_trait;
12use lru::LruCache;
13use move_core_types::account_address::AccountAddress;
14use sui_package_resolver::Package;
15use sui_package_resolver::PackageStore;
16use sui_package_resolver::PackageStoreWithLruCache;
17use sui_package_resolver::Resolver;
18use sui_package_resolver::Result;
19use sui_package_resolver::error::Error as ResolverError;
20use sui_rpc_api::Client;
21use sui_types::SYSTEM_PACKAGE_ADDRESSES;
22use sui_types::base_types::ObjectID;
23use sui_types::object::Data;
24use sui_types::object::Object;
25use thiserror::Error;
26use typed_store::DBMapUtils;
27use typed_store::Map;
28use typed_store::TypedStoreError;
29use typed_store::rocks::DBMap;
30use typed_store::rocks::MetricConf;
31
32const STORE: &str = "RocksDB";
33const MAX_EPOCH_CACHES: usize = 2; #[derive(Error, Debug)]
36pub enum Error {
37 #[error("{0}")]
38 TypedStore(#[from] TypedStoreError),
39 #[error("Package not found: {0}")]
40 PackageNotFound(AccountAddress),
41}
42
43impl From<Error> for ResolverError {
44 fn from(e: Error) -> Self {
45 ResolverError::Store {
46 store: STORE,
47 error: e.to_string(),
48 }
49 }
50}
51
52#[derive(DBMapUtils)]
53pub struct PackageStoreTables {
54 pub(crate) packages: DBMap<ObjectID, Object>,
55}
56
57impl PackageStoreTables {
58 pub fn new(path: &Path) -> Arc<Self> {
59 Arc::new(Self::open_tables_read_write(
60 path.to_path_buf(),
61 MetricConf::new("package"),
62 None,
63 None,
64 ))
65 }
66
67 fn update(&self, object: &Object) -> StdResult<(), Error> {
68 self.update_batch(std::iter::once(object))
69 }
70
71 fn update_batch<'a, I>(&self, objects: I) -> StdResult<(), Error>
72 where
73 I: IntoIterator<Item = &'a Object>,
74 {
75 let mut batch = self.packages.batch();
76 batch.insert_batch(&self.packages, objects.into_iter().map(|o| (o.id(), o)))?;
77
78 batch.write()?;
79 Ok(())
80 }
81}
82
83#[derive(Clone)]
84pub struct LocalDBPackageStore {
85 tables: Arc<PackageStoreTables>,
86 client: Client,
87}
88
89impl LocalDBPackageStore {
90 pub fn new(path: &Path, rpc_url: &str) -> Self {
91 Self {
92 tables: PackageStoreTables::new(path),
93 client: Client::new(rpc_url).expect("invalid rpc url"),
94 }
95 }
96
97 fn update(&self, object: &Object) -> StdResult<(), Error> {
98 if object.data.try_as_package().is_some() {
99 self.tables.update(object)?;
100 }
101 Ok(())
102 }
103
104 async fn get(&self, id: AccountAddress) -> StdResult<Object, Error> {
105 if let Some(o) = self.tables.packages.get(&ObjectID::from(id))? {
106 return Ok(o);
107 }
108 let o = self
109 .client
110 .clone()
111 .get_object(ObjectID::from(id))
112 .await
113 .map_err(|_| Error::PackageNotFound(id))?;
114 self.update(&o)?;
115 Ok(o)
116 }
117
118 pub async fn get_original_package_id(&self, id: AccountAddress) -> StdResult<ObjectID, Error> {
119 let o = self.get(id).await?;
120 let Data::Package(p) = &o.data else {
121 return Err(Error::TypedStore(TypedStoreError::SerializationError(
122 "not a package".into(),
123 )));
124 };
125 Ok(p.original_package_id())
126 }
127}
128
129#[async_trait]
130impl PackageStore for LocalDBPackageStore {
131 async fn fetch(&self, id: AccountAddress) -> Result<Arc<Package>> {
132 let o = self.get(id).await?;
133 Ok(Arc::new(Package::read_from_object(&o)?))
134 }
135}
136
137#[derive(Clone)]
139pub struct GlobalArcStore(pub Arc<PackageStoreWithLruCache<LocalDBPackageStore>>);
140
141#[async_trait]
142impl PackageStore for GlobalArcStore {
143 async fn fetch(&self, id: AccountAddress) -> Result<Arc<Package>> {
144 self.0.fetch(id).await
145 }
146}
147
148impl Deref for GlobalArcStore {
149 type Target = PackageStoreWithLruCache<LocalDBPackageStore>;
150 fn deref(&self) -> &Self::Target {
151 &self.0
152 }
153}
154
155#[derive(Clone)]
161pub struct CompositeStore {
162 pub epoch: u64,
163 pub global: Arc<PackageStoreWithLruCache<LocalDBPackageStore>>,
164 pub base: LocalDBPackageStore,
165 pub epochs: Arc<Mutex<LruCache<u64, Arc<PackageStoreWithLruCache<LocalDBPackageStore>>>>>,
166}
167
168impl CompositeStore {
169 fn epoch_cache(&self) -> Arc<PackageStoreWithLruCache<LocalDBPackageStore>> {
171 let mut caches = self.epochs.lock().unwrap();
172 if let Some(cache) = caches.get(&self.epoch) {
173 return cache.clone();
174 }
175 let cache = Arc::new(PackageStoreWithLruCache::new(self.base.clone()));
177 caches.put(self.epoch, cache.clone());
178 cache
179 }
180}
181
182#[async_trait]
183impl PackageStore for CompositeStore {
184 async fn fetch(&self, id: AccountAddress) -> Result<Arc<Package>> {
185 if SYSTEM_PACKAGE_ADDRESSES.contains(&id) {
186 let cache = self.epoch_cache();
187 return cache.fetch(id).await;
188 }
189 self.global.fetch(id).await
190 }
191}
192
193pub struct PackageCache {
195 pub base_store: LocalDBPackageStore,
196 pub global_cache: Arc<PackageStoreWithLruCache<LocalDBPackageStore>>,
197 pub epochs: Arc<Mutex<LruCache<u64, Arc<PackageStoreWithLruCache<LocalDBPackageStore>>>>>,
198 pub resolver: Resolver<GlobalArcStore>,
199}
200
201impl PackageCache {
202 pub fn new(path: &Path, rpc_url: &str) -> Self {
203 let base_store = LocalDBPackageStore::new(path, rpc_url);
204 let global_cache = Arc::new(PackageStoreWithLruCache::new(base_store.clone()));
205 Self {
206 resolver: Resolver::new(GlobalArcStore(global_cache.clone())),
207 base_store,
208 global_cache,
209 epochs: Arc::new(Mutex::new(LruCache::new(
210 NonZeroUsize::new(MAX_EPOCH_CACHES).unwrap(),
211 ))),
212 }
213 }
214
215 pub fn resolver_for_epoch(&self, epoch: u64) -> Resolver<CompositeStore> {
216 Resolver::new(CompositeStore {
217 epoch,
218 global: self.global_cache.clone(),
219 base: self.base_store.clone(),
220 epochs: self.epochs.clone(),
221 })
222 }
223
224 #[cfg(not(test))]
225 pub async fn get_original_package_id(&self, id: AccountAddress) -> Result<ObjectID> {
226 Ok(self.base_store.get_original_package_id(id).await?)
227 }
228
229 #[cfg(test)]
230 pub async fn get_original_package_id(&self, id: AccountAddress) -> Result<ObjectID> {
231 Ok(id.into())
232 }
233}