1use crate::authority::AuthorityState;
5use crate::checkpoints::CheckpointStore;
6use crate::epoch::committee_store::CommitteeStore;
7use crate::execution_cache::ExecutionCacheTraitPointers;
8use crate::rpc_index::CoinIndexInfo;
9use crate::rpc_index::OwnerIndexInfo;
10use crate::rpc_index::OwnerIndexKey;
11use crate::rpc_index::RpcIndexStore;
12use move_core_types::language_storage::StructTag;
13use parking_lot::Mutex;
14use std::sync::Arc;
15use sui_types::base_types::ObjectID;
16use sui_types::base_types::SuiAddress;
17use sui_types::base_types::TransactionDigest;
18use sui_types::committee::Committee;
19use sui_types::committee::EpochId;
20use sui_types::effects::{TransactionEffects, TransactionEvents};
21use sui_types::messages_checkpoint::CheckpointContentsDigest;
22use sui_types::messages_checkpoint::CheckpointDigest;
23use sui_types::messages_checkpoint::CheckpointSequenceNumber;
24use sui_types::messages_checkpoint::EndOfEpochData;
25use sui_types::messages_checkpoint::VerifiedCheckpoint;
26use sui_types::messages_checkpoint::VerifiedCheckpointContents;
27use sui_types::messages_checkpoint::VersionedFullCheckpointContents;
28use sui_types::object::Object;
29use sui_types::storage::BalanceInfo;
30use sui_types::storage::BalanceIterator;
31use sui_types::storage::CoinInfo;
32use sui_types::storage::DynamicFieldKey;
33use sui_types::storage::ObjectStore;
34use sui_types::storage::OwnedObjectInfo;
35use sui_types::storage::RpcIndexes;
36use sui_types::storage::RpcStateReader;
37use sui_types::storage::WriteStore;
38use sui_types::storage::error::Error as StorageError;
39use sui_types::storage::error::Result;
40use sui_types::storage::{ObjectKey, ReadStore};
41use sui_types::transaction::VerifiedTransaction;
42use tap::Pipe;
43use tap::TapFallible;
44use tracing::error;
45use typed_store::TypedStoreError;
46
47#[derive(Clone)]
48pub struct RocksDbStore {
49 cache_traits: ExecutionCacheTraitPointers,
50
51 committee_store: Arc<CommitteeStore>,
52 checkpoint_store: Arc<CheckpointStore>,
53 highest_verified_checkpoint: Arc<Mutex<Option<u64>>>,
55 highest_synced_checkpoint: Arc<Mutex<Option<u64>>>,
56}
57
58impl RocksDbStore {
59 pub fn new(
60 cache_traits: ExecutionCacheTraitPointers,
61 committee_store: Arc<CommitteeStore>,
62 checkpoint_store: Arc<CheckpointStore>,
63 ) -> Self {
64 Self {
65 cache_traits,
66 committee_store,
67 checkpoint_store,
68 highest_verified_checkpoint: Arc::new(Mutex::new(None)),
69 highest_synced_checkpoint: Arc::new(Mutex::new(None)),
70 }
71 }
72
73 pub fn get_objects(&self, object_keys: &[ObjectKey]) -> Vec<Option<Object>> {
74 self.cache_traits
75 .object_cache_reader
76 .multi_get_objects_by_key(object_keys)
77 }
78
79 pub fn get_last_executed_checkpoint(&self) -> Option<VerifiedCheckpoint> {
80 self.checkpoint_store
81 .get_highest_executed_checkpoint()
82 .expect("db error")
83 }
84}
85
86impl ReadStore for RocksDbStore {
87 fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<VerifiedCheckpoint> {
88 self.checkpoint_store
89 .get_checkpoint_by_digest(digest)
90 .expect("db error")
91 }
92
93 fn get_checkpoint_by_sequence_number(
94 &self,
95 sequence_number: CheckpointSequenceNumber,
96 ) -> Option<VerifiedCheckpoint> {
97 self.checkpoint_store
98 .get_checkpoint_by_sequence_number(sequence_number)
99 .expect("db error")
100 }
101
102 fn get_highest_verified_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
103 self.checkpoint_store
104 .get_highest_verified_checkpoint()
105 .map(|maybe_checkpoint| {
106 maybe_checkpoint
107 .expect("storage should have been initialized with genesis checkpoint")
108 })
109 .map_err(Into::into)
110 }
111
112 fn get_highest_synced_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
113 self.checkpoint_store
114 .get_highest_synced_checkpoint()
115 .map(|maybe_checkpoint| {
116 maybe_checkpoint
117 .expect("storage should have been initialized with genesis checkpoint")
118 })
119 .map_err(Into::into)
120 }
121
122 fn get_lowest_available_checkpoint(&self) -> Result<CheckpointSequenceNumber, StorageError> {
123 if let Some(highest_pruned_cp) = self
124 .checkpoint_store
125 .get_highest_pruned_checkpoint_seq_number()
126 .map_err(Into::<StorageError>::into)?
127 {
128 Ok(highest_pruned_cp + 1)
129 } else {
130 Ok(0)
131 }
132 }
133
134 fn get_full_checkpoint_contents(
135 &self,
136 sequence_number: Option<CheckpointSequenceNumber>,
137 digest: &CheckpointContentsDigest,
138 ) -> Option<VersionedFullCheckpointContents> {
139 #[cfg(debug_assertions)]
140 if let Some(sequence_number) = sequence_number {
141 if let Some(loaded_sequence_number) = self
145 .checkpoint_store
146 .get_sequence_number_by_contents_digest(digest)
147 .expect("db error")
148 {
149 assert_eq!(loaded_sequence_number, sequence_number);
150 }
151 }
152
153 let sequence_number = sequence_number.or_else(|| {
154 self.checkpoint_store
155 .get_sequence_number_by_contents_digest(digest)
156 .expect("db error")
157 });
158 if let Some(sequence_number) = sequence_number {
159 if let Ok(Some(contents)) = self
165 .checkpoint_store
166 .get_full_checkpoint_contents_by_sequence_number(sequence_number)
167 .tap_err(|e| {
168 error!(
169 "error getting full checkpoint contents for checkpoint {:?}: {:?}",
170 sequence_number, e
171 )
172 })
173 {
174 return Some(contents);
175 }
176 }
177
178 self.checkpoint_store
184 .get_checkpoint_contents(digest)
185 .expect("db error")
186 .and_then(|contents| {
187 let mut transactions = Vec::with_capacity(contents.size());
188 for tx in contents.iter() {
189 if let (Some(t), Some(e)) = (
190 self.get_transaction(&tx.transaction),
191 self.cache_traits
192 .transaction_cache_reader
193 .get_effects(&tx.effects),
194 ) {
195 transactions.push(sui_types::base_types::ExecutionData::new(
196 (*t).clone().into_inner(),
197 e,
198 ))
199 } else {
200 return None;
201 }
202 }
203 Some(
204 VersionedFullCheckpointContents::from_contents_and_execution_data(
205 contents,
206 transactions.into_iter(),
207 ),
208 )
209 })
210 }
211
212 fn get_committee(&self, epoch: EpochId) -> Option<Arc<Committee>> {
213 self.committee_store.get_committee(&epoch).unwrap()
214 }
215
216 fn get_transaction(&self, digest: &TransactionDigest) -> Option<Arc<VerifiedTransaction>> {
217 self.cache_traits
218 .transaction_cache_reader
219 .get_transaction_block(digest)
220 }
221
222 fn get_transaction_effects(&self, digest: &TransactionDigest) -> Option<TransactionEffects> {
223 self.cache_traits
224 .transaction_cache_reader
225 .get_executed_effects(digest)
226 }
227
228 fn get_events(&self, digest: &TransactionDigest) -> Option<TransactionEvents> {
229 self.cache_traits
230 .transaction_cache_reader
231 .get_events(digest)
232 }
233
234 fn get_unchanged_loaded_runtime_objects(
235 &self,
236 digest: &TransactionDigest,
237 ) -> Option<Vec<ObjectKey>> {
238 self.cache_traits
239 .transaction_cache_reader
240 .get_unchanged_loaded_runtime_objects(digest)
241 }
242
243 fn get_transaction_checkpoint(
244 &self,
245 digest: &TransactionDigest,
246 ) -> Option<CheckpointSequenceNumber> {
247 self.cache_traits
248 .checkpoint_cache
249 .deprecated_get_transaction_checkpoint(digest)
250 .map(|(_epoch, checkpoint)| checkpoint)
251 }
252
253 fn get_latest_checkpoint(&self) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
254 self.checkpoint_store
255 .get_highest_executed_checkpoint()
256 .expect("db error")
257 .ok_or_else(|| {
258 sui_types::storage::error::Error::missing("unable to get latest checkpoint")
259 })
260 }
261
262 fn get_checkpoint_contents_by_digest(
263 &self,
264 digest: &CheckpointContentsDigest,
265 ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
266 self.checkpoint_store
267 .get_checkpoint_contents(digest)
268 .expect("db error")
269 }
270
271 fn get_checkpoint_contents_by_sequence_number(
272 &self,
273 sequence_number: CheckpointSequenceNumber,
274 ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
275 match self.get_checkpoint_by_sequence_number(sequence_number) {
276 Some(checkpoint) => self.get_checkpoint_contents_by_digest(&checkpoint.content_digest),
277 None => None,
278 }
279 }
280}
281
282impl ObjectStore for RocksDbStore {
283 fn get_object(&self, object_id: &sui_types::base_types::ObjectID) -> Option<Object> {
284 self.cache_traits.object_store.get_object(object_id)
285 }
286
287 fn get_object_by_key(
288 &self,
289 object_id: &sui_types::base_types::ObjectID,
290 version: sui_types::base_types::VersionNumber,
291 ) -> Option<Object> {
292 self.cache_traits
293 .object_store
294 .get_object_by_key(object_id, version)
295 }
296}
297
298impl WriteStore for RocksDbStore {
299 fn insert_checkpoint(
300 &self,
301 checkpoint: &VerifiedCheckpoint,
302 ) -> Result<(), sui_types::storage::error::Error> {
303 if let Some(EndOfEpochData {
304 next_epoch_committee,
305 ..
306 }) = checkpoint.end_of_epoch_data.as_ref()
307 {
308 let next_committee = next_epoch_committee.iter().cloned().collect();
309 let committee =
310 Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
311 self.insert_committee(committee)?;
312 }
313
314 self.checkpoint_store
315 .insert_verified_checkpoint(checkpoint)
316 .map_err(Into::into)
317 }
318
319 fn update_highest_synced_checkpoint(
320 &self,
321 checkpoint: &VerifiedCheckpoint,
322 ) -> Result<(), sui_types::storage::error::Error> {
323 let mut locked = self.highest_synced_checkpoint.lock();
324 if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
325 return Ok(());
326 }
327 self.checkpoint_store
328 .update_highest_synced_checkpoint(checkpoint)
329 .map_err(sui_types::storage::error::Error::custom)?;
330 *locked = Some(checkpoint.sequence_number);
331 Ok(())
332 }
333
334 fn update_highest_verified_checkpoint(
335 &self,
336 checkpoint: &VerifiedCheckpoint,
337 ) -> Result<(), sui_types::storage::error::Error> {
338 let mut locked = self.highest_verified_checkpoint.lock();
339 if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
340 return Ok(());
341 }
342 self.checkpoint_store
343 .update_highest_verified_checkpoint(checkpoint)
344 .map_err(sui_types::storage::error::Error::custom)?;
345 *locked = Some(checkpoint.sequence_number);
346 Ok(())
347 }
348
349 fn insert_checkpoint_contents(
350 &self,
351 checkpoint: &VerifiedCheckpoint,
352 contents: VerifiedCheckpointContents,
353 ) -> Result<(), sui_types::storage::error::Error> {
354 self.cache_traits
355 .state_sync_store
356 .multi_insert_transaction_and_effects(contents.transactions());
357 self.checkpoint_store
358 .insert_verified_checkpoint_contents(checkpoint, contents)
359 .map_err(Into::into)
360 }
361
362 fn insert_committee(
363 &self,
364 new_committee: Committee,
365 ) -> Result<(), sui_types::storage::error::Error> {
366 self.committee_store
367 .insert_new_committee(&new_committee)
368 .unwrap();
369 Ok(())
370 }
371}
372
373pub struct RestReadStore {
374 state: Arc<AuthorityState>,
375 rocks: RocksDbStore,
376}
377
378impl RestReadStore {
379 pub fn new(state: Arc<AuthorityState>, rocks: RocksDbStore) -> Self {
380 Self { state, rocks }
381 }
382
383 fn index(&self) -> sui_types::storage::error::Result<&RpcIndexStore> {
384 self.state
385 .rpc_index
386 .as_deref()
387 .ok_or_else(|| sui_types::storage::error::Error::custom("rest index store is disabled"))
388 }
389}
390
391impl ObjectStore for RestReadStore {
392 fn get_object(&self, object_id: &sui_types::base_types::ObjectID) -> Option<Object> {
393 self.rocks.get_object(object_id)
394 }
395
396 fn get_object_by_key(
397 &self,
398 object_id: &sui_types::base_types::ObjectID,
399 version: sui_types::base_types::VersionNumber,
400 ) -> Option<Object> {
401 self.rocks.get_object_by_key(object_id, version)
402 }
403}
404
405impl ReadStore for RestReadStore {
406 fn get_committee(&self, epoch: EpochId) -> Option<Arc<Committee>> {
407 self.rocks.get_committee(epoch)
408 }
409
410 fn get_latest_checkpoint(&self) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
411 self.rocks.get_latest_checkpoint()
412 }
413
414 fn get_highest_verified_checkpoint(
415 &self,
416 ) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
417 self.rocks.get_highest_verified_checkpoint()
418 }
419
420 fn get_highest_synced_checkpoint(
421 &self,
422 ) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
423 self.rocks.get_highest_synced_checkpoint()
424 }
425
426 fn get_lowest_available_checkpoint(
427 &self,
428 ) -> sui_types::storage::error::Result<CheckpointSequenceNumber> {
429 self.rocks.get_lowest_available_checkpoint()
430 }
431
432 fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<VerifiedCheckpoint> {
433 self.rocks.get_checkpoint_by_digest(digest)
434 }
435
436 fn get_checkpoint_by_sequence_number(
437 &self,
438 sequence_number: CheckpointSequenceNumber,
439 ) -> Option<VerifiedCheckpoint> {
440 self.rocks
441 .get_checkpoint_by_sequence_number(sequence_number)
442 }
443
444 fn get_checkpoint_contents_by_digest(
445 &self,
446 digest: &CheckpointContentsDigest,
447 ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
448 self.rocks.get_checkpoint_contents_by_digest(digest)
449 }
450
451 fn get_checkpoint_contents_by_sequence_number(
452 &self,
453 sequence_number: CheckpointSequenceNumber,
454 ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
455 self.rocks
456 .get_checkpoint_contents_by_sequence_number(sequence_number)
457 }
458
459 fn get_transaction(&self, digest: &TransactionDigest) -> Option<Arc<VerifiedTransaction>> {
460 self.rocks.get_transaction(digest)
461 }
462
463 fn get_transaction_effects(&self, digest: &TransactionDigest) -> Option<TransactionEffects> {
464 self.rocks.get_transaction_effects(digest)
465 }
466
467 fn get_events(&self, digest: &TransactionDigest) -> Option<TransactionEvents> {
468 self.rocks.get_events(digest)
469 }
470
471 fn get_full_checkpoint_contents(
472 &self,
473 sequence_number: Option<CheckpointSequenceNumber>,
474 digest: &CheckpointContentsDigest,
475 ) -> Option<VersionedFullCheckpointContents> {
476 self.rocks
477 .get_full_checkpoint_contents(sequence_number, digest)
478 }
479
480 fn get_unchanged_loaded_runtime_objects(
481 &self,
482 digest: &TransactionDigest,
483 ) -> Option<Vec<ObjectKey>> {
484 self.rocks.get_unchanged_loaded_runtime_objects(digest)
485 }
486
487 fn get_transaction_checkpoint(
488 &self,
489 digest: &TransactionDigest,
490 ) -> Option<CheckpointSequenceNumber> {
491 self.rocks.get_transaction_checkpoint(digest)
492 }
493}
494
495impl RpcStateReader for RestReadStore {
496 fn get_lowest_available_checkpoint_objects(
497 &self,
498 ) -> sui_types::storage::error::Result<CheckpointSequenceNumber> {
499 Ok(self
500 .state
501 .get_object_cache_reader()
502 .get_highest_pruned_checkpoint()
503 .map(|cp| cp + 1)
504 .unwrap_or(0))
505 }
506
507 fn get_chain_identifier(&self) -> Result<sui_types::digests::ChainIdentifier> {
508 Ok(self.state.get_chain_identifier())
509 }
510
511 fn indexes(&self) -> Option<&dyn RpcIndexes> {
512 Some(self)
513 }
514
515 fn get_struct_layout(
516 &self,
517 struct_tag: &move_core_types::language_storage::StructTag,
518 ) -> Result<Option<move_core_types::annotated_value::MoveTypeLayout>> {
519 self.state
520 .load_epoch_store_one_call_per_task()
521 .executor()
522 .type_layout_resolver(Box::new(self.state.get_backing_package_store().as_ref()))
524 .get_annotated_layout(struct_tag)
525 .map(|layout| layout.into_layout())
526 .map(Some)
527 .map_err(StorageError::custom)
528 }
529}
530
531struct BatchedEventIterator<'a, I>
532where
533 I: Iterator<Item = Result<crate::rpc_index::EventIndexKey, TypedStoreError>>,
534{
535 key_iter: I,
536 rocks: &'a RocksDbStore,
537 current_checkpoint: Option<u64>,
538 current_checkpoint_contents: Option<sui_types::messages_checkpoint::CheckpointContents>,
539 cached_tx_events: Option<TransactionEvents>,
540 cached_tx_digest: Option<TransactionDigest>,
541}
542
543impl<I> Iterator for BatchedEventIterator<'_, I>
544where
545 I: Iterator<Item = Result<crate::rpc_index::EventIndexKey, TypedStoreError>>,
546{
547 type Item = Result<(u64, u64, u32, u32, sui_types::event::Event), TypedStoreError>;
548
549 fn next(&mut self) -> Option<Self::Item> {
550 let key = match self.key_iter.next()? {
551 Ok(k) => k,
552 Err(e) => return Some(Err(e)),
553 };
554
555 if self.current_checkpoint != Some(key.checkpoint_seq) {
556 self.current_checkpoint = Some(key.checkpoint_seq);
557 self.current_checkpoint_contents = self
558 .rocks
559 .get_checkpoint_contents_by_sequence_number(key.checkpoint_seq);
560 self.cached_tx_events = None;
561 self.cached_tx_digest = None;
562 }
563
564 let checkpoint_contents = self.current_checkpoint_contents.as_ref()?;
565
566 let exec_digest = checkpoint_contents
567 .iter()
568 .nth(key.transaction_idx as usize)?;
569 let tx_digest = exec_digest.transaction;
570
571 if self.cached_tx_digest != Some(tx_digest) {
572 self.cached_tx_digest = Some(tx_digest);
573 self.cached_tx_events = self.rocks.get_events(&tx_digest);
574 }
575
576 let tx_events = self.cached_tx_events.as_ref()?;
577 let event = tx_events.data.get(key.event_index as usize)?.clone();
578
579 Some(Ok((
580 key.checkpoint_seq,
581 key.accumulator_version,
582 key.transaction_idx,
583 key.event_index,
584 event,
585 )))
586 }
587}
588
589impl RpcIndexes for RestReadStore {
590 fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<sui_types::storage::EpochInfo>> {
591 self.index()?
592 .get_epoch_info(epoch)
593 .map_err(StorageError::custom)
594 }
595
596 fn owned_objects_iter(
597 &self,
598 owner: SuiAddress,
599 object_type: Option<StructTag>,
600 cursor: Option<OwnedObjectInfo>,
601 ) -> Result<Box<dyn Iterator<Item = Result<OwnedObjectInfo, TypedStoreError>> + '_>> {
602 let cursor = cursor.map(|cursor| OwnerIndexKey {
603 owner: cursor.owner,
604 object_type: cursor.object_type,
605 inverted_balance: cursor.balance.map(std::ops::Not::not),
606 object_id: cursor.object_id,
607 });
608
609 let iter = self
610 .index()?
611 .owner_iter(owner, object_type, cursor)?
612 .map(|result| {
613 result.map(
614 |(
615 OwnerIndexKey {
616 owner,
617 object_id,
618 object_type,
619 inverted_balance,
620 },
621 OwnerIndexInfo { version },
622 )| {
623 OwnedObjectInfo {
624 owner,
625 object_type,
626 balance: inverted_balance.map(std::ops::Not::not),
627 object_id,
628 version,
629 }
630 },
631 )
632 });
633
634 Ok(Box::new(iter) as _)
635 }
636
637 fn dynamic_field_iter(
638 &self,
639 parent: ObjectID,
640 cursor: Option<ObjectID>,
641 ) -> sui_types::storage::error::Result<
642 Box<dyn Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_>,
643 > {
644 let iter = self.index()?.dynamic_field_iter(parent, cursor)?;
645 Ok(Box::new(iter) as _)
646 }
647
648 fn get_coin_info(
649 &self,
650 coin_type: &StructTag,
651 ) -> sui_types::storage::error::Result<Option<CoinInfo>> {
652 self.index()?
653 .get_coin_info(coin_type)?
654 .map(
655 |CoinIndexInfo {
656 coin_metadata_object_id,
657 treasury_object_id,
658 regulated_coin_metadata_object_id,
659 }| CoinInfo {
660 coin_metadata_object_id,
661 treasury_object_id,
662 regulated_coin_metadata_object_id,
663 },
664 )
665 .pipe(Ok)
666 }
667
668 fn get_balance(
669 &self,
670 owner: &SuiAddress,
671 coin_type: &StructTag,
672 ) -> sui_types::storage::error::Result<Option<BalanceInfo>> {
673 self.index()?
674 .get_balance(owner, coin_type)?
675 .map(|info| info.into())
676 .pipe(Ok)
677 }
678
679 fn balance_iter(
680 &self,
681 owner: &SuiAddress,
682 cursor: Option<(SuiAddress, StructTag)>,
683 ) -> sui_types::storage::error::Result<BalanceIterator<'_>> {
684 let cursor_key =
685 cursor.map(|(owner, coin_type)| crate::rpc_index::BalanceKey { owner, coin_type });
686
687 Ok(Box::new(
688 self.index()?
689 .balance_iter(*owner, cursor_key)?
690 .map(|result| {
691 result
692 .map(|(key, info)| (key.coin_type, info.into()))
693 .map_err(Into::into)
694 }),
695 ))
696 }
697
698 fn package_versions_iter(
699 &self,
700 original_id: ObjectID,
701 cursor: Option<u64>,
702 ) -> sui_types::storage::error::Result<
703 Box<dyn Iterator<Item = Result<(u64, ObjectID), TypedStoreError>> + '_>,
704 > {
705 let iter = self.index()?.package_versions_iter(original_id, cursor)?;
706 Ok(
707 Box::new(iter.map(|result| result.map(|(key, info)| (key.version, info.storage_id))))
708 as _,
709 )
710 }
711
712 fn get_highest_indexed_checkpoint_seq_number(
713 &self,
714 ) -> sui_types::storage::error::Result<Option<CheckpointSequenceNumber>> {
715 self.index()?
716 .get_highest_indexed_checkpoint_seq_number()
717 .map_err(Into::into)
718 }
719
720 fn authenticated_event_iter(
721 &self,
722 stream_id: SuiAddress,
723 start_checkpoint: u64,
724 start_accumulator_version: Option<u64>,
725 start_transaction_idx: Option<u32>,
726 start_event_idx: Option<u32>,
727 end_checkpoint: u64,
728 limit: u32,
729 ) -> sui_types::storage::error::Result<
730 Box<
731 dyn Iterator<
732 Item = Result<(u64, u64, u32, u32, sui_types::event::Event), TypedStoreError>,
733 > + '_,
734 >,
735 > {
736 let index = self.index()?;
737 let key_iter = index.event_iter(
738 stream_id,
739 start_checkpoint,
740 start_accumulator_version.unwrap_or(0),
741 start_transaction_idx.unwrap_or(0),
742 start_event_idx.unwrap_or(0),
743 end_checkpoint,
744 limit,
745 )?;
746
747 let rocks = &self.rocks;
748 let iter = BatchedEventIterator {
749 key_iter,
750 rocks,
751 current_checkpoint: None,
752 current_checkpoint_contents: None,
753 cached_tx_events: None,
754 cached_tx_digest: None,
755 };
756
757 Ok(Box::new(iter))
758 }
759}