1use crate::authority::AuthorityStore;
5use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
6use crate::checkpoints::CheckpointStore;
7use crate::par_index_live_object_set::LiveObjectIndexer;
8use crate::par_index_live_object_set::ParMakeLiveObjectIndexer;
9use itertools::Itertools;
10use move_core_types::language_storage::{StructTag, TypeTag};
11use mysten_common::ZipDebugEqIteratorExt;
12use rayon::iter::IntoParallelIterator;
13use rayon::iter::ParallelIterator;
14use serde::Deserialize;
15use serde::Serialize;
16use std::collections::{BTreeMap, HashMap};
17use std::path::Path;
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::sync::Mutex;
21use std::time::Duration;
22use std::time::Instant;
23use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
24use sui_types::base_types::MoveObjectType;
25use sui_types::base_types::ObjectID;
26use sui_types::base_types::SequenceNumber;
27use sui_types::base_types::SuiAddress;
28use sui_types::coin::Coin;
29use sui_types::committee::EpochId;
30use sui_types::digests::TransactionDigest;
31use sui_types::effects::{AccumulatorValue, TransactionEffectsAPI};
32use sui_types::full_checkpoint_content::CheckpointData;
33use sui_types::layout_resolver::LayoutResolver;
34use sui_types::messages_checkpoint::CheckpointContents;
35use sui_types::messages_checkpoint::CheckpointSequenceNumber;
36use sui_types::object::Data;
37use sui_types::object::Object;
38use sui_types::object::Owner;
39use sui_types::storage::BackingPackageStore;
40use sui_types::storage::DynamicFieldKey;
41use sui_types::storage::EpochInfo;
42use sui_types::storage::TransactionInfo;
43use sui_types::storage::error::Error as StorageError;
44use sui_types::sui_system_state::SuiSystemStateTrait;
45use sui_types::transaction::{TransactionDataAPI, TransactionKind};
46use sysinfo::{MemoryRefreshKind, RefreshKind, System};
47use tracing::{debug, info, warn};
48use typed_store::DBMapUtils;
49use typed_store::TypedStoreError;
50use typed_store::rocks::{DBMap, DBMapTableConfigMap, MetricConf};
51use typed_store::rocksdb::{MergeOperands, WriteOptions, compaction_filter::Decision};
52use typed_store::traits::Map;
53
54const CURRENT_DB_VERSION: u64 = 3;
55const BALANCE_FLUSH_THRESHOLD: usize = 10_000;
57
58fn bulk_ingestion_write_options() -> WriteOptions {
59 let mut opts = WriteOptions::default();
60 opts.disable_wal(true);
61 opts
62}
63
64fn get_available_memory() -> u64 {
66 let mut sys = System::new_with_specifics(
68 RefreshKind::nothing().with_memory(MemoryRefreshKind::everything()),
69 );
70 sys.refresh_memory();
71
72 if let Some(cgroup_limits) = sys.cgroup_limits() {
74 let memory_limit = cgroup_limits.total_memory;
75 if memory_limit > 0 {
77 debug!("Using cgroup memory limit: {} bytes", memory_limit);
78 return memory_limit;
79 }
80 }
81
82 let total_memory_bytes = sys.total_memory();
85 debug!("Using system memory: {} bytes", total_memory_bytes);
86 total_memory_bytes
87}
88
89#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
90struct MetadataInfo {
91 version: u64,
93}
94
95#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
97pub enum Watermark {
98 Indexed,
99 Pruned,
100}
101
102#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
103pub struct OwnerIndexKey {
104 pub owner: SuiAddress,
105
106 pub object_type: StructTag,
107
108 pub inverted_balance: Option<u64>,
111
112 pub object_id: ObjectID,
113}
114
115impl OwnerIndexKey {
116 fn from_object(object: &Object) -> Self {
119 let owner = match object.owner() {
120 Owner::AddressOwner(owner) => owner,
121 Owner::ConsensusAddressOwner { owner, .. } => owner,
122 _ => panic!("cannot create OwnerIndexKey if object is not address-owned"),
123 };
124 let object_type = object.struct_tag().expect("packages cannot be owned");
125
126 let inverted_balance = object.as_coin_maybe().map(|coin| !coin.balance.value());
127
128 Self {
129 owner: *owner,
130 object_type,
131 inverted_balance,
132 object_id: object.id(),
133 }
134 }
135}
136
137#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
138pub struct OwnerIndexInfo {
139 pub version: SequenceNumber,
141}
142
143impl OwnerIndexInfo {
144 pub fn new(object: &Object) -> Self {
145 Self {
146 version: object.version(),
147 }
148 }
149}
150
151#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
152pub struct CoinIndexKey {
153 coin_type: StructTag,
154}
155
156#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
157pub struct BalanceKey {
158 pub owner: SuiAddress,
159 pub coin_type: StructTag,
160}
161
162#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
163pub struct CoinIndexInfo {
164 pub coin_metadata_object_id: Option<ObjectID>,
165 pub treasury_object_id: Option<ObjectID>,
166 pub regulated_coin_metadata_object_id: Option<ObjectID>,
167}
168
169#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug, Default)]
170pub struct BalanceIndexInfo {
171 pub balance_delta: i128,
172}
173
174impl From<u64> for BalanceIndexInfo {
175 fn from(coin_value: u64) -> Self {
176 Self {
177 balance_delta: coin_value as i128,
178 }
179 }
180}
181
182impl BalanceIndexInfo {
183 fn merge_delta(&mut self, other: &Self) {
184 self.balance_delta += other.balance_delta;
185 }
186}
187
188impl From<BalanceIndexInfo> for sui_types::storage::BalanceInfo {
189 fn from(index_info: BalanceIndexInfo) -> Self {
190 let balance = index_info.balance_delta.clamp(0, u64::MAX as i128) as u64;
197 sui_types::storage::BalanceInfo { balance }
198 }
199}
200
201#[derive(Copy, Clone, PartialEq, Eq, Debug, Serialize, Deserialize, PartialOrd, Ord)]
202pub struct PackageVersionKey {
203 pub original_package_id: ObjectID,
204 pub version: u64,
205}
206
207#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
208pub struct PackageVersionInfo {
209 pub storage_id: ObjectID,
210}
211
212#[derive(Default, Clone)]
213pub struct IndexStoreOptions {
214 pub events_compaction_filter: Option<EventsCompactionFilter>,
215}
216
217fn default_table_options() -> typed_store::rocks::DBOptions {
218 typed_store::rocks::default_db_options().disable_write_throttling()
219}
220
221fn events_table_options(
222 compaction_filter: Option<EventsCompactionFilter>,
223) -> typed_store::rocks::DBOptions {
224 let mut options = default_table_options();
225 if let Some(filter) = compaction_filter {
226 options.options.set_compaction_filter(
227 "events_by_stream",
228 move |_, key, value| match filter.filter(key, value) {
229 Ok(decision) => decision,
230 Err(e) => {
231 warn!(
232 "Failed to parse event key during compaction: {}, key: {:?}",
233 e, key
234 );
235 Decision::Remove
236 }
237 },
238 );
239 }
240 options
241}
242
243fn balance_delta_merge_operator(
244 _key: &[u8],
245 existing_val: Option<&[u8]>,
246 operands: &MergeOperands,
247) -> Option<Vec<u8>> {
248 let mut result = existing_val
249 .map(|v| {
250 bcs::from_bytes::<BalanceIndexInfo>(v)
251 .expect("Failed to deserialize BalanceIndexInfo from RocksDB - data corruption.")
252 })
253 .unwrap_or_default();
254
255 for operand in operands.iter() {
256 let delta = bcs::from_bytes::<BalanceIndexInfo>(operand)
257 .expect("Failed to deserialize BalanceIndexInfo from RocksDB - data corruption.");
258 result.merge_delta(&delta);
259 }
260 Some(
261 bcs::to_bytes(&result)
262 .expect("Failed to deserialize BalanceIndexInfo from RocksDB - data corruption."),
263 )
264}
265
266fn balance_compaction_filter(_level: u32, _key: &[u8], value: &[u8]) -> Decision {
267 let balance_info = match bcs::from_bytes::<BalanceIndexInfo>(value) {
268 Ok(info) => info,
269 Err(_) => return Decision::Keep,
270 };
271
272 if balance_info.balance_delta == 0 {
273 Decision::Remove
274 } else {
275 Decision::Keep
276 }
277}
278
279fn balance_table_options() -> typed_store::rocks::DBOptions {
280 default_table_options()
281 .set_merge_operator_associative("balance_merge", balance_delta_merge_operator)
282 .set_compaction_filter("balance_zero_filter", balance_compaction_filter)
283}
284
285impl CoinIndexInfo {
286 fn merge(&mut self, other: Self) {
287 self.coin_metadata_object_id = self
288 .coin_metadata_object_id
289 .or(other.coin_metadata_object_id);
290 self.regulated_coin_metadata_object_id = self
291 .regulated_coin_metadata_object_id
292 .or(other.regulated_coin_metadata_object_id);
293 self.treasury_object_id = self.treasury_object_id.or(other.treasury_object_id);
294 }
295}
296
297#[derive(DBMapUtils)]
306struct IndexStoreTables {
307 meta: DBMap<(), MetadataInfo>,
315
316 #[default_options_override_fn = "default_table_options"]
322 watermark: DBMap<Watermark, CheckpointSequenceNumber>,
323
324 #[default_options_override_fn = "default_table_options"]
328 epochs: DBMap<EpochId, EpochInfo>,
329
330 #[default_options_override_fn = "default_table_options"]
334 #[allow(unused)]
335 #[deprecated]
336 transactions: DBMap<TransactionDigest, TransactionInfo>,
337
338 #[default_options_override_fn = "default_table_options"]
343 owner: DBMap<OwnerIndexKey, OwnerIndexInfo>,
344
345 #[default_options_override_fn = "default_table_options"]
350 dynamic_field: DBMap<DynamicFieldKey, ()>,
351
352 #[default_options_override_fn = "default_table_options"]
357 coin: DBMap<CoinIndexKey, CoinIndexInfo>,
358
359 #[default_options_override_fn = "balance_table_options"]
363 balance: DBMap<BalanceKey, BalanceIndexInfo>,
364
365 #[default_options_override_fn = "default_table_options"]
370 package_version: DBMap<PackageVersionKey, PackageVersionInfo>,
371
372 events_by_stream: DBMap<EventIndexKey, ()>,
374 }
378
379#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
380pub struct EventIndexKey {
381 pub stream_id: SuiAddress,
382 pub checkpoint_seq: u64,
383 pub accumulator_version: u64,
385 pub transaction_idx: u32,
386 pub event_index: u32,
387}
388
389#[derive(Clone)]
391pub struct EventsCompactionFilter {
392 pruning_watermark: Arc<std::sync::atomic::AtomicU64>,
393}
394
395impl EventsCompactionFilter {
396 pub fn new(pruning_watermark: Arc<std::sync::atomic::AtomicU64>) -> Self {
397 Self { pruning_watermark }
398 }
399
400 pub fn filter(&self, key: &[u8], _value: &[u8]) -> anyhow::Result<Decision> {
401 let event_key: EventIndexKey = bcs::from_bytes(key)?;
402 let watermark = self
403 .pruning_watermark
404 .load(std::sync::atomic::Ordering::Relaxed);
405
406 if event_key.checkpoint_seq <= watermark {
407 Ok(Decision::Remove)
408 } else {
409 Ok(Decision::Keep)
410 }
411 }
412}
413
414impl IndexStoreTables {
415 fn extract_version_if_package(
416 object: &Object,
417 ) -> Option<(PackageVersionKey, PackageVersionInfo)> {
418 if let Data::Package(package) = &object.data {
419 let original_id = package.original_package_id();
420 let version = package.version().value();
421 let storage_id = object.id();
422
423 let key = PackageVersionKey {
424 original_package_id: original_id,
425 version,
426 };
427 let info = PackageVersionInfo { storage_id };
428 return Some((key, info));
429 }
430 None
431 }
432
433 fn open_with_index_options<P: Into<PathBuf>>(
434 path: P,
435 index_options: IndexStoreOptions,
436 ) -> Self {
437 let mut table_options = std::collections::BTreeMap::new();
438 table_options.insert("balance".to_string(), balance_table_options());
439 table_options.insert(
440 "events_by_stream".to_string(),
441 events_table_options(index_options.events_compaction_filter),
442 );
443
444 IndexStoreTables::open_tables_read_write_with_deprecation_option(
445 path.into(),
446 MetricConf::new("rpc-index"),
447 None,
448 Some(DBMapTableConfigMap::new(table_options)),
449 true, )
451 }
452
453 fn open_with_options<P: Into<PathBuf>>(
454 path: P,
455 options: typed_store::rocksdb::Options,
456 table_options: Option<DBMapTableConfigMap>,
457 ) -> Self {
458 IndexStoreTables::open_tables_read_write_with_deprecation_option(
459 path.into(),
460 MetricConf::new("rpc-index"),
461 Some(options),
462 table_options,
463 true, )
465 }
466
467 fn needs_to_do_initialization(&self, checkpoint_store: &CheckpointStore) -> bool {
468 (match self.meta.get(&()) {
469 Ok(Some(metadata)) => metadata.version != CURRENT_DB_VERSION,
470 Ok(None) => true,
471 Err(_) => true,
472 }) || self.is_indexed_watermark_out_of_date(checkpoint_store)
473 }
474
475 fn is_indexed_watermark_out_of_date(&self, checkpoint_store: &CheckpointStore) -> bool {
477 let highest_executed_checkpint = checkpoint_store
478 .get_highest_executed_checkpoint_seq_number()
479 .ok()
480 .flatten();
481 let watermark = self.watermark.get(&Watermark::Indexed).ok().flatten();
482 watermark < highest_executed_checkpint
483 }
484
485 #[tracing::instrument(skip_all)]
486 fn init(
487 &mut self,
488 authority_store: &AuthorityStore,
489 checkpoint_store: &CheckpointStore,
490 _epoch_store: &AuthorityPerEpochStore,
491 _package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
492 batch_size_limit: usize,
493 rpc_config: &sui_config::RpcConfig,
494 ) -> Result<(), StorageError> {
495 info!("Initializing RPC indexes");
496
497 let highest_executed_checkpint =
498 checkpoint_store.get_highest_executed_checkpoint_seq_number()?;
499 let lowest_available_checkpoint = checkpoint_store
500 .get_highest_pruned_checkpoint_seq_number()?
501 .map(|c| c.saturating_add(1))
502 .unwrap_or(0);
503 let lowest_available_checkpoint_objects = authority_store
504 .perpetual_tables
505 .get_highest_pruned_checkpoint()?
506 .map(|c| c.saturating_add(1))
507 .unwrap_or(0);
508 let lowest_available_checkpoint =
511 lowest_available_checkpoint.max(lowest_available_checkpoint_objects);
512
513 let checkpoint_range = highest_executed_checkpint.map(|highest_executed_checkpint| {
514 lowest_available_checkpoint..=highest_executed_checkpint
515 });
516
517 if let Some(checkpoint_range) = checkpoint_range {
518 self.index_existing_checkpoints(
519 authority_store,
520 checkpoint_store,
521 checkpoint_range,
522 rpc_config,
523 )?;
524 }
525
526 self.initialize_current_epoch(authority_store, checkpoint_store)?;
527
528 if highest_executed_checkpint.is_some() {
532 let coin_index = Mutex::new(HashMap::new());
533
534 let make_live_object_indexer = RpcParLiveObjectSetIndexer {
535 tables: self,
536 coin_index: &coin_index,
537 batch_size_limit,
538 };
539
540 crate::par_index_live_object_set::par_index_live_object_set(
541 authority_store,
542 &make_live_object_indexer,
543 )?;
544
545 self.coin.multi_insert(coin_index.into_inner().unwrap())?;
546 }
547
548 self.watermark.insert(
549 &Watermark::Indexed,
550 &highest_executed_checkpint.unwrap_or(0),
551 )?;
552
553 self.meta.insert(
554 &(),
555 &MetadataInfo {
556 version: CURRENT_DB_VERSION,
557 },
558 )?;
559
560 info!("Finished initializing RPC indexes");
561
562 Ok(())
563 }
564
565 #[tracing::instrument(skip(self, authority_store, checkpoint_store, rpc_config))]
566 fn index_existing_checkpoints(
567 &mut self,
568 authority_store: &AuthorityStore,
569 checkpoint_store: &CheckpointStore,
570 checkpoint_range: std::ops::RangeInclusive<u64>,
571 rpc_config: &sui_config::RpcConfig,
572 ) -> Result<(), StorageError> {
573 info!(
574 "Indexing {} checkpoints in range {checkpoint_range:?}",
575 checkpoint_range.size_hint().0
576 );
577 let start_time = Instant::now();
578
579 checkpoint_range.into_par_iter().try_for_each(|seq| {
580 let load_events = rpc_config.authenticated_events_indexing();
581 let Some(checkpoint_data) = sparse_checkpoint_data_for_epoch_backfill(
582 authority_store,
583 checkpoint_store,
584 seq,
585 load_events,
586 )?
587 else {
588 return Ok(());
589 };
590
591 let mut batch = self.epochs.batch();
592
593 self.index_epoch(&checkpoint_data, &mut batch)?;
594
595 batch
596 .write_opt(bulk_ingestion_write_options())
597 .map_err(StorageError::from)
598 })?;
599
600 info!(
601 "Indexing checkpoints took {} seconds",
602 start_time.elapsed().as_secs()
603 );
604 Ok(())
605 }
606
607 fn prune(
609 &self,
610 pruned_checkpoint_watermark: u64,
611 _checkpoint_contents_to_prune: &[CheckpointContents],
612 ) -> Result<(), TypedStoreError> {
613 let mut batch = self.watermark.batch();
614
615 batch.insert_batch(
616 &self.watermark,
617 [(Watermark::Pruned, pruned_checkpoint_watermark)],
618 )?;
619
620 batch.write()
621 }
622
623 fn index_checkpoint(
625 &self,
626 checkpoint: &CheckpointData,
627 _resolver: &mut dyn LayoutResolver,
628 rpc_config: &sui_config::RpcConfig,
629 ) -> Result<typed_store::rocks::DBBatch, StorageError> {
630 debug!(
631 checkpoint = checkpoint.checkpoint_summary.sequence_number,
632 "indexing checkpoint"
633 );
634
635 let mut batch = self.owner.batch();
636
637 self.index_epoch(checkpoint, &mut batch)?;
638 self.index_transactions(
639 checkpoint,
640 &mut batch,
641 rpc_config.authenticated_events_indexing(),
642 )?;
643 self.index_objects(checkpoint, &mut batch)?;
644
645 batch.insert_batch(
646 &self.watermark,
647 [(
648 Watermark::Indexed,
649 checkpoint.checkpoint_summary.sequence_number,
650 )],
651 )?;
652
653 debug!(
654 checkpoint = checkpoint.checkpoint_summary.sequence_number,
655 "finished indexing checkpoint"
656 );
657
658 Ok(batch)
659 }
660
661 fn extract_accumulator_version(
662 &self,
663 tx: &sui_types::full_checkpoint_content::CheckpointTransaction,
664 ) -> Option<u64> {
665 let TransactionKind::ProgrammableSystemTransaction(pt) =
666 tx.transaction.transaction_data().kind()
667 else {
668 return None;
669 };
670
671 if pt.shared_input_objects().any(|obj| {
672 obj.id == SUI_ACCUMULATOR_ROOT_OBJECT_ID
673 && obj.mutability == sui_types::transaction::SharedObjectMutability::Mutable
674 }) {
675 return tx.output_objects.iter().find_map(|obj| {
676 if obj.id() == SUI_ACCUMULATOR_ROOT_OBJECT_ID {
677 Some(obj.version().value())
678 } else {
679 None
680 }
681 });
682 }
683
684 None
685 }
686
687 fn index_transaction_events(
688 &self,
689 tx: &sui_types::full_checkpoint_content::CheckpointTransaction,
690 checkpoint_seq: u64,
691 tx_idx: u32,
692 accumulator_version: Option<u64>,
693 batch: &mut typed_store::rocks::DBBatch,
694 ) -> Result<(), StorageError> {
695 let acc_events = tx.effects.accumulator_events();
696 if acc_events.is_empty() {
697 return Ok(());
698 }
699
700 let mut entries: Vec<(EventIndexKey, ())> = Vec::new();
701 for acc in acc_events {
702 if let AccumulatorValue::EventDigest(event_digests) = &acc.write.value {
703 let Some(accumulator_version) = accumulator_version else {
704 mysten_common::debug_fatal!(
705 "Found events at checkpoint {} tx {} before any accumulator settlement",
706 checkpoint_seq,
707 tx_idx
708 );
709 continue;
710 };
711
712 if let Some(stream_id) =
713 sui_types::accumulator_root::stream_id_from_accumulator_event(&acc)
714 {
715 for (idx, _d) in event_digests {
716 let key = EventIndexKey {
717 stream_id,
718 checkpoint_seq,
719 accumulator_version,
720 transaction_idx: tx_idx,
721 event_index: *idx as u32,
722 };
723 entries.push((key, ()));
724 }
725 }
726 }
727 }
728
729 if !entries.is_empty() {
730 batch.insert_batch(&self.events_by_stream, entries)?;
731 }
732 Ok(())
733 }
734
735 fn index_epoch(
736 &self,
737 checkpoint: &CheckpointData,
738 batch: &mut typed_store::rocks::DBBatch,
739 ) -> Result<(), StorageError> {
740 let Some(epoch_info) = checkpoint.epoch_info()? else {
741 return Ok(());
742 };
743 if epoch_info.epoch > 0 {
744 let prev_epoch = epoch_info.epoch - 1;
745 let mut current_epoch = self.epochs.get(&prev_epoch)?.unwrap_or_default();
746 current_epoch.epoch = prev_epoch; current_epoch.end_timestamp_ms = epoch_info.start_timestamp_ms;
748 current_epoch.end_checkpoint = epoch_info.start_checkpoint.map(|sq| sq - 1);
749 batch.insert_batch(&self.epochs, [(prev_epoch, current_epoch)])?;
750 }
751 batch.insert_batch(&self.epochs, [(epoch_info.epoch, epoch_info)])?;
752 Ok(())
753 }
754
755 fn initialize_current_epoch(
758 &mut self,
759 authority_store: &AuthorityStore,
760 checkpoint_store: &CheckpointStore,
761 ) -> Result<(), StorageError> {
762 let Some(checkpoint) = checkpoint_store.get_highest_executed_checkpoint()? else {
763 return Ok(());
764 };
765
766 let system_state = sui_types::sui_system_state::get_sui_system_state(authority_store)
767 .map_err(|e| StorageError::custom(format!("Failed to find system state: {e}")))?;
768
769 let mut epoch = self.epochs.get(&checkpoint.epoch)?.unwrap_or_default();
770 epoch.epoch = checkpoint.epoch;
771
772 if epoch.protocol_version.is_none() {
773 epoch.protocol_version = Some(system_state.protocol_version());
774 }
775
776 if epoch.start_timestamp_ms.is_none() {
777 epoch.start_timestamp_ms = Some(system_state.epoch_start_timestamp_ms());
778 }
779
780 if epoch.reference_gas_price.is_none() {
781 epoch.reference_gas_price = Some(system_state.reference_gas_price());
782 }
783
784 if epoch.system_state.is_none() {
785 epoch.system_state = Some(system_state);
786 }
787
788 self.epochs.insert(&epoch.epoch, &epoch)?;
789
790 Ok(())
791 }
792
793 fn index_transactions(
794 &self,
795 checkpoint: &CheckpointData,
796 batch: &mut typed_store::rocks::DBBatch,
797 index_events: bool,
798 ) -> Result<(), StorageError> {
799 let cp = checkpoint.checkpoint_summary.sequence_number;
800 let mut current_accumulator_version: Option<u64> = None;
801
802 for (tx_idx, tx) in checkpoint.transactions.iter().enumerate().rev() {
804 let balance_changes = sui_types::balance_change::derive_balance_changes(
805 &tx.effects,
806 &tx.input_objects,
807 &tx.output_objects,
808 )
809 .into_iter()
810 .filter_map(|change| {
811 if let TypeTag::Struct(coin_type) = change.coin_type {
812 Some((
813 BalanceKey {
814 owner: change.address,
815 coin_type: *coin_type,
816 },
817 BalanceIndexInfo {
818 balance_delta: change.amount,
819 },
820 ))
821 } else {
822 None
823 }
824 });
825 batch.partial_merge_batch(&self.balance, balance_changes)?;
826
827 if index_events {
828 if let Some(version) = self.extract_accumulator_version(tx) {
829 current_accumulator_version = Some(version);
830 }
831
832 self.index_transaction_events(
833 tx,
834 cp,
835 tx_idx as u32,
836 current_accumulator_version,
837 batch,
838 )?;
839 }
840 }
841
842 Ok(())
843 }
844
845 fn index_objects(
846 &self,
847 checkpoint: &CheckpointData,
848 batch: &mut typed_store::rocks::DBBatch,
849 ) -> Result<(), StorageError> {
850 let mut coin_index: HashMap<CoinIndexKey, CoinIndexInfo> = HashMap::new();
851 let mut package_version_index: Vec<(PackageVersionKey, PackageVersionInfo)> = vec![];
852
853 for tx in &checkpoint.transactions {
854 for removed_object in tx.removed_objects_pre_version() {
856 match removed_object.owner() {
857 Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
858 let owner_key = OwnerIndexKey::from_object(removed_object);
859 batch.delete_batch(&self.owner, [owner_key])?;
860 }
861 Owner::ObjectOwner(object_id) => {
862 batch.delete_batch(
863 &self.dynamic_field,
864 [DynamicFieldKey::new(*object_id, removed_object.id())],
865 )?;
866 }
867 Owner::Shared { .. } | Owner::Immutable => {}
868 }
869 }
870
871 for (object, old_object) in tx.changed_objects() {
873 if let Some(old_object) = old_object {
874 match old_object.owner() {
875 Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
876 let owner_key = OwnerIndexKey::from_object(old_object);
877 batch.delete_batch(&self.owner, [owner_key])?;
878 }
879
880 Owner::ObjectOwner(object_id) => {
881 if old_object.owner() != object.owner() {
882 batch.delete_batch(
883 &self.dynamic_field,
884 [DynamicFieldKey::new(*object_id, old_object.id())],
885 )?;
886 }
887 }
888
889 Owner::Shared { .. } | Owner::Immutable => {}
890 }
891 }
892
893 match object.owner() {
894 Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
895 let owner_key = OwnerIndexKey::from_object(object);
896 let owner_info = OwnerIndexInfo::new(object);
897 batch.insert_batch(&self.owner, [(owner_key, owner_info)])?;
898 }
899 Owner::ObjectOwner(parent) => {
900 if should_index_dynamic_field(object) {
901 let field_key = DynamicFieldKey::new(*parent, object.id());
902 batch.insert_batch(&self.dynamic_field, [(field_key, ())])?;
903 }
904 }
905 Owner::Shared { .. } | Owner::Immutable => {}
906 }
907 if let Some((key, info)) = Self::extract_version_if_package(object) {
908 package_version_index.push((key, info));
909 }
910 }
911
912 for (key, value) in tx.created_objects().flat_map(try_create_coin_index_info) {
918 use std::collections::hash_map::Entry;
919
920 match coin_index.entry(key) {
921 Entry::Occupied(mut o) => {
922 o.get_mut().merge(value);
923 }
924 Entry::Vacant(v) => {
925 v.insert(value);
926 }
927 }
928 }
929 }
930
931 batch.insert_batch(&self.coin, coin_index)?;
932 batch.insert_batch(&self.package_version, package_version_index)?;
933
934 Ok(())
935 }
936
937 fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<EpochInfo>, TypedStoreError> {
938 self.epochs.get(&epoch)
939 }
940
941 fn event_iter(
942 &self,
943 stream_id: SuiAddress,
944 start_checkpoint: u64,
945 start_accumulator_version: u64,
946 start_transaction_idx: u32,
947 start_event_idx: u32,
948 end_checkpoint: u64,
949 limit: u32,
950 ) -> Result<impl Iterator<Item = Result<EventIndexKey, TypedStoreError>> + '_, TypedStoreError>
951 {
952 let lower = EventIndexKey {
953 stream_id,
954 checkpoint_seq: start_checkpoint,
955 accumulator_version: start_accumulator_version,
956 transaction_idx: start_transaction_idx,
957 event_index: start_event_idx,
958 };
959 let upper = EventIndexKey {
960 stream_id,
961 checkpoint_seq: end_checkpoint,
962 accumulator_version: u64::MAX,
963 transaction_idx: u32::MAX,
964 event_index: u32::MAX,
965 };
966
967 Ok(self
968 .events_by_stream
969 .safe_iter_with_bounds(Some(lower), Some(upper))
970 .map(|res| res.map(|(k, _)| k))
971 .take(limit as usize))
972 }
973
974 fn owner_iter(
975 &self,
976 owner: SuiAddress,
977 object_type: Option<StructTag>,
978 cursor: Option<OwnerIndexKey>,
979 ) -> Result<
980 impl Iterator<Item = Result<(OwnerIndexKey, OwnerIndexInfo), TypedStoreError>> + '_,
981 TypedStoreError,
982 > {
983 let lower_bound = cursor.unwrap_or_else(|| OwnerIndexKey {
985 owner,
986 object_type: object_type
987 .clone()
988 .unwrap_or_else(|| "0x0::a::a".parse::<StructTag>().unwrap()),
989 inverted_balance: None,
990 object_id: ObjectID::ZERO,
991 });
992
993 Ok(self
994 .owner
995 .safe_iter_with_bounds(Some(lower_bound), None)
996 .take_while(move |item| {
997 let Ok((key, _)) = item else {
999 return true;
1000 };
1001
1002 key.owner == owner
1004 && object_type
1006 .as_ref()
1007 .map(|ty| {
1008 ty.address == key.object_type.address
1009 && ty.module == key.object_type.module
1010 && ty.name == key.object_type.name
1011 && (ty.type_params.is_empty() ||
1013 ty.type_params == key.object_type.type_params)
1015 }).unwrap_or(true)
1016 }))
1017 }
1018
1019 fn dynamic_field_iter(
1020 &self,
1021 parent: ObjectID,
1022 cursor: Option<ObjectID>,
1023 ) -> Result<impl Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_, TypedStoreError>
1024 {
1025 let lower_bound = DynamicFieldKey::new(parent, cursor.unwrap_or(ObjectID::ZERO));
1026 let upper_bound = DynamicFieldKey::new(parent, ObjectID::MAX);
1027 let iter = self
1028 .dynamic_field
1029 .safe_iter_with_bounds(Some(lower_bound), Some(upper_bound))
1030 .map_ok(|(key, ())| key);
1031 Ok(iter)
1032 }
1033
1034 fn get_coin_info(
1035 &self,
1036 coin_type: &StructTag,
1037 ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
1038 let key = CoinIndexKey {
1039 coin_type: coin_type.to_owned(),
1040 };
1041 self.coin.get(&key)
1042 }
1043
1044 fn get_balance(
1045 &self,
1046 owner: &SuiAddress,
1047 coin_type: &StructTag,
1048 ) -> Result<Option<BalanceIndexInfo>, TypedStoreError> {
1049 let key = BalanceKey {
1050 owner: owner.to_owned(),
1051 coin_type: coin_type.to_owned(),
1052 };
1053 self.balance.get(&key)
1054 }
1055
1056 fn balance_iter(
1057 &self,
1058 owner: SuiAddress,
1059 cursor: Option<BalanceKey>,
1060 ) -> Result<
1061 impl Iterator<Item = Result<(BalanceKey, BalanceIndexInfo), TypedStoreError>> + '_,
1062 TypedStoreError,
1063 > {
1064 let lower_bound = cursor.unwrap_or_else(|| BalanceKey {
1065 owner,
1066 coin_type: "0x0::a::a".parse::<StructTag>().unwrap(),
1067 });
1068
1069 Ok(self
1070 .balance
1071 .safe_iter_with_bounds(Some(lower_bound), None)
1072 .scan((), move |_, item| {
1073 match item {
1074 Ok((key, value)) if key.owner == owner => Some(Ok((key, value))),
1075 Ok(_) => None, Err(e) => Some(Err(e)), }
1078 }))
1079 }
1080
1081 fn package_versions_iter(
1082 &self,
1083 original_id: ObjectID,
1084 cursor: Option<u64>,
1085 ) -> Result<
1086 impl Iterator<Item = Result<(PackageVersionKey, PackageVersionInfo), TypedStoreError>> + '_,
1087 TypedStoreError,
1088 > {
1089 let lower_bound = PackageVersionKey {
1090 original_package_id: original_id,
1091 version: cursor.unwrap_or(0),
1092 };
1093 let upper_bound = PackageVersionKey {
1094 original_package_id: original_id,
1095 version: u64::MAX,
1096 };
1097
1098 Ok(self
1099 .package_version
1100 .safe_iter_with_bounds(Some(lower_bound), Some(upper_bound)))
1101 }
1102}
1103
1104pub struct RpcIndexStore {
1105 tables: IndexStoreTables,
1106 pending_updates: Mutex<BTreeMap<u64, typed_store::rocks::DBBatch>>,
1107 rpc_config: sui_config::RpcConfig,
1108}
1109
1110impl RpcIndexStore {
1111 fn db_path(dir: &Path) -> PathBuf {
1113 dir.join("rpc-index")
1114 }
1115
1116 pub async fn new(
1117 dir: &Path,
1118 authority_store: &AuthorityStore,
1119 checkpoint_store: &CheckpointStore,
1120 epoch_store: &AuthorityPerEpochStore,
1121 package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
1122 pruning_watermark: Arc<std::sync::atomic::AtomicU64>,
1123 rpc_config: sui_config::RpcConfig,
1124 ) -> Self {
1125 let events_filter = EventsCompactionFilter::new(pruning_watermark);
1126 let index_options = IndexStoreOptions {
1127 events_compaction_filter: Some(events_filter),
1128 };
1129
1130 Self::new_with_options(
1131 dir,
1132 authority_store,
1133 checkpoint_store,
1134 epoch_store,
1135 package_store,
1136 index_options,
1137 rpc_config,
1138 )
1139 .await
1140 }
1141
1142 pub async fn new_with_options(
1143 dir: &Path,
1144 authority_store: &AuthorityStore,
1145 checkpoint_store: &CheckpointStore,
1146 epoch_store: &AuthorityPerEpochStore,
1147 package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
1148 index_options: IndexStoreOptions,
1149 rpc_config: sui_config::RpcConfig,
1150 ) -> Self {
1151 let path = Self::db_path(dir);
1152 let index_config = rpc_config.index_initialization_config();
1153
1154 let tables = {
1155 let tables = IndexStoreTables::open_with_index_options(&path, index_options.clone());
1156
1157 if tables.needs_to_do_initialization(checkpoint_store) {
1160 let batch_size_limit;
1161
1162 let mut tables = {
1163 drop(tables);
1164 typed_store::rocks::safe_drop_db(path.clone(), Duration::from_secs(30))
1165 .await
1166 .expect("unable to destroy old rpc-index db");
1167
1168 let mut options = typed_store::rocksdb::Options::default();
1171 options.set_unordered_write(true);
1172
1173 let max_background_jobs = if let Some(jobs) =
1175 index_config.as_ref().and_then(|c| c.max_background_jobs)
1176 {
1177 debug!("Using config override for max_background_jobs: {}", jobs);
1178 jobs
1179 } else {
1180 let jobs = num_cpus::get() as i32;
1181 debug!(
1182 "Calculated max_background_jobs: {} (based on CPU count)",
1183 jobs
1184 );
1185 jobs
1186 };
1187 options.set_max_background_jobs(max_background_jobs);
1188
1189 options.set_level_zero_file_num_compaction_trigger(0);
1193 options.set_level_zero_slowdown_writes_trigger(-1);
1194 options.set_level_zero_stop_writes_trigger(i32::MAX);
1195
1196 let total_memory_bytes = get_available_memory();
1197 let db_buffer_size = if let Some(size) =
1200 index_config.as_ref().and_then(|c| c.db_write_buffer_size)
1201 {
1202 debug!(
1203 "Using config override for db_write_buffer_size: {} bytes",
1204 size
1205 );
1206 size
1207 } else {
1208 let size = (total_memory_bytes as f64 * 0.8) as usize;
1210 debug!(
1211 "Calculated db_write_buffer_size: {} bytes (80% of {} total bytes)",
1212 size, total_memory_bytes
1213 );
1214 size
1215 };
1216 options.set_db_write_buffer_size(db_buffer_size);
1217
1218 let mut table_config_map = BTreeMap::new();
1220
1221 let mut cf_options = typed_store::rocks::default_db_options();
1225 cf_options.options.set_disable_auto_compactions(true);
1226
1227 let (buffer_size, buffer_count) = match (
1228 index_config.as_ref().and_then(|c| c.cf_write_buffer_size),
1229 index_config
1230 .as_ref()
1231 .and_then(|c| c.cf_max_write_buffer_number),
1232 ) {
1233 (Some(size), Some(count)) => {
1234 debug!(
1235 "Using config overrides - buffer_size: {} bytes, buffer_count: {}",
1236 size, count
1237 );
1238 (size, count)
1239 }
1240 (None, None) => {
1241 let cf_memory_budget = (total_memory_bytes as f64 * 0.25) as usize;
1243 debug!(
1244 "Column family memory budget: {} bytes (25% of {} total bytes)",
1245 cf_memory_budget, total_memory_bytes
1246 );
1247 const MIN_BUFFER_SIZE: usize = 64 * 1024 * 1024; let target_buffer_count = num_cpus::get().max(2);
1252
1253 let buffer_size =
1259 (cf_memory_budget / target_buffer_count).max(MIN_BUFFER_SIZE);
1260 let buffer_count = (cf_memory_budget / buffer_size)
1261 .clamp(2, target_buffer_count)
1262 as i32;
1263 debug!(
1264 "Calculated buffer_size: {} bytes, buffer_count: {} (based on {} CPUs)",
1265 buffer_size, buffer_count, target_buffer_count
1266 );
1267 (buffer_size, buffer_count)
1268 }
1269 _ => {
1270 panic!(
1271 "indexing-cf-write-buffer-size and indexing-cf-max-write-buffer-number must both be specified or both be omitted"
1272 );
1273 }
1274 };
1275
1276 cf_options.options.set_write_buffer_size(buffer_size);
1277 cf_options.options.set_max_write_buffer_number(buffer_count);
1278
1279 batch_size_limit = if let Some(limit) =
1281 index_config.as_ref().and_then(|c| c.batch_size_limit)
1282 {
1283 debug!(
1284 "Using config override for batch_size_limit: {} bytes",
1285 limit
1286 );
1287 limit
1288 } else {
1289 let half_buffer = buffer_size / 2;
1290 let default_limit = 1 << 27; let limit = half_buffer.min(default_limit);
1292 debug!(
1293 "Calculated batch_size_limit: {} bytes (min of half_buffer={} and default_limit={})",
1294 limit, half_buffer, default_limit
1295 );
1296 limit
1297 };
1298
1299 for (table_name, _) in IndexStoreTables::describe_tables() {
1301 table_config_map.insert(table_name, cf_options.clone());
1302 }
1303
1304 let mut balance_options = cf_options.clone();
1306 balance_options = balance_options.set_merge_operator_associative(
1307 "balance_merge",
1308 balance_delta_merge_operator,
1309 );
1310 table_config_map.insert("balance".to_string(), balance_options);
1311
1312 table_config_map.insert(
1313 "events_by_stream".to_string(),
1314 events_table_options(index_options.events_compaction_filter.clone()),
1315 );
1316
1317 IndexStoreTables::open_with_options(
1318 &path,
1319 options,
1320 Some(DBMapTableConfigMap::new(table_config_map)),
1321 )
1322 };
1323
1324 tables
1325 .init(
1326 authority_store,
1327 checkpoint_store,
1328 epoch_store,
1329 package_store,
1330 batch_size_limit,
1331 &rpc_config,
1332 )
1333 .expect("unable to initialize rpc index from live object set");
1334
1335 tables
1340 .meta
1341 .flush()
1342 .expect("Failed to flush RPC index tables to disk");
1343
1344 let weak_db = Arc::downgrade(&tables.meta.db);
1345 drop(tables);
1346
1347 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30);
1348 loop {
1349 if weak_db.strong_count() == 0 {
1350 break;
1351 }
1352 if std::time::Instant::now() > deadline {
1353 panic!("unable to reopen DB after indexing");
1354 }
1355 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1356 }
1357
1358 let reopened_tables =
1360 IndexStoreTables::open_with_index_options(&path, index_options);
1361
1362 let stored_version = reopened_tables
1364 .meta
1365 .get(&())
1366 .expect("Failed to read metadata from reopened database")
1367 .expect("Metadata not found in reopened database");
1368 assert_eq!(
1369 stored_version.version, CURRENT_DB_VERSION,
1370 "Database version mismatch after flush and reopen: expected {}, found {}",
1371 CURRENT_DB_VERSION, stored_version.version
1372 );
1373
1374 reopened_tables
1375 } else {
1376 tables
1377 }
1378 };
1379
1380 Self {
1381 tables,
1382 pending_updates: Default::default(),
1383 rpc_config,
1384 }
1385 }
1386
1387 pub fn new_without_init(dir: &Path) -> Self {
1388 let path = Self::db_path(dir);
1389 let tables = IndexStoreTables::open_with_index_options(path, IndexStoreOptions::default());
1390
1391 Self {
1392 tables,
1393 pending_updates: Default::default(),
1394 rpc_config: sui_config::RpcConfig::default(),
1395 }
1396 }
1397
1398 pub fn prune(
1399 &self,
1400 pruned_checkpoint_watermark: u64,
1401 checkpoint_contents_to_prune: &[CheckpointContents],
1402 ) -> Result<(), TypedStoreError> {
1403 self.tables
1404 .prune(pruned_checkpoint_watermark, checkpoint_contents_to_prune)
1405 }
1406
1407 #[tracing::instrument(
1412 skip_all,
1413 fields(checkpoint = checkpoint.checkpoint_summary.sequence_number)
1414 )]
1415 pub fn index_checkpoint(&self, checkpoint: &CheckpointData, resolver: &mut dyn LayoutResolver) {
1416 let sequence_number = checkpoint.checkpoint_summary.sequence_number;
1417 let batch = self
1418 .tables
1419 .index_checkpoint(checkpoint, resolver, &self.rpc_config)
1420 .expect("db error");
1421
1422 self.pending_updates
1423 .lock()
1424 .unwrap()
1425 .insert(sequence_number, batch);
1426 }
1427
1428 #[tracing::instrument(skip(self))]
1436 pub fn commit_update_for_checkpoint(&self, checkpoint: u64) -> Result<(), StorageError> {
1437 let next_batch = self.pending_updates.lock().unwrap().pop_first();
1438
1439 let (next_sequence_number, batch) = next_batch.unwrap();
1441 assert_eq!(
1442 checkpoint, next_sequence_number,
1443 "commit_update_for_checkpoint must be called in order"
1444 );
1445
1446 Ok(batch.write()?)
1447 }
1448
1449 pub fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<EpochInfo>, TypedStoreError> {
1450 self.tables.get_epoch_info(epoch)
1451 }
1452
1453 pub fn owner_iter(
1454 &self,
1455 owner: SuiAddress,
1456 object_type: Option<StructTag>,
1457 cursor: Option<OwnerIndexKey>,
1458 ) -> Result<
1459 impl Iterator<Item = Result<(OwnerIndexKey, OwnerIndexInfo), TypedStoreError>> + '_,
1460 TypedStoreError,
1461 > {
1462 self.tables.owner_iter(owner, object_type, cursor)
1463 }
1464
1465 pub fn dynamic_field_iter(
1466 &self,
1467 parent: ObjectID,
1468 cursor: Option<ObjectID>,
1469 ) -> Result<impl Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_, TypedStoreError>
1470 {
1471 self.tables.dynamic_field_iter(parent, cursor)
1472 }
1473
1474 pub fn get_coin_info(
1475 &self,
1476 coin_type: &StructTag,
1477 ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
1478 self.tables.get_coin_info(coin_type)
1479 }
1480
1481 pub fn get_balance(
1482 &self,
1483 owner: &SuiAddress,
1484 coin_type: &StructTag,
1485 ) -> Result<Option<BalanceIndexInfo>, TypedStoreError> {
1486 self.tables.get_balance(owner, coin_type)
1487 }
1488
1489 pub fn balance_iter(
1490 &self,
1491 owner: SuiAddress,
1492 cursor: Option<BalanceKey>,
1493 ) -> Result<
1494 impl Iterator<Item = Result<(BalanceKey, BalanceIndexInfo), TypedStoreError>> + '_,
1495 TypedStoreError,
1496 > {
1497 self.tables.balance_iter(owner, cursor)
1498 }
1499
1500 pub fn package_versions_iter(
1501 &self,
1502 original_id: ObjectID,
1503 cursor: Option<u64>,
1504 ) -> Result<
1505 impl Iterator<Item = Result<(PackageVersionKey, PackageVersionInfo), TypedStoreError>> + '_,
1506 TypedStoreError,
1507 > {
1508 self.tables.package_versions_iter(original_id, cursor)
1509 }
1510
1511 pub fn event_iter(
1512 &self,
1513 stream_id: SuiAddress,
1514 start_checkpoint: u64,
1515 start_accumulator_version: u64,
1516 start_transaction_idx: u32,
1517 start_event_idx: u32,
1518 end_checkpoint: u64,
1519 limit: u32,
1520 ) -> Result<impl Iterator<Item = Result<EventIndexKey, TypedStoreError>> + '_, TypedStoreError>
1521 {
1522 self.tables.event_iter(
1523 stream_id,
1524 start_checkpoint,
1525 start_accumulator_version,
1526 start_transaction_idx,
1527 start_event_idx,
1528 end_checkpoint,
1529 limit,
1530 )
1531 }
1532
1533 pub fn get_highest_indexed_checkpoint_seq_number(
1534 &self,
1535 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
1536 self.tables.watermark.get(&Watermark::Indexed)
1537 }
1538}
1539
1540fn should_index_dynamic_field(object: &Object) -> bool {
1541 object
1549 .data
1550 .try_as_move()
1551 .is_some_and(|move_object| move_object.type_().is_dynamic_field())
1552}
1553
1554fn try_create_coin_index_info(object: &Object) -> Option<(CoinIndexKey, CoinIndexInfo)> {
1555 use sui_types::coin::CoinMetadata;
1556 use sui_types::coin::RegulatedCoinMetadata;
1557 use sui_types::coin::TreasuryCap;
1558
1559 let object_type = object.type_().and_then(MoveObjectType::other)?;
1560
1561 if let Some(coin_type) = CoinMetadata::is_coin_metadata_with_coin_type(object_type).cloned() {
1562 return Some((
1563 CoinIndexKey { coin_type },
1564 CoinIndexInfo {
1565 coin_metadata_object_id: Some(object.id()),
1566 treasury_object_id: None,
1567 regulated_coin_metadata_object_id: None,
1568 },
1569 ));
1570 }
1571
1572 if let Some(coin_type) = TreasuryCap::is_treasury_with_coin_type(object_type).cloned() {
1573 return Some((
1574 CoinIndexKey { coin_type },
1575 CoinIndexInfo {
1576 coin_metadata_object_id: None,
1577 treasury_object_id: Some(object.id()),
1578 regulated_coin_metadata_object_id: None,
1579 },
1580 ));
1581 }
1582
1583 if let Some(coin_type) =
1584 RegulatedCoinMetadata::is_regulated_coin_metadata_with_coin_type(object_type).cloned()
1585 {
1586 return Some((
1587 CoinIndexKey { coin_type },
1588 CoinIndexInfo {
1589 coin_metadata_object_id: None,
1590 treasury_object_id: None,
1591 regulated_coin_metadata_object_id: Some(object.id()),
1592 },
1593 ));
1594 }
1595
1596 None
1597}
1598
1599struct RpcParLiveObjectSetIndexer<'a> {
1600 tables: &'a IndexStoreTables,
1601 coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
1602 batch_size_limit: usize,
1603}
1604
1605struct RpcLiveObjectIndexer<'a> {
1606 tables: &'a IndexStoreTables,
1607 batch: typed_store::rocks::DBBatch,
1608 coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
1609 balance_changes: HashMap<BalanceKey, BalanceIndexInfo>,
1610 batch_size_limit: usize,
1611}
1612
1613impl<'a> ParMakeLiveObjectIndexer for RpcParLiveObjectSetIndexer<'a> {
1614 type ObjectIndexer = RpcLiveObjectIndexer<'a>;
1615
1616 fn make_live_object_indexer(&self) -> Self::ObjectIndexer {
1617 RpcLiveObjectIndexer {
1618 tables: self.tables,
1619 batch: self.tables.owner.batch(),
1620 coin_index: self.coin_index,
1621 balance_changes: HashMap::new(),
1622 batch_size_limit: self.batch_size_limit,
1623 }
1624 }
1625}
1626
1627impl LiveObjectIndexer for RpcLiveObjectIndexer<'_> {
1628 fn index_object(&mut self, object: Object) -> Result<(), StorageError> {
1629 match object.owner {
1630 Owner::AddressOwner(owner) | Owner::ConsensusAddressOwner { owner, .. } => {
1632 let owner_key = OwnerIndexKey::from_object(&object);
1633 let owner_info = OwnerIndexInfo::new(&object);
1634 self.batch
1635 .insert_batch(&self.tables.owner, [(owner_key, owner_info)])?;
1636
1637 if let Some((coin_type, value)) = get_balance_and_type_if_coin(&object)? {
1638 let balance_key = BalanceKey { owner, coin_type };
1639 let balance_info = BalanceIndexInfo::from(value);
1640 self.balance_changes
1641 .entry(balance_key)
1642 .or_default()
1643 .merge_delta(&balance_info);
1644
1645 if self.balance_changes.len() >= BALANCE_FLUSH_THRESHOLD {
1646 self.batch.partial_merge_batch(
1647 &self.tables.balance,
1648 std::mem::take(&mut self.balance_changes),
1649 )?;
1650 }
1651 }
1652 }
1653
1654 Owner::ObjectOwner(parent) => {
1656 if should_index_dynamic_field(&object) {
1657 let field_key = DynamicFieldKey::new(parent, object.id());
1658 self.batch
1659 .insert_batch(&self.tables.dynamic_field, [(field_key, ())])?;
1660 }
1661
1662 if parent == SUI_ACCUMULATOR_ROOT_OBJECT_ID.into()
1664 && let Some((owner, coin_type, balance)) = get_address_balance_info(&object)
1665 {
1666 let balance_key = BalanceKey { owner, coin_type };
1667 let balance_info = BalanceIndexInfo {
1668 balance_delta: balance,
1669 };
1670 self.balance_changes
1671 .entry(balance_key)
1672 .or_default()
1673 .merge_delta(&balance_info);
1674
1675 if self.balance_changes.len() >= BALANCE_FLUSH_THRESHOLD {
1676 self.batch.partial_merge_batch(
1677 &self.tables.balance,
1678 std::mem::take(&mut self.balance_changes),
1679 )?;
1680 }
1681 }
1682 }
1683
1684 Owner::Shared { .. } | Owner::Immutable => {}
1685 }
1686
1687 if let Some((key, value)) = try_create_coin_index_info(&object) {
1689 use std::collections::hash_map::Entry;
1690
1691 match self.coin_index.lock().unwrap().entry(key) {
1692 Entry::Occupied(mut o) => {
1693 o.get_mut().merge(value);
1694 }
1695 Entry::Vacant(v) => {
1696 v.insert(value);
1697 }
1698 }
1699 }
1700
1701 if let Some((key, info)) = IndexStoreTables::extract_version_if_package(&object) {
1702 self.batch
1703 .insert_batch(&self.tables.package_version, [(key, info)])?;
1704 }
1705
1706 if self.batch.size_in_bytes() >= self.batch_size_limit {
1709 std::mem::replace(&mut self.batch, self.tables.owner.batch())
1710 .write_opt(bulk_ingestion_write_options())?;
1711 }
1712
1713 Ok(())
1714 }
1715
1716 fn finish(mut self) -> Result<(), StorageError> {
1717 self.batch.partial_merge_batch(
1718 &self.tables.balance,
1719 std::mem::take(&mut self.balance_changes),
1720 )?;
1721 self.batch.write_opt(bulk_ingestion_write_options())?;
1722 Ok(())
1723 }
1724}
1725
1726fn sparse_checkpoint_data_for_epoch_backfill(
1729 authority_store: &AuthorityStore,
1730 checkpoint_store: &CheckpointStore,
1731 checkpoint: u64,
1732 load_events: bool,
1733) -> Result<Option<CheckpointData>, StorageError> {
1734 use sui_types::full_checkpoint_content::CheckpointTransaction;
1735
1736 let summary = checkpoint_store
1737 .get_checkpoint_by_sequence_number(checkpoint)?
1738 .ok_or_else(|| StorageError::missing(format!("missing checkpoint {checkpoint}")))?;
1739
1740 if summary.end_of_epoch_data.is_none() && summary.sequence_number != 0 {
1742 return Ok(None);
1743 }
1744
1745 let contents = checkpoint_store
1746 .get_checkpoint_contents(&summary.content_digest)?
1747 .ok_or_else(|| StorageError::missing(format!("missing checkpoint {checkpoint}")))?;
1748
1749 let transaction_digests = contents
1750 .iter()
1751 .map(|execution_digests| execution_digests.transaction)
1752 .collect::<Vec<_>>();
1753 let transactions = authority_store
1754 .multi_get_transaction_blocks(&transaction_digests)?
1755 .into_iter()
1756 .map(|maybe_transaction| {
1757 maybe_transaction.ok_or_else(|| StorageError::custom("missing transaction"))
1758 })
1759 .collect::<Result<Vec<_>, _>>()?;
1760
1761 let effects = authority_store
1762 .multi_get_executed_effects(&transaction_digests)?
1763 .into_iter()
1764 .map(|maybe_effects| maybe_effects.ok_or_else(|| StorageError::custom("missing effects")))
1765 .collect::<Result<Vec<_>, _>>()?;
1766
1767 let events = if load_events {
1768 authority_store
1769 .multi_get_events(&transaction_digests)
1770 .map_err(|e| StorageError::custom(e.to_string()))?
1771 } else {
1772 vec![None; transaction_digests.len()]
1773 };
1774
1775 let mut full_transactions = Vec::with_capacity(transactions.len());
1776 for ((tx, fx), ev) in transactions
1777 .into_iter()
1778 .zip_debug_eq(effects)
1779 .zip_debug_eq(events)
1780 {
1781 let input_objects =
1782 sui_types::storage::get_transaction_input_objects(authority_store, &fx)?;
1783 let output_objects =
1784 sui_types::storage::get_transaction_output_objects(authority_store, &fx)?;
1785
1786 let full_transaction = CheckpointTransaction {
1787 transaction: tx.into(),
1788 effects: fx,
1789 events: ev,
1790 input_objects,
1791 output_objects,
1792 };
1793
1794 full_transactions.push(full_transaction);
1795 }
1796
1797 let checkpoint_data = CheckpointData {
1798 checkpoint_summary: summary.into(),
1799 checkpoint_contents: contents,
1800 transactions: full_transactions,
1801 };
1802
1803 Ok(Some(checkpoint_data))
1804}
1805
1806fn get_balance_and_type_if_coin(object: &Object) -> Result<Option<(StructTag, u64)>, StorageError> {
1807 match Coin::extract_balance_if_coin(object) {
1808 Ok(Some((TypeTag::Struct(struct_tag), value))) => Ok(Some((*struct_tag, value))),
1809 Ok(Some(_)) => {
1810 debug!("Coin object {} has non-struct type tag", object.id());
1811 Ok(None)
1812 }
1813 Ok(None) => {
1814 Ok(None)
1816 }
1817 Err(e) => {
1818 Err(StorageError::custom(format!(
1820 "Failed to deserialize coin object {}: {}",
1821 object.id(),
1822 e
1823 )))
1824 }
1825 }
1826}
1827
1828fn get_address_balance_info(object: &Object) -> Option<(SuiAddress, StructTag, i128)> {
1829 let move_object = object.data.try_as_move()?;
1830
1831 let TypeTag::Struct(coin_type) = move_object.type_().balance_accumulator_field_type_maybe()?
1832 else {
1833 return None;
1834 };
1835
1836 let (key, value): (
1837 sui_types::accumulator_root::AccumulatorKey,
1838 sui_types::accumulator_root::AccumulatorValue,
1839 ) = move_object.try_into().ok()?;
1840
1841 let balance = value.as_u128()? as i128;
1842 if balance <= 0 {
1843 return None;
1844 }
1845
1846 Some((key.owner, *coin_type, balance))
1847}
1848
1849#[cfg(test)]
1850mod tests {
1851 use super::*;
1852 use std::sync::atomic::AtomicU64;
1853 use sui_types::base_types::SuiAddress;
1854
1855 #[tokio::test]
1856 async fn test_events_compaction_filter() {
1857 let temp_dir = tempfile::tempdir().unwrap();
1858 let path = temp_dir.path();
1859 let db_path = path.join("rpc-index");
1860
1861 let pruning_watermark = Arc::new(AtomicU64::new(5));
1862 let compaction_filter = EventsCompactionFilter::new(pruning_watermark.clone());
1863
1864 let index_options = IndexStoreOptions {
1865 events_compaction_filter: Some(compaction_filter),
1866 };
1867
1868 let tables = IndexStoreTables::open_with_index_options(&db_path, index_options);
1869 let stream_id = SuiAddress::random_for_testing_only();
1870 let test_events: Vec<EventIndexKey> = [1, 3, 5, 10, 15]
1871 .iter()
1872 .map(|&checkpoint_seq| EventIndexKey {
1873 stream_id,
1874 checkpoint_seq,
1875 accumulator_version: 0,
1876 transaction_idx: 0,
1877 event_index: 0,
1878 })
1879 .collect();
1880
1881 let mut batch = tables.events_by_stream.batch();
1882 for key in &test_events {
1883 batch
1884 .insert_batch(&tables.events_by_stream, [(key.clone(), ())])
1885 .unwrap();
1886 }
1887 batch.write().unwrap();
1888
1889 tables.events_by_stream.flush().unwrap();
1890 let mut events_before_compaction = 0;
1891 for result in tables.events_by_stream.safe_iter() {
1892 if result.is_ok() {
1893 events_before_compaction += 1;
1894 }
1895 }
1896 assert_eq!(
1897 events_before_compaction, 5,
1898 "Should have 5 events before compaction"
1899 );
1900 let start_key = EventIndexKey {
1901 stream_id: SuiAddress::ZERO,
1902 checkpoint_seq: 0,
1903 accumulator_version: 0,
1904 transaction_idx: 0,
1905 event_index: 0,
1906 };
1907 let end_key = EventIndexKey {
1908 stream_id: SuiAddress::random_for_testing_only(),
1909 checkpoint_seq: u64::MAX,
1910 accumulator_version: u64::MAX,
1911 transaction_idx: u32::MAX,
1912 event_index: u32::MAX,
1913 };
1914
1915 tables
1916 .events_by_stream
1917 .compact_range(&start_key, &end_key)
1918 .unwrap();
1919 let mut events_after_compaction = Vec::new();
1920 for (key, _event) in tables.events_by_stream.safe_iter().flatten() {
1921 events_after_compaction.push(key);
1922 }
1923
1924 println!("Events after compaction: {}", events_after_compaction.len());
1925 assert!(
1926 events_after_compaction.len() >= 2,
1927 "Should have at least the events that shouldn't be pruned"
1928 );
1929 pruning_watermark.store(20, std::sync::atomic::Ordering::Relaxed);
1930 let watermark_after = pruning_watermark.load(std::sync::atomic::Ordering::Relaxed);
1931 assert_eq!(watermark_after, 20, "Watermark should be updated");
1932 }
1933
1934 #[test]
1935 fn test_events_compaction_filter_logic() {
1936 let watermark = Arc::new(AtomicU64::new(100));
1937 let filter = EventsCompactionFilter::new(watermark.clone());
1938
1939 let old_key = EventIndexKey {
1940 stream_id: SuiAddress::random_for_testing_only(),
1941 checkpoint_seq: 50,
1942 accumulator_version: 0,
1943 transaction_idx: 0,
1944 event_index: 0,
1945 };
1946 let old_key_bytes = bcs::to_bytes(&old_key).unwrap();
1947 let decision = filter.filter(&old_key_bytes, &[]).unwrap();
1948 assert!(
1949 matches!(decision, Decision::Remove),
1950 "Event with checkpoint 50 should be removed when watermark is 100"
1951 );
1952 let new_key = EventIndexKey {
1953 stream_id: SuiAddress::random_for_testing_only(),
1954 checkpoint_seq: 150,
1955 accumulator_version: 0,
1956 transaction_idx: 0,
1957 event_index: 0,
1958 };
1959 let new_key_bytes = bcs::to_bytes(&new_key).unwrap();
1960 let decision = filter.filter(&new_key_bytes, &[]).unwrap();
1961 assert!(
1962 matches!(decision, Decision::Keep),
1963 "Event with checkpoint 150 should be kept when watermark is 100"
1964 );
1965 let boundary_key = EventIndexKey {
1966 stream_id: SuiAddress::random_for_testing_only(),
1967 checkpoint_seq: 100,
1968 accumulator_version: 0,
1969 transaction_idx: 0,
1970 event_index: 0,
1971 };
1972 let boundary_key_bytes = bcs::to_bytes(&boundary_key).unwrap();
1973 let decision = filter.filter(&boundary_key_bytes, &[]).unwrap();
1974 assert!(
1975 matches!(decision, Decision::Remove),
1976 "Event with checkpoint equal to watermark should be removed"
1977 );
1978 }
1979}