1use crate::authority::AuthorityState;
5use crate::checkpoints::CheckpointStore;
6use crate::epoch::committee_store::CommitteeStore;
7use crate::execution_cache::ExecutionCacheTraitPointers;
8use crate::rpc_index::CoinIndexInfo;
9use crate::rpc_index::OwnerIndexInfo;
10use crate::rpc_index::OwnerIndexKey;
11use crate::rpc_index::RpcIndexStore;
12use move_core_types::language_storage::StructTag;
13use parking_lot::Mutex;
14use std::sync::Arc;
15use sui_types::base_types::ObjectID;
16use sui_types::base_types::SuiAddress;
17use sui_types::base_types::TransactionDigest;
18use sui_types::committee::Committee;
19use sui_types::committee::EpochId;
20use sui_types::effects::{TransactionEffects, TransactionEvents};
21use sui_types::messages_checkpoint::CheckpointContentsDigest;
22use sui_types::messages_checkpoint::CheckpointDigest;
23use sui_types::messages_checkpoint::CheckpointSequenceNumber;
24use sui_types::messages_checkpoint::EndOfEpochData;
25use sui_types::messages_checkpoint::VerifiedCheckpoint;
26use sui_types::messages_checkpoint::VerifiedCheckpointContents;
27use sui_types::messages_checkpoint::VersionedFullCheckpointContents;
28use sui_types::object::Object;
29use sui_types::storage::BalanceInfo;
30use sui_types::storage::BalanceIterator;
31use sui_types::storage::CoinInfo;
32use sui_types::storage::DynamicFieldKey;
33use sui_types::storage::ObjectStore;
34use sui_types::storage::OwnedObjectInfo;
35use sui_types::storage::RpcIndexes;
36use sui_types::storage::RpcStateReader;
37use sui_types::storage::TransactionInfo;
38use sui_types::storage::WriteStore;
39use sui_types::storage::error::Error as StorageError;
40use sui_types::storage::error::Result;
41use sui_types::storage::{ObjectKey, ReadStore};
42use sui_types::transaction::VerifiedTransaction;
43use tap::Pipe;
44use tap::TapFallible;
45use tracing::error;
46use typed_store::TypedStoreError;
47
48#[derive(Clone)]
49pub struct RocksDbStore {
50 cache_traits: ExecutionCacheTraitPointers,
51
52 committee_store: Arc<CommitteeStore>,
53 checkpoint_store: Arc<CheckpointStore>,
54 highest_verified_checkpoint: Arc<Mutex<Option<u64>>>,
56 highest_synced_checkpoint: Arc<Mutex<Option<u64>>>,
57}
58
59impl RocksDbStore {
60 pub fn new(
61 cache_traits: ExecutionCacheTraitPointers,
62 committee_store: Arc<CommitteeStore>,
63 checkpoint_store: Arc<CheckpointStore>,
64 ) -> Self {
65 Self {
66 cache_traits,
67 committee_store,
68 checkpoint_store,
69 highest_verified_checkpoint: Arc::new(Mutex::new(None)),
70 highest_synced_checkpoint: Arc::new(Mutex::new(None)),
71 }
72 }
73
74 pub fn get_objects(&self, object_keys: &[ObjectKey]) -> Vec<Option<Object>> {
75 self.cache_traits
76 .object_cache_reader
77 .multi_get_objects_by_key(object_keys)
78 }
79
80 pub fn get_last_executed_checkpoint(&self) -> Option<VerifiedCheckpoint> {
81 self.checkpoint_store
82 .get_highest_executed_checkpoint()
83 .expect("db error")
84 }
85}
86
87impl ReadStore for RocksDbStore {
88 fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<VerifiedCheckpoint> {
89 self.checkpoint_store
90 .get_checkpoint_by_digest(digest)
91 .expect("db error")
92 }
93
94 fn get_checkpoint_by_sequence_number(
95 &self,
96 sequence_number: CheckpointSequenceNumber,
97 ) -> Option<VerifiedCheckpoint> {
98 self.checkpoint_store
99 .get_checkpoint_by_sequence_number(sequence_number)
100 .expect("db error")
101 }
102
103 fn get_highest_verified_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
104 self.checkpoint_store
105 .get_highest_verified_checkpoint()
106 .map(|maybe_checkpoint| {
107 maybe_checkpoint
108 .expect("storage should have been initialized with genesis checkpoint")
109 })
110 .map_err(Into::into)
111 }
112
113 fn get_highest_synced_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
114 self.checkpoint_store
115 .get_highest_synced_checkpoint()
116 .map(|maybe_checkpoint| {
117 maybe_checkpoint
118 .expect("storage should have been initialized with genesis checkpoint")
119 })
120 .map_err(Into::into)
121 }
122
123 fn get_lowest_available_checkpoint(&self) -> Result<CheckpointSequenceNumber, StorageError> {
124 if let Some(highest_pruned_cp) = self
125 .checkpoint_store
126 .get_highest_pruned_checkpoint_seq_number()
127 .map_err(Into::<StorageError>::into)?
128 {
129 Ok(highest_pruned_cp + 1)
130 } else {
131 Ok(0)
132 }
133 }
134
135 fn get_full_checkpoint_contents(
136 &self,
137 sequence_number: Option<CheckpointSequenceNumber>,
138 digest: &CheckpointContentsDigest,
139 ) -> Option<VersionedFullCheckpointContents> {
140 #[cfg(debug_assertions)]
141 if let Some(sequence_number) = sequence_number {
142 if let Some(loaded_sequence_number) = self
146 .checkpoint_store
147 .get_sequence_number_by_contents_digest(digest)
148 .expect("db error")
149 {
150 assert_eq!(loaded_sequence_number, sequence_number);
151 }
152 }
153
154 let sequence_number = sequence_number.or_else(|| {
155 self.checkpoint_store
156 .get_sequence_number_by_contents_digest(digest)
157 .expect("db error")
158 });
159 if let Some(sequence_number) = sequence_number {
160 if let Ok(Some(contents)) = self
166 .checkpoint_store
167 .get_full_checkpoint_contents_by_sequence_number(sequence_number)
168 .tap_err(|e| {
169 error!(
170 "error getting full checkpoint contents for checkpoint {:?}: {:?}",
171 sequence_number, e
172 )
173 })
174 {
175 return Some(contents);
176 }
177 }
178
179 self.checkpoint_store
185 .get_checkpoint_contents(digest)
186 .expect("db error")
187 .and_then(|contents| {
188 let mut transactions = Vec::with_capacity(contents.size());
189 for tx in contents.iter() {
190 if let (Some(t), Some(e)) = (
191 self.get_transaction(&tx.transaction),
192 self.cache_traits
193 .transaction_cache_reader
194 .get_effects(&tx.effects),
195 ) {
196 transactions.push(sui_types::base_types::ExecutionData::new(
197 (*t).clone().into_inner(),
198 e,
199 ))
200 } else {
201 return None;
202 }
203 }
204 Some(
205 VersionedFullCheckpointContents::from_contents_and_execution_data(
206 contents,
207 transactions.into_iter(),
208 ),
209 )
210 })
211 }
212
213 fn get_committee(&self, epoch: EpochId) -> Option<Arc<Committee>> {
214 self.committee_store.get_committee(&epoch).unwrap()
215 }
216
217 fn get_transaction(&self, digest: &TransactionDigest) -> Option<Arc<VerifiedTransaction>> {
218 self.cache_traits
219 .transaction_cache_reader
220 .get_transaction_block(digest)
221 }
222
223 fn get_transaction_effects(&self, digest: &TransactionDigest) -> Option<TransactionEffects> {
224 self.cache_traits
225 .transaction_cache_reader
226 .get_executed_effects(digest)
227 }
228
229 fn get_events(&self, digest: &TransactionDigest) -> Option<TransactionEvents> {
230 self.cache_traits
231 .transaction_cache_reader
232 .get_events(digest)
233 }
234
235 fn get_unchanged_loaded_runtime_objects(
236 &self,
237 digest: &TransactionDigest,
238 ) -> Option<Vec<ObjectKey>> {
239 self.cache_traits
240 .transaction_cache_reader
241 .get_unchanged_loaded_runtime_objects(digest)
242 }
243
244 fn get_transaction_checkpoint(
245 &self,
246 digest: &TransactionDigest,
247 ) -> Option<CheckpointSequenceNumber> {
248 self.cache_traits
249 .checkpoint_cache
250 .deprecated_get_transaction_checkpoint(digest)
251 .map(|(_epoch, checkpoint)| checkpoint)
252 }
253
254 fn get_latest_checkpoint(&self) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
255 self.checkpoint_store
256 .get_highest_executed_checkpoint()
257 .expect("db error")
258 .ok_or_else(|| {
259 sui_types::storage::error::Error::missing("unable to get latest checkpoint")
260 })
261 }
262
263 fn get_checkpoint_contents_by_digest(
264 &self,
265 digest: &CheckpointContentsDigest,
266 ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
267 self.checkpoint_store
268 .get_checkpoint_contents(digest)
269 .expect("db error")
270 }
271
272 fn get_checkpoint_contents_by_sequence_number(
273 &self,
274 sequence_number: CheckpointSequenceNumber,
275 ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
276 match self.get_checkpoint_by_sequence_number(sequence_number) {
277 Some(checkpoint) => self.get_checkpoint_contents_by_digest(&checkpoint.content_digest),
278 None => None,
279 }
280 }
281}
282
283impl ObjectStore for RocksDbStore {
284 fn get_object(&self, object_id: &sui_types::base_types::ObjectID) -> Option<Object> {
285 self.cache_traits.object_store.get_object(object_id)
286 }
287
288 fn get_object_by_key(
289 &self,
290 object_id: &sui_types::base_types::ObjectID,
291 version: sui_types::base_types::VersionNumber,
292 ) -> Option<Object> {
293 self.cache_traits
294 .object_store
295 .get_object_by_key(object_id, version)
296 }
297}
298
299impl WriteStore for RocksDbStore {
300 fn insert_checkpoint(
301 &self,
302 checkpoint: &VerifiedCheckpoint,
303 ) -> Result<(), sui_types::storage::error::Error> {
304 if let Some(EndOfEpochData {
305 next_epoch_committee,
306 ..
307 }) = checkpoint.end_of_epoch_data.as_ref()
308 {
309 let next_committee = next_epoch_committee.iter().cloned().collect();
310 let committee =
311 Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
312 self.insert_committee(committee)?;
313 }
314
315 self.checkpoint_store
316 .insert_verified_checkpoint(checkpoint)
317 .map_err(Into::into)
318 }
319
320 fn update_highest_synced_checkpoint(
321 &self,
322 checkpoint: &VerifiedCheckpoint,
323 ) -> Result<(), sui_types::storage::error::Error> {
324 let mut locked = self.highest_synced_checkpoint.lock();
325 if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
326 return Ok(());
327 }
328 self.checkpoint_store
329 .update_highest_synced_checkpoint(checkpoint)
330 .map_err(sui_types::storage::error::Error::custom)?;
331 *locked = Some(checkpoint.sequence_number);
332 Ok(())
333 }
334
335 fn update_highest_verified_checkpoint(
336 &self,
337 checkpoint: &VerifiedCheckpoint,
338 ) -> Result<(), sui_types::storage::error::Error> {
339 let mut locked = self.highest_verified_checkpoint.lock();
340 if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
341 return Ok(());
342 }
343 self.checkpoint_store
344 .update_highest_verified_checkpoint(checkpoint)
345 .map_err(sui_types::storage::error::Error::custom)?;
346 *locked = Some(checkpoint.sequence_number);
347 Ok(())
348 }
349
350 fn insert_checkpoint_contents(
351 &self,
352 checkpoint: &VerifiedCheckpoint,
353 contents: VerifiedCheckpointContents,
354 ) -> Result<(), sui_types::storage::error::Error> {
355 self.cache_traits
356 .state_sync_store
357 .multi_insert_transaction_and_effects(contents.transactions());
358 self.checkpoint_store
359 .insert_verified_checkpoint_contents(checkpoint, contents)
360 .map_err(Into::into)
361 }
362
363 fn insert_committee(
364 &self,
365 new_committee: Committee,
366 ) -> Result<(), sui_types::storage::error::Error> {
367 self.committee_store
368 .insert_new_committee(&new_committee)
369 .unwrap();
370 Ok(())
371 }
372}
373
374pub struct RestReadStore {
375 state: Arc<AuthorityState>,
376 rocks: RocksDbStore,
377}
378
379impl RestReadStore {
380 pub fn new(state: Arc<AuthorityState>, rocks: RocksDbStore) -> Self {
381 Self { state, rocks }
382 }
383
384 fn index(&self) -> sui_types::storage::error::Result<&RpcIndexStore> {
385 self.state
386 .rpc_index
387 .as_deref()
388 .ok_or_else(|| sui_types::storage::error::Error::custom("rest index store is disabled"))
389 }
390}
391
392impl ObjectStore for RestReadStore {
393 fn get_object(&self, object_id: &sui_types::base_types::ObjectID) -> Option<Object> {
394 self.rocks.get_object(object_id)
395 }
396
397 fn get_object_by_key(
398 &self,
399 object_id: &sui_types::base_types::ObjectID,
400 version: sui_types::base_types::VersionNumber,
401 ) -> Option<Object> {
402 self.rocks.get_object_by_key(object_id, version)
403 }
404}
405
406impl ReadStore for RestReadStore {
407 fn get_committee(&self, epoch: EpochId) -> Option<Arc<Committee>> {
408 self.rocks.get_committee(epoch)
409 }
410
411 fn get_latest_checkpoint(&self) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
412 self.rocks.get_latest_checkpoint()
413 }
414
415 fn get_highest_verified_checkpoint(
416 &self,
417 ) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
418 self.rocks.get_highest_verified_checkpoint()
419 }
420
421 fn get_highest_synced_checkpoint(
422 &self,
423 ) -> sui_types::storage::error::Result<VerifiedCheckpoint> {
424 self.rocks.get_highest_synced_checkpoint()
425 }
426
427 fn get_lowest_available_checkpoint(
428 &self,
429 ) -> sui_types::storage::error::Result<CheckpointSequenceNumber> {
430 self.rocks.get_lowest_available_checkpoint()
431 }
432
433 fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<VerifiedCheckpoint> {
434 self.rocks.get_checkpoint_by_digest(digest)
435 }
436
437 fn get_checkpoint_by_sequence_number(
438 &self,
439 sequence_number: CheckpointSequenceNumber,
440 ) -> Option<VerifiedCheckpoint> {
441 self.rocks
442 .get_checkpoint_by_sequence_number(sequence_number)
443 }
444
445 fn get_checkpoint_contents_by_digest(
446 &self,
447 digest: &CheckpointContentsDigest,
448 ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
449 self.rocks.get_checkpoint_contents_by_digest(digest)
450 }
451
452 fn get_checkpoint_contents_by_sequence_number(
453 &self,
454 sequence_number: CheckpointSequenceNumber,
455 ) -> Option<sui_types::messages_checkpoint::CheckpointContents> {
456 self.rocks
457 .get_checkpoint_contents_by_sequence_number(sequence_number)
458 }
459
460 fn get_transaction(&self, digest: &TransactionDigest) -> Option<Arc<VerifiedTransaction>> {
461 self.rocks.get_transaction(digest)
462 }
463
464 fn get_transaction_effects(&self, digest: &TransactionDigest) -> Option<TransactionEffects> {
465 self.rocks.get_transaction_effects(digest)
466 }
467
468 fn get_events(&self, digest: &TransactionDigest) -> Option<TransactionEvents> {
469 self.rocks.get_events(digest)
470 }
471
472 fn get_full_checkpoint_contents(
473 &self,
474 sequence_number: Option<CheckpointSequenceNumber>,
475 digest: &CheckpointContentsDigest,
476 ) -> Option<VersionedFullCheckpointContents> {
477 self.rocks
478 .get_full_checkpoint_contents(sequence_number, digest)
479 }
480
481 fn get_unchanged_loaded_runtime_objects(
482 &self,
483 digest: &TransactionDigest,
484 ) -> Option<Vec<ObjectKey>> {
485 self.rocks.get_unchanged_loaded_runtime_objects(digest)
486 }
487
488 fn get_transaction_checkpoint(
489 &self,
490 digest: &TransactionDigest,
491 ) -> Option<CheckpointSequenceNumber> {
492 self.rocks.get_transaction_checkpoint(digest)
493 }
494}
495
496impl RpcStateReader for RestReadStore {
497 fn get_lowest_available_checkpoint_objects(
498 &self,
499 ) -> sui_types::storage::error::Result<CheckpointSequenceNumber> {
500 Ok(self
501 .state
502 .get_object_cache_reader()
503 .get_highest_pruned_checkpoint()
504 .map(|cp| cp + 1)
505 .unwrap_or(0))
506 }
507
508 fn get_chain_identifier(&self) -> Result<sui_types::digests::ChainIdentifier> {
509 Ok(self.state.get_chain_identifier())
510 }
511
512 fn indexes(&self) -> Option<&dyn RpcIndexes> {
513 Some(self)
514 }
515
516 fn get_struct_layout(
517 &self,
518 struct_tag: &move_core_types::language_storage::StructTag,
519 ) -> Result<Option<move_core_types::annotated_value::MoveTypeLayout>> {
520 self.state
521 .load_epoch_store_one_call_per_task()
522 .executor()
523 .type_layout_resolver(Box::new(self.state.get_backing_package_store().as_ref()))
525 .get_annotated_layout(struct_tag)
526 .map(|layout| layout.into_layout())
527 .map(Some)
528 .map_err(StorageError::custom)
529 }
530}
531
532struct BatchedEventIterator<'a, I>
533where
534 I: Iterator<Item = Result<crate::rpc_index::EventIndexKey, TypedStoreError>>,
535{
536 key_iter: I,
537 rocks: &'a RocksDbStore,
538 current_checkpoint: Option<u64>,
539 current_checkpoint_contents: Option<sui_types::messages_checkpoint::CheckpointContents>,
540 cached_tx_events: Option<TransactionEvents>,
541 cached_tx_digest: Option<TransactionDigest>,
542}
543
544impl<I> Iterator for BatchedEventIterator<'_, I>
545where
546 I: Iterator<Item = Result<crate::rpc_index::EventIndexKey, TypedStoreError>>,
547{
548 type Item = Result<(u64, u64, u32, u32, sui_types::event::Event), TypedStoreError>;
549
550 fn next(&mut self) -> Option<Self::Item> {
551 let key = match self.key_iter.next()? {
552 Ok(k) => k,
553 Err(e) => return Some(Err(e)),
554 };
555
556 if self.current_checkpoint != Some(key.checkpoint_seq) {
557 self.current_checkpoint = Some(key.checkpoint_seq);
558 self.current_checkpoint_contents = self
559 .rocks
560 .get_checkpoint_contents_by_sequence_number(key.checkpoint_seq);
561 self.cached_tx_events = None;
562 self.cached_tx_digest = None;
563 }
564
565 let checkpoint_contents = self.current_checkpoint_contents.as_ref()?;
566
567 let exec_digest = checkpoint_contents
568 .iter()
569 .nth(key.transaction_idx as usize)?;
570 let tx_digest = exec_digest.transaction;
571
572 if self.cached_tx_digest != Some(tx_digest) {
573 self.cached_tx_digest = Some(tx_digest);
574 self.cached_tx_events = self.rocks.get_events(&tx_digest);
575 }
576
577 let tx_events = self.cached_tx_events.as_ref()?;
578 let event = tx_events.data.get(key.event_index as usize)?.clone();
579
580 Some(Ok((
581 key.checkpoint_seq,
582 key.accumulator_version,
583 key.transaction_idx,
584 key.event_index,
585 event,
586 )))
587 }
588}
589
590impl RpcIndexes for RestReadStore {
591 fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<sui_types::storage::EpochInfo>> {
592 self.index()?
593 .get_epoch_info(epoch)
594 .map_err(StorageError::custom)
595 }
596
597 fn get_transaction_info(
598 &self,
599 digest: &TransactionDigest,
600 ) -> sui_types::storage::error::Result<Option<TransactionInfo>> {
601 self.index()?
602 .get_transaction_info(digest)
603 .map_err(StorageError::custom)
604 }
605
606 fn owned_objects_iter(
607 &self,
608 owner: SuiAddress,
609 object_type: Option<StructTag>,
610 cursor: Option<OwnedObjectInfo>,
611 ) -> Result<Box<dyn Iterator<Item = Result<OwnedObjectInfo, TypedStoreError>> + '_>> {
612 let cursor = cursor.map(|cursor| OwnerIndexKey {
613 owner: cursor.owner,
614 object_type: cursor.object_type,
615 inverted_balance: cursor.balance.map(std::ops::Not::not),
616 object_id: cursor.object_id,
617 });
618
619 let iter = self
620 .index()?
621 .owner_iter(owner, object_type, cursor)?
622 .map(|result| {
623 result.map(
624 |(
625 OwnerIndexKey {
626 owner,
627 object_id,
628 object_type,
629 inverted_balance,
630 },
631 OwnerIndexInfo { version },
632 )| {
633 OwnedObjectInfo {
634 owner,
635 object_type,
636 balance: inverted_balance.map(std::ops::Not::not),
637 object_id,
638 version,
639 }
640 },
641 )
642 });
643
644 Ok(Box::new(iter) as _)
645 }
646
647 fn dynamic_field_iter(
648 &self,
649 parent: ObjectID,
650 cursor: Option<ObjectID>,
651 ) -> sui_types::storage::error::Result<
652 Box<dyn Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_>,
653 > {
654 let iter = self.index()?.dynamic_field_iter(parent, cursor)?;
655 Ok(Box::new(iter) as _)
656 }
657
658 fn get_coin_info(
659 &self,
660 coin_type: &StructTag,
661 ) -> sui_types::storage::error::Result<Option<CoinInfo>> {
662 self.index()?
663 .get_coin_info(coin_type)?
664 .map(
665 |CoinIndexInfo {
666 coin_metadata_object_id,
667 treasury_object_id,
668 regulated_coin_metadata_object_id,
669 }| CoinInfo {
670 coin_metadata_object_id,
671 treasury_object_id,
672 regulated_coin_metadata_object_id,
673 },
674 )
675 .pipe(Ok)
676 }
677
678 fn get_balance(
679 &self,
680 owner: &SuiAddress,
681 coin_type: &StructTag,
682 ) -> sui_types::storage::error::Result<Option<BalanceInfo>> {
683 self.index()?
684 .get_balance(owner, coin_type)?
685 .map(|info| info.into())
686 .pipe(Ok)
687 }
688
689 fn balance_iter(
690 &self,
691 owner: &SuiAddress,
692 cursor: Option<(SuiAddress, StructTag)>,
693 ) -> sui_types::storage::error::Result<BalanceIterator<'_>> {
694 let cursor_key =
695 cursor.map(|(owner, coin_type)| crate::rpc_index::BalanceKey { owner, coin_type });
696
697 Ok(Box::new(
698 self.index()?
699 .balance_iter(*owner, cursor_key)?
700 .map(|result| {
701 result
702 .map(|(key, info)| (key.coin_type, info.into()))
703 .map_err(Into::into)
704 }),
705 ))
706 }
707
708 fn package_versions_iter(
709 &self,
710 original_id: ObjectID,
711 cursor: Option<u64>,
712 ) -> sui_types::storage::error::Result<
713 Box<dyn Iterator<Item = Result<(u64, ObjectID), TypedStoreError>> + '_>,
714 > {
715 let iter = self.index()?.package_versions_iter(original_id, cursor)?;
716 Ok(
717 Box::new(iter.map(|result| result.map(|(key, info)| (key.version, info.storage_id))))
718 as _,
719 )
720 }
721
722 fn get_highest_indexed_checkpoint_seq_number(
723 &self,
724 ) -> sui_types::storage::error::Result<Option<CheckpointSequenceNumber>> {
725 self.index()?
726 .get_highest_indexed_checkpoint_seq_number()
727 .map_err(Into::into)
728 }
729
730 fn authenticated_event_iter(
731 &self,
732 stream_id: SuiAddress,
733 start_checkpoint: u64,
734 start_accumulator_version: Option<u64>,
735 start_transaction_idx: Option<u32>,
736 start_event_idx: Option<u32>,
737 end_checkpoint: u64,
738 limit: u32,
739 ) -> sui_types::storage::error::Result<
740 Box<
741 dyn Iterator<
742 Item = Result<(u64, u64, u32, u32, sui_types::event::Event), TypedStoreError>,
743 > + '_,
744 >,
745 > {
746 let index = self.index()?;
747 let key_iter = index.event_iter(
748 stream_id,
749 start_checkpoint,
750 start_accumulator_version.unwrap_or(0),
751 start_transaction_idx.unwrap_or(0),
752 start_event_idx.unwrap_or(0),
753 end_checkpoint,
754 limit,
755 )?;
756
757 let rocks = &self.rocks;
758 let iter = BatchedEventIterator {
759 key_iter,
760 rocks,
761 current_checkpoint: None,
762 current_checkpoint_contents: None,
763 cached_tx_events: None,
764 cached_tx_digest: None,
765 };
766
767 Ok(Box::new(iter))
768 }
769}