1use crate::authority::AuthorityStore;
5use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
6use crate::checkpoints::CheckpointStore;
7use crate::par_index_live_object_set::LiveObjectIndexer;
8use crate::par_index_live_object_set::ParMakeLiveObjectIndexer;
9use itertools::Itertools;
10use move_core_types::language_storage::{StructTag, TypeTag};
11use rayon::iter::IntoParallelIterator;
12use rayon::iter::ParallelIterator;
13use serde::Deserialize;
14use serde::Serialize;
15use std::collections::{BTreeMap, HashMap};
16use std::path::Path;
17use std::path::PathBuf;
18use std::sync::Arc;
19use std::sync::Mutex;
20use std::time::Duration;
21use std::time::Instant;
22use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
23use sui_types::base_types::MoveObjectType;
24use sui_types::base_types::ObjectID;
25use sui_types::base_types::SequenceNumber;
26use sui_types::base_types::SuiAddress;
27use sui_types::coin::Coin;
28use sui_types::committee::EpochId;
29use sui_types::digests::TransactionDigest;
30use sui_types::effects::{AccumulatorValue, TransactionEffectsAPI};
31use sui_types::full_checkpoint_content::CheckpointData;
32use sui_types::layout_resolver::LayoutResolver;
33use sui_types::messages_checkpoint::CheckpointContents;
34use sui_types::messages_checkpoint::CheckpointSequenceNumber;
35use sui_types::object::Data;
36use sui_types::object::Object;
37use sui_types::object::Owner;
38use sui_types::storage::BackingPackageStore;
39use sui_types::storage::DynamicFieldKey;
40use sui_types::storage::EpochInfo;
41use sui_types::storage::TransactionInfo;
42use sui_types::storage::error::Error as StorageError;
43use sui_types::sui_system_state::SuiSystemStateTrait;
44use sui_types::transaction::{TransactionDataAPI, TransactionKind};
45use sysinfo::{MemoryRefreshKind, RefreshKind, System};
46use tracing::{debug, info, warn};
47use typed_store::DBMapUtils;
48use typed_store::TypedStoreError;
49use typed_store::rocks::{DBMap, DBMapTableConfigMap, MetricConf};
50use typed_store::rocksdb::{MergeOperands, WriteOptions, compaction_filter::Decision};
51use typed_store::traits::Map;
52
53const CURRENT_DB_VERSION: u64 = 3;
54const BALANCE_FLUSH_THRESHOLD: usize = 10_000;
56
57fn bulk_ingestion_write_options() -> WriteOptions {
58 let mut opts = WriteOptions::default();
59 opts.disable_wal(true);
60 opts
61}
62
63fn get_available_memory() -> u64 {
65 let mut sys = System::new_with_specifics(
67 RefreshKind::nothing().with_memory(MemoryRefreshKind::everything()),
68 );
69 sys.refresh_memory();
70
71 if let Some(cgroup_limits) = sys.cgroup_limits() {
73 let memory_limit = cgroup_limits.total_memory;
74 if memory_limit > 0 {
76 debug!("Using cgroup memory limit: {} bytes", memory_limit);
77 return memory_limit;
78 }
79 }
80
81 let total_memory_bytes = sys.total_memory();
84 debug!("Using system memory: {} bytes", total_memory_bytes);
85 total_memory_bytes
86}
87
88#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
89struct MetadataInfo {
90 version: u64,
92}
93
94#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
96pub enum Watermark {
97 Indexed,
98 Pruned,
99}
100
101#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
102pub struct OwnerIndexKey {
103 pub owner: SuiAddress,
104
105 pub object_type: StructTag,
106
107 pub inverted_balance: Option<u64>,
110
111 pub object_id: ObjectID,
112}
113
114impl OwnerIndexKey {
115 fn from_object(object: &Object) -> Self {
118 let owner = match object.owner() {
119 Owner::AddressOwner(owner) => owner,
120 Owner::ConsensusAddressOwner { owner, .. } => owner,
121 _ => panic!("cannot create OwnerIndexKey if object is not address-owned"),
122 };
123 let object_type = object.struct_tag().expect("packages cannot be owned");
124
125 let inverted_balance = object.as_coin_maybe().map(|coin| !coin.balance.value());
126
127 Self {
128 owner: *owner,
129 object_type,
130 inverted_balance,
131 object_id: object.id(),
132 }
133 }
134}
135
136#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
137pub struct OwnerIndexInfo {
138 pub version: SequenceNumber,
140}
141
142impl OwnerIndexInfo {
143 pub fn new(object: &Object) -> Self {
144 Self {
145 version: object.version(),
146 }
147 }
148}
149
150#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
151pub struct CoinIndexKey {
152 coin_type: StructTag,
153}
154
155#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
156pub struct BalanceKey {
157 pub owner: SuiAddress,
158 pub coin_type: StructTag,
159}
160
161#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
162pub struct CoinIndexInfo {
163 pub coin_metadata_object_id: Option<ObjectID>,
164 pub treasury_object_id: Option<ObjectID>,
165 pub regulated_coin_metadata_object_id: Option<ObjectID>,
166}
167
168#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug, Default)]
169pub struct BalanceIndexInfo {
170 pub balance_delta: i128,
171}
172
173impl From<u64> for BalanceIndexInfo {
174 fn from(coin_value: u64) -> Self {
175 Self {
176 balance_delta: coin_value as i128,
177 }
178 }
179}
180
181impl BalanceIndexInfo {
182 fn invert(self) -> Self {
183 assert!(
185 self.balance_delta != i128::MIN,
186 "Cannot invert balance_delta: would overflow i128"
187 );
188
189 Self {
190 balance_delta: -self.balance_delta,
191 }
192 }
193
194 fn merge_delta(&mut self, other: &Self) {
195 self.balance_delta += other.balance_delta;
196 }
197}
198
199impl From<BalanceIndexInfo> for sui_types::storage::BalanceInfo {
200 fn from(index_info: BalanceIndexInfo) -> Self {
201 let balance = index_info.balance_delta.clamp(0, u64::MAX as i128) as u64;
208 sui_types::storage::BalanceInfo { balance }
209 }
210}
211
212#[derive(Copy, Clone, PartialEq, Eq, Debug, Serialize, Deserialize, PartialOrd, Ord)]
213pub struct PackageVersionKey {
214 pub original_package_id: ObjectID,
215 pub version: u64,
216}
217
218#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
219pub struct PackageVersionInfo {
220 pub storage_id: ObjectID,
221}
222
223#[derive(Default, Clone)]
224pub struct IndexStoreOptions {
225 pub events_compaction_filter: Option<EventsCompactionFilter>,
226}
227
228fn default_table_options() -> typed_store::rocks::DBOptions {
229 typed_store::rocks::default_db_options().disable_write_throttling()
230}
231
232fn events_table_options(
233 compaction_filter: Option<EventsCompactionFilter>,
234) -> typed_store::rocks::DBOptions {
235 let mut options = default_table_options();
236 if let Some(filter) = compaction_filter {
237 options.options.set_compaction_filter(
238 "events_by_stream",
239 move |_, key, value| match filter.filter(key, value) {
240 Ok(decision) => decision,
241 Err(e) => {
242 warn!(
243 "Failed to parse event key during compaction: {}, key: {:?}",
244 e, key
245 );
246 Decision::Remove
247 }
248 },
249 );
250 }
251 options
252}
253
254fn balance_delta_merge_operator(
255 _key: &[u8],
256 existing_val: Option<&[u8]>,
257 operands: &MergeOperands,
258) -> Option<Vec<u8>> {
259 let mut result = existing_val
260 .map(|v| {
261 bcs::from_bytes::<BalanceIndexInfo>(v)
262 .expect("Failed to deserialize BalanceIndexInfo from RocksDB - data corruption.")
263 })
264 .unwrap_or_default();
265
266 for operand in operands.iter() {
267 let delta = bcs::from_bytes::<BalanceIndexInfo>(operand)
268 .expect("Failed to deserialize BalanceIndexInfo from RocksDB - data corruption.");
269 result.merge_delta(&delta);
270 }
271 Some(
272 bcs::to_bytes(&result)
273 .expect("Failed to deserialize BalanceIndexInfo from RocksDB - data corruption."),
274 )
275}
276
277fn balance_compaction_filter(_level: u32, _key: &[u8], value: &[u8]) -> Decision {
278 let balance_info = match bcs::from_bytes::<BalanceIndexInfo>(value) {
279 Ok(info) => info,
280 Err(_) => return Decision::Keep,
281 };
282
283 if balance_info.balance_delta == 0 {
284 Decision::Remove
285 } else {
286 Decision::Keep
287 }
288}
289
290fn balance_table_options() -> typed_store::rocks::DBOptions {
291 default_table_options()
292 .set_merge_operator_associative("balance_merge", balance_delta_merge_operator)
293 .set_compaction_filter("balance_zero_filter", balance_compaction_filter)
294}
295
296impl CoinIndexInfo {
297 fn merge(&mut self, other: Self) {
298 self.coin_metadata_object_id = self
299 .coin_metadata_object_id
300 .or(other.coin_metadata_object_id);
301 self.regulated_coin_metadata_object_id = self
302 .regulated_coin_metadata_object_id
303 .or(other.regulated_coin_metadata_object_id);
304 self.treasury_object_id = self.treasury_object_id.or(other.treasury_object_id);
305 }
306}
307
308#[derive(DBMapUtils)]
317struct IndexStoreTables {
318 meta: DBMap<(), MetadataInfo>,
326
327 #[default_options_override_fn = "default_table_options"]
333 watermark: DBMap<Watermark, CheckpointSequenceNumber>,
334
335 #[default_options_override_fn = "default_table_options"]
339 epochs: DBMap<EpochId, EpochInfo>,
340
341 #[default_options_override_fn = "default_table_options"]
345 transactions: DBMap<TransactionDigest, TransactionInfo>,
346
347 #[default_options_override_fn = "default_table_options"]
352 owner: DBMap<OwnerIndexKey, OwnerIndexInfo>,
353
354 #[default_options_override_fn = "default_table_options"]
359 dynamic_field: DBMap<DynamicFieldKey, ()>,
360
361 #[default_options_override_fn = "default_table_options"]
366 coin: DBMap<CoinIndexKey, CoinIndexInfo>,
367
368 #[default_options_override_fn = "balance_table_options"]
372 balance: DBMap<BalanceKey, BalanceIndexInfo>,
373
374 #[default_options_override_fn = "default_table_options"]
379 package_version: DBMap<PackageVersionKey, PackageVersionInfo>,
380
381 events_by_stream: DBMap<EventIndexKey, ()>,
383 }
387
388#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
389pub struct EventIndexKey {
390 pub stream_id: SuiAddress,
391 pub checkpoint_seq: u64,
392 pub accumulator_version: u64,
394 pub transaction_idx: u32,
395 pub event_index: u32,
396}
397
398#[derive(Clone)]
400pub struct EventsCompactionFilter {
401 pruning_watermark: Arc<std::sync::atomic::AtomicU64>,
402}
403
404impl EventsCompactionFilter {
405 pub fn new(pruning_watermark: Arc<std::sync::atomic::AtomicU64>) -> Self {
406 Self { pruning_watermark }
407 }
408
409 pub fn filter(&self, key: &[u8], _value: &[u8]) -> anyhow::Result<Decision> {
410 let event_key: EventIndexKey = bcs::from_bytes(key)?;
411 let watermark = self
412 .pruning_watermark
413 .load(std::sync::atomic::Ordering::Relaxed);
414
415 if event_key.checkpoint_seq <= watermark {
416 Ok(Decision::Remove)
417 } else {
418 Ok(Decision::Keep)
419 }
420 }
421}
422
423impl IndexStoreTables {
424 fn track_coin_balance_change(
425 object: &Object,
426 owner: &SuiAddress,
427 is_removal: bool,
428 balance_changes: &mut HashMap<BalanceKey, BalanceIndexInfo>,
429 ) -> Result<(), StorageError> {
430 if let Some((struct_tag, value)) = get_balance_and_type_if_coin(object)? {
431 let key = BalanceKey {
432 owner: *owner,
433 coin_type: struct_tag,
434 };
435
436 let mut delta = BalanceIndexInfo::from(value);
437 if is_removal {
438 delta = delta.invert();
439 }
440
441 balance_changes.entry(key).or_default().merge_delta(&delta);
442 }
443 Ok(())
444 }
445
446 fn extract_version_if_package(
447 object: &Object,
448 ) -> Option<(PackageVersionKey, PackageVersionInfo)> {
449 if let Data::Package(package) = &object.data {
450 let original_id = package.original_package_id();
451 let version = package.version().value();
452 let storage_id = object.id();
453
454 let key = PackageVersionKey {
455 original_package_id: original_id,
456 version,
457 };
458 let info = PackageVersionInfo { storage_id };
459 return Some((key, info));
460 }
461 None
462 }
463
464 fn open_with_index_options<P: Into<PathBuf>>(
465 path: P,
466 index_options: IndexStoreOptions,
467 ) -> Self {
468 let mut table_options = std::collections::BTreeMap::new();
469 table_options.insert("balance".to_string(), balance_table_options());
470 table_options.insert(
471 "events_by_stream".to_string(),
472 events_table_options(index_options.events_compaction_filter),
473 );
474
475 IndexStoreTables::open_tables_read_write(
476 path.into(),
477 MetricConf::new("rpc-index"),
478 None,
479 Some(DBMapTableConfigMap::new(table_options)),
480 )
481 }
482
483 fn open_with_options<P: Into<PathBuf>>(
484 path: P,
485 options: typed_store::rocksdb::Options,
486 table_options: Option<DBMapTableConfigMap>,
487 ) -> Self {
488 IndexStoreTables::open_tables_read_write(
489 path.into(),
490 MetricConf::new("rpc-index"),
491 Some(options),
492 table_options,
493 )
494 }
495
496 fn needs_to_do_initialization(&self, checkpoint_store: &CheckpointStore) -> bool {
497 (match self.meta.get(&()) {
498 Ok(Some(metadata)) => metadata.version != CURRENT_DB_VERSION,
499 Ok(None) => true,
500 Err(_) => true,
501 }) || self.is_indexed_watermark_out_of_date(checkpoint_store)
502 }
503
504 fn is_indexed_watermark_out_of_date(&self, checkpoint_store: &CheckpointStore) -> bool {
506 let highest_executed_checkpint = checkpoint_store
507 .get_highest_executed_checkpoint_seq_number()
508 .ok()
509 .flatten();
510 let watermark = self.watermark.get(&Watermark::Indexed).ok().flatten();
511 watermark < highest_executed_checkpint
512 }
513
514 #[tracing::instrument(skip_all)]
515 fn init(
516 &mut self,
517 authority_store: &AuthorityStore,
518 checkpoint_store: &CheckpointStore,
519 _epoch_store: &AuthorityPerEpochStore,
520 _package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
521 batch_size_limit: usize,
522 rpc_config: &sui_config::RpcConfig,
523 ) -> Result<(), StorageError> {
524 info!("Initializing RPC indexes");
525
526 let highest_executed_checkpint =
527 checkpoint_store.get_highest_executed_checkpoint_seq_number()?;
528 let lowest_available_checkpoint = checkpoint_store
529 .get_highest_pruned_checkpoint_seq_number()?
530 .map(|c| c.saturating_add(1))
531 .unwrap_or(0);
532 let lowest_available_checkpoint_objects = authority_store
533 .perpetual_tables
534 .get_highest_pruned_checkpoint()?
535 .map(|c| c.saturating_add(1))
536 .unwrap_or(0);
537 let lowest_available_checkpoint =
540 lowest_available_checkpoint.max(lowest_available_checkpoint_objects);
541
542 let checkpoint_range = highest_executed_checkpint.map(|highest_executed_checkpint| {
543 lowest_available_checkpoint..=highest_executed_checkpint
544 });
545
546 if let Some(checkpoint_range) = checkpoint_range {
547 self.index_existing_transactions(
548 authority_store,
549 checkpoint_store,
550 checkpoint_range,
551 rpc_config,
552 )?;
553 }
554
555 self.initialize_current_epoch(authority_store, checkpoint_store)?;
556
557 if highest_executed_checkpint.is_some() {
561 let coin_index = Mutex::new(HashMap::new());
562
563 let make_live_object_indexer = RpcParLiveObjectSetIndexer {
564 tables: self,
565 coin_index: &coin_index,
566 batch_size_limit,
567 };
568
569 crate::par_index_live_object_set::par_index_live_object_set(
570 authority_store,
571 &make_live_object_indexer,
572 )?;
573
574 self.coin.multi_insert(coin_index.into_inner().unwrap())?;
575 }
576
577 self.watermark.insert(
578 &Watermark::Indexed,
579 &highest_executed_checkpint.unwrap_or(0),
580 )?;
581
582 self.meta.insert(
583 &(),
584 &MetadataInfo {
585 version: CURRENT_DB_VERSION,
586 },
587 )?;
588
589 info!("Finished initializing RPC indexes");
590
591 Ok(())
592 }
593
594 #[tracing::instrument(skip(self, authority_store, checkpoint_store, rpc_config))]
595 fn index_existing_transactions(
596 &mut self,
597 authority_store: &AuthorityStore,
598 checkpoint_store: &CheckpointStore,
599 checkpoint_range: std::ops::RangeInclusive<u64>,
600 rpc_config: &sui_config::RpcConfig,
601 ) -> Result<(), StorageError> {
602 info!(
603 "Indexing {} checkpoints in range {checkpoint_range:?}",
604 checkpoint_range.size_hint().0
605 );
606 let start_time = Instant::now();
607
608 checkpoint_range.into_par_iter().try_for_each(|seq| {
609 let load_events = rpc_config.authenticated_events_indexing();
610 let checkpoint_data = sparse_checkpoint_data_for_backfill(
611 authority_store,
612 checkpoint_store,
613 seq,
614 load_events,
615 )?;
616
617 let mut batch = self.transactions.batch();
618
619 self.index_epoch(&checkpoint_data, &mut batch)?;
620 self.index_transactions(&checkpoint_data, &mut batch, load_events)?;
621
622 batch
623 .write_opt(&(bulk_ingestion_write_options()))
624 .map_err(StorageError::from)
625 })?;
626
627 info!(
628 "Indexing checkpoints took {} seconds",
629 start_time.elapsed().as_secs()
630 );
631 Ok(())
632 }
633
634 fn prune(
636 &self,
637 pruned_checkpoint_watermark: u64,
638 checkpoint_contents_to_prune: &[CheckpointContents],
639 ) -> Result<(), TypedStoreError> {
640 let mut batch = self.transactions.batch();
641
642 let transactions_to_prune = checkpoint_contents_to_prune
643 .iter()
644 .flat_map(|contents| contents.iter().map(|digests| digests.transaction));
645
646 batch.delete_batch(&self.transactions, transactions_to_prune)?;
647 batch.insert_batch(
648 &self.watermark,
649 [(Watermark::Pruned, pruned_checkpoint_watermark)],
650 )?;
651
652 batch.write()
653 }
654
655 fn index_checkpoint(
657 &self,
658 checkpoint: &CheckpointData,
659 _resolver: &mut dyn LayoutResolver,
660 rpc_config: &sui_config::RpcConfig,
661 ) -> Result<typed_store::rocks::DBBatch, StorageError> {
662 debug!(
663 checkpoint = checkpoint.checkpoint_summary.sequence_number,
664 "indexing checkpoint"
665 );
666
667 let mut batch = self.transactions.batch();
668
669 self.index_epoch(checkpoint, &mut batch)?;
670 self.index_transactions(
671 checkpoint,
672 &mut batch,
673 rpc_config.authenticated_events_indexing(),
674 )?;
675 self.index_objects(checkpoint, &mut batch)?;
676
677 batch.insert_batch(
678 &self.watermark,
679 [(
680 Watermark::Indexed,
681 checkpoint.checkpoint_summary.sequence_number,
682 )],
683 )?;
684
685 debug!(
686 checkpoint = checkpoint.checkpoint_summary.sequence_number,
687 "finished indexing checkpoint"
688 );
689
690 Ok(batch)
691 }
692
693 fn extract_accumulator_version(
694 &self,
695 tx: &sui_types::full_checkpoint_content::CheckpointTransaction,
696 ) -> Option<u64> {
697 let TransactionKind::ProgrammableSystemTransaction(pt) =
698 tx.transaction.transaction_data().kind()
699 else {
700 return None;
701 };
702
703 if pt.shared_input_objects().any(|obj| {
704 obj.id == SUI_ACCUMULATOR_ROOT_OBJECT_ID
705 && obj.mutability == sui_types::transaction::SharedObjectMutability::Mutable
706 }) {
707 return tx.output_objects.iter().find_map(|obj| {
708 if obj.id() == SUI_ACCUMULATOR_ROOT_OBJECT_ID {
709 Some(obj.version().value())
710 } else {
711 None
712 }
713 });
714 }
715
716 None
717 }
718
719 fn index_transaction_events(
720 &self,
721 tx: &sui_types::full_checkpoint_content::CheckpointTransaction,
722 checkpoint_seq: u64,
723 tx_idx: u32,
724 accumulator_version: Option<u64>,
725 batch: &mut typed_store::rocks::DBBatch,
726 ) -> Result<(), StorageError> {
727 let acc_events = tx.effects.accumulator_events();
728 if acc_events.is_empty() {
729 return Ok(());
730 }
731
732 let mut entries: Vec<(EventIndexKey, ())> = Vec::new();
733 for acc in acc_events {
734 if let AccumulatorValue::EventDigest(event_digests) = &acc.write.value {
735 let Some(accumulator_version) = accumulator_version else {
736 mysten_common::debug_fatal!(
737 "Found events at checkpoint {} tx {} before any accumulator settlement",
738 checkpoint_seq,
739 tx_idx
740 );
741 continue;
742 };
743
744 if let Some(stream_id) =
745 sui_types::accumulator_root::stream_id_from_accumulator_event(&acc)
746 {
747 for (idx, _d) in event_digests {
748 let key = EventIndexKey {
749 stream_id,
750 checkpoint_seq,
751 accumulator_version,
752 transaction_idx: tx_idx,
753 event_index: *idx as u32,
754 };
755 entries.push((key, ()));
756 }
757 }
758 }
759 }
760
761 if !entries.is_empty() {
762 batch.insert_batch(&self.events_by_stream, entries)?;
763 }
764 Ok(())
765 }
766
767 fn index_epoch(
768 &self,
769 checkpoint: &CheckpointData,
770 batch: &mut typed_store::rocks::DBBatch,
771 ) -> Result<(), StorageError> {
772 let Some(epoch_info) = checkpoint.epoch_info()? else {
773 return Ok(());
774 };
775 if epoch_info.epoch > 0 {
776 let prev_epoch = epoch_info.epoch - 1;
777 let mut current_epoch = self.epochs.get(&prev_epoch)?.unwrap_or_default();
778 current_epoch.epoch = prev_epoch; current_epoch.end_timestamp_ms = epoch_info.start_timestamp_ms;
780 current_epoch.end_checkpoint = epoch_info.start_checkpoint.map(|sq| sq - 1);
781 batch.insert_batch(&self.epochs, [(prev_epoch, current_epoch)])?;
782 }
783 batch.insert_batch(&self.epochs, [(epoch_info.epoch, epoch_info)])?;
784 Ok(())
785 }
786
787 fn initialize_current_epoch(
790 &mut self,
791 authority_store: &AuthorityStore,
792 checkpoint_store: &CheckpointStore,
793 ) -> Result<(), StorageError> {
794 let Some(checkpoint) = checkpoint_store.get_highest_executed_checkpoint()? else {
795 return Ok(());
796 };
797
798 let system_state = sui_types::sui_system_state::get_sui_system_state(authority_store)
799 .map_err(|e| StorageError::custom(format!("Failed to find system state: {e}")))?;
800
801 let mut epoch = self.epochs.get(&checkpoint.epoch)?.unwrap_or_default();
802 epoch.epoch = checkpoint.epoch;
803
804 if epoch.protocol_version.is_none() {
805 epoch.protocol_version = Some(system_state.protocol_version());
806 }
807
808 if epoch.start_timestamp_ms.is_none() {
809 epoch.start_timestamp_ms = Some(system_state.epoch_start_timestamp_ms());
810 }
811
812 if epoch.reference_gas_price.is_none() {
813 epoch.reference_gas_price = Some(system_state.reference_gas_price());
814 }
815
816 if epoch.system_state.is_none() {
817 epoch.system_state = Some(system_state);
818 }
819
820 self.epochs.insert(&epoch.epoch, &epoch)?;
821
822 Ok(())
823 }
824
825 fn index_transactions(
826 &self,
827 checkpoint: &CheckpointData,
828 batch: &mut typed_store::rocks::DBBatch,
829 index_events: bool,
830 ) -> Result<(), StorageError> {
831 let cp = checkpoint.checkpoint_summary.sequence_number;
832 let mut current_accumulator_version: Option<u64> = None;
833
834 for (tx_idx, tx) in checkpoint.transactions.iter().enumerate().rev() {
836 let info = TransactionInfo::new(
837 tx.transaction.transaction_data(),
838 &tx.effects,
839 &tx.input_objects,
840 &tx.output_objects,
841 cp,
842 );
843
844 let digest = tx.transaction.digest();
845 batch.insert_batch(&self.transactions, [(digest, info)])?;
846
847 if index_events {
848 if let Some(version) = self.extract_accumulator_version(tx) {
849 current_accumulator_version = Some(version);
850 }
851
852 self.index_transaction_events(
853 tx,
854 cp,
855 tx_idx as u32,
856 current_accumulator_version,
857 batch,
858 )?;
859 }
860 }
861
862 Ok(())
863 }
864
865 fn index_objects(
866 &self,
867 checkpoint: &CheckpointData,
868 batch: &mut typed_store::rocks::DBBatch,
869 ) -> Result<(), StorageError> {
870 let mut coin_index: HashMap<CoinIndexKey, CoinIndexInfo> = HashMap::new();
871 let mut balance_changes: HashMap<BalanceKey, BalanceIndexInfo> = HashMap::new();
872 let mut package_version_index: Vec<(PackageVersionKey, PackageVersionInfo)> = vec![];
873
874 for tx in &checkpoint.transactions {
875 for removed_object in tx.removed_objects_pre_version() {
877 match removed_object.owner() {
878 Owner::AddressOwner(owner) | Owner::ConsensusAddressOwner { owner, .. } => {
879 Self::track_coin_balance_change(
880 removed_object,
881 owner,
882 true,
883 &mut balance_changes,
884 )?;
885
886 let owner_key = OwnerIndexKey::from_object(removed_object);
887 batch.delete_batch(&self.owner, [owner_key])?;
888 }
889 Owner::ObjectOwner(object_id) => {
890 batch.delete_batch(
891 &self.dynamic_field,
892 [DynamicFieldKey::new(*object_id, removed_object.id())],
893 )?;
894 }
895 Owner::Shared { .. } | Owner::Immutable => {}
896 }
897 }
898
899 for (object, old_object) in tx.changed_objects() {
901 if let Some(old_object) = old_object {
902 match old_object.owner() {
903 Owner::AddressOwner(owner) | Owner::ConsensusAddressOwner { owner, .. } => {
904 Self::track_coin_balance_change(
905 old_object,
906 owner,
907 true,
908 &mut balance_changes,
909 )?;
910
911 let owner_key = OwnerIndexKey::from_object(old_object);
912 batch.delete_batch(&self.owner, [owner_key])?;
913 }
914
915 Owner::ObjectOwner(object_id) => {
916 if old_object.owner() != object.owner() {
917 batch.delete_batch(
918 &self.dynamic_field,
919 [DynamicFieldKey::new(*object_id, old_object.id())],
920 )?;
921 }
922 }
923
924 Owner::Shared { .. } | Owner::Immutable => {}
925 }
926 }
927
928 match object.owner() {
929 Owner::AddressOwner(owner) | Owner::ConsensusAddressOwner { owner, .. } => {
930 Self::track_coin_balance_change(
931 object,
932 owner,
933 false,
934 &mut balance_changes,
935 )?;
936 let owner_key = OwnerIndexKey::from_object(object);
937 let owner_info = OwnerIndexInfo::new(object);
938 batch.insert_batch(&self.owner, [(owner_key, owner_info)])?;
939 }
940 Owner::ObjectOwner(parent) => {
941 if should_index_dynamic_field(object) {
942 let field_key = DynamicFieldKey::new(*parent, object.id());
943 batch.insert_batch(&self.dynamic_field, [(field_key, ())])?;
944 }
945 }
946 Owner::Shared { .. } | Owner::Immutable => {}
947 }
948 if let Some((key, info)) = Self::extract_version_if_package(object) {
949 package_version_index.push((key, info));
950 }
951 }
952
953 for (key, value) in tx.created_objects().flat_map(try_create_coin_index_info) {
959 use std::collections::hash_map::Entry;
960
961 match coin_index.entry(key) {
962 Entry::Occupied(mut o) => {
963 o.get_mut().merge(value);
964 }
965 Entry::Vacant(v) => {
966 v.insert(value);
967 }
968 }
969 }
970 }
971
972 batch.insert_batch(&self.coin, coin_index)?;
973 batch.partial_merge_batch(&self.balance, balance_changes)?;
974 batch.insert_batch(&self.package_version, package_version_index)?;
975
976 Ok(())
977 }
978
979 fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<EpochInfo>, TypedStoreError> {
980 self.epochs.get(&epoch)
981 }
982
983 fn get_transaction_info(
984 &self,
985 digest: &TransactionDigest,
986 ) -> Result<Option<TransactionInfo>, TypedStoreError> {
987 self.transactions.get(digest)
988 }
989
990 fn event_iter(
991 &self,
992 stream_id: SuiAddress,
993 start_checkpoint: u64,
994 start_accumulator_version: u64,
995 start_transaction_idx: u32,
996 start_event_idx: u32,
997 end_checkpoint: u64,
998 limit: u32,
999 ) -> Result<impl Iterator<Item = Result<EventIndexKey, TypedStoreError>> + '_, TypedStoreError>
1000 {
1001 let lower = EventIndexKey {
1002 stream_id,
1003 checkpoint_seq: start_checkpoint,
1004 accumulator_version: start_accumulator_version,
1005 transaction_idx: start_transaction_idx,
1006 event_index: start_event_idx,
1007 };
1008 let upper = EventIndexKey {
1009 stream_id,
1010 checkpoint_seq: end_checkpoint,
1011 accumulator_version: u64::MAX,
1012 transaction_idx: u32::MAX,
1013 event_index: u32::MAX,
1014 };
1015
1016 Ok(self
1017 .events_by_stream
1018 .safe_iter_with_bounds(Some(lower), Some(upper))
1019 .map(|res| res.map(|(k, _)| k))
1020 .take(limit as usize))
1021 }
1022
1023 fn owner_iter(
1024 &self,
1025 owner: SuiAddress,
1026 object_type: Option<StructTag>,
1027 cursor: Option<OwnerIndexKey>,
1028 ) -> Result<
1029 impl Iterator<Item = Result<(OwnerIndexKey, OwnerIndexInfo), TypedStoreError>> + '_,
1030 TypedStoreError,
1031 > {
1032 let lower_bound = cursor.unwrap_or_else(|| OwnerIndexKey {
1034 owner,
1035 object_type: object_type
1036 .clone()
1037 .unwrap_or_else(|| "0x0::a::a".parse::<StructTag>().unwrap()),
1038 inverted_balance: None,
1039 object_id: ObjectID::ZERO,
1040 });
1041
1042 Ok(self
1043 .owner
1044 .safe_iter_with_bounds(Some(lower_bound), None)
1045 .take_while(move |item| {
1046 let Ok((key, _)) = item else {
1048 return true;
1049 };
1050
1051 key.owner == owner
1053 && object_type
1055 .as_ref()
1056 .map(|ty| {
1057 ty.address == key.object_type.address
1058 && ty.module == key.object_type.module
1059 && ty.name == key.object_type.name
1060 && (ty.type_params.is_empty() ||
1062 ty.type_params == key.object_type.type_params)
1064 }).unwrap_or(true)
1065 }))
1066 }
1067
1068 fn dynamic_field_iter(
1069 &self,
1070 parent: ObjectID,
1071 cursor: Option<ObjectID>,
1072 ) -> Result<impl Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_, TypedStoreError>
1073 {
1074 let lower_bound = DynamicFieldKey::new(parent, cursor.unwrap_or(ObjectID::ZERO));
1075 let upper_bound = DynamicFieldKey::new(parent, ObjectID::MAX);
1076 let iter = self
1077 .dynamic_field
1078 .safe_iter_with_bounds(Some(lower_bound), Some(upper_bound))
1079 .map_ok(|(key, ())| key);
1080 Ok(iter)
1081 }
1082
1083 fn get_coin_info(
1084 &self,
1085 coin_type: &StructTag,
1086 ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
1087 let key = CoinIndexKey {
1088 coin_type: coin_type.to_owned(),
1089 };
1090 self.coin.get(&key)
1091 }
1092
1093 fn get_balance(
1094 &self,
1095 owner: &SuiAddress,
1096 coin_type: &StructTag,
1097 ) -> Result<Option<BalanceIndexInfo>, TypedStoreError> {
1098 let key = BalanceKey {
1099 owner: owner.to_owned(),
1100 coin_type: coin_type.to_owned(),
1101 };
1102 self.balance.get(&key)
1103 }
1104
1105 fn balance_iter(
1106 &self,
1107 owner: SuiAddress,
1108 cursor: Option<BalanceKey>,
1109 ) -> Result<
1110 impl Iterator<Item = Result<(BalanceKey, BalanceIndexInfo), TypedStoreError>> + '_,
1111 TypedStoreError,
1112 > {
1113 let lower_bound = cursor.unwrap_or_else(|| BalanceKey {
1114 owner,
1115 coin_type: "0x0::a::a".parse::<StructTag>().unwrap(),
1116 });
1117
1118 Ok(self
1119 .balance
1120 .safe_iter_with_bounds(Some(lower_bound), None)
1121 .scan((), move |_, item| {
1122 match item {
1123 Ok((key, value)) if key.owner == owner => Some(Ok((key, value))),
1124 Ok(_) => None, Err(e) => Some(Err(e)), }
1127 }))
1128 }
1129
1130 fn package_versions_iter(
1131 &self,
1132 original_id: ObjectID,
1133 cursor: Option<u64>,
1134 ) -> Result<
1135 impl Iterator<Item = Result<(PackageVersionKey, PackageVersionInfo), TypedStoreError>> + '_,
1136 TypedStoreError,
1137 > {
1138 let lower_bound = PackageVersionKey {
1139 original_package_id: original_id,
1140 version: cursor.unwrap_or(0),
1141 };
1142 let upper_bound = PackageVersionKey {
1143 original_package_id: original_id,
1144 version: u64::MAX,
1145 };
1146
1147 Ok(self
1148 .package_version
1149 .safe_iter_with_bounds(Some(lower_bound), Some(upper_bound)))
1150 }
1151}
1152
1153pub struct RpcIndexStore {
1154 tables: IndexStoreTables,
1155 pending_updates: Mutex<BTreeMap<u64, typed_store::rocks::DBBatch>>,
1156 rpc_config: sui_config::RpcConfig,
1157}
1158
1159impl RpcIndexStore {
1160 fn db_path(dir: &Path) -> PathBuf {
1162 dir.join("rpc-index")
1163 }
1164
1165 pub async fn new(
1166 dir: &Path,
1167 authority_store: &AuthorityStore,
1168 checkpoint_store: &CheckpointStore,
1169 epoch_store: &AuthorityPerEpochStore,
1170 package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
1171 pruning_watermark: Arc<std::sync::atomic::AtomicU64>,
1172 rpc_config: sui_config::RpcConfig,
1173 ) -> Self {
1174 let events_filter = EventsCompactionFilter::new(pruning_watermark);
1175 let index_options = IndexStoreOptions {
1176 events_compaction_filter: Some(events_filter),
1177 };
1178
1179 Self::new_with_options(
1180 dir,
1181 authority_store,
1182 checkpoint_store,
1183 epoch_store,
1184 package_store,
1185 index_options,
1186 rpc_config,
1187 )
1188 .await
1189 }
1190
1191 pub async fn new_with_options(
1192 dir: &Path,
1193 authority_store: &AuthorityStore,
1194 checkpoint_store: &CheckpointStore,
1195 epoch_store: &AuthorityPerEpochStore,
1196 package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
1197 index_options: IndexStoreOptions,
1198 rpc_config: sui_config::RpcConfig,
1199 ) -> Self {
1200 let path = Self::db_path(dir);
1201 let index_config = rpc_config.index_initialization_config();
1202
1203 let tables = {
1204 let tables = IndexStoreTables::open_with_index_options(&path, index_options.clone());
1205
1206 if tables.needs_to_do_initialization(checkpoint_store) {
1209 let batch_size_limit;
1210
1211 let mut tables = {
1212 drop(tables);
1213 typed_store::rocks::safe_drop_db(path.clone(), Duration::from_secs(30))
1214 .await
1215 .expect("unable to destroy old rpc-index db");
1216
1217 let mut options = typed_store::rocksdb::Options::default();
1220 options.set_unordered_write(true);
1221
1222 let max_background_jobs = if let Some(jobs) =
1224 index_config.as_ref().and_then(|c| c.max_background_jobs)
1225 {
1226 debug!("Using config override for max_background_jobs: {}", jobs);
1227 jobs
1228 } else {
1229 let jobs = num_cpus::get() as i32;
1230 debug!(
1231 "Calculated max_background_jobs: {} (based on CPU count)",
1232 jobs
1233 );
1234 jobs
1235 };
1236 options.set_max_background_jobs(max_background_jobs);
1237
1238 options.set_level_zero_file_num_compaction_trigger(0);
1242 options.set_level_zero_slowdown_writes_trigger(-1);
1243 options.set_level_zero_stop_writes_trigger(i32::MAX);
1244
1245 let total_memory_bytes = get_available_memory();
1246 let db_buffer_size = if let Some(size) =
1249 index_config.as_ref().and_then(|c| c.db_write_buffer_size)
1250 {
1251 debug!(
1252 "Using config override for db_write_buffer_size: {} bytes",
1253 size
1254 );
1255 size
1256 } else {
1257 let size = (total_memory_bytes as f64 * 0.8) as usize;
1259 debug!(
1260 "Calculated db_write_buffer_size: {} bytes (80% of {} total bytes)",
1261 size, total_memory_bytes
1262 );
1263 size
1264 };
1265 options.set_db_write_buffer_size(db_buffer_size);
1266
1267 let mut table_config_map = BTreeMap::new();
1269
1270 let mut cf_options = typed_store::rocks::default_db_options();
1274 cf_options.options.set_disable_auto_compactions(true);
1275
1276 let (buffer_size, buffer_count) = match (
1277 index_config.as_ref().and_then(|c| c.cf_write_buffer_size),
1278 index_config
1279 .as_ref()
1280 .and_then(|c| c.cf_max_write_buffer_number),
1281 ) {
1282 (Some(size), Some(count)) => {
1283 debug!(
1284 "Using config overrides - buffer_size: {} bytes, buffer_count: {}",
1285 size, count
1286 );
1287 (size, count)
1288 }
1289 (None, None) => {
1290 let cf_memory_budget = (total_memory_bytes as f64 * 0.25) as usize;
1292 debug!(
1293 "Column family memory budget: {} bytes (25% of {} total bytes)",
1294 cf_memory_budget, total_memory_bytes
1295 );
1296 const MIN_BUFFER_SIZE: usize = 64 * 1024 * 1024; let target_buffer_count = num_cpus::get().max(2);
1301
1302 let buffer_size =
1308 (cf_memory_budget / target_buffer_count).max(MIN_BUFFER_SIZE);
1309 let buffer_count = (cf_memory_budget / buffer_size)
1310 .clamp(2, target_buffer_count)
1311 as i32;
1312 debug!(
1313 "Calculated buffer_size: {} bytes, buffer_count: {} (based on {} CPUs)",
1314 buffer_size, buffer_count, target_buffer_count
1315 );
1316 (buffer_size, buffer_count)
1317 }
1318 _ => {
1319 panic!(
1320 "indexing-cf-write-buffer-size and indexing-cf-max-write-buffer-number must both be specified or both be omitted"
1321 );
1322 }
1323 };
1324
1325 cf_options.options.set_write_buffer_size(buffer_size);
1326 cf_options.options.set_max_write_buffer_number(buffer_count);
1327
1328 batch_size_limit = if let Some(limit) =
1330 index_config.as_ref().and_then(|c| c.batch_size_limit)
1331 {
1332 debug!(
1333 "Using config override for batch_size_limit: {} bytes",
1334 limit
1335 );
1336 limit
1337 } else {
1338 let half_buffer = buffer_size / 2;
1339 let default_limit = 1 << 27; let limit = half_buffer.min(default_limit);
1341 debug!(
1342 "Calculated batch_size_limit: {} bytes (min of half_buffer={} and default_limit={})",
1343 limit, half_buffer, default_limit
1344 );
1345 limit
1346 };
1347
1348 for (table_name, _) in IndexStoreTables::describe_tables() {
1350 table_config_map.insert(table_name, cf_options.clone());
1351 }
1352
1353 let mut balance_options = cf_options.clone();
1355 balance_options = balance_options.set_merge_operator_associative(
1356 "balance_merge",
1357 balance_delta_merge_operator,
1358 );
1359 table_config_map.insert("balance".to_string(), balance_options);
1360
1361 table_config_map.insert(
1362 "events_by_stream".to_string(),
1363 events_table_options(index_options.events_compaction_filter.clone()),
1364 );
1365
1366 IndexStoreTables::open_with_options(
1367 &path,
1368 options,
1369 Some(DBMapTableConfigMap::new(table_config_map)),
1370 )
1371 };
1372
1373 tables
1374 .init(
1375 authority_store,
1376 checkpoint_store,
1377 epoch_store,
1378 package_store,
1379 batch_size_limit,
1380 &rpc_config,
1381 )
1382 .expect("unable to initialize rpc index from live object set");
1383
1384 tables
1389 .meta
1390 .flush()
1391 .expect("Failed to flush RPC index tables to disk");
1392
1393 let weak_db = Arc::downgrade(&tables.meta.db);
1394 drop(tables);
1395
1396 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30);
1397 loop {
1398 if weak_db.strong_count() == 0 {
1399 break;
1400 }
1401 if std::time::Instant::now() > deadline {
1402 panic!("unable to reopen DB after indexing");
1403 }
1404 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1405 }
1406
1407 let reopened_tables =
1409 IndexStoreTables::open_with_index_options(&path, index_options);
1410
1411 let stored_version = reopened_tables
1413 .meta
1414 .get(&())
1415 .expect("Failed to read metadata from reopened database")
1416 .expect("Metadata not found in reopened database");
1417 assert_eq!(
1418 stored_version.version, CURRENT_DB_VERSION,
1419 "Database version mismatch after flush and reopen: expected {}, found {}",
1420 CURRENT_DB_VERSION, stored_version.version
1421 );
1422
1423 reopened_tables
1424 } else {
1425 tables
1426 }
1427 };
1428
1429 Self {
1430 tables,
1431 pending_updates: Default::default(),
1432 rpc_config,
1433 }
1434 }
1435
1436 pub fn new_without_init(dir: &Path) -> Self {
1437 let path = Self::db_path(dir);
1438 let tables = IndexStoreTables::open_with_index_options(path, IndexStoreOptions::default());
1439
1440 Self {
1441 tables,
1442 pending_updates: Default::default(),
1443 rpc_config: sui_config::RpcConfig::default(),
1444 }
1445 }
1446
1447 pub fn prune(
1448 &self,
1449 pruned_checkpoint_watermark: u64,
1450 checkpoint_contents_to_prune: &[CheckpointContents],
1451 ) -> Result<(), TypedStoreError> {
1452 self.tables
1453 .prune(pruned_checkpoint_watermark, checkpoint_contents_to_prune)
1454 }
1455
1456 #[tracing::instrument(
1461 skip_all,
1462 fields(checkpoint = checkpoint.checkpoint_summary.sequence_number)
1463 )]
1464 pub fn index_checkpoint(&self, checkpoint: &CheckpointData, resolver: &mut dyn LayoutResolver) {
1465 let sequence_number = checkpoint.checkpoint_summary.sequence_number;
1466 let batch = self
1467 .tables
1468 .index_checkpoint(checkpoint, resolver, &self.rpc_config)
1469 .expect("db error");
1470
1471 self.pending_updates
1472 .lock()
1473 .unwrap()
1474 .insert(sequence_number, batch);
1475 }
1476
1477 #[tracing::instrument(skip(self))]
1485 pub fn commit_update_for_checkpoint(&self, checkpoint: u64) -> Result<(), StorageError> {
1486 let next_batch = self.pending_updates.lock().unwrap().pop_first();
1487
1488 let (next_sequence_number, batch) = next_batch.unwrap();
1490 assert_eq!(
1491 checkpoint, next_sequence_number,
1492 "commit_update_for_checkpoint must be called in order"
1493 );
1494
1495 Ok(batch.write()?)
1496 }
1497
1498 pub fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<EpochInfo>, TypedStoreError> {
1499 self.tables.get_epoch_info(epoch)
1500 }
1501
1502 pub fn get_transaction_info(
1503 &self,
1504 digest: &TransactionDigest,
1505 ) -> Result<Option<TransactionInfo>, TypedStoreError> {
1506 self.tables.get_transaction_info(digest)
1507 }
1508
1509 pub fn owner_iter(
1510 &self,
1511 owner: SuiAddress,
1512 object_type: Option<StructTag>,
1513 cursor: Option<OwnerIndexKey>,
1514 ) -> Result<
1515 impl Iterator<Item = Result<(OwnerIndexKey, OwnerIndexInfo), TypedStoreError>> + '_,
1516 TypedStoreError,
1517 > {
1518 self.tables.owner_iter(owner, object_type, cursor)
1519 }
1520
1521 pub fn dynamic_field_iter(
1522 &self,
1523 parent: ObjectID,
1524 cursor: Option<ObjectID>,
1525 ) -> Result<impl Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_, TypedStoreError>
1526 {
1527 self.tables.dynamic_field_iter(parent, cursor)
1528 }
1529
1530 pub fn get_coin_info(
1531 &self,
1532 coin_type: &StructTag,
1533 ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
1534 self.tables.get_coin_info(coin_type)
1535 }
1536
1537 pub fn get_balance(
1538 &self,
1539 owner: &SuiAddress,
1540 coin_type: &StructTag,
1541 ) -> Result<Option<BalanceIndexInfo>, TypedStoreError> {
1542 self.tables.get_balance(owner, coin_type)
1543 }
1544
1545 pub fn balance_iter(
1546 &self,
1547 owner: SuiAddress,
1548 cursor: Option<BalanceKey>,
1549 ) -> Result<
1550 impl Iterator<Item = Result<(BalanceKey, BalanceIndexInfo), TypedStoreError>> + '_,
1551 TypedStoreError,
1552 > {
1553 self.tables.balance_iter(owner, cursor)
1554 }
1555
1556 pub fn package_versions_iter(
1557 &self,
1558 original_id: ObjectID,
1559 cursor: Option<u64>,
1560 ) -> Result<
1561 impl Iterator<Item = Result<(PackageVersionKey, PackageVersionInfo), TypedStoreError>> + '_,
1562 TypedStoreError,
1563 > {
1564 self.tables.package_versions_iter(original_id, cursor)
1565 }
1566
1567 pub fn event_iter(
1568 &self,
1569 stream_id: SuiAddress,
1570 start_checkpoint: u64,
1571 start_accumulator_version: u64,
1572 start_transaction_idx: u32,
1573 start_event_idx: u32,
1574 end_checkpoint: u64,
1575 limit: u32,
1576 ) -> Result<impl Iterator<Item = Result<EventIndexKey, TypedStoreError>> + '_, TypedStoreError>
1577 {
1578 self.tables.event_iter(
1579 stream_id,
1580 start_checkpoint,
1581 start_accumulator_version,
1582 start_transaction_idx,
1583 start_event_idx,
1584 end_checkpoint,
1585 limit,
1586 )
1587 }
1588
1589 pub fn get_highest_indexed_checkpoint_seq_number(
1590 &self,
1591 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
1592 self.tables.watermark.get(&Watermark::Indexed)
1593 }
1594}
1595
1596fn should_index_dynamic_field(object: &Object) -> bool {
1597 object
1605 .data
1606 .try_as_move()
1607 .is_some_and(|move_object| move_object.type_().is_dynamic_field())
1608}
1609
1610fn try_create_coin_index_info(object: &Object) -> Option<(CoinIndexKey, CoinIndexInfo)> {
1611 use sui_types::coin::CoinMetadata;
1612 use sui_types::coin::RegulatedCoinMetadata;
1613 use sui_types::coin::TreasuryCap;
1614
1615 let object_type = object.type_().and_then(MoveObjectType::other)?;
1616
1617 if let Some(coin_type) = CoinMetadata::is_coin_metadata_with_coin_type(object_type).cloned() {
1618 return Some((
1619 CoinIndexKey { coin_type },
1620 CoinIndexInfo {
1621 coin_metadata_object_id: Some(object.id()),
1622 treasury_object_id: None,
1623 regulated_coin_metadata_object_id: None,
1624 },
1625 ));
1626 }
1627
1628 if let Some(coin_type) = TreasuryCap::is_treasury_with_coin_type(object_type).cloned() {
1629 return Some((
1630 CoinIndexKey { coin_type },
1631 CoinIndexInfo {
1632 coin_metadata_object_id: None,
1633 treasury_object_id: Some(object.id()),
1634 regulated_coin_metadata_object_id: None,
1635 },
1636 ));
1637 }
1638
1639 if let Some(coin_type) =
1640 RegulatedCoinMetadata::is_regulated_coin_metadata_with_coin_type(object_type).cloned()
1641 {
1642 return Some((
1643 CoinIndexKey { coin_type },
1644 CoinIndexInfo {
1645 coin_metadata_object_id: None,
1646 treasury_object_id: None,
1647 regulated_coin_metadata_object_id: Some(object.id()),
1648 },
1649 ));
1650 }
1651
1652 None
1653}
1654
1655struct RpcParLiveObjectSetIndexer<'a> {
1656 tables: &'a IndexStoreTables,
1657 coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
1658 batch_size_limit: usize,
1659}
1660
1661struct RpcLiveObjectIndexer<'a> {
1662 tables: &'a IndexStoreTables,
1663 batch: typed_store::rocks::DBBatch,
1664 coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
1665 balance_changes: HashMap<BalanceKey, BalanceIndexInfo>,
1666 batch_size_limit: usize,
1667}
1668
1669impl<'a> ParMakeLiveObjectIndexer for RpcParLiveObjectSetIndexer<'a> {
1670 type ObjectIndexer = RpcLiveObjectIndexer<'a>;
1671
1672 fn make_live_object_indexer(&self) -> Self::ObjectIndexer {
1673 RpcLiveObjectIndexer {
1674 tables: self.tables,
1675 batch: self.tables.owner.batch(),
1676 coin_index: self.coin_index,
1677 balance_changes: HashMap::new(),
1678 batch_size_limit: self.batch_size_limit,
1679 }
1680 }
1681}
1682
1683impl LiveObjectIndexer for RpcLiveObjectIndexer<'_> {
1684 fn index_object(&mut self, object: Object) -> Result<(), StorageError> {
1685 match object.owner {
1686 Owner::AddressOwner(owner) | Owner::ConsensusAddressOwner { owner, .. } => {
1688 let owner_key = OwnerIndexKey::from_object(&object);
1689 let owner_info = OwnerIndexInfo::new(&object);
1690 self.batch
1691 .insert_batch(&self.tables.owner, [(owner_key, owner_info)])?;
1692
1693 if let Some((coin_type, value)) = get_balance_and_type_if_coin(&object)? {
1694 let balance_key = BalanceKey { owner, coin_type };
1695 let balance_info = BalanceIndexInfo::from(value);
1696 self.balance_changes
1697 .entry(balance_key)
1698 .or_default()
1699 .merge_delta(&balance_info);
1700
1701 if self.balance_changes.len() >= BALANCE_FLUSH_THRESHOLD {
1702 self.batch.partial_merge_batch(
1703 &self.tables.balance,
1704 std::mem::take(&mut self.balance_changes),
1705 )?;
1706 }
1707 }
1708 }
1709
1710 Owner::ObjectOwner(parent) => {
1712 if should_index_dynamic_field(&object) {
1713 let field_key = DynamicFieldKey::new(parent, object.id());
1714 self.batch
1715 .insert_batch(&self.tables.dynamic_field, [(field_key, ())])?;
1716 }
1717 }
1718
1719 Owner::Shared { .. } | Owner::Immutable => {}
1720 }
1721
1722 if let Some((key, value)) = try_create_coin_index_info(&object) {
1724 use std::collections::hash_map::Entry;
1725
1726 match self.coin_index.lock().unwrap().entry(key) {
1727 Entry::Occupied(mut o) => {
1728 o.get_mut().merge(value);
1729 }
1730 Entry::Vacant(v) => {
1731 v.insert(value);
1732 }
1733 }
1734 }
1735
1736 if let Some((key, info)) = IndexStoreTables::extract_version_if_package(&object) {
1737 self.batch
1738 .insert_batch(&self.tables.package_version, [(key, info)])?;
1739 }
1740
1741 if self.batch.size_in_bytes() >= self.batch_size_limit {
1744 std::mem::replace(&mut self.batch, self.tables.owner.batch())
1745 .write_opt(&bulk_ingestion_write_options())?;
1746 }
1747
1748 Ok(())
1749 }
1750
1751 fn finish(mut self) -> Result<(), StorageError> {
1752 self.batch.partial_merge_batch(
1753 &self.tables.balance,
1754 std::mem::take(&mut self.balance_changes),
1755 )?;
1756 self.batch.write_opt(&bulk_ingestion_write_options())?;
1757 Ok(())
1758 }
1759}
1760
1761fn sparse_checkpoint_data_for_backfill(
1764 authority_store: &AuthorityStore,
1765 checkpoint_store: &CheckpointStore,
1766 checkpoint: u64,
1767 load_events: bool,
1768) -> Result<CheckpointData, StorageError> {
1769 use sui_types::full_checkpoint_content::CheckpointTransaction;
1770
1771 let summary = checkpoint_store
1772 .get_checkpoint_by_sequence_number(checkpoint)?
1773 .ok_or_else(|| StorageError::missing(format!("missing checkpoint {checkpoint}")))?;
1774 let contents = checkpoint_store
1775 .get_checkpoint_contents(&summary.content_digest)?
1776 .ok_or_else(|| StorageError::missing(format!("missing checkpoint {checkpoint}")))?;
1777
1778 let transaction_digests = contents
1779 .iter()
1780 .map(|execution_digests| execution_digests.transaction)
1781 .collect::<Vec<_>>();
1782 let transactions = authority_store
1783 .multi_get_transaction_blocks(&transaction_digests)?
1784 .into_iter()
1785 .map(|maybe_transaction| {
1786 maybe_transaction.ok_or_else(|| StorageError::custom("missing transaction"))
1787 })
1788 .collect::<Result<Vec<_>, _>>()?;
1789
1790 let effects = authority_store
1791 .multi_get_executed_effects(&transaction_digests)?
1792 .into_iter()
1793 .map(|maybe_effects| maybe_effects.ok_or_else(|| StorageError::custom("missing effects")))
1794 .collect::<Result<Vec<_>, _>>()?;
1795
1796 let events = if load_events {
1797 authority_store
1798 .multi_get_events(&transaction_digests)
1799 .map_err(|e| StorageError::custom(e.to_string()))?
1800 } else {
1801 vec![None; transaction_digests.len()]
1802 };
1803
1804 let mut full_transactions = Vec::with_capacity(transactions.len());
1805 for ((tx, fx), ev) in transactions.into_iter().zip(effects).zip(events) {
1806 let input_objects =
1807 sui_types::storage::get_transaction_input_objects(authority_store, &fx)?;
1808 let output_objects =
1809 sui_types::storage::get_transaction_output_objects(authority_store, &fx)?;
1810
1811 let full_transaction = CheckpointTransaction {
1812 transaction: tx.into(),
1813 effects: fx,
1814 events: ev,
1815 input_objects,
1816 output_objects,
1817 };
1818
1819 full_transactions.push(full_transaction);
1820 }
1821
1822 let checkpoint_data = CheckpointData {
1823 checkpoint_summary: summary.into(),
1824 checkpoint_contents: contents,
1825 transactions: full_transactions,
1826 };
1827
1828 Ok(checkpoint_data)
1829}
1830
1831fn get_balance_and_type_if_coin(object: &Object) -> Result<Option<(StructTag, u64)>, StorageError> {
1832 match Coin::extract_balance_if_coin(object) {
1833 Ok(Some((TypeTag::Struct(struct_tag), value))) => Ok(Some((*struct_tag, value))),
1834 Ok(Some(_)) => {
1835 debug!("Coin object {} has non-struct type tag", object.id());
1836 Ok(None)
1837 }
1838 Ok(None) => {
1839 Ok(None)
1841 }
1842 Err(e) => {
1843 Err(StorageError::custom(format!(
1845 "Failed to deserialize coin object {}: {}",
1846 object.id(),
1847 e
1848 )))
1849 }
1850 }
1851}
1852
1853#[cfg(test)]
1854mod tests {
1855 use super::*;
1856 use std::sync::atomic::AtomicU64;
1857 use sui_types::base_types::SuiAddress;
1858
1859 #[tokio::test]
1860 async fn test_events_compaction_filter() {
1861 let temp_dir = tempfile::tempdir().unwrap();
1862 let path = temp_dir.path();
1863 let db_path = path.join("rpc-index");
1864
1865 let pruning_watermark = Arc::new(AtomicU64::new(5));
1866 let compaction_filter = EventsCompactionFilter::new(pruning_watermark.clone());
1867
1868 let index_options = IndexStoreOptions {
1869 events_compaction_filter: Some(compaction_filter),
1870 };
1871
1872 let tables = IndexStoreTables::open_with_index_options(&db_path, index_options);
1873 let stream_id = SuiAddress::random_for_testing_only();
1874 let test_events: Vec<EventIndexKey> = [1, 3, 5, 10, 15]
1875 .iter()
1876 .map(|&checkpoint_seq| EventIndexKey {
1877 stream_id,
1878 checkpoint_seq,
1879 accumulator_version: 0,
1880 transaction_idx: 0,
1881 event_index: 0,
1882 })
1883 .collect();
1884
1885 let mut batch = tables.events_by_stream.batch();
1886 for key in &test_events {
1887 batch
1888 .insert_batch(&tables.events_by_stream, [(key.clone(), ())])
1889 .unwrap();
1890 }
1891 batch.write().unwrap();
1892
1893 tables.events_by_stream.flush().unwrap();
1894 let mut events_before_compaction = 0;
1895 for result in tables.events_by_stream.safe_iter() {
1896 if result.is_ok() {
1897 events_before_compaction += 1;
1898 }
1899 }
1900 assert_eq!(
1901 events_before_compaction, 5,
1902 "Should have 5 events before compaction"
1903 );
1904 let start_key = EventIndexKey {
1905 stream_id: SuiAddress::ZERO,
1906 checkpoint_seq: 0,
1907 accumulator_version: 0,
1908 transaction_idx: 0,
1909 event_index: 0,
1910 };
1911 let end_key = EventIndexKey {
1912 stream_id: SuiAddress::random_for_testing_only(),
1913 checkpoint_seq: u64::MAX,
1914 accumulator_version: u64::MAX,
1915 transaction_idx: u32::MAX,
1916 event_index: u32::MAX,
1917 };
1918
1919 tables
1920 .events_by_stream
1921 .compact_range(&start_key, &end_key)
1922 .unwrap();
1923 let mut events_after_compaction = Vec::new();
1924 for (key, _event) in tables.events_by_stream.safe_iter().flatten() {
1925 events_after_compaction.push(key);
1926 }
1927
1928 println!("Events after compaction: {}", events_after_compaction.len());
1929 assert!(
1930 events_after_compaction.len() >= 2,
1931 "Should have at least the events that shouldn't be pruned"
1932 );
1933 pruning_watermark.store(20, std::sync::atomic::Ordering::Relaxed);
1934 let watermark_after = pruning_watermark.load(std::sync::atomic::Ordering::Relaxed);
1935 assert_eq!(watermark_after, 20, "Watermark should be updated");
1936 }
1937
1938 #[test]
1939 fn test_events_compaction_filter_logic() {
1940 let watermark = Arc::new(AtomicU64::new(100));
1941 let filter = EventsCompactionFilter::new(watermark.clone());
1942
1943 let old_key = EventIndexKey {
1944 stream_id: SuiAddress::random_for_testing_only(),
1945 checkpoint_seq: 50,
1946 accumulator_version: 0,
1947 transaction_idx: 0,
1948 event_index: 0,
1949 };
1950 let old_key_bytes = bcs::to_bytes(&old_key).unwrap();
1951 let decision = filter.filter(&old_key_bytes, &[]).unwrap();
1952 assert!(
1953 matches!(decision, Decision::Remove),
1954 "Event with checkpoint 50 should be removed when watermark is 100"
1955 );
1956 let new_key = EventIndexKey {
1957 stream_id: SuiAddress::random_for_testing_only(),
1958 checkpoint_seq: 150,
1959 accumulator_version: 0,
1960 transaction_idx: 0,
1961 event_index: 0,
1962 };
1963 let new_key_bytes = bcs::to_bytes(&new_key).unwrap();
1964 let decision = filter.filter(&new_key_bytes, &[]).unwrap();
1965 assert!(
1966 matches!(decision, Decision::Keep),
1967 "Event with checkpoint 150 should be kept when watermark is 100"
1968 );
1969 let boundary_key = EventIndexKey {
1970 stream_id: SuiAddress::random_for_testing_only(),
1971 checkpoint_seq: 100,
1972 accumulator_version: 0,
1973 transaction_idx: 0,
1974 event_index: 0,
1975 };
1976 let boundary_key_bytes = bcs::to_bytes(&boundary_key).unwrap();
1977 let decision = filter.filter(&boundary_key_bytes, &[]).unwrap();
1978 assert!(
1979 matches!(decision, Decision::Remove),
1980 "Event with checkpoint equal to watermark should be removed"
1981 );
1982 }
1983}