1use std::ops::Bound;
13
14use move_core_types::language_storage::StructTag;
15use sui_consistent_store::reader::Reader;
16use sui_types::base_types::ObjectID;
17use sui_types::base_types::SuiAddress;
18use sui_types::messages_checkpoint::CheckpointSequenceNumber;
19use sui_types::storage::BalanceInfo;
20use sui_types::storage::BalanceIterator;
21use sui_types::storage::CoinInfo;
22use sui_types::storage::DynamicFieldIteratorItem;
23use sui_types::storage::DynamicFieldKey;
24use sui_types::storage::EpochInfo;
25use sui_types::storage::LedgerBitmapBucketIterator;
26use sui_types::storage::LedgerTxSeqDigest;
27use sui_types::storage::LedgerTxSeqDigestIterator;
28use sui_types::storage::OwnedObjectInfo;
29use sui_types::storage::RpcIndexes;
30
31type PackageVersionsIterator<'a> =
34 Box<dyn Iterator<Item = Result<(u64, ObjectID), TypedStoreError>> + 'a>;
35use sui_types::storage::error::Result as StorageResult;
36use typed_store_error::TypedStoreError;
37
38use crate::reader::RpcStoreReader;
39use crate::schema::type_filter::TypeFilter;
40
41fn to_typed_store_err(e: sui_consistent_store::error::Error) -> TypedStoreError {
42 TypedStoreError::RocksDBError(format!("{e:#}"))
43}
44
45fn first_object_of_type<R: Reader + Send + Sync>(
52 reader: &RpcStoreReader<R>,
53 struct_tag: move_core_types::language_storage::StructTag,
54) -> StorageResult<Option<ObjectID>> {
55 let filter = TypeFilter::Type(struct_tag);
56 let mut iter = reader
57 .schema()
58 .iter_objects_of_type(&filter)
59 .map_err(sui_types::storage::error::Error::custom)?;
60 match iter.next() {
61 Some(Ok((key, _value))) => Ok(Some(key.object_id)),
62 Some(Err(e)) => Err(sui_types::storage::error::Error::custom(e)),
63 None => Ok(None),
64 }
65}
66
67impl<R: Reader + Send + Sync> RpcIndexes for RpcStoreReader<R> {
68 fn get_epoch_info(
69 &self,
70 epoch: sui_types::committee::EpochId,
71 ) -> StorageResult<Option<EpochInfo>> {
72 self.schema()
73 .get_epoch(epoch)
74 .map_err(sui_types::storage::error::Error::custom)
75 }
76
77 fn owned_objects_iter(
78 &self,
79 owner: SuiAddress,
80 object_type: Option<StructTag>,
81 cursor: Option<OwnedObjectInfo>,
82 ) -> StorageResult<Box<dyn Iterator<Item = Result<OwnedObjectInfo, TypedStoreError>> + '_>>
83 {
84 let cursor_object_id = cursor.as_ref().map(|c| c.object_id);
85 let iter = match object_type.as_ref() {
86 Some(struct_tag) => {
87 let filter = TypeFilter::Type(struct_tag.clone());
88 self.schema()
89 .iter_objects_owned_by_address_of_type(owner, filter)
90 .map_err(sui_types::storage::error::Error::custom)?
91 }
92 None => self
93 .schema()
94 .iter_objects_owned_by_address(owner)
95 .map_err(sui_types::storage::error::Error::custom)?,
96 };
97
98 let mapped = iter
99 .map(move |row| {
100 let (key, value) = row.map_err(to_typed_store_err)?;
101 Ok(OwnedObjectInfo {
102 owner,
103 object_type: key.type_,
104 balance: key.inverted_balance.map(|b| !b),
105 object_id: key.object_id,
106 version: sui_types::base_types::SequenceNumber::from_u64(value.0),
107 })
108 })
109 .skip_while(
115 move |entry: &Result<OwnedObjectInfo, TypedStoreError>| match entry {
116 Ok(info) => cursor_object_id
117 .map(|c| info.object_id == c)
118 .unwrap_or(false),
119 Err(_) => false,
120 },
121 );
122
123 Ok(Box::new(mapped))
124 }
125
126 fn dynamic_field_iter(
127 &self,
128 parent: ObjectID,
129 cursor: Option<ObjectID>,
130 ) -> StorageResult<Box<dyn Iterator<Item = DynamicFieldIteratorItem> + '_>> {
131 let iter = self
136 .schema()
137 .iter_objects_owned_by_object(parent.into())
138 .map_err(sui_types::storage::error::Error::custom)?;
139
140 let mapped = iter
141 .map(move |row| {
142 let (key, _value) = row.map_err(to_typed_store_err)?;
143 Ok(DynamicFieldKey {
144 parent,
145 field_id: key.object_id,
146 })
147 })
148 .skip_while(
149 move |entry: &Result<DynamicFieldKey, TypedStoreError>| match entry {
150 Ok(info) => cursor.map(|c| info.field_id == c).unwrap_or(false),
151 Err(_) => false,
152 },
153 );
154
155 Ok(Box::new(mapped))
156 }
157
158 fn get_coin_info(&self, coin_type: &StructTag) -> StorageResult<Option<CoinInfo>> {
159 let coin_metadata_object_id = first_object_of_type(
165 self,
166 sui_types::coin::CoinMetadata::type_(coin_type.clone()),
167 )?;
168 let treasury_object_id =
169 first_object_of_type(self, sui_types::coin::TreasuryCap::type_(coin_type.clone()))?;
170 let regulated_coin_metadata_object_id = first_object_of_type(
171 self,
172 sui_types::coin::RegulatedCoinMetadata::type_(coin_type.clone()),
173 )?;
174
175 if coin_metadata_object_id.is_none()
176 && treasury_object_id.is_none()
177 && regulated_coin_metadata_object_id.is_none()
178 {
179 return Ok(None);
180 }
181
182 Ok(Some(CoinInfo {
183 coin_metadata_object_id,
184 treasury_object_id,
185 regulated_coin_metadata_object_id,
186 }))
187 }
188
189 fn get_balance(
190 &self,
191 owner: &SuiAddress,
192 coin_type: &StructTag,
193 ) -> StorageResult<Option<BalanceInfo>> {
194 let balance = self
195 .schema()
196 .get_balance(*owner, coin_type.clone().into())
197 .map_err(sui_types::storage::error::Error::custom)?;
198 Ok(balance.map(|b| BalanceInfo {
203 coin_balance: b.coin.clamp(0, u64::MAX as i128) as u64,
204 address_balance: b.address.clamp(0, u64::MAX as i128) as u64,
205 }))
206 }
207
208 fn balance_iter(
209 &self,
210 owner: &SuiAddress,
211 cursor: Option<(SuiAddress, StructTag)>,
212 ) -> StorageResult<BalanceIterator<'_>> {
213 let cursor_coin_type = cursor
214 .map(|(_, tag)| move_core_types::language_storage::TypeTag::Struct(Box::new(tag)));
215 let iter = self
216 .schema()
217 .iter_balances_owned_by(*owner)
218 .map_err(sui_types::storage::error::Error::custom)?;
219
220 let mapped = iter.filter_map(move |row| {
221 let (key, value) = match row {
222 Ok(pair) => pair,
223 Err(e) => return Some(Err(sui_types::storage::error::Error::custom(e))),
224 };
225 let balance = match crate::schema::balance::Balance::from_delta(&value.into_inner()) {
230 Ok(b) => b,
231 Err(e) => return Some(Err(sui_types::storage::error::Error::custom(e))),
232 };
233 let info = BalanceInfo {
237 coin_balance: balance.coin.clamp(0, u64::MAX as i128) as u64,
238 address_balance: balance.address.clamp(0, u64::MAX as i128) as u64,
239 };
240 if let Some(c) = cursor_coin_type.as_ref()
242 && key.coin_type == *c
243 {
244 return None;
245 }
246 let struct_tag = match key.coin_type {
247 move_core_types::language_storage::TypeTag::Struct(b) => *b,
248 _ => return None,
249 };
250 Some(Ok((struct_tag, info)))
251 });
252
253 Ok(Box::new(mapped))
254 }
255
256 fn package_versions_iter(
257 &self,
258 original_id: ObjectID,
259 cursor: Option<u64>,
260 ) -> StorageResult<PackageVersionsIterator<'_>> {
261 let iter = self
262 .schema()
263 .iter_package_versions(original_id)
264 .map_err(sui_types::storage::error::Error::custom)?;
265 let mapped = iter
266 .map(move |row| {
267 let (key, value) = row.map_err(to_typed_store_err)?;
268 let storage_id_bytes: [u8; 32] = (&value.into_inner().storage_id[..])
270 .try_into()
271 .map_err(|_| {
272 TypedStoreError::SerializationError(
273 "package_versions storage_id length".into(),
274 )
275 })?;
276 Ok((key.version, ObjectID::new(storage_id_bytes)))
277 })
278 .filter(
289 move |entry: &Result<(u64, ObjectID), TypedStoreError>| match entry {
290 Ok((v, _)) => cursor.map(|c| *v >= c).unwrap_or(true),
291 Err(_) => true,
292 },
293 );
294 Ok(Box::new(mapped))
295 }
296
297 fn get_highest_indexed_checkpoint_seq_number(
298 &self,
299 ) -> StorageResult<Option<CheckpointSequenceNumber>> {
300 let framework = sui_consistent_store::FrameworkSchema::new(self.db().clone());
304 let mut min: Option<u64> = None;
305 for entry in framework
306 .watermarks
307 .iter(..)
308 .map_err(sui_types::storage::error::Error::custom)?
309 {
310 let (_, watermark) = entry.map_err(sui_types::storage::error::Error::custom)?;
311 let hi = watermark.checkpoint_hi_inclusive;
312 min = Some(min.map_or(hi, |m| m.min(hi)));
313 }
314 Ok(min)
315 }
316
317 fn ledger_tx_seq_digest(&self, tx_seq: u64) -> StorageResult<Option<LedgerTxSeqDigest>> {
318 let meta = self
319 .schema()
320 .get_tx_metadata_by_seq(tx_seq)
321 .map_err(sui_types::storage::error::Error::custom)?;
322 Ok(meta.map(|m| LedgerTxSeqDigest {
323 tx_sequence_number: tx_seq,
324 digest: m.digest,
325 event_count: m.event_count,
326 tx_offset: m.ckpt_position,
327 checkpoint_number: m.checkpoint_seq,
328 }))
329 }
330
331 fn ledger_tx_seq_digest_iter(
332 &self,
333 start: u64,
334 end_exclusive: u64,
335 descending: bool,
336 ) -> StorageResult<LedgerTxSeqDigestIterator<'_>> {
337 use crate::schema::primitives::U64Be;
338 let range = (
339 Bound::Included(U64Be(start)),
340 Bound::Excluded(U64Be(end_exclusive)),
341 );
342 let map = &self.schema().tx_metadata_by_seq;
343 let project = move |row: Result<
344 (U64Be, crate::schema::tx_metadata_by_seq::Value),
345 sui_consistent_store::error::Error,
346 >| {
347 let (U64Be(seq), value) = row.map_err(to_typed_store_err)?;
348 let stored = value.into_inner();
349 let digest_bytes: [u8; 32] = (&stored.digest[..]).try_into().map_err(|_| {
350 TypedStoreError::SerializationError("tx_metadata digest length".into())
351 })?;
352 Ok(LedgerTxSeqDigest {
353 tx_sequence_number: seq,
354 digest: sui_types::digests::TransactionDigest::new(digest_bytes),
355 event_count: stored.event_count,
356 tx_offset: stored.ckpt_position,
357 checkpoint_number: stored.checkpoint_seq,
358 })
359 };
360 if descending {
361 let iter = map
362 .iter_rev(range)
363 .map_err(sui_types::storage::error::Error::custom)?;
364 Ok(Box::new(iter.map(project)))
365 } else {
366 let iter = map
367 .iter(range)
368 .map_err(sui_types::storage::error::Error::custom)?;
369 Ok(Box::new(iter.map(project)))
370 }
371 }
372
373 fn transaction_bitmap_bucket_iter(
374 &self,
375 dimension_key: Vec<u8>,
376 start_bucket: u64,
377 end_bucket_exclusive: u64,
378 descending: bool,
379 ) -> StorageResult<LedgerBitmapBucketIterator<'_>> {
380 let map = &self.schema().transaction_bitmap;
381 let lower = crate::schema::transaction_bitmap::Key {
382 dimension_key: dimension_key.clone(),
383 bucket: start_bucket,
384 };
385 let upper = crate::schema::transaction_bitmap::Key {
386 dimension_key,
387 bucket: end_bucket_exclusive,
388 };
389 if descending {
390 let iter = map
391 .iter_rev(lower..upper)
392 .map_err(sui_types::storage::error::Error::custom)?;
393 Ok(Box::new(iter.map(project_bitmap_row)))
394 } else {
395 let iter = map
396 .iter(lower..upper)
397 .map_err(sui_types::storage::error::Error::custom)?;
398 Ok(Box::new(iter.map(project_bitmap_row)))
399 }
400 }
401
402 fn event_bitmap_bucket_iter(
403 &self,
404 dimension_key: Vec<u8>,
405 start_bucket: u64,
406 end_bucket_exclusive: u64,
407 descending: bool,
408 ) -> StorageResult<LedgerBitmapBucketIterator<'_>> {
409 let map = &self.schema().event_bitmap;
410 let lower = crate::schema::event_bitmap::Key {
411 dimension_key: dimension_key.clone(),
412 bucket: start_bucket,
413 };
414 let upper = crate::schema::event_bitmap::Key {
415 dimension_key,
416 bucket: end_bucket_exclusive,
417 };
418 if descending {
419 let iter = map
420 .iter_rev(lower..upper)
421 .map_err(sui_types::storage::error::Error::custom)?;
422 Ok(Box::new(iter.map(project_event_bitmap_row)))
423 } else {
424 let iter = map
425 .iter(lower..upper)
426 .map_err(sui_types::storage::error::Error::custom)?;
427 Ok(Box::new(iter.map(project_event_bitmap_row)))
428 }
429 }
430}
431
432fn project_bitmap_row(
436 row: Result<
437 (
438 crate::schema::transaction_bitmap::Key,
439 crate::schema::transaction_bitmap::Value,
440 ),
441 sui_consistent_store::error::Error,
442 >,
443) -> Result<sui_types::storage::LedgerBitmapBucket, TypedStoreError> {
444 let (key, value) = row.map_err(to_typed_store_err)?;
445 let bitmap = roaring::RoaringBitmap::deserialize_from(value.into_inner().data.as_ref())
446 .map_err(|e| TypedStoreError::SerializationError(format!("RoaringBitmap: {e}")))?;
447 Ok(sui_types::storage::LedgerBitmapBucket {
448 bucket_id: key.bucket,
449 bitmap,
450 })
451}
452
453fn project_event_bitmap_row(
458 row: Result<
459 (
460 crate::schema::event_bitmap::Key,
461 crate::schema::event_bitmap::Value,
462 ),
463 sui_consistent_store::error::Error,
464 >,
465) -> Result<sui_types::storage::LedgerBitmapBucket, TypedStoreError> {
466 let (key, value) = row.map_err(to_typed_store_err)?;
467 let bitmap = roaring::RoaringBitmap::deserialize_from(value.into_inner().data.as_ref())
468 .map_err(|e| TypedStoreError::SerializationError(format!("RoaringBitmap: {e}")))?;
469 Ok(sui_types::storage::LedgerBitmapBucket {
470 bucket_id: key.bucket,
471 bitmap,
472 })
473}
474
475#[cfg(test)]
476mod tests {
477 use std::sync::Arc;
478
479 use sui_consistent_store::Db;
480 use sui_consistent_store::DbOptions;
481 use sui_types::base_types::ObjectID;
482 use sui_types::storage::RpcIndexes;
483
484 use crate::RpcStoreSchema;
485 use crate::reader::RpcStoreReader;
486 use crate::schema::transaction_bitmap;
487
488 fn setup() -> (tempfile::TempDir, Db, RpcStoreReader) {
489 let dir = tempfile::tempdir().unwrap();
490 let (db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
491 let reader = RpcStoreReader::new(db.clone(), Arc::new(schema));
492 (dir, db, reader)
493 }
494
495 #[test]
496 fn transaction_bitmap_bucket_iter_walks_range_ascending() {
497 let (_dir, db, reader) = setup();
498 let dim = b"sender:alice".to_vec();
499
500 let mut batch = db.batch();
501 for tx_seq in [
502 1u64,
503 transaction_bitmap::TX_BUCKET_SIZE + 5,
504 3 * transaction_bitmap::TX_BUCKET_SIZE + 9,
505 ] {
506 let (k, v) = transaction_bitmap::store_match(dim.clone(), tx_seq);
507 batch
508 .merge(&reader.schema().transaction_bitmap, &k, &v)
509 .unwrap();
510 }
511 batch.commit().unwrap();
512
513 let buckets: Vec<u64> = reader
514 .transaction_bitmap_bucket_iter(dim.clone(), 0, 5, false)
515 .unwrap()
516 .map(|res| res.unwrap().bucket_id)
517 .collect();
518 assert_eq!(buckets, vec![0, 1, 3]);
519 }
520
521 #[test]
522 fn transaction_bitmap_bucket_iter_respects_bucket_range_bounds() {
523 let (_dir, db, reader) = setup();
524 let dim = b"sender:alice".to_vec();
525
526 let mut batch = db.batch();
527 for tx_seq in [
528 1u64,
529 transaction_bitmap::TX_BUCKET_SIZE + 5,
530 3 * transaction_bitmap::TX_BUCKET_SIZE + 9,
531 ] {
532 let (k, v) = transaction_bitmap::store_match(dim.clone(), tx_seq);
533 batch
534 .merge(&reader.schema().transaction_bitmap, &k, &v)
535 .unwrap();
536 }
537 batch.commit().unwrap();
538
539 let buckets: Vec<u64> = reader
541 .transaction_bitmap_bucket_iter(dim.clone(), 1, 3, false)
542 .unwrap()
543 .map(|res| res.unwrap().bucket_id)
544 .collect();
545 assert_eq!(buckets, vec![1]);
546 }
547
548 #[test]
549 fn transaction_bitmap_bucket_iter_descending_reverses_order() {
550 let (_dir, db, reader) = setup();
551 let dim = b"sender:alice".to_vec();
552
553 let mut batch = db.batch();
554 for tx_seq in [
555 1u64,
556 transaction_bitmap::TX_BUCKET_SIZE + 5,
557 3 * transaction_bitmap::TX_BUCKET_SIZE + 9,
558 ] {
559 let (k, v) = transaction_bitmap::store_match(dim.clone(), tx_seq);
560 batch
561 .merge(&reader.schema().transaction_bitmap, &k, &v)
562 .unwrap();
563 }
564 batch.commit().unwrap();
565
566 let buckets: Vec<u64> = reader
567 .transaction_bitmap_bucket_iter(dim, 0, 5, true)
568 .unwrap()
569 .map(|res| res.unwrap().bucket_id)
570 .collect();
571 assert_eq!(buckets, vec![3, 1, 0]);
572 }
573
574 #[test]
575 fn transaction_bitmap_bucket_iter_isolates_dimension() {
576 let (_dir, db, reader) = setup();
577 let alice = b"sender:alice".to_vec();
578 let bob = b"sender:bob".to_vec();
579
580 let mut batch = db.batch();
581 let (k_a, v_a) = transaction_bitmap::store_match(alice.clone(), 1);
582 let (k_b, v_b) = transaction_bitmap::store_match(bob, 1);
583 batch
584 .merge(&reader.schema().transaction_bitmap, &k_a, &v_a)
585 .unwrap();
586 batch
587 .merge(&reader.schema().transaction_bitmap, &k_b, &v_b)
588 .unwrap();
589 batch.commit().unwrap();
590
591 let buckets: Vec<u64> = reader
592 .transaction_bitmap_bucket_iter(alice, 0, 5, false)
593 .unwrap()
594 .map(|res| res.unwrap().bucket_id)
595 .collect();
596 assert_eq!(buckets, vec![0]);
598 }
599
600 #[test]
601 fn get_coin_info_finds_metadata_and_treasury_objects() {
602 use move_core_types::language_storage::StructTag;
603
604 use crate::schema::object_by_type;
605 use crate::schema::primitives::U64Varint;
606
607 let (_dir, db, reader) = setup();
608
609 let coin_type = StructTag {
615 address: move_core_types::account_address::AccountAddress::new([2u8; 32]),
616 module: move_core_types::identifier::Identifier::new("sui").unwrap(),
617 name: move_core_types::identifier::Identifier::new("SUI").unwrap(),
618 type_params: vec![],
619 };
620 let metadata_type = sui_types::coin::CoinMetadata::type_(coin_type.clone());
621 let treasury_type = sui_types::coin::TreasuryCap::type_(coin_type.clone());
622
623 let metadata_object_id = ObjectID::from_single_byte(0xA1);
624 let treasury_object_id = ObjectID::from_single_byte(0xA2);
625
626 let mut batch = db.batch();
627 batch
628 .put(
629 &reader.schema().object_by_type,
630 &object_by_type::Key {
631 type_: metadata_type,
632 object_id: metadata_object_id,
633 },
634 &U64Varint(1),
635 )
636 .unwrap();
637 batch
638 .put(
639 &reader.schema().object_by_type,
640 &object_by_type::Key {
641 type_: treasury_type,
642 object_id: treasury_object_id,
643 },
644 &U64Varint(1),
645 )
646 .unwrap();
647 batch.commit().unwrap();
648
649 let info = reader
650 .get_coin_info(&coin_type)
651 .unwrap()
652 .expect("coin info present");
653 assert_eq!(info.coin_metadata_object_id, Some(metadata_object_id));
654 assert_eq!(info.treasury_object_id, Some(treasury_object_id));
655 assert_eq!(info.regulated_coin_metadata_object_id, None);
656 }
657
658 #[test]
659 fn get_coin_info_returns_none_when_no_wrappers_indexed() {
660 use move_core_types::language_storage::StructTag;
661
662 let (_dir, _db, reader) = setup();
663
664 let coin_type = StructTag {
665 address: move_core_types::account_address::AccountAddress::new([3u8; 32]),
666 module: move_core_types::identifier::Identifier::new("custom").unwrap(),
667 name: move_core_types::identifier::Identifier::new("COIN").unwrap(),
668 type_params: vec![],
669 };
670 assert!(reader.get_coin_info(&coin_type).unwrap().is_none());
671 }
672
673 #[test]
674 fn transaction_bitmap_bucket_iter_returns_decoded_bitmap() {
675 let (_dir, db, reader) = setup();
676 let dim = b"sender:alice".to_vec();
677
678 let mut batch = db.batch();
679 for tx_seq in [1u64, 17, 256] {
680 let (k, v) = transaction_bitmap::store_match(dim.clone(), tx_seq);
681 batch
682 .merge(&reader.schema().transaction_bitmap, &k, &v)
683 .unwrap();
684 }
685 batch.commit().unwrap();
686
687 let first = reader
688 .transaction_bitmap_bucket_iter(dim, 0, 1, false)
689 .unwrap()
690 .next()
691 .unwrap()
692 .unwrap();
693 let bits: Vec<u32> = first.bitmap.iter().collect();
694 assert_eq!(bits, vec![1, 17, 256]);
695 }
696}