1use crate::authority::AuthorityStore;
5use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
6use crate::checkpoints::CheckpointStore;
7use crate::par_index_live_object_set::LiveObjectIndexer;
8use crate::par_index_live_object_set::ParMakeLiveObjectIndexer;
9use itertools::Itertools;
10use move_core_types::language_storage::{StructTag, TypeTag};
11use rayon::iter::IntoParallelIterator;
12use rayon::iter::ParallelIterator;
13use serde::Deserialize;
14use serde::Serialize;
15use std::collections::{BTreeMap, HashMap};
16use std::path::Path;
17use std::path::PathBuf;
18use std::sync::Arc;
19use std::sync::Mutex;
20use std::time::Duration;
21use std::time::Instant;
22use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
23use sui_types::base_types::MoveObjectType;
24use sui_types::base_types::ObjectID;
25use sui_types::base_types::SequenceNumber;
26use sui_types::base_types::SuiAddress;
27use sui_types::coin::Coin;
28use sui_types::committee::EpochId;
29use sui_types::digests::TransactionDigest;
30use sui_types::effects::{AccumulatorValue, TransactionEffectsAPI};
31use sui_types::full_checkpoint_content::CheckpointData;
32use sui_types::layout_resolver::LayoutResolver;
33use sui_types::messages_checkpoint::CheckpointContents;
34use sui_types::messages_checkpoint::CheckpointSequenceNumber;
35use sui_types::object::Data;
36use sui_types::object::Object;
37use sui_types::object::Owner;
38use sui_types::storage::BackingPackageStore;
39use sui_types::storage::DynamicFieldKey;
40use sui_types::storage::EpochInfo;
41use sui_types::storage::TransactionInfo;
42use sui_types::storage::error::Error as StorageError;
43use sui_types::sui_system_state::SuiSystemStateTrait;
44use sui_types::transaction::{TransactionDataAPI, TransactionKind};
45use sysinfo::{MemoryRefreshKind, RefreshKind, System};
46use tracing::{debug, info, warn};
47use typed_store::DBMapUtils;
48use typed_store::TypedStoreError;
49use typed_store::rocks::{DBMap, DBMapTableConfigMap, MetricConf};
50use typed_store::rocksdb::{MergeOperands, WriteOptions, compaction_filter::Decision};
51use typed_store::traits::Map;
52
53const CURRENT_DB_VERSION: u64 = 3;
54const BALANCE_FLUSH_THRESHOLD: usize = 10_000;
56
57fn bulk_ingestion_write_options() -> WriteOptions {
58 let mut opts = WriteOptions::default();
59 opts.disable_wal(true);
60 opts
61}
62
63fn get_available_memory() -> u64 {
65 let mut sys = System::new_with_specifics(
67 RefreshKind::nothing().with_memory(MemoryRefreshKind::everything()),
68 );
69 sys.refresh_memory();
70
71 if let Some(cgroup_limits) = sys.cgroup_limits() {
73 let memory_limit = cgroup_limits.total_memory;
74 if memory_limit > 0 {
76 debug!("Using cgroup memory limit: {} bytes", memory_limit);
77 return memory_limit;
78 }
79 }
80
81 let total_memory_bytes = sys.total_memory();
84 debug!("Using system memory: {} bytes", total_memory_bytes);
85 total_memory_bytes
86}
87
88#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
89struct MetadataInfo {
90 version: u64,
92}
93
94#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
96pub enum Watermark {
97 Indexed,
98 Pruned,
99}
100
101#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
102pub struct OwnerIndexKey {
103 pub owner: SuiAddress,
104
105 pub object_type: StructTag,
106
107 pub inverted_balance: Option<u64>,
110
111 pub object_id: ObjectID,
112}
113
114impl OwnerIndexKey {
115 fn from_object(object: &Object) -> Self {
118 let owner = match object.owner() {
119 Owner::AddressOwner(owner) => owner,
120 Owner::ConsensusAddressOwner { owner, .. } => owner,
121 _ => panic!("cannot create OwnerIndexKey if object is not address-owned"),
122 };
123 let object_type = object.struct_tag().expect("packages cannot be owned");
124
125 let inverted_balance = object.as_coin_maybe().map(|coin| !coin.balance.value());
126
127 Self {
128 owner: *owner,
129 object_type,
130 inverted_balance,
131 object_id: object.id(),
132 }
133 }
134}
135
136#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
137pub struct OwnerIndexInfo {
138 pub version: SequenceNumber,
140}
141
142impl OwnerIndexInfo {
143 pub fn new(object: &Object) -> Self {
144 Self {
145 version: object.version(),
146 }
147 }
148}
149
150#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
151pub struct CoinIndexKey {
152 coin_type: StructTag,
153}
154
155#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
156pub struct BalanceKey {
157 pub owner: SuiAddress,
158 pub coin_type: StructTag,
159}
160
161#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
162pub struct CoinIndexInfo {
163 pub coin_metadata_object_id: Option<ObjectID>,
164 pub treasury_object_id: Option<ObjectID>,
165 pub regulated_coin_metadata_object_id: Option<ObjectID>,
166}
167
168#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug, Default)]
169pub struct BalanceIndexInfo {
170 pub balance_delta: i128,
171}
172
173impl From<u64> for BalanceIndexInfo {
174 fn from(coin_value: u64) -> Self {
175 Self {
176 balance_delta: coin_value as i128,
177 }
178 }
179}
180
181impl BalanceIndexInfo {
182 fn merge_delta(&mut self, other: &Self) {
183 self.balance_delta += other.balance_delta;
184 }
185}
186
187impl From<BalanceIndexInfo> for sui_types::storage::BalanceInfo {
188 fn from(index_info: BalanceIndexInfo) -> Self {
189 let balance = index_info.balance_delta.clamp(0, u64::MAX as i128) as u64;
196 sui_types::storage::BalanceInfo { balance }
197 }
198}
199
200#[derive(Copy, Clone, PartialEq, Eq, Debug, Serialize, Deserialize, PartialOrd, Ord)]
201pub struct PackageVersionKey {
202 pub original_package_id: ObjectID,
203 pub version: u64,
204}
205
206#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
207pub struct PackageVersionInfo {
208 pub storage_id: ObjectID,
209}
210
211#[derive(Default, Clone)]
212pub struct IndexStoreOptions {
213 pub events_compaction_filter: Option<EventsCompactionFilter>,
214}
215
216fn default_table_options() -> typed_store::rocks::DBOptions {
217 typed_store::rocks::default_db_options().disable_write_throttling()
218}
219
220fn events_table_options(
221 compaction_filter: Option<EventsCompactionFilter>,
222) -> typed_store::rocks::DBOptions {
223 let mut options = default_table_options();
224 if let Some(filter) = compaction_filter {
225 options.options.set_compaction_filter(
226 "events_by_stream",
227 move |_, key, value| match filter.filter(key, value) {
228 Ok(decision) => decision,
229 Err(e) => {
230 warn!(
231 "Failed to parse event key during compaction: {}, key: {:?}",
232 e, key
233 );
234 Decision::Remove
235 }
236 },
237 );
238 }
239 options
240}
241
242fn balance_delta_merge_operator(
243 _key: &[u8],
244 existing_val: Option<&[u8]>,
245 operands: &MergeOperands,
246) -> Option<Vec<u8>> {
247 let mut result = existing_val
248 .map(|v| {
249 bcs::from_bytes::<BalanceIndexInfo>(v)
250 .expect("Failed to deserialize BalanceIndexInfo from RocksDB - data corruption.")
251 })
252 .unwrap_or_default();
253
254 for operand in operands.iter() {
255 let delta = bcs::from_bytes::<BalanceIndexInfo>(operand)
256 .expect("Failed to deserialize BalanceIndexInfo from RocksDB - data corruption.");
257 result.merge_delta(&delta);
258 }
259 Some(
260 bcs::to_bytes(&result)
261 .expect("Failed to deserialize BalanceIndexInfo from RocksDB - data corruption."),
262 )
263}
264
265fn balance_compaction_filter(_level: u32, _key: &[u8], value: &[u8]) -> Decision {
266 let balance_info = match bcs::from_bytes::<BalanceIndexInfo>(value) {
267 Ok(info) => info,
268 Err(_) => return Decision::Keep,
269 };
270
271 if balance_info.balance_delta == 0 {
272 Decision::Remove
273 } else {
274 Decision::Keep
275 }
276}
277
278fn balance_table_options() -> typed_store::rocks::DBOptions {
279 default_table_options()
280 .set_merge_operator_associative("balance_merge", balance_delta_merge_operator)
281 .set_compaction_filter("balance_zero_filter", balance_compaction_filter)
282}
283
284impl CoinIndexInfo {
285 fn merge(&mut self, other: Self) {
286 self.coin_metadata_object_id = self
287 .coin_metadata_object_id
288 .or(other.coin_metadata_object_id);
289 self.regulated_coin_metadata_object_id = self
290 .regulated_coin_metadata_object_id
291 .or(other.regulated_coin_metadata_object_id);
292 self.treasury_object_id = self.treasury_object_id.or(other.treasury_object_id);
293 }
294}
295
296#[derive(DBMapUtils)]
305struct IndexStoreTables {
306 meta: DBMap<(), MetadataInfo>,
314
315 #[default_options_override_fn = "default_table_options"]
321 watermark: DBMap<Watermark, CheckpointSequenceNumber>,
322
323 #[default_options_override_fn = "default_table_options"]
327 epochs: DBMap<EpochId, EpochInfo>,
328
329 #[default_options_override_fn = "default_table_options"]
333 #[allow(unused)]
334 #[deprecated]
335 transactions: DBMap<TransactionDigest, TransactionInfo>,
336
337 #[default_options_override_fn = "default_table_options"]
342 owner: DBMap<OwnerIndexKey, OwnerIndexInfo>,
343
344 #[default_options_override_fn = "default_table_options"]
349 dynamic_field: DBMap<DynamicFieldKey, ()>,
350
351 #[default_options_override_fn = "default_table_options"]
356 coin: DBMap<CoinIndexKey, CoinIndexInfo>,
357
358 #[default_options_override_fn = "balance_table_options"]
362 balance: DBMap<BalanceKey, BalanceIndexInfo>,
363
364 #[default_options_override_fn = "default_table_options"]
369 package_version: DBMap<PackageVersionKey, PackageVersionInfo>,
370
371 events_by_stream: DBMap<EventIndexKey, ()>,
373 }
377
378#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
379pub struct EventIndexKey {
380 pub stream_id: SuiAddress,
381 pub checkpoint_seq: u64,
382 pub accumulator_version: u64,
384 pub transaction_idx: u32,
385 pub event_index: u32,
386}
387
388#[derive(Clone)]
390pub struct EventsCompactionFilter {
391 pruning_watermark: Arc<std::sync::atomic::AtomicU64>,
392}
393
394impl EventsCompactionFilter {
395 pub fn new(pruning_watermark: Arc<std::sync::atomic::AtomicU64>) -> Self {
396 Self { pruning_watermark }
397 }
398
399 pub fn filter(&self, key: &[u8], _value: &[u8]) -> anyhow::Result<Decision> {
400 let event_key: EventIndexKey = bcs::from_bytes(key)?;
401 let watermark = self
402 .pruning_watermark
403 .load(std::sync::atomic::Ordering::Relaxed);
404
405 if event_key.checkpoint_seq <= watermark {
406 Ok(Decision::Remove)
407 } else {
408 Ok(Decision::Keep)
409 }
410 }
411}
412
413impl IndexStoreTables {
414 fn extract_version_if_package(
415 object: &Object,
416 ) -> Option<(PackageVersionKey, PackageVersionInfo)> {
417 if let Data::Package(package) = &object.data {
418 let original_id = package.original_package_id();
419 let version = package.version().value();
420 let storage_id = object.id();
421
422 let key = PackageVersionKey {
423 original_package_id: original_id,
424 version,
425 };
426 let info = PackageVersionInfo { storage_id };
427 return Some((key, info));
428 }
429 None
430 }
431
432 fn open_with_index_options<P: Into<PathBuf>>(
433 path: P,
434 index_options: IndexStoreOptions,
435 ) -> Self {
436 let mut table_options = std::collections::BTreeMap::new();
437 table_options.insert("balance".to_string(), balance_table_options());
438 table_options.insert(
439 "events_by_stream".to_string(),
440 events_table_options(index_options.events_compaction_filter),
441 );
442
443 IndexStoreTables::open_tables_read_write_with_deprecation_option(
444 path.into(),
445 MetricConf::new("rpc-index"),
446 None,
447 Some(DBMapTableConfigMap::new(table_options)),
448 true, )
450 }
451
452 fn open_with_options<P: Into<PathBuf>>(
453 path: P,
454 options: typed_store::rocksdb::Options,
455 table_options: Option<DBMapTableConfigMap>,
456 ) -> Self {
457 IndexStoreTables::open_tables_read_write_with_deprecation_option(
458 path.into(),
459 MetricConf::new("rpc-index"),
460 Some(options),
461 table_options,
462 true, )
464 }
465
466 fn needs_to_do_initialization(&self, checkpoint_store: &CheckpointStore) -> bool {
467 (match self.meta.get(&()) {
468 Ok(Some(metadata)) => metadata.version != CURRENT_DB_VERSION,
469 Ok(None) => true,
470 Err(_) => true,
471 }) || self.is_indexed_watermark_out_of_date(checkpoint_store)
472 }
473
474 fn is_indexed_watermark_out_of_date(&self, checkpoint_store: &CheckpointStore) -> bool {
476 let highest_executed_checkpint = checkpoint_store
477 .get_highest_executed_checkpoint_seq_number()
478 .ok()
479 .flatten();
480 let watermark = self.watermark.get(&Watermark::Indexed).ok().flatten();
481 watermark < highest_executed_checkpint
482 }
483
484 #[tracing::instrument(skip_all)]
485 fn init(
486 &mut self,
487 authority_store: &AuthorityStore,
488 checkpoint_store: &CheckpointStore,
489 _epoch_store: &AuthorityPerEpochStore,
490 _package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
491 batch_size_limit: usize,
492 rpc_config: &sui_config::RpcConfig,
493 ) -> Result<(), StorageError> {
494 info!("Initializing RPC indexes");
495
496 let highest_executed_checkpint =
497 checkpoint_store.get_highest_executed_checkpoint_seq_number()?;
498 let lowest_available_checkpoint = checkpoint_store
499 .get_highest_pruned_checkpoint_seq_number()?
500 .map(|c| c.saturating_add(1))
501 .unwrap_or(0);
502 let lowest_available_checkpoint_objects = authority_store
503 .perpetual_tables
504 .get_highest_pruned_checkpoint()?
505 .map(|c| c.saturating_add(1))
506 .unwrap_or(0);
507 let lowest_available_checkpoint =
510 lowest_available_checkpoint.max(lowest_available_checkpoint_objects);
511
512 let checkpoint_range = highest_executed_checkpint.map(|highest_executed_checkpint| {
513 lowest_available_checkpoint..=highest_executed_checkpint
514 });
515
516 if let Some(checkpoint_range) = checkpoint_range {
517 self.index_existing_checkpoints(
518 authority_store,
519 checkpoint_store,
520 checkpoint_range,
521 rpc_config,
522 )?;
523 }
524
525 self.initialize_current_epoch(authority_store, checkpoint_store)?;
526
527 if highest_executed_checkpint.is_some() {
531 let coin_index = Mutex::new(HashMap::new());
532
533 let make_live_object_indexer = RpcParLiveObjectSetIndexer {
534 tables: self,
535 coin_index: &coin_index,
536 batch_size_limit,
537 };
538
539 crate::par_index_live_object_set::par_index_live_object_set(
540 authority_store,
541 &make_live_object_indexer,
542 )?;
543
544 self.coin.multi_insert(coin_index.into_inner().unwrap())?;
545 }
546
547 self.watermark.insert(
548 &Watermark::Indexed,
549 &highest_executed_checkpint.unwrap_or(0),
550 )?;
551
552 self.meta.insert(
553 &(),
554 &MetadataInfo {
555 version: CURRENT_DB_VERSION,
556 },
557 )?;
558
559 info!("Finished initializing RPC indexes");
560
561 Ok(())
562 }
563
564 #[tracing::instrument(skip(self, authority_store, checkpoint_store, rpc_config))]
565 fn index_existing_checkpoints(
566 &mut self,
567 authority_store: &AuthorityStore,
568 checkpoint_store: &CheckpointStore,
569 checkpoint_range: std::ops::RangeInclusive<u64>,
570 rpc_config: &sui_config::RpcConfig,
571 ) -> Result<(), StorageError> {
572 info!(
573 "Indexing {} checkpoints in range {checkpoint_range:?}",
574 checkpoint_range.size_hint().0
575 );
576 let start_time = Instant::now();
577
578 checkpoint_range.into_par_iter().try_for_each(|seq| {
579 let load_events = rpc_config.authenticated_events_indexing();
580 let Some(checkpoint_data) = sparse_checkpoint_data_for_epoch_backfill(
581 authority_store,
582 checkpoint_store,
583 seq,
584 load_events,
585 )?
586 else {
587 return Ok(());
588 };
589
590 let mut batch = self.epochs.batch();
591
592 self.index_epoch(&checkpoint_data, &mut batch)?;
593
594 batch
595 .write_opt(bulk_ingestion_write_options())
596 .map_err(StorageError::from)
597 })?;
598
599 info!(
600 "Indexing checkpoints took {} seconds",
601 start_time.elapsed().as_secs()
602 );
603 Ok(())
604 }
605
606 fn prune(
608 &self,
609 pruned_checkpoint_watermark: u64,
610 _checkpoint_contents_to_prune: &[CheckpointContents],
611 ) -> Result<(), TypedStoreError> {
612 let mut batch = self.watermark.batch();
613
614 batch.insert_batch(
615 &self.watermark,
616 [(Watermark::Pruned, pruned_checkpoint_watermark)],
617 )?;
618
619 batch.write()
620 }
621
622 fn index_checkpoint(
624 &self,
625 checkpoint: &CheckpointData,
626 _resolver: &mut dyn LayoutResolver,
627 rpc_config: &sui_config::RpcConfig,
628 ) -> Result<typed_store::rocks::DBBatch, StorageError> {
629 debug!(
630 checkpoint = checkpoint.checkpoint_summary.sequence_number,
631 "indexing checkpoint"
632 );
633
634 let mut batch = self.owner.batch();
635
636 self.index_epoch(checkpoint, &mut batch)?;
637 self.index_transactions(
638 checkpoint,
639 &mut batch,
640 rpc_config.authenticated_events_indexing(),
641 )?;
642 self.index_objects(checkpoint, &mut batch)?;
643
644 batch.insert_batch(
645 &self.watermark,
646 [(
647 Watermark::Indexed,
648 checkpoint.checkpoint_summary.sequence_number,
649 )],
650 )?;
651
652 debug!(
653 checkpoint = checkpoint.checkpoint_summary.sequence_number,
654 "finished indexing checkpoint"
655 );
656
657 Ok(batch)
658 }
659
660 fn extract_accumulator_version(
661 &self,
662 tx: &sui_types::full_checkpoint_content::CheckpointTransaction,
663 ) -> Option<u64> {
664 let TransactionKind::ProgrammableSystemTransaction(pt) =
665 tx.transaction.transaction_data().kind()
666 else {
667 return None;
668 };
669
670 if pt.shared_input_objects().any(|obj| {
671 obj.id == SUI_ACCUMULATOR_ROOT_OBJECT_ID
672 && obj.mutability == sui_types::transaction::SharedObjectMutability::Mutable
673 }) {
674 return tx.output_objects.iter().find_map(|obj| {
675 if obj.id() == SUI_ACCUMULATOR_ROOT_OBJECT_ID {
676 Some(obj.version().value())
677 } else {
678 None
679 }
680 });
681 }
682
683 None
684 }
685
686 fn index_transaction_events(
687 &self,
688 tx: &sui_types::full_checkpoint_content::CheckpointTransaction,
689 checkpoint_seq: u64,
690 tx_idx: u32,
691 accumulator_version: Option<u64>,
692 batch: &mut typed_store::rocks::DBBatch,
693 ) -> Result<(), StorageError> {
694 let acc_events = tx.effects.accumulator_events();
695 if acc_events.is_empty() {
696 return Ok(());
697 }
698
699 let mut entries: Vec<(EventIndexKey, ())> = Vec::new();
700 for acc in acc_events {
701 if let AccumulatorValue::EventDigest(event_digests) = &acc.write.value {
702 let Some(accumulator_version) = accumulator_version else {
703 mysten_common::debug_fatal!(
704 "Found events at checkpoint {} tx {} before any accumulator settlement",
705 checkpoint_seq,
706 tx_idx
707 );
708 continue;
709 };
710
711 if let Some(stream_id) =
712 sui_types::accumulator_root::stream_id_from_accumulator_event(&acc)
713 {
714 for (idx, _d) in event_digests {
715 let key = EventIndexKey {
716 stream_id,
717 checkpoint_seq,
718 accumulator_version,
719 transaction_idx: tx_idx,
720 event_index: *idx as u32,
721 };
722 entries.push((key, ()));
723 }
724 }
725 }
726 }
727
728 if !entries.is_empty() {
729 batch.insert_batch(&self.events_by_stream, entries)?;
730 }
731 Ok(())
732 }
733
734 fn index_epoch(
735 &self,
736 checkpoint: &CheckpointData,
737 batch: &mut typed_store::rocks::DBBatch,
738 ) -> Result<(), StorageError> {
739 let Some(epoch_info) = checkpoint.epoch_info()? else {
740 return Ok(());
741 };
742 if epoch_info.epoch > 0 {
743 let prev_epoch = epoch_info.epoch - 1;
744 let mut current_epoch = self.epochs.get(&prev_epoch)?.unwrap_or_default();
745 current_epoch.epoch = prev_epoch; current_epoch.end_timestamp_ms = epoch_info.start_timestamp_ms;
747 current_epoch.end_checkpoint = epoch_info.start_checkpoint.map(|sq| sq - 1);
748 batch.insert_batch(&self.epochs, [(prev_epoch, current_epoch)])?;
749 }
750 batch.insert_batch(&self.epochs, [(epoch_info.epoch, epoch_info)])?;
751 Ok(())
752 }
753
754 fn initialize_current_epoch(
757 &mut self,
758 authority_store: &AuthorityStore,
759 checkpoint_store: &CheckpointStore,
760 ) -> Result<(), StorageError> {
761 let Some(checkpoint) = checkpoint_store.get_highest_executed_checkpoint()? else {
762 return Ok(());
763 };
764
765 let system_state = sui_types::sui_system_state::get_sui_system_state(authority_store)
766 .map_err(|e| StorageError::custom(format!("Failed to find system state: {e}")))?;
767
768 let mut epoch = self.epochs.get(&checkpoint.epoch)?.unwrap_or_default();
769 epoch.epoch = checkpoint.epoch;
770
771 if epoch.protocol_version.is_none() {
772 epoch.protocol_version = Some(system_state.protocol_version());
773 }
774
775 if epoch.start_timestamp_ms.is_none() {
776 epoch.start_timestamp_ms = Some(system_state.epoch_start_timestamp_ms());
777 }
778
779 if epoch.reference_gas_price.is_none() {
780 epoch.reference_gas_price = Some(system_state.reference_gas_price());
781 }
782
783 if epoch.system_state.is_none() {
784 epoch.system_state = Some(system_state);
785 }
786
787 self.epochs.insert(&epoch.epoch, &epoch)?;
788
789 Ok(())
790 }
791
792 fn index_transactions(
793 &self,
794 checkpoint: &CheckpointData,
795 batch: &mut typed_store::rocks::DBBatch,
796 index_events: bool,
797 ) -> Result<(), StorageError> {
798 let cp = checkpoint.checkpoint_summary.sequence_number;
799 let mut current_accumulator_version: Option<u64> = None;
800
801 for (tx_idx, tx) in checkpoint.transactions.iter().enumerate().rev() {
803 let balance_changes = sui_types::balance_change::derive_balance_changes(
804 &tx.effects,
805 &tx.input_objects,
806 &tx.output_objects,
807 )
808 .into_iter()
809 .filter_map(|change| {
810 if let TypeTag::Struct(coin_type) = change.coin_type {
811 Some((
812 BalanceKey {
813 owner: change.address,
814 coin_type: *coin_type,
815 },
816 BalanceIndexInfo {
817 balance_delta: change.amount,
818 },
819 ))
820 } else {
821 None
822 }
823 });
824 batch.partial_merge_batch(&self.balance, balance_changes)?;
825
826 if index_events {
827 if let Some(version) = self.extract_accumulator_version(tx) {
828 current_accumulator_version = Some(version);
829 }
830
831 self.index_transaction_events(
832 tx,
833 cp,
834 tx_idx as u32,
835 current_accumulator_version,
836 batch,
837 )?;
838 }
839 }
840
841 Ok(())
842 }
843
844 fn index_objects(
845 &self,
846 checkpoint: &CheckpointData,
847 batch: &mut typed_store::rocks::DBBatch,
848 ) -> Result<(), StorageError> {
849 let mut coin_index: HashMap<CoinIndexKey, CoinIndexInfo> = HashMap::new();
850 let mut package_version_index: Vec<(PackageVersionKey, PackageVersionInfo)> = vec![];
851
852 for tx in &checkpoint.transactions {
853 for removed_object in tx.removed_objects_pre_version() {
855 match removed_object.owner() {
856 Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
857 let owner_key = OwnerIndexKey::from_object(removed_object);
858 batch.delete_batch(&self.owner, [owner_key])?;
859 }
860 Owner::ObjectOwner(object_id) => {
861 batch.delete_batch(
862 &self.dynamic_field,
863 [DynamicFieldKey::new(*object_id, removed_object.id())],
864 )?;
865 }
866 Owner::Shared { .. } | Owner::Immutable => {}
867 }
868 }
869
870 for (object, old_object) in tx.changed_objects() {
872 if let Some(old_object) = old_object {
873 match old_object.owner() {
874 Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
875 let owner_key = OwnerIndexKey::from_object(old_object);
876 batch.delete_batch(&self.owner, [owner_key])?;
877 }
878
879 Owner::ObjectOwner(object_id) => {
880 if old_object.owner() != object.owner() {
881 batch.delete_batch(
882 &self.dynamic_field,
883 [DynamicFieldKey::new(*object_id, old_object.id())],
884 )?;
885 }
886 }
887
888 Owner::Shared { .. } | Owner::Immutable => {}
889 }
890 }
891
892 match object.owner() {
893 Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
894 let owner_key = OwnerIndexKey::from_object(object);
895 let owner_info = OwnerIndexInfo::new(object);
896 batch.insert_batch(&self.owner, [(owner_key, owner_info)])?;
897 }
898 Owner::ObjectOwner(parent) => {
899 if should_index_dynamic_field(object) {
900 let field_key = DynamicFieldKey::new(*parent, object.id());
901 batch.insert_batch(&self.dynamic_field, [(field_key, ())])?;
902 }
903 }
904 Owner::Shared { .. } | Owner::Immutable => {}
905 }
906 if let Some((key, info)) = Self::extract_version_if_package(object) {
907 package_version_index.push((key, info));
908 }
909 }
910
911 for (key, value) in tx.created_objects().flat_map(try_create_coin_index_info) {
917 use std::collections::hash_map::Entry;
918
919 match coin_index.entry(key) {
920 Entry::Occupied(mut o) => {
921 o.get_mut().merge(value);
922 }
923 Entry::Vacant(v) => {
924 v.insert(value);
925 }
926 }
927 }
928 }
929
930 batch.insert_batch(&self.coin, coin_index)?;
931 batch.insert_batch(&self.package_version, package_version_index)?;
932
933 Ok(())
934 }
935
936 fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<EpochInfo>, TypedStoreError> {
937 self.epochs.get(&epoch)
938 }
939
940 fn event_iter(
941 &self,
942 stream_id: SuiAddress,
943 start_checkpoint: u64,
944 start_accumulator_version: u64,
945 start_transaction_idx: u32,
946 start_event_idx: u32,
947 end_checkpoint: u64,
948 limit: u32,
949 ) -> Result<impl Iterator<Item = Result<EventIndexKey, TypedStoreError>> + '_, TypedStoreError>
950 {
951 let lower = EventIndexKey {
952 stream_id,
953 checkpoint_seq: start_checkpoint,
954 accumulator_version: start_accumulator_version,
955 transaction_idx: start_transaction_idx,
956 event_index: start_event_idx,
957 };
958 let upper = EventIndexKey {
959 stream_id,
960 checkpoint_seq: end_checkpoint,
961 accumulator_version: u64::MAX,
962 transaction_idx: u32::MAX,
963 event_index: u32::MAX,
964 };
965
966 Ok(self
967 .events_by_stream
968 .safe_iter_with_bounds(Some(lower), Some(upper))
969 .map(|res| res.map(|(k, _)| k))
970 .take(limit as usize))
971 }
972
973 fn owner_iter(
974 &self,
975 owner: SuiAddress,
976 object_type: Option<StructTag>,
977 cursor: Option<OwnerIndexKey>,
978 ) -> Result<
979 impl Iterator<Item = Result<(OwnerIndexKey, OwnerIndexInfo), TypedStoreError>> + '_,
980 TypedStoreError,
981 > {
982 let lower_bound = cursor.unwrap_or_else(|| OwnerIndexKey {
984 owner,
985 object_type: object_type
986 .clone()
987 .unwrap_or_else(|| "0x0::a::a".parse::<StructTag>().unwrap()),
988 inverted_balance: None,
989 object_id: ObjectID::ZERO,
990 });
991
992 Ok(self
993 .owner
994 .safe_iter_with_bounds(Some(lower_bound), None)
995 .take_while(move |item| {
996 let Ok((key, _)) = item else {
998 return true;
999 };
1000
1001 key.owner == owner
1003 && object_type
1005 .as_ref()
1006 .map(|ty| {
1007 ty.address == key.object_type.address
1008 && ty.module == key.object_type.module
1009 && ty.name == key.object_type.name
1010 && (ty.type_params.is_empty() ||
1012 ty.type_params == key.object_type.type_params)
1014 }).unwrap_or(true)
1015 }))
1016 }
1017
1018 fn dynamic_field_iter(
1019 &self,
1020 parent: ObjectID,
1021 cursor: Option<ObjectID>,
1022 ) -> Result<impl Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_, TypedStoreError>
1023 {
1024 let lower_bound = DynamicFieldKey::new(parent, cursor.unwrap_or(ObjectID::ZERO));
1025 let upper_bound = DynamicFieldKey::new(parent, ObjectID::MAX);
1026 let iter = self
1027 .dynamic_field
1028 .safe_iter_with_bounds(Some(lower_bound), Some(upper_bound))
1029 .map_ok(|(key, ())| key);
1030 Ok(iter)
1031 }
1032
1033 fn get_coin_info(
1034 &self,
1035 coin_type: &StructTag,
1036 ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
1037 let key = CoinIndexKey {
1038 coin_type: coin_type.to_owned(),
1039 };
1040 self.coin.get(&key)
1041 }
1042
1043 fn get_balance(
1044 &self,
1045 owner: &SuiAddress,
1046 coin_type: &StructTag,
1047 ) -> Result<Option<BalanceIndexInfo>, TypedStoreError> {
1048 let key = BalanceKey {
1049 owner: owner.to_owned(),
1050 coin_type: coin_type.to_owned(),
1051 };
1052 self.balance.get(&key)
1053 }
1054
1055 fn balance_iter(
1056 &self,
1057 owner: SuiAddress,
1058 cursor: Option<BalanceKey>,
1059 ) -> Result<
1060 impl Iterator<Item = Result<(BalanceKey, BalanceIndexInfo), TypedStoreError>> + '_,
1061 TypedStoreError,
1062 > {
1063 let lower_bound = cursor.unwrap_or_else(|| BalanceKey {
1064 owner,
1065 coin_type: "0x0::a::a".parse::<StructTag>().unwrap(),
1066 });
1067
1068 Ok(self
1069 .balance
1070 .safe_iter_with_bounds(Some(lower_bound), None)
1071 .scan((), move |_, item| {
1072 match item {
1073 Ok((key, value)) if key.owner == owner => Some(Ok((key, value))),
1074 Ok(_) => None, Err(e) => Some(Err(e)), }
1077 }))
1078 }
1079
1080 fn package_versions_iter(
1081 &self,
1082 original_id: ObjectID,
1083 cursor: Option<u64>,
1084 ) -> Result<
1085 impl Iterator<Item = Result<(PackageVersionKey, PackageVersionInfo), TypedStoreError>> + '_,
1086 TypedStoreError,
1087 > {
1088 let lower_bound = PackageVersionKey {
1089 original_package_id: original_id,
1090 version: cursor.unwrap_or(0),
1091 };
1092 let upper_bound = PackageVersionKey {
1093 original_package_id: original_id,
1094 version: u64::MAX,
1095 };
1096
1097 Ok(self
1098 .package_version
1099 .safe_iter_with_bounds(Some(lower_bound), Some(upper_bound)))
1100 }
1101}
1102
1103pub struct RpcIndexStore {
1104 tables: IndexStoreTables,
1105 pending_updates: Mutex<BTreeMap<u64, typed_store::rocks::DBBatch>>,
1106 rpc_config: sui_config::RpcConfig,
1107}
1108
1109impl RpcIndexStore {
1110 fn db_path(dir: &Path) -> PathBuf {
1112 dir.join("rpc-index")
1113 }
1114
1115 pub async fn new(
1116 dir: &Path,
1117 authority_store: &AuthorityStore,
1118 checkpoint_store: &CheckpointStore,
1119 epoch_store: &AuthorityPerEpochStore,
1120 package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
1121 pruning_watermark: Arc<std::sync::atomic::AtomicU64>,
1122 rpc_config: sui_config::RpcConfig,
1123 ) -> Self {
1124 let events_filter = EventsCompactionFilter::new(pruning_watermark);
1125 let index_options = IndexStoreOptions {
1126 events_compaction_filter: Some(events_filter),
1127 };
1128
1129 Self::new_with_options(
1130 dir,
1131 authority_store,
1132 checkpoint_store,
1133 epoch_store,
1134 package_store,
1135 index_options,
1136 rpc_config,
1137 )
1138 .await
1139 }
1140
1141 pub async fn new_with_options(
1142 dir: &Path,
1143 authority_store: &AuthorityStore,
1144 checkpoint_store: &CheckpointStore,
1145 epoch_store: &AuthorityPerEpochStore,
1146 package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
1147 index_options: IndexStoreOptions,
1148 rpc_config: sui_config::RpcConfig,
1149 ) -> Self {
1150 let path = Self::db_path(dir);
1151 let index_config = rpc_config.index_initialization_config();
1152
1153 let tables = {
1154 let tables = IndexStoreTables::open_with_index_options(&path, index_options.clone());
1155
1156 if tables.needs_to_do_initialization(checkpoint_store) {
1159 let batch_size_limit;
1160
1161 let mut tables = {
1162 drop(tables);
1163 typed_store::rocks::safe_drop_db(path.clone(), Duration::from_secs(30))
1164 .await
1165 .expect("unable to destroy old rpc-index db");
1166
1167 let mut options = typed_store::rocksdb::Options::default();
1170 options.set_unordered_write(true);
1171
1172 let max_background_jobs = if let Some(jobs) =
1174 index_config.as_ref().and_then(|c| c.max_background_jobs)
1175 {
1176 debug!("Using config override for max_background_jobs: {}", jobs);
1177 jobs
1178 } else {
1179 let jobs = num_cpus::get() as i32;
1180 debug!(
1181 "Calculated max_background_jobs: {} (based on CPU count)",
1182 jobs
1183 );
1184 jobs
1185 };
1186 options.set_max_background_jobs(max_background_jobs);
1187
1188 options.set_level_zero_file_num_compaction_trigger(0);
1192 options.set_level_zero_slowdown_writes_trigger(-1);
1193 options.set_level_zero_stop_writes_trigger(i32::MAX);
1194
1195 let total_memory_bytes = get_available_memory();
1196 let db_buffer_size = if let Some(size) =
1199 index_config.as_ref().and_then(|c| c.db_write_buffer_size)
1200 {
1201 debug!(
1202 "Using config override for db_write_buffer_size: {} bytes",
1203 size
1204 );
1205 size
1206 } else {
1207 let size = (total_memory_bytes as f64 * 0.8) as usize;
1209 debug!(
1210 "Calculated db_write_buffer_size: {} bytes (80% of {} total bytes)",
1211 size, total_memory_bytes
1212 );
1213 size
1214 };
1215 options.set_db_write_buffer_size(db_buffer_size);
1216
1217 let mut table_config_map = BTreeMap::new();
1219
1220 let mut cf_options = typed_store::rocks::default_db_options();
1224 cf_options.options.set_disable_auto_compactions(true);
1225
1226 let (buffer_size, buffer_count) = match (
1227 index_config.as_ref().and_then(|c| c.cf_write_buffer_size),
1228 index_config
1229 .as_ref()
1230 .and_then(|c| c.cf_max_write_buffer_number),
1231 ) {
1232 (Some(size), Some(count)) => {
1233 debug!(
1234 "Using config overrides - buffer_size: {} bytes, buffer_count: {}",
1235 size, count
1236 );
1237 (size, count)
1238 }
1239 (None, None) => {
1240 let cf_memory_budget = (total_memory_bytes as f64 * 0.25) as usize;
1242 debug!(
1243 "Column family memory budget: {} bytes (25% of {} total bytes)",
1244 cf_memory_budget, total_memory_bytes
1245 );
1246 const MIN_BUFFER_SIZE: usize = 64 * 1024 * 1024; let target_buffer_count = num_cpus::get().max(2);
1251
1252 let buffer_size =
1258 (cf_memory_budget / target_buffer_count).max(MIN_BUFFER_SIZE);
1259 let buffer_count = (cf_memory_budget / buffer_size)
1260 .clamp(2, target_buffer_count)
1261 as i32;
1262 debug!(
1263 "Calculated buffer_size: {} bytes, buffer_count: {} (based on {} CPUs)",
1264 buffer_size, buffer_count, target_buffer_count
1265 );
1266 (buffer_size, buffer_count)
1267 }
1268 _ => {
1269 panic!(
1270 "indexing-cf-write-buffer-size and indexing-cf-max-write-buffer-number must both be specified or both be omitted"
1271 );
1272 }
1273 };
1274
1275 cf_options.options.set_write_buffer_size(buffer_size);
1276 cf_options.options.set_max_write_buffer_number(buffer_count);
1277
1278 batch_size_limit = if let Some(limit) =
1280 index_config.as_ref().and_then(|c| c.batch_size_limit)
1281 {
1282 debug!(
1283 "Using config override for batch_size_limit: {} bytes",
1284 limit
1285 );
1286 limit
1287 } else {
1288 let half_buffer = buffer_size / 2;
1289 let default_limit = 1 << 27; let limit = half_buffer.min(default_limit);
1291 debug!(
1292 "Calculated batch_size_limit: {} bytes (min of half_buffer={} and default_limit={})",
1293 limit, half_buffer, default_limit
1294 );
1295 limit
1296 };
1297
1298 for (table_name, _) in IndexStoreTables::describe_tables() {
1300 table_config_map.insert(table_name, cf_options.clone());
1301 }
1302
1303 let mut balance_options = cf_options.clone();
1305 balance_options = balance_options.set_merge_operator_associative(
1306 "balance_merge",
1307 balance_delta_merge_operator,
1308 );
1309 table_config_map.insert("balance".to_string(), balance_options);
1310
1311 table_config_map.insert(
1312 "events_by_stream".to_string(),
1313 events_table_options(index_options.events_compaction_filter.clone()),
1314 );
1315
1316 IndexStoreTables::open_with_options(
1317 &path,
1318 options,
1319 Some(DBMapTableConfigMap::new(table_config_map)),
1320 )
1321 };
1322
1323 tables
1324 .init(
1325 authority_store,
1326 checkpoint_store,
1327 epoch_store,
1328 package_store,
1329 batch_size_limit,
1330 &rpc_config,
1331 )
1332 .expect("unable to initialize rpc index from live object set");
1333
1334 tables
1339 .meta
1340 .flush()
1341 .expect("Failed to flush RPC index tables to disk");
1342
1343 let weak_db = Arc::downgrade(&tables.meta.db);
1344 drop(tables);
1345
1346 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30);
1347 loop {
1348 if weak_db.strong_count() == 0 {
1349 break;
1350 }
1351 if std::time::Instant::now() > deadline {
1352 panic!("unable to reopen DB after indexing");
1353 }
1354 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1355 }
1356
1357 let reopened_tables =
1359 IndexStoreTables::open_with_index_options(&path, index_options);
1360
1361 let stored_version = reopened_tables
1363 .meta
1364 .get(&())
1365 .expect("Failed to read metadata from reopened database")
1366 .expect("Metadata not found in reopened database");
1367 assert_eq!(
1368 stored_version.version, CURRENT_DB_VERSION,
1369 "Database version mismatch after flush and reopen: expected {}, found {}",
1370 CURRENT_DB_VERSION, stored_version.version
1371 );
1372
1373 reopened_tables
1374 } else {
1375 tables
1376 }
1377 };
1378
1379 Self {
1380 tables,
1381 pending_updates: Default::default(),
1382 rpc_config,
1383 }
1384 }
1385
1386 pub fn new_without_init(dir: &Path) -> Self {
1387 let path = Self::db_path(dir);
1388 let tables = IndexStoreTables::open_with_index_options(path, IndexStoreOptions::default());
1389
1390 Self {
1391 tables,
1392 pending_updates: Default::default(),
1393 rpc_config: sui_config::RpcConfig::default(),
1394 }
1395 }
1396
1397 pub fn prune(
1398 &self,
1399 pruned_checkpoint_watermark: u64,
1400 checkpoint_contents_to_prune: &[CheckpointContents],
1401 ) -> Result<(), TypedStoreError> {
1402 self.tables
1403 .prune(pruned_checkpoint_watermark, checkpoint_contents_to_prune)
1404 }
1405
1406 #[tracing::instrument(
1411 skip_all,
1412 fields(checkpoint = checkpoint.checkpoint_summary.sequence_number)
1413 )]
1414 pub fn index_checkpoint(&self, checkpoint: &CheckpointData, resolver: &mut dyn LayoutResolver) {
1415 let sequence_number = checkpoint.checkpoint_summary.sequence_number;
1416 let batch = self
1417 .tables
1418 .index_checkpoint(checkpoint, resolver, &self.rpc_config)
1419 .expect("db error");
1420
1421 self.pending_updates
1422 .lock()
1423 .unwrap()
1424 .insert(sequence_number, batch);
1425 }
1426
1427 #[tracing::instrument(skip(self))]
1435 pub fn commit_update_for_checkpoint(&self, checkpoint: u64) -> Result<(), StorageError> {
1436 let next_batch = self.pending_updates.lock().unwrap().pop_first();
1437
1438 let (next_sequence_number, batch) = next_batch.unwrap();
1440 assert_eq!(
1441 checkpoint, next_sequence_number,
1442 "commit_update_for_checkpoint must be called in order"
1443 );
1444
1445 Ok(batch.write()?)
1446 }
1447
1448 pub fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<EpochInfo>, TypedStoreError> {
1449 self.tables.get_epoch_info(epoch)
1450 }
1451
1452 pub fn owner_iter(
1453 &self,
1454 owner: SuiAddress,
1455 object_type: Option<StructTag>,
1456 cursor: Option<OwnerIndexKey>,
1457 ) -> Result<
1458 impl Iterator<Item = Result<(OwnerIndexKey, OwnerIndexInfo), TypedStoreError>> + '_,
1459 TypedStoreError,
1460 > {
1461 self.tables.owner_iter(owner, object_type, cursor)
1462 }
1463
1464 pub fn dynamic_field_iter(
1465 &self,
1466 parent: ObjectID,
1467 cursor: Option<ObjectID>,
1468 ) -> Result<impl Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_, TypedStoreError>
1469 {
1470 self.tables.dynamic_field_iter(parent, cursor)
1471 }
1472
1473 pub fn get_coin_info(
1474 &self,
1475 coin_type: &StructTag,
1476 ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
1477 self.tables.get_coin_info(coin_type)
1478 }
1479
1480 pub fn get_balance(
1481 &self,
1482 owner: &SuiAddress,
1483 coin_type: &StructTag,
1484 ) -> Result<Option<BalanceIndexInfo>, TypedStoreError> {
1485 self.tables.get_balance(owner, coin_type)
1486 }
1487
1488 pub fn balance_iter(
1489 &self,
1490 owner: SuiAddress,
1491 cursor: Option<BalanceKey>,
1492 ) -> Result<
1493 impl Iterator<Item = Result<(BalanceKey, BalanceIndexInfo), TypedStoreError>> + '_,
1494 TypedStoreError,
1495 > {
1496 self.tables.balance_iter(owner, cursor)
1497 }
1498
1499 pub fn package_versions_iter(
1500 &self,
1501 original_id: ObjectID,
1502 cursor: Option<u64>,
1503 ) -> Result<
1504 impl Iterator<Item = Result<(PackageVersionKey, PackageVersionInfo), TypedStoreError>> + '_,
1505 TypedStoreError,
1506 > {
1507 self.tables.package_versions_iter(original_id, cursor)
1508 }
1509
1510 pub fn event_iter(
1511 &self,
1512 stream_id: SuiAddress,
1513 start_checkpoint: u64,
1514 start_accumulator_version: u64,
1515 start_transaction_idx: u32,
1516 start_event_idx: u32,
1517 end_checkpoint: u64,
1518 limit: u32,
1519 ) -> Result<impl Iterator<Item = Result<EventIndexKey, TypedStoreError>> + '_, TypedStoreError>
1520 {
1521 self.tables.event_iter(
1522 stream_id,
1523 start_checkpoint,
1524 start_accumulator_version,
1525 start_transaction_idx,
1526 start_event_idx,
1527 end_checkpoint,
1528 limit,
1529 )
1530 }
1531
1532 pub fn get_highest_indexed_checkpoint_seq_number(
1533 &self,
1534 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
1535 self.tables.watermark.get(&Watermark::Indexed)
1536 }
1537}
1538
1539fn should_index_dynamic_field(object: &Object) -> bool {
1540 object
1548 .data
1549 .try_as_move()
1550 .is_some_and(|move_object| move_object.type_().is_dynamic_field())
1551}
1552
1553fn try_create_coin_index_info(object: &Object) -> Option<(CoinIndexKey, CoinIndexInfo)> {
1554 use sui_types::coin::CoinMetadata;
1555 use sui_types::coin::RegulatedCoinMetadata;
1556 use sui_types::coin::TreasuryCap;
1557
1558 let object_type = object.type_().and_then(MoveObjectType::other)?;
1559
1560 if let Some(coin_type) = CoinMetadata::is_coin_metadata_with_coin_type(object_type).cloned() {
1561 return Some((
1562 CoinIndexKey { coin_type },
1563 CoinIndexInfo {
1564 coin_metadata_object_id: Some(object.id()),
1565 treasury_object_id: None,
1566 regulated_coin_metadata_object_id: None,
1567 },
1568 ));
1569 }
1570
1571 if let Some(coin_type) = TreasuryCap::is_treasury_with_coin_type(object_type).cloned() {
1572 return Some((
1573 CoinIndexKey { coin_type },
1574 CoinIndexInfo {
1575 coin_metadata_object_id: None,
1576 treasury_object_id: Some(object.id()),
1577 regulated_coin_metadata_object_id: None,
1578 },
1579 ));
1580 }
1581
1582 if let Some(coin_type) =
1583 RegulatedCoinMetadata::is_regulated_coin_metadata_with_coin_type(object_type).cloned()
1584 {
1585 return Some((
1586 CoinIndexKey { coin_type },
1587 CoinIndexInfo {
1588 coin_metadata_object_id: None,
1589 treasury_object_id: None,
1590 regulated_coin_metadata_object_id: Some(object.id()),
1591 },
1592 ));
1593 }
1594
1595 None
1596}
1597
1598struct RpcParLiveObjectSetIndexer<'a> {
1599 tables: &'a IndexStoreTables,
1600 coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
1601 batch_size_limit: usize,
1602}
1603
1604struct RpcLiveObjectIndexer<'a> {
1605 tables: &'a IndexStoreTables,
1606 batch: typed_store::rocks::DBBatch,
1607 coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
1608 balance_changes: HashMap<BalanceKey, BalanceIndexInfo>,
1609 batch_size_limit: usize,
1610}
1611
1612impl<'a> ParMakeLiveObjectIndexer for RpcParLiveObjectSetIndexer<'a> {
1613 type ObjectIndexer = RpcLiveObjectIndexer<'a>;
1614
1615 fn make_live_object_indexer(&self) -> Self::ObjectIndexer {
1616 RpcLiveObjectIndexer {
1617 tables: self.tables,
1618 batch: self.tables.owner.batch(),
1619 coin_index: self.coin_index,
1620 balance_changes: HashMap::new(),
1621 batch_size_limit: self.batch_size_limit,
1622 }
1623 }
1624}
1625
1626impl LiveObjectIndexer for RpcLiveObjectIndexer<'_> {
1627 fn index_object(&mut self, object: Object) -> Result<(), StorageError> {
1628 match object.owner {
1629 Owner::AddressOwner(owner) | Owner::ConsensusAddressOwner { owner, .. } => {
1631 let owner_key = OwnerIndexKey::from_object(&object);
1632 let owner_info = OwnerIndexInfo::new(&object);
1633 self.batch
1634 .insert_batch(&self.tables.owner, [(owner_key, owner_info)])?;
1635
1636 if let Some((coin_type, value)) = get_balance_and_type_if_coin(&object)? {
1637 let balance_key = BalanceKey { owner, coin_type };
1638 let balance_info = BalanceIndexInfo::from(value);
1639 self.balance_changes
1640 .entry(balance_key)
1641 .or_default()
1642 .merge_delta(&balance_info);
1643
1644 if self.balance_changes.len() >= BALANCE_FLUSH_THRESHOLD {
1645 self.batch.partial_merge_batch(
1646 &self.tables.balance,
1647 std::mem::take(&mut self.balance_changes),
1648 )?;
1649 }
1650 }
1651 }
1652
1653 Owner::ObjectOwner(parent) => {
1655 if should_index_dynamic_field(&object) {
1656 let field_key = DynamicFieldKey::new(parent, object.id());
1657 self.batch
1658 .insert_batch(&self.tables.dynamic_field, [(field_key, ())])?;
1659 }
1660
1661 if parent == SUI_ACCUMULATOR_ROOT_OBJECT_ID.into()
1663 && let Some((owner, coin_type, balance)) = get_address_balance_info(&object)
1664 {
1665 let balance_key = BalanceKey { owner, coin_type };
1666 let balance_info = BalanceIndexInfo {
1667 balance_delta: balance,
1668 };
1669 self.balance_changes
1670 .entry(balance_key)
1671 .or_default()
1672 .merge_delta(&balance_info);
1673
1674 if self.balance_changes.len() >= BALANCE_FLUSH_THRESHOLD {
1675 self.batch.partial_merge_batch(
1676 &self.tables.balance,
1677 std::mem::take(&mut self.balance_changes),
1678 )?;
1679 }
1680 }
1681 }
1682
1683 Owner::Shared { .. } | Owner::Immutable => {}
1684 }
1685
1686 if let Some((key, value)) = try_create_coin_index_info(&object) {
1688 use std::collections::hash_map::Entry;
1689
1690 match self.coin_index.lock().unwrap().entry(key) {
1691 Entry::Occupied(mut o) => {
1692 o.get_mut().merge(value);
1693 }
1694 Entry::Vacant(v) => {
1695 v.insert(value);
1696 }
1697 }
1698 }
1699
1700 if let Some((key, info)) = IndexStoreTables::extract_version_if_package(&object) {
1701 self.batch
1702 .insert_batch(&self.tables.package_version, [(key, info)])?;
1703 }
1704
1705 if self.batch.size_in_bytes() >= self.batch_size_limit {
1708 std::mem::replace(&mut self.batch, self.tables.owner.batch())
1709 .write_opt(bulk_ingestion_write_options())?;
1710 }
1711
1712 Ok(())
1713 }
1714
1715 fn finish(mut self) -> Result<(), StorageError> {
1716 self.batch.partial_merge_batch(
1717 &self.tables.balance,
1718 std::mem::take(&mut self.balance_changes),
1719 )?;
1720 self.batch.write_opt(bulk_ingestion_write_options())?;
1721 Ok(())
1722 }
1723}
1724
1725fn sparse_checkpoint_data_for_epoch_backfill(
1728 authority_store: &AuthorityStore,
1729 checkpoint_store: &CheckpointStore,
1730 checkpoint: u64,
1731 load_events: bool,
1732) -> Result<Option<CheckpointData>, StorageError> {
1733 use sui_types::full_checkpoint_content::CheckpointTransaction;
1734
1735 let summary = checkpoint_store
1736 .get_checkpoint_by_sequence_number(checkpoint)?
1737 .ok_or_else(|| StorageError::missing(format!("missing checkpoint {checkpoint}")))?;
1738
1739 if summary.end_of_epoch_data.is_none() && summary.sequence_number != 0 {
1741 return Ok(None);
1742 }
1743
1744 let contents = checkpoint_store
1745 .get_checkpoint_contents(&summary.content_digest)?
1746 .ok_or_else(|| StorageError::missing(format!("missing checkpoint {checkpoint}")))?;
1747
1748 let transaction_digests = contents
1749 .iter()
1750 .map(|execution_digests| execution_digests.transaction)
1751 .collect::<Vec<_>>();
1752 let transactions = authority_store
1753 .multi_get_transaction_blocks(&transaction_digests)?
1754 .into_iter()
1755 .map(|maybe_transaction| {
1756 maybe_transaction.ok_or_else(|| StorageError::custom("missing transaction"))
1757 })
1758 .collect::<Result<Vec<_>, _>>()?;
1759
1760 let effects = authority_store
1761 .multi_get_executed_effects(&transaction_digests)?
1762 .into_iter()
1763 .map(|maybe_effects| maybe_effects.ok_or_else(|| StorageError::custom("missing effects")))
1764 .collect::<Result<Vec<_>, _>>()?;
1765
1766 let events = if load_events {
1767 authority_store
1768 .multi_get_events(&transaction_digests)
1769 .map_err(|e| StorageError::custom(e.to_string()))?
1770 } else {
1771 vec![None; transaction_digests.len()]
1772 };
1773
1774 let mut full_transactions = Vec::with_capacity(transactions.len());
1775 for ((tx, fx), ev) in transactions.into_iter().zip(effects).zip(events) {
1776 let input_objects =
1777 sui_types::storage::get_transaction_input_objects(authority_store, &fx)?;
1778 let output_objects =
1779 sui_types::storage::get_transaction_output_objects(authority_store, &fx)?;
1780
1781 let full_transaction = CheckpointTransaction {
1782 transaction: tx.into(),
1783 effects: fx,
1784 events: ev,
1785 input_objects,
1786 output_objects,
1787 };
1788
1789 full_transactions.push(full_transaction);
1790 }
1791
1792 let checkpoint_data = CheckpointData {
1793 checkpoint_summary: summary.into(),
1794 checkpoint_contents: contents,
1795 transactions: full_transactions,
1796 };
1797
1798 Ok(Some(checkpoint_data))
1799}
1800
1801fn get_balance_and_type_if_coin(object: &Object) -> Result<Option<(StructTag, u64)>, StorageError> {
1802 match Coin::extract_balance_if_coin(object) {
1803 Ok(Some((TypeTag::Struct(struct_tag), value))) => Ok(Some((*struct_tag, value))),
1804 Ok(Some(_)) => {
1805 debug!("Coin object {} has non-struct type tag", object.id());
1806 Ok(None)
1807 }
1808 Ok(None) => {
1809 Ok(None)
1811 }
1812 Err(e) => {
1813 Err(StorageError::custom(format!(
1815 "Failed to deserialize coin object {}: {}",
1816 object.id(),
1817 e
1818 )))
1819 }
1820 }
1821}
1822
1823fn get_address_balance_info(object: &Object) -> Option<(SuiAddress, StructTag, i128)> {
1824 let move_object = object.data.try_as_move()?;
1825
1826 let TypeTag::Struct(coin_type) = move_object.type_().balance_accumulator_field_type_maybe()?
1827 else {
1828 return None;
1829 };
1830
1831 let (key, value): (
1832 sui_types::accumulator_root::AccumulatorKey,
1833 sui_types::accumulator_root::AccumulatorValue,
1834 ) = move_object.try_into().ok()?;
1835
1836 let balance = value.as_u128()? as i128;
1837 if balance <= 0 {
1838 return None;
1839 }
1840
1841 Some((key.owner, *coin_type, balance))
1842}
1843
1844#[cfg(test)]
1845mod tests {
1846 use super::*;
1847 use std::sync::atomic::AtomicU64;
1848 use sui_types::base_types::SuiAddress;
1849
1850 #[tokio::test]
1851 async fn test_events_compaction_filter() {
1852 let temp_dir = tempfile::tempdir().unwrap();
1853 let path = temp_dir.path();
1854 let db_path = path.join("rpc-index");
1855
1856 let pruning_watermark = Arc::new(AtomicU64::new(5));
1857 let compaction_filter = EventsCompactionFilter::new(pruning_watermark.clone());
1858
1859 let index_options = IndexStoreOptions {
1860 events_compaction_filter: Some(compaction_filter),
1861 };
1862
1863 let tables = IndexStoreTables::open_with_index_options(&db_path, index_options);
1864 let stream_id = SuiAddress::random_for_testing_only();
1865 let test_events: Vec<EventIndexKey> = [1, 3, 5, 10, 15]
1866 .iter()
1867 .map(|&checkpoint_seq| EventIndexKey {
1868 stream_id,
1869 checkpoint_seq,
1870 accumulator_version: 0,
1871 transaction_idx: 0,
1872 event_index: 0,
1873 })
1874 .collect();
1875
1876 let mut batch = tables.events_by_stream.batch();
1877 for key in &test_events {
1878 batch
1879 .insert_batch(&tables.events_by_stream, [(key.clone(), ())])
1880 .unwrap();
1881 }
1882 batch.write().unwrap();
1883
1884 tables.events_by_stream.flush().unwrap();
1885 let mut events_before_compaction = 0;
1886 for result in tables.events_by_stream.safe_iter() {
1887 if result.is_ok() {
1888 events_before_compaction += 1;
1889 }
1890 }
1891 assert_eq!(
1892 events_before_compaction, 5,
1893 "Should have 5 events before compaction"
1894 );
1895 let start_key = EventIndexKey {
1896 stream_id: SuiAddress::ZERO,
1897 checkpoint_seq: 0,
1898 accumulator_version: 0,
1899 transaction_idx: 0,
1900 event_index: 0,
1901 };
1902 let end_key = EventIndexKey {
1903 stream_id: SuiAddress::random_for_testing_only(),
1904 checkpoint_seq: u64::MAX,
1905 accumulator_version: u64::MAX,
1906 transaction_idx: u32::MAX,
1907 event_index: u32::MAX,
1908 };
1909
1910 tables
1911 .events_by_stream
1912 .compact_range(&start_key, &end_key)
1913 .unwrap();
1914 let mut events_after_compaction = Vec::new();
1915 for (key, _event) in tables.events_by_stream.safe_iter().flatten() {
1916 events_after_compaction.push(key);
1917 }
1918
1919 println!("Events after compaction: {}", events_after_compaction.len());
1920 assert!(
1921 events_after_compaction.len() >= 2,
1922 "Should have at least the events that shouldn't be pruned"
1923 );
1924 pruning_watermark.store(20, std::sync::atomic::Ordering::Relaxed);
1925 let watermark_after = pruning_watermark.load(std::sync::atomic::Ordering::Relaxed);
1926 assert_eq!(watermark_after, 20, "Watermark should be updated");
1927 }
1928
1929 #[test]
1930 fn test_events_compaction_filter_logic() {
1931 let watermark = Arc::new(AtomicU64::new(100));
1932 let filter = EventsCompactionFilter::new(watermark.clone());
1933
1934 let old_key = EventIndexKey {
1935 stream_id: SuiAddress::random_for_testing_only(),
1936 checkpoint_seq: 50,
1937 accumulator_version: 0,
1938 transaction_idx: 0,
1939 event_index: 0,
1940 };
1941 let old_key_bytes = bcs::to_bytes(&old_key).unwrap();
1942 let decision = filter.filter(&old_key_bytes, &[]).unwrap();
1943 assert!(
1944 matches!(decision, Decision::Remove),
1945 "Event with checkpoint 50 should be removed when watermark is 100"
1946 );
1947 let new_key = EventIndexKey {
1948 stream_id: SuiAddress::random_for_testing_only(),
1949 checkpoint_seq: 150,
1950 accumulator_version: 0,
1951 transaction_idx: 0,
1952 event_index: 0,
1953 };
1954 let new_key_bytes = bcs::to_bytes(&new_key).unwrap();
1955 let decision = filter.filter(&new_key_bytes, &[]).unwrap();
1956 assert!(
1957 matches!(decision, Decision::Keep),
1958 "Event with checkpoint 150 should be kept when watermark is 100"
1959 );
1960 let boundary_key = EventIndexKey {
1961 stream_id: SuiAddress::random_for_testing_only(),
1962 checkpoint_seq: 100,
1963 accumulator_version: 0,
1964 transaction_idx: 0,
1965 event_index: 0,
1966 };
1967 let boundary_key_bytes = bcs::to_bytes(&boundary_key).unwrap();
1968 let decision = filter.filter(&boundary_key_bytes, &[]).unwrap();
1969 assert!(
1970 matches!(decision, Decision::Remove),
1971 "Event with checkpoint equal to watermark should be removed"
1972 );
1973 }
1974}