1use crate::authority::AuthorityStore;
5use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
6use crate::checkpoints::CheckpointStore;
7use crate::par_index_live_object_set::LiveObjectIndexer;
8use crate::par_index_live_object_set::ParMakeLiveObjectIndexer;
9use itertools::Itertools;
10use move_core_types::language_storage::{StructTag, TypeTag};
11use mysten_common::ZipDebugEqIteratorExt;
12use rayon::iter::IntoParallelIterator;
13use rayon::iter::ParallelIterator;
14use serde::Deserialize;
15use serde::Serialize;
16use std::collections::{BTreeMap, HashMap};
17use std::path::Path;
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::sync::Mutex;
21use std::time::Duration;
22use std::time::Instant;
23use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
24use sui_types::base_types::MoveObjectType;
25use sui_types::base_types::ObjectID;
26use sui_types::base_types::SequenceNumber;
27use sui_types::base_types::SuiAddress;
28use sui_types::coin::Coin;
29use sui_types::committee::EpochId;
30use sui_types::digests::TransactionDigest;
31use sui_types::effects::{AccumulatorValue, TransactionEffectsAPI};
32use sui_types::full_checkpoint_content::CheckpointData;
33use sui_types::layout_resolver::LayoutResolver;
34use sui_types::messages_checkpoint::CheckpointContents;
35use sui_types::messages_checkpoint::CheckpointSequenceNumber;
36use sui_types::object::Data;
37use sui_types::object::Object;
38use sui_types::object::Owner;
39use sui_types::storage::BackingPackageStore;
40use sui_types::storage::DynamicFieldKey;
41use sui_types::storage::EpochInfo;
42use sui_types::storage::TransactionInfo;
43use sui_types::storage::error::Error as StorageError;
44use sui_types::sui_system_state::SuiSystemStateTrait;
45use sui_types::transaction::{TransactionDataAPI, TransactionKind};
46use sysinfo::{MemoryRefreshKind, RefreshKind, System};
47use tracing::{debug, info, warn};
48use typed_store::DBMapUtils;
49use typed_store::TypedStoreError;
50use typed_store::rocks::{DBMap, DBMapTableConfigMap, MetricConf};
51use typed_store::rocksdb::{MergeOperands, WriteOptions, compaction_filter::Decision};
52use typed_store::traits::Map;
53
54const CURRENT_DB_VERSION: u64 = 4;
55const BALANCE_FLUSH_THRESHOLD: usize = 10_000;
57
58fn bulk_ingestion_write_options() -> WriteOptions {
59 let mut opts = WriteOptions::default();
60 opts.disable_wal(true);
61 opts
62}
63
64fn get_available_memory() -> u64 {
66 let mut sys = System::new_with_specifics(
68 RefreshKind::nothing().with_memory(MemoryRefreshKind::everything()),
69 );
70 sys.refresh_memory();
71
72 if let Some(cgroup_limits) = sys.cgroup_limits() {
74 let memory_limit = cgroup_limits.total_memory;
75 if memory_limit > 0 {
77 debug!("Using cgroup memory limit: {} bytes", memory_limit);
78 return memory_limit;
79 }
80 }
81
82 let total_memory_bytes = sys.total_memory();
85 debug!("Using system memory: {} bytes", total_memory_bytes);
86 total_memory_bytes
87}
88
89#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
90struct MetadataInfo {
91 version: u64,
93}
94
95#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
97pub enum Watermark {
98 Indexed,
99 Pruned,
100}
101
102#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
103pub struct OwnerIndexKey {
104 pub owner: SuiAddress,
105
106 pub object_type: StructTag,
107
108 pub inverted_balance: Option<u64>,
111
112 pub object_id: ObjectID,
113}
114
115impl OwnerIndexKey {
116 fn from_object(object: &Object) -> Self {
119 let owner = match object.owner() {
120 Owner::AddressOwner(owner) => owner,
121 Owner::ConsensusAddressOwner { owner, .. } => owner,
122 _ => panic!("cannot create OwnerIndexKey if object is not address-owned"),
123 };
124 let object_type = object.struct_tag().expect("packages cannot be owned");
125
126 let inverted_balance = object.as_coin_maybe().map(|coin| !coin.balance.value());
127
128 Self {
129 owner: *owner,
130 object_type,
131 inverted_balance,
132 object_id: object.id(),
133 }
134 }
135}
136
137#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
138pub struct OwnerIndexInfo {
139 pub version: SequenceNumber,
141}
142
143impl OwnerIndexInfo {
144 pub fn new(object: &Object) -> Self {
145 Self {
146 version: object.version(),
147 }
148 }
149}
150
151#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
152pub struct CoinIndexKey {
153 coin_type: StructTag,
154}
155
156#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
157pub struct BalanceKey {
158 pub owner: SuiAddress,
159 pub coin_type: StructTag,
160}
161
162#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
163pub struct CoinIndexInfo {
164 pub coin_metadata_object_id: Option<ObjectID>,
165 pub treasury_object_id: Option<ObjectID>,
166 pub regulated_coin_metadata_object_id: Option<ObjectID>,
167}
168
169#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug, Default)]
170pub struct BalanceIndexInfo {
171 pub coin_balance_delta: i128,
172 pub address_balance_delta: i128,
173}
174
175impl BalanceIndexInfo {
176 fn merge_delta(&mut self, other: &Self) {
177 self.coin_balance_delta = self
178 .coin_balance_delta
179 .saturating_add(other.coin_balance_delta);
180 self.address_balance_delta = self
181 .address_balance_delta
182 .saturating_add(other.address_balance_delta);
183 }
184}
185
186impl From<BalanceIndexInfo> for sui_types::storage::BalanceInfo {
187 fn from(index_info: BalanceIndexInfo) -> Self {
188 let coin_balance = index_info.coin_balance_delta.clamp(0, u64::MAX as i128) as u64;
195 let address_balance = index_info.address_balance_delta.clamp(0, u64::MAX as i128) as u64;
196 sui_types::storage::BalanceInfo {
197 coin_balance,
198 address_balance,
199 }
200 }
201}
202
203#[derive(Copy, Clone, PartialEq, Eq, Debug, Serialize, Deserialize, PartialOrd, Ord)]
204pub struct PackageVersionKey {
205 pub original_package_id: ObjectID,
206 pub version: u64,
207}
208
209#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
210pub struct PackageVersionInfo {
211 pub storage_id: ObjectID,
212}
213
214#[derive(Default, Clone)]
215pub struct IndexStoreOptions {
216 pub events_compaction_filter: Option<EventsCompactionFilter>,
217}
218
219fn default_table_options() -> typed_store::rocks::DBOptions {
220 typed_store::rocks::default_db_options().disable_write_throttling()
221}
222
223fn events_table_options(
224 compaction_filter: Option<EventsCompactionFilter>,
225) -> typed_store::rocks::DBOptions {
226 let mut options = default_table_options();
227 if let Some(filter) = compaction_filter {
228 options.options.set_compaction_filter(
229 "events_by_stream",
230 move |_, key, value| match filter.filter(key, value) {
231 Ok(decision) => decision,
232 Err(e) => {
233 warn!(
234 "Failed to parse event key during compaction: {}, key: {:?}",
235 e, key
236 );
237 Decision::Remove
238 }
239 },
240 );
241 }
242 options
243}
244
245fn balance_delta_merge_operator(
246 _key: &[u8],
247 existing_val: Option<&[u8]>,
248 operands: &MergeOperands,
249) -> Option<Vec<u8>> {
250 let mut result = if let Some(existing_val) = existing_val {
251 bcs::from_bytes::<BalanceIndexInfo>(existing_val)
252 .inspect_err(|e| {
253 tracing::error!(
254 "Failed to deserialize BalanceIndexInfo from RocksDB - data corruption: {e}"
255 )
256 })
257 .ok()?
258 } else {
259 BalanceIndexInfo::default()
260 };
261
262 for operand in operands.iter() {
263 let delta = bcs::from_bytes::<BalanceIndexInfo>(operand)
264 .inspect_err(|e| {
265 tracing::error!(
266 "Failed to deserialize BalanceIndexInfo from RocksDB - data corruption: {e}"
267 )
268 })
269 .ok()?;
270 result.merge_delta(&delta);
271 }
272
273 Some(
274 bcs::to_bytes(&result)
275 .expect("Failed to deserialize BalanceIndexInfo from RocksDB - data corruption."),
276 )
277}
278
279fn balance_compaction_filter(_level: u32, _key: &[u8], value: &[u8]) -> Decision {
280 let balance_info = match bcs::from_bytes::<BalanceIndexInfo>(value) {
281 Ok(info) => info,
282 Err(_) => return Decision::Keep,
283 };
284
285 if balance_info.coin_balance_delta == 0 && balance_info.address_balance_delta == 0 {
286 Decision::Remove
287 } else {
288 Decision::Keep
289 }
290}
291
292fn balance_table_options() -> typed_store::rocks::DBOptions {
293 default_table_options()
294 .set_merge_operator_associative("balance_merge", balance_delta_merge_operator)
295 .set_compaction_filter("balance_zero_filter", balance_compaction_filter)
296}
297
298impl CoinIndexInfo {
299 fn merge(&mut self, other: Self) {
300 self.coin_metadata_object_id = self
301 .coin_metadata_object_id
302 .or(other.coin_metadata_object_id);
303 self.regulated_coin_metadata_object_id = self
304 .regulated_coin_metadata_object_id
305 .or(other.regulated_coin_metadata_object_id);
306 self.treasury_object_id = self.treasury_object_id.or(other.treasury_object_id);
307 }
308}
309
310#[derive(DBMapUtils)]
319struct IndexStoreTables {
320 meta: DBMap<(), MetadataInfo>,
328
329 #[default_options_override_fn = "default_table_options"]
335 watermark: DBMap<Watermark, CheckpointSequenceNumber>,
336
337 #[default_options_override_fn = "default_table_options"]
341 epochs: DBMap<EpochId, EpochInfo>,
342
343 #[default_options_override_fn = "default_table_options"]
347 #[allow(unused)]
348 #[deprecated]
349 transactions: DBMap<TransactionDigest, TransactionInfo>,
350
351 #[default_options_override_fn = "default_table_options"]
356 owner: DBMap<OwnerIndexKey, OwnerIndexInfo>,
357
358 #[default_options_override_fn = "default_table_options"]
363 dynamic_field: DBMap<DynamicFieldKey, ()>,
364
365 #[default_options_override_fn = "default_table_options"]
370 coin: DBMap<CoinIndexKey, CoinIndexInfo>,
371
372 #[default_options_override_fn = "balance_table_options"]
376 balance: DBMap<BalanceKey, BalanceIndexInfo>,
377
378 #[default_options_override_fn = "default_table_options"]
383 package_version: DBMap<PackageVersionKey, PackageVersionInfo>,
384
385 events_by_stream: DBMap<EventIndexKey, ()>,
387 }
391
392#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
393pub struct EventIndexKey {
394 pub stream_id: SuiAddress,
395 pub checkpoint_seq: u64,
396 pub accumulator_version: u64,
398 pub transaction_idx: u32,
399 pub event_index: u32,
400}
401
402#[derive(Clone)]
404pub struct EventsCompactionFilter {
405 pruning_watermark: Arc<std::sync::atomic::AtomicU64>,
406}
407
408impl EventsCompactionFilter {
409 pub fn new(pruning_watermark: Arc<std::sync::atomic::AtomicU64>) -> Self {
410 Self { pruning_watermark }
411 }
412
413 pub fn filter(&self, key: &[u8], _value: &[u8]) -> anyhow::Result<Decision> {
414 let event_key: EventIndexKey = bcs::from_bytes(key)?;
415 let watermark = self
416 .pruning_watermark
417 .load(std::sync::atomic::Ordering::Relaxed);
418
419 if event_key.checkpoint_seq <= watermark {
420 Ok(Decision::Remove)
421 } else {
422 Ok(Decision::Keep)
423 }
424 }
425}
426
427impl IndexStoreTables {
428 fn extract_version_if_package(
429 object: &Object,
430 ) -> Option<(PackageVersionKey, PackageVersionInfo)> {
431 if let Data::Package(package) = &object.data {
432 let original_id = package.original_package_id();
433 let version = package.version().value();
434 let storage_id = object.id();
435
436 let key = PackageVersionKey {
437 original_package_id: original_id,
438 version,
439 };
440 let info = PackageVersionInfo { storage_id };
441 return Some((key, info));
442 }
443 None
444 }
445
446 fn open_with_index_options<P: Into<PathBuf>>(
447 path: P,
448 index_options: IndexStoreOptions,
449 ) -> Self {
450 let mut table_options = std::collections::BTreeMap::new();
458 for (table_name, _) in IndexStoreTables::describe_tables() {
459 table_options.insert(table_name, default_table_options());
460 }
461 table_options.insert("balance".to_string(), balance_table_options());
462 table_options.insert(
463 "events_by_stream".to_string(),
464 events_table_options(index_options.events_compaction_filter),
465 );
466
467 IndexStoreTables::open_tables_read_write_with_deprecation_option(
468 path.into(),
469 MetricConf::new("rpc-index"),
470 None,
471 Some(DBMapTableConfigMap::new(table_options)),
472 true, )
474 }
475
476 fn open_with_options<P: Into<PathBuf>>(
477 path: P,
478 options: typed_store::rocksdb::Options,
479 table_options: Option<DBMapTableConfigMap>,
480 ) -> Self {
481 IndexStoreTables::open_tables_read_write_with_deprecation_option(
482 path.into(),
483 MetricConf::new("rpc-index"),
484 Some(options),
485 table_options,
486 true, )
488 }
489
490 fn needs_to_do_initialization(&self, checkpoint_store: &CheckpointStore) -> bool {
491 (match self.meta.get(&()) {
492 Ok(Some(metadata)) => metadata.version != CURRENT_DB_VERSION,
493 Ok(None) => true,
494 Err(_) => true,
495 }) || self.is_indexed_watermark_out_of_date(checkpoint_store)
496 }
497
498 fn is_indexed_watermark_out_of_date(&self, checkpoint_store: &CheckpointStore) -> bool {
500 let highest_executed_checkpint = checkpoint_store
501 .get_highest_executed_checkpoint_seq_number()
502 .ok()
503 .flatten();
504 let watermark = self.watermark.get(&Watermark::Indexed).ok().flatten();
505 watermark < highest_executed_checkpint
506 }
507
508 #[tracing::instrument(skip_all)]
509 fn init(
510 &mut self,
511 authority_store: &AuthorityStore,
512 checkpoint_store: &CheckpointStore,
513 _epoch_store: &AuthorityPerEpochStore,
514 _package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
515 batch_size_limit: usize,
516 rpc_config: &sui_config::RpcConfig,
517 ) -> Result<(), StorageError> {
518 info!("Initializing RPC indexes");
519
520 let highest_executed_checkpint =
521 checkpoint_store.get_highest_executed_checkpoint_seq_number()?;
522 let lowest_available_checkpoint = checkpoint_store
523 .get_highest_pruned_checkpoint_seq_number()?
524 .map(|c| c.saturating_add(1))
525 .unwrap_or(0);
526 let lowest_available_checkpoint_objects = authority_store
527 .perpetual_tables
528 .get_highest_pruned_checkpoint()?
529 .map(|c| c.saturating_add(1))
530 .unwrap_or(0);
531 let lowest_available_checkpoint =
534 lowest_available_checkpoint.max(lowest_available_checkpoint_objects);
535
536 let checkpoint_range = highest_executed_checkpint.map(|highest_executed_checkpint| {
537 lowest_available_checkpoint..=highest_executed_checkpint
538 });
539
540 if let Some(checkpoint_range) = checkpoint_range {
541 self.index_existing_checkpoints(
542 authority_store,
543 checkpoint_store,
544 checkpoint_range,
545 rpc_config,
546 )?;
547 }
548
549 self.initialize_current_epoch(authority_store, checkpoint_store)?;
550
551 if highest_executed_checkpint.is_some() {
555 let coin_index = Mutex::new(HashMap::new());
556
557 let make_live_object_indexer = RpcParLiveObjectSetIndexer {
558 tables: self,
559 coin_index: &coin_index,
560 batch_size_limit,
561 };
562
563 crate::par_index_live_object_set::par_index_live_object_set(
564 authority_store,
565 &make_live_object_indexer,
566 )?;
567
568 self.coin.multi_insert(coin_index.into_inner().unwrap())?;
569 }
570
571 self.watermark.insert(
572 &Watermark::Indexed,
573 &highest_executed_checkpint.unwrap_or(0),
574 )?;
575
576 self.meta.insert(
577 &(),
578 &MetadataInfo {
579 version: CURRENT_DB_VERSION,
580 },
581 )?;
582
583 info!("Finished initializing RPC indexes");
584
585 Ok(())
586 }
587
588 #[tracing::instrument(skip(self, authority_store, checkpoint_store, rpc_config))]
589 fn index_existing_checkpoints(
590 &mut self,
591 authority_store: &AuthorityStore,
592 checkpoint_store: &CheckpointStore,
593 checkpoint_range: std::ops::RangeInclusive<u64>,
594 rpc_config: &sui_config::RpcConfig,
595 ) -> Result<(), StorageError> {
596 info!(
597 "Indexing {} checkpoints in range {checkpoint_range:?}",
598 checkpoint_range.size_hint().0
599 );
600 let start_time = Instant::now();
601
602 checkpoint_range.into_par_iter().try_for_each(|seq| {
603 let load_events = rpc_config.authenticated_events_indexing();
604 let Some(checkpoint_data) = sparse_checkpoint_data_for_epoch_backfill(
605 authority_store,
606 checkpoint_store,
607 seq,
608 load_events,
609 )?
610 else {
611 return Ok(());
612 };
613
614 let mut batch = self.epochs.batch();
615
616 self.index_epoch(&checkpoint_data, &mut batch)?;
617
618 batch
619 .write_opt(bulk_ingestion_write_options())
620 .map_err(StorageError::from)
621 })?;
622
623 info!(
624 "Indexing checkpoints took {} seconds",
625 start_time.elapsed().as_secs()
626 );
627 Ok(())
628 }
629
630 fn prune(
632 &self,
633 pruned_checkpoint_watermark: u64,
634 _checkpoint_contents_to_prune: &[CheckpointContents],
635 ) -> Result<(), TypedStoreError> {
636 let mut batch = self.watermark.batch();
637
638 batch.insert_batch(
639 &self.watermark,
640 [(Watermark::Pruned, pruned_checkpoint_watermark)],
641 )?;
642
643 batch.write()
644 }
645
646 fn index_checkpoint(
648 &self,
649 checkpoint: &CheckpointData,
650 _resolver: &mut dyn LayoutResolver,
651 rpc_config: &sui_config::RpcConfig,
652 ) -> Result<typed_store::rocks::DBBatch, StorageError> {
653 debug!(
654 checkpoint = checkpoint.checkpoint_summary.sequence_number,
655 "indexing checkpoint"
656 );
657
658 let mut batch = self.owner.batch();
659
660 self.index_epoch(checkpoint, &mut batch)?;
661 self.index_transactions(
662 checkpoint,
663 &mut batch,
664 rpc_config.authenticated_events_indexing(),
665 )?;
666 self.index_objects(checkpoint, &mut batch)?;
667
668 batch.insert_batch(
669 &self.watermark,
670 [(
671 Watermark::Indexed,
672 checkpoint.checkpoint_summary.sequence_number,
673 )],
674 )?;
675
676 debug!(
677 checkpoint = checkpoint.checkpoint_summary.sequence_number,
678 "finished indexing checkpoint"
679 );
680
681 Ok(batch)
682 }
683
684 fn extract_accumulator_version(
685 &self,
686 tx: &sui_types::full_checkpoint_content::CheckpointTransaction,
687 ) -> Option<u64> {
688 let TransactionKind::ProgrammableSystemTransaction(pt) =
689 tx.transaction.transaction_data().kind()
690 else {
691 return None;
692 };
693
694 if pt.shared_input_objects().any(|obj| {
695 obj.id == SUI_ACCUMULATOR_ROOT_OBJECT_ID
696 && obj.mutability == sui_types::transaction::SharedObjectMutability::Mutable
697 }) {
698 return tx.output_objects.iter().find_map(|obj| {
699 if obj.id() == SUI_ACCUMULATOR_ROOT_OBJECT_ID {
700 Some(obj.version().value())
701 } else {
702 None
703 }
704 });
705 }
706
707 None
708 }
709
710 fn index_transaction_events(
711 &self,
712 tx: &sui_types::full_checkpoint_content::CheckpointTransaction,
713 checkpoint_seq: u64,
714 tx_idx: u32,
715 accumulator_version: Option<u64>,
716 batch: &mut typed_store::rocks::DBBatch,
717 ) -> Result<(), StorageError> {
718 let acc_events = tx.effects.accumulator_events();
719 if acc_events.is_empty() {
720 return Ok(());
721 }
722
723 let mut entries: Vec<(EventIndexKey, ())> = Vec::new();
724 for acc in acc_events {
725 if let AccumulatorValue::EventDigest(event_digests) = &acc.write.value {
726 let Some(accumulator_version) = accumulator_version else {
727 mysten_common::debug_fatal!(
728 "Found events at checkpoint {} tx {} before any accumulator settlement",
729 checkpoint_seq,
730 tx_idx
731 );
732 continue;
733 };
734
735 if let Some(stream_id) =
736 sui_types::accumulator_root::stream_id_from_accumulator_event(&acc)
737 {
738 for (idx, _d) in event_digests {
739 let key = EventIndexKey {
740 stream_id,
741 checkpoint_seq,
742 accumulator_version,
743 transaction_idx: tx_idx,
744 event_index: *idx as u32,
745 };
746 entries.push((key, ()));
747 }
748 }
749 }
750 }
751
752 if !entries.is_empty() {
753 batch.insert_batch(&self.events_by_stream, entries)?;
754 }
755 Ok(())
756 }
757
758 fn index_epoch(
759 &self,
760 checkpoint: &CheckpointData,
761 batch: &mut typed_store::rocks::DBBatch,
762 ) -> Result<(), StorageError> {
763 let Some(epoch_info) = checkpoint.epoch_info()? else {
764 return Ok(());
765 };
766 if epoch_info.epoch > 0 {
767 let prev_epoch = epoch_info.epoch - 1;
768 let mut current_epoch = self.epochs.get(&prev_epoch)?.unwrap_or_default();
769 current_epoch.epoch = prev_epoch; current_epoch.end_timestamp_ms = epoch_info.start_timestamp_ms;
771 current_epoch.end_checkpoint = epoch_info.start_checkpoint.map(|sq| sq - 1);
772 batch.insert_batch(&self.epochs, [(prev_epoch, current_epoch)])?;
773 }
774 batch.insert_batch(&self.epochs, [(epoch_info.epoch, epoch_info)])?;
775 Ok(())
776 }
777
778 fn initialize_current_epoch(
781 &mut self,
782 authority_store: &AuthorityStore,
783 checkpoint_store: &CheckpointStore,
784 ) -> Result<(), StorageError> {
785 let Some(checkpoint) = checkpoint_store.get_highest_executed_checkpoint()? else {
786 return Ok(());
787 };
788
789 let system_state = sui_types::sui_system_state::get_sui_system_state(authority_store)
790 .map_err(|e| StorageError::custom(format!("Failed to find system state: {e}")))?;
791
792 let mut epoch = self.epochs.get(&checkpoint.epoch)?.unwrap_or_default();
793 epoch.epoch = checkpoint.epoch;
794
795 if epoch.protocol_version.is_none() {
796 epoch.protocol_version = Some(system_state.protocol_version());
797 }
798
799 if epoch.start_timestamp_ms.is_none() {
800 epoch.start_timestamp_ms = Some(system_state.epoch_start_timestamp_ms());
801 }
802
803 if epoch.reference_gas_price.is_none() {
804 epoch.reference_gas_price = Some(system_state.reference_gas_price());
805 }
806
807 if epoch.system_state.is_none() {
808 epoch.system_state = Some(system_state);
809 }
810
811 self.epochs.insert(&epoch.epoch, &epoch)?;
812
813 Ok(())
814 }
815
816 fn index_transactions(
817 &self,
818 checkpoint: &CheckpointData,
819 batch: &mut typed_store::rocks::DBBatch,
820 index_events: bool,
821 ) -> Result<(), StorageError> {
822 let cp = checkpoint.checkpoint_summary.sequence_number;
823 let mut current_accumulator_version: Option<u64> = None;
824
825 for (tx_idx, tx) in checkpoint.transactions.iter().enumerate().rev() {
827 let balance_changes = sui_types::balance_change::derive_detailed_balance_changes(
828 &tx.effects,
829 &tx.input_objects,
830 &tx.output_objects,
831 )
832 .into_iter()
833 .filter_map(|change| {
834 if let TypeTag::Struct(coin_type) = change.coin_type {
835 Some((
836 BalanceKey {
837 owner: change.address,
838 coin_type: *coin_type,
839 },
840 BalanceIndexInfo {
841 coin_balance_delta: change.coin_amount,
842 address_balance_delta: change.address_amount,
843 },
844 ))
845 } else {
846 None
847 }
848 });
849 batch.partial_merge_batch(&self.balance, balance_changes)?;
850
851 if index_events {
852 if let Some(version) = self.extract_accumulator_version(tx) {
853 current_accumulator_version = Some(version);
854 }
855
856 self.index_transaction_events(
857 tx,
858 cp,
859 tx_idx as u32,
860 current_accumulator_version,
861 batch,
862 )?;
863 }
864 }
865
866 Ok(())
867 }
868
869 fn index_objects(
870 &self,
871 checkpoint: &CheckpointData,
872 batch: &mut typed_store::rocks::DBBatch,
873 ) -> Result<(), StorageError> {
874 let mut coin_index: HashMap<CoinIndexKey, CoinIndexInfo> = HashMap::new();
875 let mut package_version_index: Vec<(PackageVersionKey, PackageVersionInfo)> = vec![];
876
877 for tx in &checkpoint.transactions {
878 for removed_object in tx.removed_objects_pre_version() {
880 match removed_object.owner() {
881 Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
882 let owner_key = OwnerIndexKey::from_object(removed_object);
883 batch.delete_batch(&self.owner, [owner_key])?;
884 }
885 Owner::ObjectOwner(object_id) => {
886 batch.delete_batch(
887 &self.dynamic_field,
888 [DynamicFieldKey::new(*object_id, removed_object.id())],
889 )?;
890 }
891 Owner::Shared { .. } | Owner::Immutable => {}
892 }
893 }
894
895 for (object, old_object) in tx.changed_objects() {
897 if let Some(old_object) = old_object {
898 match old_object.owner() {
899 Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
900 let owner_key = OwnerIndexKey::from_object(old_object);
901 batch.delete_batch(&self.owner, [owner_key])?;
902 }
903
904 Owner::ObjectOwner(object_id) => {
905 if old_object.owner() != object.owner() {
906 batch.delete_batch(
907 &self.dynamic_field,
908 [DynamicFieldKey::new(*object_id, old_object.id())],
909 )?;
910 }
911 }
912
913 Owner::Shared { .. } | Owner::Immutable => {}
914 }
915 }
916
917 match object.owner() {
918 Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
919 let owner_key = OwnerIndexKey::from_object(object);
920 let owner_info = OwnerIndexInfo::new(object);
921 batch.insert_batch(&self.owner, [(owner_key, owner_info)])?;
922 }
923 Owner::ObjectOwner(parent) => {
924 if should_index_dynamic_field(object) {
925 let field_key = DynamicFieldKey::new(*parent, object.id());
926 batch.insert_batch(&self.dynamic_field, [(field_key, ())])?;
927 }
928 }
929 Owner::Shared { .. } | Owner::Immutable => {}
930 }
931 if let Some((key, info)) = Self::extract_version_if_package(object) {
932 package_version_index.push((key, info));
933 }
934 }
935
936 for (key, value) in tx.created_objects().flat_map(try_create_coin_index_info) {
942 use std::collections::hash_map::Entry;
943
944 match coin_index.entry(key) {
945 Entry::Occupied(mut o) => {
946 o.get_mut().merge(value);
947 }
948 Entry::Vacant(v) => {
949 v.insert(value);
950 }
951 }
952 }
953 }
954
955 batch.insert_batch(&self.coin, coin_index)?;
956 batch.insert_batch(&self.package_version, package_version_index)?;
957
958 Ok(())
959 }
960
961 fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<EpochInfo>, TypedStoreError> {
962 self.epochs.get(&epoch)
963 }
964
965 fn event_iter(
966 &self,
967 stream_id: SuiAddress,
968 start_checkpoint: u64,
969 start_accumulator_version: u64,
970 start_transaction_idx: u32,
971 start_event_idx: u32,
972 end_checkpoint: u64,
973 limit: u32,
974 ) -> Result<impl Iterator<Item = Result<EventIndexKey, TypedStoreError>> + '_, TypedStoreError>
975 {
976 let lower = EventIndexKey {
977 stream_id,
978 checkpoint_seq: start_checkpoint,
979 accumulator_version: start_accumulator_version,
980 transaction_idx: start_transaction_idx,
981 event_index: start_event_idx,
982 };
983 let upper = EventIndexKey {
984 stream_id,
985 checkpoint_seq: end_checkpoint,
986 accumulator_version: u64::MAX,
987 transaction_idx: u32::MAX,
988 event_index: u32::MAX,
989 };
990
991 Ok(self
992 .events_by_stream
993 .safe_iter_with_bounds(Some(lower), Some(upper))
994 .map(|res| res.map(|(k, _)| k))
995 .take(limit as usize))
996 }
997
998 fn owner_iter(
999 &self,
1000 owner: SuiAddress,
1001 object_type: Option<StructTag>,
1002 cursor: Option<OwnerIndexKey>,
1003 ) -> Result<
1004 impl Iterator<Item = Result<(OwnerIndexKey, OwnerIndexInfo), TypedStoreError>> + '_,
1005 TypedStoreError,
1006 > {
1007 let lower_bound = cursor.unwrap_or_else(|| OwnerIndexKey {
1009 owner,
1010 object_type: object_type
1011 .clone()
1012 .unwrap_or_else(|| "0x0::a::a".parse::<StructTag>().unwrap()),
1013 inverted_balance: None,
1014 object_id: ObjectID::ZERO,
1015 });
1016
1017 Ok(self
1018 .owner
1019 .safe_iter_with_bounds(Some(lower_bound), None)
1020 .take_while(move |item| {
1021 let Ok((key, _)) = item else {
1023 return true;
1024 };
1025
1026 key.owner == owner
1028 && object_type
1030 .as_ref()
1031 .map(|ty| {
1032 ty.address == key.object_type.address
1033 && ty.module == key.object_type.module
1034 && ty.name == key.object_type.name
1035 && (ty.type_params.is_empty() ||
1037 ty.type_params == key.object_type.type_params)
1039 }).unwrap_or(true)
1040 }))
1041 }
1042
1043 fn dynamic_field_iter(
1044 &self,
1045 parent: ObjectID,
1046 cursor: Option<ObjectID>,
1047 ) -> Result<impl Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_, TypedStoreError>
1048 {
1049 let lower_bound = DynamicFieldKey::new(parent, cursor.unwrap_or(ObjectID::ZERO));
1050 let upper_bound = DynamicFieldKey::new(parent, ObjectID::MAX);
1051 let iter = self
1052 .dynamic_field
1053 .safe_iter_with_bounds(Some(lower_bound), Some(upper_bound))
1054 .map_ok(|(key, ())| key);
1055 Ok(iter)
1056 }
1057
1058 fn get_coin_info(
1059 &self,
1060 coin_type: &StructTag,
1061 ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
1062 let key = CoinIndexKey {
1063 coin_type: coin_type.to_owned(),
1064 };
1065 self.coin.get(&key)
1066 }
1067
1068 fn get_balance(
1069 &self,
1070 owner: &SuiAddress,
1071 coin_type: &StructTag,
1072 ) -> Result<Option<BalanceIndexInfo>, TypedStoreError> {
1073 let key = BalanceKey {
1074 owner: owner.to_owned(),
1075 coin_type: coin_type.to_owned(),
1076 };
1077 self.balance.get(&key)
1078 }
1079
1080 fn balance_iter(
1081 &self,
1082 owner: SuiAddress,
1083 cursor: Option<BalanceKey>,
1084 ) -> Result<
1085 impl Iterator<Item = Result<(BalanceKey, BalanceIndexInfo), TypedStoreError>> + '_,
1086 TypedStoreError,
1087 > {
1088 let lower_bound = cursor.unwrap_or_else(|| BalanceKey {
1089 owner,
1090 coin_type: "0x0::a::a".parse::<StructTag>().unwrap(),
1091 });
1092
1093 Ok(self
1094 .balance
1095 .safe_iter_with_bounds(Some(lower_bound), None)
1096 .scan((), move |_, item| {
1097 match item {
1098 Ok((key, value)) if key.owner == owner => Some(Ok((key, value))),
1099 Ok(_) => None, Err(e) => Some(Err(e)), }
1102 }))
1103 }
1104
1105 fn package_versions_iter(
1106 &self,
1107 original_id: ObjectID,
1108 cursor: Option<u64>,
1109 ) -> Result<
1110 impl Iterator<Item = Result<(PackageVersionKey, PackageVersionInfo), TypedStoreError>> + '_,
1111 TypedStoreError,
1112 > {
1113 let lower_bound = PackageVersionKey {
1114 original_package_id: original_id,
1115 version: cursor.unwrap_or(0),
1116 };
1117 let upper_bound = PackageVersionKey {
1118 original_package_id: original_id,
1119 version: u64::MAX,
1120 };
1121
1122 Ok(self
1123 .package_version
1124 .safe_iter_with_bounds(Some(lower_bound), Some(upper_bound)))
1125 }
1126}
1127
1128pub struct RpcIndexStore {
1129 tables: IndexStoreTables,
1130 pending_updates: Mutex<BTreeMap<u64, typed_store::rocks::DBBatch>>,
1131 rpc_config: sui_config::RpcConfig,
1132}
1133
1134impl RpcIndexStore {
1135 fn db_path(dir: &Path) -> PathBuf {
1137 dir.join("rpc-index")
1138 }
1139
1140 pub async fn new(
1141 dir: &Path,
1142 authority_store: &AuthorityStore,
1143 checkpoint_store: &CheckpointStore,
1144 epoch_store: &AuthorityPerEpochStore,
1145 package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
1146 pruning_watermark: Arc<std::sync::atomic::AtomicU64>,
1147 rpc_config: sui_config::RpcConfig,
1148 ) -> Self {
1149 let events_filter = EventsCompactionFilter::new(pruning_watermark);
1150 let index_options = IndexStoreOptions {
1151 events_compaction_filter: Some(events_filter),
1152 };
1153
1154 Self::new_with_options(
1155 dir,
1156 authority_store,
1157 checkpoint_store,
1158 epoch_store,
1159 package_store,
1160 index_options,
1161 rpc_config,
1162 )
1163 .await
1164 }
1165
1166 pub async fn new_with_options(
1167 dir: &Path,
1168 authority_store: &AuthorityStore,
1169 checkpoint_store: &CheckpointStore,
1170 epoch_store: &AuthorityPerEpochStore,
1171 package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
1172 index_options: IndexStoreOptions,
1173 rpc_config: sui_config::RpcConfig,
1174 ) -> Self {
1175 let path = Self::db_path(dir);
1176 let index_config = rpc_config.index_initialization_config();
1177
1178 let tables = {
1179 let tables = IndexStoreTables::open_with_index_options(&path, index_options.clone());
1180
1181 if tables.needs_to_do_initialization(checkpoint_store) {
1184 let batch_size_limit;
1185
1186 let mut tables = {
1187 drop(tables);
1188 typed_store::rocks::safe_drop_db(path.clone(), Duration::from_secs(30))
1189 .await
1190 .expect("unable to destroy old rpc-index db");
1191
1192 let mut options = typed_store::rocksdb::Options::default();
1195 options.set_unordered_write(true);
1196
1197 let max_background_jobs = if let Some(jobs) =
1199 index_config.as_ref().and_then(|c| c.max_background_jobs)
1200 {
1201 debug!("Using config override for max_background_jobs: {}", jobs);
1202 jobs
1203 } else {
1204 let jobs = num_cpus::get() as i32;
1205 debug!(
1206 "Calculated max_background_jobs: {} (based on CPU count)",
1207 jobs
1208 );
1209 jobs
1210 };
1211 options.set_max_background_jobs(max_background_jobs);
1212
1213 options.set_level_zero_file_num_compaction_trigger(0);
1217 options.set_level_zero_slowdown_writes_trigger(-1);
1218 options.set_level_zero_stop_writes_trigger(i32::MAX);
1219
1220 let total_memory_bytes = get_available_memory();
1221 let db_buffer_size = if let Some(size) =
1224 index_config.as_ref().and_then(|c| c.db_write_buffer_size)
1225 {
1226 debug!(
1227 "Using config override for db_write_buffer_size: {} bytes",
1228 size
1229 );
1230 size
1231 } else {
1232 let size = (total_memory_bytes as f64 * 0.8) as usize;
1234 debug!(
1235 "Calculated db_write_buffer_size: {} bytes (80% of {} total bytes)",
1236 size, total_memory_bytes
1237 );
1238 size
1239 };
1240 options.set_db_write_buffer_size(db_buffer_size);
1241
1242 let mut table_config_map = BTreeMap::new();
1244
1245 let mut cf_options = typed_store::rocks::default_db_options();
1249 cf_options.options.set_disable_auto_compactions(true);
1250
1251 let (buffer_size, buffer_count) = match (
1252 index_config.as_ref().and_then(|c| c.cf_write_buffer_size),
1253 index_config
1254 .as_ref()
1255 .and_then(|c| c.cf_max_write_buffer_number),
1256 ) {
1257 (Some(size), Some(count)) => {
1258 debug!(
1259 "Using config overrides - buffer_size: {} bytes, buffer_count: {}",
1260 size, count
1261 );
1262 (size, count)
1263 }
1264 (None, None) => {
1265 let cf_memory_budget = (total_memory_bytes as f64 * 0.25) as usize;
1267 debug!(
1268 "Column family memory budget: {} bytes (25% of {} total bytes)",
1269 cf_memory_budget, total_memory_bytes
1270 );
1271 const MIN_BUFFER_SIZE: usize = 64 * 1024 * 1024; let target_buffer_count = num_cpus::get().max(2);
1276
1277 let buffer_size =
1283 (cf_memory_budget / target_buffer_count).max(MIN_BUFFER_SIZE);
1284 let buffer_count = (cf_memory_budget / buffer_size)
1285 .clamp(2, target_buffer_count)
1286 as i32;
1287 debug!(
1288 "Calculated buffer_size: {} bytes, buffer_count: {} (based on {} CPUs)",
1289 buffer_size, buffer_count, target_buffer_count
1290 );
1291 (buffer_size, buffer_count)
1292 }
1293 _ => {
1294 panic!(
1295 "indexing-cf-write-buffer-size and indexing-cf-max-write-buffer-number must both be specified or both be omitted"
1296 );
1297 }
1298 };
1299
1300 cf_options.options.set_write_buffer_size(buffer_size);
1301 cf_options.options.set_max_write_buffer_number(buffer_count);
1302
1303 batch_size_limit = if let Some(limit) =
1305 index_config.as_ref().and_then(|c| c.batch_size_limit)
1306 {
1307 debug!(
1308 "Using config override for batch_size_limit: {} bytes",
1309 limit
1310 );
1311 limit
1312 } else {
1313 let half_buffer = buffer_size / 2;
1314 let default_limit = 1 << 27; let limit = half_buffer.min(default_limit);
1316 debug!(
1317 "Calculated batch_size_limit: {} bytes (min of half_buffer={} and default_limit={})",
1318 limit, half_buffer, default_limit
1319 );
1320 limit
1321 };
1322
1323 for (table_name, _) in IndexStoreTables::describe_tables() {
1325 table_config_map.insert(table_name, cf_options.clone());
1326 }
1327
1328 let mut balance_options = cf_options.clone();
1330 balance_options = balance_options.set_merge_operator_associative(
1331 "balance_merge",
1332 balance_delta_merge_operator,
1333 );
1334 table_config_map.insert("balance".to_string(), balance_options);
1335
1336 table_config_map.insert(
1337 "events_by_stream".to_string(),
1338 events_table_options(index_options.events_compaction_filter.clone()),
1339 );
1340
1341 IndexStoreTables::open_with_options(
1342 &path,
1343 options,
1344 Some(DBMapTableConfigMap::new(table_config_map)),
1345 )
1346 };
1347
1348 tables
1349 .init(
1350 authority_store,
1351 checkpoint_store,
1352 epoch_store,
1353 package_store,
1354 batch_size_limit,
1355 &rpc_config,
1356 )
1357 .expect("unable to initialize rpc index from live object set");
1358
1359 tables
1364 .meta
1365 .flush()
1366 .expect("Failed to flush RPC index tables to disk");
1367
1368 let weak_db = Arc::downgrade(&tables.meta.db);
1369 drop(tables);
1370
1371 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30);
1372 loop {
1373 if weak_db.strong_count() == 0 {
1374 break;
1375 }
1376 if std::time::Instant::now() > deadline {
1377 panic!("unable to reopen DB after indexing");
1378 }
1379 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1380 }
1381
1382 let reopened_tables =
1384 IndexStoreTables::open_with_index_options(&path, index_options);
1385
1386 let stored_version = reopened_tables
1388 .meta
1389 .get(&())
1390 .expect("Failed to read metadata from reopened database")
1391 .expect("Metadata not found in reopened database");
1392 assert_eq!(
1393 stored_version.version, CURRENT_DB_VERSION,
1394 "Database version mismatch after flush and reopen: expected {}, found {}",
1395 CURRENT_DB_VERSION, stored_version.version
1396 );
1397
1398 reopened_tables
1399 } else {
1400 tables
1401 }
1402 };
1403
1404 Self {
1405 tables,
1406 pending_updates: Default::default(),
1407 rpc_config,
1408 }
1409 }
1410
1411 pub fn new_without_init(dir: &Path) -> Self {
1412 let path = Self::db_path(dir);
1413 let tables = IndexStoreTables::open_with_index_options(path, IndexStoreOptions::default());
1414
1415 Self {
1416 tables,
1417 pending_updates: Default::default(),
1418 rpc_config: sui_config::RpcConfig::default(),
1419 }
1420 }
1421
1422 pub fn prune(
1423 &self,
1424 pruned_checkpoint_watermark: u64,
1425 checkpoint_contents_to_prune: &[CheckpointContents],
1426 ) -> Result<(), TypedStoreError> {
1427 self.tables
1428 .prune(pruned_checkpoint_watermark, checkpoint_contents_to_prune)
1429 }
1430
1431 #[tracing::instrument(
1436 skip_all,
1437 fields(checkpoint = checkpoint.checkpoint_summary.sequence_number)
1438 )]
1439 pub fn index_checkpoint(&self, checkpoint: &CheckpointData, resolver: &mut dyn LayoutResolver) {
1440 let sequence_number = checkpoint.checkpoint_summary.sequence_number;
1441 let batch = self
1442 .tables
1443 .index_checkpoint(checkpoint, resolver, &self.rpc_config)
1444 .expect("db error");
1445
1446 self.pending_updates
1447 .lock()
1448 .unwrap()
1449 .insert(sequence_number, batch);
1450 }
1451
1452 #[tracing::instrument(skip(self))]
1460 pub fn commit_update_for_checkpoint(&self, checkpoint: u64) -> Result<(), StorageError> {
1461 let next_batch = self.pending_updates.lock().unwrap().pop_first();
1462
1463 let (next_sequence_number, batch) = next_batch.unwrap();
1465 assert_eq!(
1466 checkpoint, next_sequence_number,
1467 "commit_update_for_checkpoint must be called in order"
1468 );
1469
1470 Ok(batch.write()?)
1471 }
1472
1473 pub fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<EpochInfo>, TypedStoreError> {
1474 self.tables.get_epoch_info(epoch)
1475 }
1476
1477 pub fn owner_iter(
1478 &self,
1479 owner: SuiAddress,
1480 object_type: Option<StructTag>,
1481 cursor: Option<OwnerIndexKey>,
1482 ) -> Result<
1483 impl Iterator<Item = Result<(OwnerIndexKey, OwnerIndexInfo), TypedStoreError>> + '_,
1484 TypedStoreError,
1485 > {
1486 self.tables.owner_iter(owner, object_type, cursor)
1487 }
1488
1489 pub fn dynamic_field_iter(
1490 &self,
1491 parent: ObjectID,
1492 cursor: Option<ObjectID>,
1493 ) -> Result<impl Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_, TypedStoreError>
1494 {
1495 self.tables.dynamic_field_iter(parent, cursor)
1496 }
1497
1498 pub fn get_coin_info(
1499 &self,
1500 coin_type: &StructTag,
1501 ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
1502 self.tables.get_coin_info(coin_type)
1503 }
1504
1505 pub fn get_balance(
1506 &self,
1507 owner: &SuiAddress,
1508 coin_type: &StructTag,
1509 ) -> Result<Option<BalanceIndexInfo>, TypedStoreError> {
1510 self.tables.get_balance(owner, coin_type)
1511 }
1512
1513 pub fn balance_iter(
1514 &self,
1515 owner: SuiAddress,
1516 cursor: Option<BalanceKey>,
1517 ) -> Result<
1518 impl Iterator<Item = Result<(BalanceKey, BalanceIndexInfo), TypedStoreError>> + '_,
1519 TypedStoreError,
1520 > {
1521 self.tables.balance_iter(owner, cursor)
1522 }
1523
1524 pub fn package_versions_iter(
1525 &self,
1526 original_id: ObjectID,
1527 cursor: Option<u64>,
1528 ) -> Result<
1529 impl Iterator<Item = Result<(PackageVersionKey, PackageVersionInfo), TypedStoreError>> + '_,
1530 TypedStoreError,
1531 > {
1532 self.tables.package_versions_iter(original_id, cursor)
1533 }
1534
1535 pub fn event_iter(
1536 &self,
1537 stream_id: SuiAddress,
1538 start_checkpoint: u64,
1539 start_accumulator_version: u64,
1540 start_transaction_idx: u32,
1541 start_event_idx: u32,
1542 end_checkpoint: u64,
1543 limit: u32,
1544 ) -> Result<impl Iterator<Item = Result<EventIndexKey, TypedStoreError>> + '_, TypedStoreError>
1545 {
1546 self.tables.event_iter(
1547 stream_id,
1548 start_checkpoint,
1549 start_accumulator_version,
1550 start_transaction_idx,
1551 start_event_idx,
1552 end_checkpoint,
1553 limit,
1554 )
1555 }
1556
1557 pub fn get_highest_indexed_checkpoint_seq_number(
1558 &self,
1559 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
1560 self.tables.watermark.get(&Watermark::Indexed)
1561 }
1562}
1563
1564fn should_index_dynamic_field(object: &Object) -> bool {
1565 object
1573 .data
1574 .try_as_move()
1575 .is_some_and(|move_object| move_object.type_().is_dynamic_field())
1576}
1577
1578fn try_create_coin_index_info(object: &Object) -> Option<(CoinIndexKey, CoinIndexInfo)> {
1579 use sui_types::coin::CoinMetadata;
1580 use sui_types::coin::RegulatedCoinMetadata;
1581 use sui_types::coin::TreasuryCap;
1582
1583 let object_type = object.type_().and_then(MoveObjectType::other)?;
1584
1585 if let Some(coin_type) = CoinMetadata::is_coin_metadata_with_coin_type(object_type).cloned() {
1586 return Some((
1587 CoinIndexKey { coin_type },
1588 CoinIndexInfo {
1589 coin_metadata_object_id: Some(object.id()),
1590 treasury_object_id: None,
1591 regulated_coin_metadata_object_id: None,
1592 },
1593 ));
1594 }
1595
1596 if let Some(coin_type) = TreasuryCap::is_treasury_with_coin_type(object_type).cloned() {
1597 return Some((
1598 CoinIndexKey { coin_type },
1599 CoinIndexInfo {
1600 coin_metadata_object_id: None,
1601 treasury_object_id: Some(object.id()),
1602 regulated_coin_metadata_object_id: None,
1603 },
1604 ));
1605 }
1606
1607 if let Some(coin_type) =
1608 RegulatedCoinMetadata::is_regulated_coin_metadata_with_coin_type(object_type).cloned()
1609 {
1610 return Some((
1611 CoinIndexKey { coin_type },
1612 CoinIndexInfo {
1613 coin_metadata_object_id: None,
1614 treasury_object_id: None,
1615 regulated_coin_metadata_object_id: Some(object.id()),
1616 },
1617 ));
1618 }
1619
1620 None
1621}
1622
1623struct RpcParLiveObjectSetIndexer<'a> {
1624 tables: &'a IndexStoreTables,
1625 coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
1626 batch_size_limit: usize,
1627}
1628
1629struct RpcLiveObjectIndexer<'a> {
1630 tables: &'a IndexStoreTables,
1631 batch: typed_store::rocks::DBBatch,
1632 coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
1633 balance_changes: HashMap<BalanceKey, BalanceIndexInfo>,
1634 batch_size_limit: usize,
1635}
1636
1637impl<'a> ParMakeLiveObjectIndexer for RpcParLiveObjectSetIndexer<'a> {
1638 type ObjectIndexer = RpcLiveObjectIndexer<'a>;
1639
1640 fn make_live_object_indexer(&self) -> Self::ObjectIndexer {
1641 RpcLiveObjectIndexer {
1642 tables: self.tables,
1643 batch: self.tables.owner.batch(),
1644 coin_index: self.coin_index,
1645 balance_changes: HashMap::new(),
1646 batch_size_limit: self.batch_size_limit,
1647 }
1648 }
1649}
1650
1651impl LiveObjectIndexer for RpcLiveObjectIndexer<'_> {
1652 fn index_object(&mut self, object: Object) -> Result<(), StorageError> {
1653 match object.owner {
1654 Owner::AddressOwner(owner) | Owner::ConsensusAddressOwner { owner, .. } => {
1656 let owner_key = OwnerIndexKey::from_object(&object);
1657 let owner_info = OwnerIndexInfo::new(&object);
1658 self.batch
1659 .insert_batch(&self.tables.owner, [(owner_key, owner_info)])?;
1660
1661 if let Some((coin_type, value)) = get_balance_and_type_if_coin(&object)? {
1662 let balance_key = BalanceKey { owner, coin_type };
1663 let balance_info = BalanceIndexInfo {
1664 coin_balance_delta: value.into(),
1665 address_balance_delta: 0,
1666 };
1667 self.balance_changes
1668 .entry(balance_key)
1669 .or_default()
1670 .merge_delta(&balance_info);
1671
1672 if self.balance_changes.len() >= BALANCE_FLUSH_THRESHOLD {
1673 self.batch.partial_merge_batch(
1674 &self.tables.balance,
1675 std::mem::take(&mut self.balance_changes),
1676 )?;
1677 }
1678 }
1679 }
1680
1681 Owner::ObjectOwner(parent) => {
1683 if should_index_dynamic_field(&object) {
1684 let field_key = DynamicFieldKey::new(parent, object.id());
1685 self.batch
1686 .insert_batch(&self.tables.dynamic_field, [(field_key, ())])?;
1687 }
1688
1689 if parent == SUI_ACCUMULATOR_ROOT_OBJECT_ID.into()
1691 && let Some((owner, coin_type, balance)) = get_address_balance_info(&object)
1692 {
1693 let balance_key = BalanceKey { owner, coin_type };
1694 let balance_info = BalanceIndexInfo {
1695 coin_balance_delta: 0,
1696 address_balance_delta: balance,
1697 };
1698 self.balance_changes
1699 .entry(balance_key)
1700 .or_default()
1701 .merge_delta(&balance_info);
1702
1703 if self.balance_changes.len() >= BALANCE_FLUSH_THRESHOLD {
1704 self.batch.partial_merge_batch(
1705 &self.tables.balance,
1706 std::mem::take(&mut self.balance_changes),
1707 )?;
1708 }
1709 }
1710 }
1711
1712 Owner::Shared { .. } | Owner::Immutable => {}
1713 }
1714
1715 if let Some((key, value)) = try_create_coin_index_info(&object) {
1717 use std::collections::hash_map::Entry;
1718
1719 match self.coin_index.lock().unwrap().entry(key) {
1720 Entry::Occupied(mut o) => {
1721 o.get_mut().merge(value);
1722 }
1723 Entry::Vacant(v) => {
1724 v.insert(value);
1725 }
1726 }
1727 }
1728
1729 if let Some((key, info)) = IndexStoreTables::extract_version_if_package(&object) {
1730 self.batch
1731 .insert_batch(&self.tables.package_version, [(key, info)])?;
1732 }
1733
1734 if self.batch.size_in_bytes() >= self.batch_size_limit {
1737 std::mem::replace(&mut self.batch, self.tables.owner.batch())
1738 .write_opt(bulk_ingestion_write_options())?;
1739 }
1740
1741 Ok(())
1742 }
1743
1744 fn finish(mut self) -> Result<(), StorageError> {
1745 self.batch.partial_merge_batch(
1746 &self.tables.balance,
1747 std::mem::take(&mut self.balance_changes),
1748 )?;
1749 self.batch.write_opt(bulk_ingestion_write_options())?;
1750 Ok(())
1751 }
1752}
1753
1754fn sparse_checkpoint_data_for_epoch_backfill(
1757 authority_store: &AuthorityStore,
1758 checkpoint_store: &CheckpointStore,
1759 checkpoint: u64,
1760 load_events: bool,
1761) -> Result<Option<CheckpointData>, StorageError> {
1762 use sui_types::full_checkpoint_content::CheckpointTransaction;
1763
1764 let summary = checkpoint_store
1765 .get_checkpoint_by_sequence_number(checkpoint)?
1766 .ok_or_else(|| StorageError::missing(format!("missing checkpoint {checkpoint}")))?;
1767
1768 if summary.end_of_epoch_data.is_none() && summary.sequence_number != 0 {
1770 return Ok(None);
1771 }
1772
1773 let contents = checkpoint_store
1774 .get_checkpoint_contents(&summary.content_digest)?
1775 .ok_or_else(|| StorageError::missing(format!("missing checkpoint {checkpoint}")))?;
1776
1777 let transaction_digests = contents
1778 .iter()
1779 .map(|execution_digests| execution_digests.transaction)
1780 .collect::<Vec<_>>();
1781 let transactions = authority_store
1782 .multi_get_transaction_blocks(&transaction_digests)?
1783 .into_iter()
1784 .map(|maybe_transaction| {
1785 maybe_transaction.ok_or_else(|| StorageError::custom("missing transaction"))
1786 })
1787 .collect::<Result<Vec<_>, _>>()?;
1788
1789 let effects = authority_store
1790 .multi_get_executed_effects(&transaction_digests)?
1791 .into_iter()
1792 .map(|maybe_effects| maybe_effects.ok_or_else(|| StorageError::custom("missing effects")))
1793 .collect::<Result<Vec<_>, _>>()?;
1794
1795 let events = if load_events {
1796 authority_store
1797 .multi_get_events(&transaction_digests)
1798 .map_err(|e| StorageError::custom(e.to_string()))?
1799 } else {
1800 vec![None; transaction_digests.len()]
1801 };
1802
1803 let mut full_transactions = Vec::with_capacity(transactions.len());
1804 for ((tx, fx), ev) in transactions
1805 .into_iter()
1806 .zip_debug_eq(effects)
1807 .zip_debug_eq(events)
1808 {
1809 let input_objects =
1810 sui_types::storage::get_transaction_input_objects(authority_store, &fx)?;
1811 let output_objects =
1812 sui_types::storage::get_transaction_output_objects(authority_store, &fx)?;
1813
1814 let full_transaction = CheckpointTransaction {
1815 transaction: tx.into(),
1816 effects: fx,
1817 events: ev,
1818 input_objects,
1819 output_objects,
1820 };
1821
1822 full_transactions.push(full_transaction);
1823 }
1824
1825 let checkpoint_data = CheckpointData {
1826 checkpoint_summary: summary.into(),
1827 checkpoint_contents: contents,
1828 transactions: full_transactions,
1829 };
1830
1831 Ok(Some(checkpoint_data))
1832}
1833
1834fn get_balance_and_type_if_coin(object: &Object) -> Result<Option<(StructTag, u64)>, StorageError> {
1835 match Coin::extract_balance_if_coin(object) {
1836 Ok(Some((TypeTag::Struct(struct_tag), value))) => Ok(Some((*struct_tag, value))),
1837 Ok(Some(_)) => {
1838 debug!("Coin object {} has non-struct type tag", object.id());
1839 Ok(None)
1840 }
1841 Ok(None) => {
1842 Ok(None)
1844 }
1845 Err(e) => {
1846 Err(StorageError::custom(format!(
1848 "Failed to deserialize coin object {}: {}",
1849 object.id(),
1850 e
1851 )))
1852 }
1853 }
1854}
1855
1856fn get_address_balance_info(object: &Object) -> Option<(SuiAddress, StructTag, i128)> {
1857 let move_object = object.data.try_as_move()?;
1858
1859 let TypeTag::Struct(coin_type) = move_object.type_().balance_accumulator_field_type_maybe()?
1860 else {
1861 return None;
1862 };
1863
1864 let (key, value): (
1865 sui_types::accumulator_root::AccumulatorKey,
1866 sui_types::accumulator_root::AccumulatorValue,
1867 ) = move_object.try_into().ok()?;
1868
1869 let balance = value.as_u128()? as i128;
1870 if balance <= 0 {
1871 return None;
1872 }
1873
1874 Some((key.owner, *coin_type, balance))
1875}
1876
1877#[cfg(test)]
1878mod tests {
1879 use super::*;
1880 use std::sync::atomic::AtomicU64;
1881 use sui_types::base_types::SuiAddress;
1882
1883 #[tokio::test]
1884 async fn test_events_compaction_filter() {
1885 let temp_dir = tempfile::tempdir().unwrap();
1886 let path = temp_dir.path();
1887 let db_path = path.join("rpc-index");
1888
1889 let pruning_watermark = Arc::new(AtomicU64::new(5));
1890 let compaction_filter = EventsCompactionFilter::new(pruning_watermark.clone());
1891
1892 let index_options = IndexStoreOptions {
1893 events_compaction_filter: Some(compaction_filter),
1894 };
1895
1896 let tables = IndexStoreTables::open_with_index_options(&db_path, index_options);
1897 let stream_id = SuiAddress::random_for_testing_only();
1898 let test_events: Vec<EventIndexKey> = [1, 3, 5, 10, 15]
1899 .iter()
1900 .map(|&checkpoint_seq| EventIndexKey {
1901 stream_id,
1902 checkpoint_seq,
1903 accumulator_version: 0,
1904 transaction_idx: 0,
1905 event_index: 0,
1906 })
1907 .collect();
1908
1909 let mut batch = tables.events_by_stream.batch();
1910 for key in &test_events {
1911 batch
1912 .insert_batch(&tables.events_by_stream, [(key.clone(), ())])
1913 .unwrap();
1914 }
1915 batch.write().unwrap();
1916
1917 tables.events_by_stream.flush().unwrap();
1918 let mut events_before_compaction = 0;
1919 for result in tables.events_by_stream.safe_iter() {
1920 if result.is_ok() {
1921 events_before_compaction += 1;
1922 }
1923 }
1924 assert_eq!(
1925 events_before_compaction, 5,
1926 "Should have 5 events before compaction"
1927 );
1928 let start_key = EventIndexKey {
1929 stream_id: SuiAddress::ZERO,
1930 checkpoint_seq: 0,
1931 accumulator_version: 0,
1932 transaction_idx: 0,
1933 event_index: 0,
1934 };
1935 let end_key = EventIndexKey {
1936 stream_id: SuiAddress::random_for_testing_only(),
1937 checkpoint_seq: u64::MAX,
1938 accumulator_version: u64::MAX,
1939 transaction_idx: u32::MAX,
1940 event_index: u32::MAX,
1941 };
1942
1943 tables
1944 .events_by_stream
1945 .compact_range(&start_key, &end_key)
1946 .unwrap();
1947 let mut events_after_compaction = Vec::new();
1948 for (key, _event) in tables.events_by_stream.safe_iter().flatten() {
1949 events_after_compaction.push(key);
1950 }
1951
1952 println!("Events after compaction: {}", events_after_compaction.len());
1953 assert!(
1954 events_after_compaction.len() >= 2,
1955 "Should have at least the events that shouldn't be pruned"
1956 );
1957 pruning_watermark.store(20, std::sync::atomic::Ordering::Relaxed);
1958 let watermark_after = pruning_watermark.load(std::sync::atomic::Ordering::Relaxed);
1959 assert_eq!(watermark_after, 20, "Watermark should be updated");
1960 }
1961
1962 #[test]
1963 fn test_events_compaction_filter_logic() {
1964 let watermark = Arc::new(AtomicU64::new(100));
1965 let filter = EventsCompactionFilter::new(watermark.clone());
1966
1967 let old_key = EventIndexKey {
1968 stream_id: SuiAddress::random_for_testing_only(),
1969 checkpoint_seq: 50,
1970 accumulator_version: 0,
1971 transaction_idx: 0,
1972 event_index: 0,
1973 };
1974 let old_key_bytes = bcs::to_bytes(&old_key).unwrap();
1975 let decision = filter.filter(&old_key_bytes, &[]).unwrap();
1976 assert!(
1977 matches!(decision, Decision::Remove),
1978 "Event with checkpoint 50 should be removed when watermark is 100"
1979 );
1980 let new_key = EventIndexKey {
1981 stream_id: SuiAddress::random_for_testing_only(),
1982 checkpoint_seq: 150,
1983 accumulator_version: 0,
1984 transaction_idx: 0,
1985 event_index: 0,
1986 };
1987 let new_key_bytes = bcs::to_bytes(&new_key).unwrap();
1988 let decision = filter.filter(&new_key_bytes, &[]).unwrap();
1989 assert!(
1990 matches!(decision, Decision::Keep),
1991 "Event with checkpoint 150 should be kept when watermark is 100"
1992 );
1993 let boundary_key = EventIndexKey {
1994 stream_id: SuiAddress::random_for_testing_only(),
1995 checkpoint_seq: 100,
1996 accumulator_version: 0,
1997 transaction_idx: 0,
1998 event_index: 0,
1999 };
2000 let boundary_key_bytes = bcs::to_bytes(&boundary_key).unwrap();
2001 let decision = filter.filter(&boundary_key_bytes, &[]).unwrap();
2002 assert!(
2003 matches!(decision, Decision::Remove),
2004 "Event with checkpoint equal to watermark should be removed"
2005 );
2006 }
2007
2008 #[tokio::test]
2017 async fn open_with_index_options_overrides_every_cf() {
2018 let temp_dir = tempfile::tempdir().unwrap();
2019 let db_path = temp_dir.path().join("rpc-index");
2020
2021 let _tables =
2022 IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
2023
2024 let per_cf = parse_cf_options(&db_path);
2030 assert!(
2031 !per_cf.is_empty(),
2032 "expected at least one CFOptions section in OPTIONS file"
2033 );
2034 for (cf_name, opts) in &per_cf {
2035 if cf_name == "default" {
2036 continue;
2037 }
2038 for (key, expected) in [
2039 ("level0_slowdown_writes_trigger", "512"),
2040 ("level0_stop_writes_trigger", "1024"),
2041 ("soft_pending_compaction_bytes_limit", "0"),
2042 ("hard_pending_compaction_bytes_limit", "0"),
2043 ] {
2044 let actual = opts
2045 .get(key)
2046 .unwrap_or_else(|| panic!("cf `{cf_name}` missing `{key}`"));
2047 assert_eq!(
2048 actual, expected,
2049 "cf `{cf_name}` has `{key}={actual}`, expected `{expected}` — \
2050 the typed-store override map likely doesn't cover this CF"
2051 );
2052 }
2053 }
2054 }
2055
2056 fn parse_cf_options(db_path: &Path) -> HashMap<String, HashMap<String, String>> {
2060 let mut options_file: Option<(u64, PathBuf)> = None;
2061 for entry in std::fs::read_dir(db_path).expect("read_dir failed") {
2062 let entry = entry.unwrap();
2063 let name = entry.file_name().to_string_lossy().into_owned();
2064 let Some(rest) = name.strip_prefix("OPTIONS-") else {
2065 continue;
2066 };
2067 let Ok(seq) = rest.parse::<u64>() else {
2069 continue;
2070 };
2071 if options_file.as_ref().is_none_or(|(s, _)| seq > *s) {
2072 options_file = Some((seq, entry.path()));
2073 }
2074 }
2075 let (_, path) = options_file.expect("no OPTIONS-* file written");
2076 let content = std::fs::read_to_string(&path).expect("read OPTIONS failed");
2077
2078 let mut result: HashMap<String, HashMap<String, String>> = HashMap::new();
2079 let mut current_cf: Option<String> = None;
2080 for line in content.lines() {
2081 let line = line.trim();
2082 if let Some(rest) = line.strip_prefix("[CFOptions \"") {
2083 let cf_name = rest.trim_end_matches("\"]").to_string();
2084 current_cf = Some(cf_name);
2085 } else if line.starts_with('[') {
2086 current_cf = None;
2088 } else if let Some(cf) = current_cf.as_ref()
2089 && let Some((k, v)) = line.split_once('=')
2090 {
2091 result
2092 .entry(cf.clone())
2093 .or_default()
2094 .insert(k.trim().to_string(), v.trim().to_string());
2095 }
2096 }
2097 result
2098 }
2099}