1use crate::authority::AuthorityStore;
5use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
6use crate::checkpoints::CheckpointStore;
7use crate::par_index_live_object_set::LiveObjectIndexer;
8use crate::par_index_live_object_set::ParMakeLiveObjectIndexer;
9use itertools::Itertools;
10use move_core_types::language_storage::{StructTag, TypeTag};
11use mysten_common::ZipDebugEqIteratorExt;
12use rayon::iter::IntoParallelIterator;
13use rayon::iter::ParallelIterator;
14use roaring::RoaringBitmap;
15use rustc_hash::{FxHashMap, FxHashSet};
16use serde::Deserialize;
17use serde::Serialize;
18use serde::de::DeserializeOwned;
19use std::collections::{BTreeMap, HashMap};
20use std::path::Path;
21use std::path::PathBuf;
22use std::sync::Arc;
23use std::sync::Mutex;
24use std::sync::atomic::AtomicU64;
25use std::sync::atomic::Ordering;
26use std::time::Duration;
27use std::time::Instant;
28use sui_inverted_index::encode_dimension_key;
29use sui_inverted_index::for_each_event_dimension;
30use sui_inverted_index::for_each_transaction_dimension;
31use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
32use sui_types::base_types::MoveObjectType;
33use sui_types::base_types::ObjectID;
34use sui_types::base_types::SequenceNumber;
35use sui_types::base_types::SuiAddress;
36use sui_types::coin::Coin;
37use sui_types::committee::EpochId;
38use sui_types::digests::TransactionDigest;
39use sui_types::effects::TransactionEffectsAPI;
40use sui_types::full_checkpoint_content::Checkpoint;
41use sui_types::full_checkpoint_content::ExecutedTransaction;
42use sui_types::full_checkpoint_content::ObjectSet;
43use sui_types::messages_checkpoint::CheckpointSequenceNumber;
44use sui_types::object::Data;
45use sui_types::object::Object;
46use sui_types::object::Owner;
47use sui_types::storage::BackingPackageStore;
48use sui_types::storage::DynamicFieldKey;
49use sui_types::storage::EpochInfo;
50use sui_types::storage::LedgerBitmapBucket;
51use sui_types::storage::LedgerBitmapBucketIterator;
52use sui_types::storage::LedgerTxSeqDigest;
53use sui_types::storage::LedgerTxSeqDigestIterator;
54use sui_types::storage::ObjectKey;
55use sui_types::storage::TransactionInfo;
56use sui_types::storage::error::Error as StorageError;
57use sui_types::sui_system_state::SuiSystemStateTrait;
58use sui_types::transaction::TransactionDataAPI;
59use sysinfo::{MemoryRefreshKind, RefreshKind, System};
60use tracing::{debug, info, warn};
61use typed_store::DBMapUtils;
62use typed_store::TypedStoreError;
63use typed_store::rocks::{DBMap, DBMapTableConfigMap, MetricConf};
64use typed_store::rocksdb::{MergeOperands, WriteOptions, compaction_filter::Decision};
65use typed_store::traits::Map;
66
67const CURRENT_DB_VERSION: u64 = 4;
68
69const BALANCE_FLUSH_THRESHOLD: usize = 10_000;
71
72const TX_BUCKET_SIZE: u64 = 65_536;
75const EVENT_BUCKET_SIZE: u64 = 268_435_456;
78const EVENT_BITS: u32 = 16;
79const MAX_EVENTS_PER_TX: u32 = 1 << EVENT_BITS;
80const MAX_TX_SEQ: u64 = u64::MAX >> EVENT_BITS;
81
82const _: () = assert!(TX_BUCKET_SIZE <= u32::MAX as u64);
83const _: () = assert!(EVENT_BUCKET_SIZE <= u32::MAX as u64);
84const _: () = assert!(EVENT_BITS < u32::BITS);
85const _: () = assert!(EVENT_BITS < u64::BITS);
86const _: () = assert!(MAX_EVENTS_PER_TX as u64 == 1u64 << EVENT_BITS);
87const _: () = assert!(EVENT_BUCKET_SIZE.is_multiple_of(MAX_EVENTS_PER_TX as u64));
88
89fn checked_encode_event_seq(tx_seq: u64, event_idx: u32) -> Result<u64, StorageError> {
90 if event_idx >= MAX_EVENTS_PER_TX {
91 return Err(StorageError::custom(format!(
92 "event_idx {event_idx} exceeds packed event-seq limit {}",
93 MAX_EVENTS_PER_TX - 1
94 )));
95 }
96 if tx_seq > MAX_TX_SEQ {
97 return Err(StorageError::custom(format!(
98 "tx_seq {tx_seq} exceeds packed event-seq limit {MAX_TX_SEQ}"
99 )));
100 }
101 Ok((tx_seq << EVENT_BITS) | (event_idx as u64))
102}
103
104fn checked_event_seq_lo(tx_seq: u64) -> Option<u64> {
108 if tx_seq <= MAX_TX_SEQ {
109 Some(tx_seq << EVENT_BITS)
110 } else {
111 None
112 }
113}
114
115fn bulk_ingestion_write_options() -> WriteOptions {
116 let mut opts = WriteOptions::default();
117 opts.disable_wal(true);
118 opts
119}
120
121fn get_available_memory() -> u64 {
123 let mut sys = System::new_with_specifics(
125 RefreshKind::nothing().with_memory(MemoryRefreshKind::everything()),
126 );
127 sys.refresh_memory();
128
129 if let Some(cgroup_limits) = sys.cgroup_limits() {
131 let memory_limit = cgroup_limits.total_memory;
132 if memory_limit > 0 {
134 debug!("Using cgroup memory limit: {} bytes", memory_limit);
135 return memory_limit;
136 }
137 }
138
139 let total_memory_bytes = sys.total_memory();
142 debug!("Using system memory: {} bytes", total_memory_bytes);
143 total_memory_bytes
144}
145
146#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
147struct MetadataInfo {
148 version: u64,
150}
151
152#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Default)]
164struct IndexSettings {
165 #[serde(default)]
167 ledger_history_indexing: bool,
168}
169
170fn encode_settings(settings: &IndexSettings) -> Vec<u8> {
173 serde_json::to_vec(settings).expect("IndexSettings is always JSON-serializable")
174}
175
176fn decode_settings(bytes: &[u8]) -> IndexSettings {
180 serde_json::from_slice(bytes).unwrap_or_default()
181}
182
183#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
185pub enum Watermark {
186 Indexed,
187 Pruned,
188}
189
190#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
191pub struct OwnerIndexKey {
192 pub owner: SuiAddress,
193
194 pub object_type: StructTag,
195
196 pub inverted_balance: Option<u64>,
199
200 pub object_id: ObjectID,
201}
202
203impl OwnerIndexKey {
204 fn from_object(object: &Object) -> Self {
207 let owner = match object.owner() {
208 Owner::AddressOwner(owner) => owner,
209 Owner::ConsensusAddressOwner { owner, .. } => owner,
210 _ => panic!("cannot create OwnerIndexKey if object is not address-owned"),
211 };
212 let object_type = object.struct_tag().expect("packages cannot be owned");
213
214 let inverted_balance = object.as_coin_maybe().map(|coin| !coin.balance.value());
215
216 Self {
217 owner: *owner,
218 object_type,
219 inverted_balance,
220 object_id: object.id(),
221 }
222 }
223}
224
225#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
226pub struct OwnerIndexInfo {
227 pub version: SequenceNumber,
229}
230
231impl OwnerIndexInfo {
232 pub fn new(object: &Object) -> Self {
233 Self {
234 version: object.version(),
235 }
236 }
237}
238
239#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
240pub struct CoinIndexKey {
241 coin_type: StructTag,
242}
243
244#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
245pub struct BalanceKey {
246 pub owner: SuiAddress,
247 pub coin_type: StructTag,
248}
249
250#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
251pub struct CoinIndexInfo {
252 pub coin_metadata_object_id: Option<ObjectID>,
253 pub treasury_object_id: Option<ObjectID>,
254 pub regulated_coin_metadata_object_id: Option<ObjectID>,
255}
256
257#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug, Default)]
258pub struct BalanceIndexInfo {
259 pub coin_balance_delta: i128,
260 pub address_balance_delta: i128,
261}
262
263impl BalanceIndexInfo {
264 fn merge_delta(&mut self, other: &Self) {
265 self.coin_balance_delta = self
266 .coin_balance_delta
267 .saturating_add(other.coin_balance_delta);
268 self.address_balance_delta = self
269 .address_balance_delta
270 .saturating_add(other.address_balance_delta);
271 }
272}
273
274impl From<BalanceIndexInfo> for sui_types::storage::BalanceInfo {
275 fn from(index_info: BalanceIndexInfo) -> Self {
276 let coin_balance = index_info.coin_balance_delta.clamp(0, u64::MAX as i128) as u64;
283 let address_balance = index_info.address_balance_delta.clamp(0, u64::MAX as i128) as u64;
284 sui_types::storage::BalanceInfo {
285 coin_balance,
286 address_balance,
287 }
288 }
289}
290
291#[derive(Copy, Clone, PartialEq, Eq, Debug, Serialize, Deserialize, PartialOrd, Ord)]
292pub struct PackageVersionKey {
293 pub original_package_id: ObjectID,
294 pub version: u64,
295}
296
297#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
298pub struct PackageVersionInfo {
299 pub storage_id: ObjectID,
300}
301
302#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
309pub struct TxSeqDigestInfo {
310 pub digest: TransactionDigest,
311 pub event_count: u32,
312 pub tx_offset: u32,
313 pub checkpoint_number: CheckpointSequenceNumber,
314}
315
316#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
330pub struct BitmapIndexKey {
331 pub dimension_key: Vec<u8>,
332 pub bucket_id: u64,
333}
334
335#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug, Default)]
341pub struct BitmapBlob(pub Vec<u8>);
342
343impl From<RoaringBitmap> for BitmapBlob {
344 fn from(bm: RoaringBitmap) -> Self {
345 let mut buf = Vec::with_capacity(bm.serialized_size());
346 bm.serialize_into(&mut buf)
347 .expect("RoaringBitmap::serialize_into on Vec cannot fail");
348 Self(buf)
349 }
350}
351
352fn ledger_tx_seq_digest(tx_sequence_number: u64, info: TxSeqDigestInfo) -> LedgerTxSeqDigest {
353 LedgerTxSeqDigest {
354 tx_sequence_number,
355 digest: info.digest,
356 event_count: info.event_count,
357 tx_offset: info.tx_offset,
358 checkpoint_number: info.checkpoint_number,
359 }
360}
361
362fn decode_ledger_bitmap_bucket(
363 key: BitmapIndexKey,
364 blob: BitmapBlob,
365) -> Result<LedgerBitmapBucket, TypedStoreError> {
366 let bitmap = RoaringBitmap::deserialize_from(&blob.0[..]).map_err(|e| {
367 TypedStoreError::SerializationError(format!("decode ledger bitmap bucket: {e}"))
368 })?;
369 Ok(LedgerBitmapBucket {
370 bucket_id: key.bucket_id,
371 bitmap,
372 })
373}
374
375#[derive(Clone, Copy, Debug, PartialEq, Eq)]
378pub enum BitmapKind {
379 Transaction,
380 Event,
381}
382
383impl BitmapKind {
384 fn bucket_fully_pruned(self, bucket_id: u64, pruned_tx_seq_exclusive: u64) -> bool {
397 match self {
398 BitmapKind::Transaction => bucket_id
399 .checked_add(1)
400 .and_then(|b| b.checked_mul(TX_BUCKET_SIZE))
401 .map(|hi| hi <= pruned_tx_seq_exclusive)
402 .unwrap_or(false),
403 BitmapKind::Event => {
404 let bucket_hi = bucket_id
405 .checked_add(1)
406 .and_then(|b| b.checked_mul(EVENT_BUCKET_SIZE));
407 let threshold = checked_event_seq_lo(pruned_tx_seq_exclusive);
408 match (bucket_hi, threshold) {
409 (Some(hi), Some(th)) => hi <= th,
410 _ => false,
411 }
412 }
413 }
414 }
415}
416
417#[derive(Default, Clone)]
418pub struct IndexStoreOptions {
419 pub pruning_tx_seq_exclusive: Arc<AtomicU64>,
422}
423
424fn default_table_options() -> typed_store::rocks::DBOptions {
425 typed_store::rocks::default_db_options().disable_write_throttling()
426}
427
428fn tx_seq_digest_table_options() -> typed_store::rocks::DBOptions {
434 let mut options = default_table_options();
435 options.rw_options = options.rw_options.clone().set_ignore_range_deletions(false);
436 options
437}
438
439fn balance_delta_merge_operator(
440 _key: &[u8],
441 existing_val: Option<&[u8]>,
442 operands: &MergeOperands,
443) -> Option<Vec<u8>> {
444 let mut result = if let Some(existing_val) = existing_val {
445 bcs::from_bytes::<BalanceIndexInfo>(existing_val)
446 .inspect_err(|e| {
447 tracing::error!(
448 "Failed to deserialize BalanceIndexInfo from RocksDB - data corruption: {e}"
449 )
450 })
451 .ok()?
452 } else {
453 BalanceIndexInfo::default()
454 };
455
456 for operand in operands.iter() {
457 let delta = bcs::from_bytes::<BalanceIndexInfo>(operand)
458 .inspect_err(|e| {
459 tracing::error!(
460 "Failed to deserialize BalanceIndexInfo from RocksDB - data corruption: {e}"
461 )
462 })
463 .ok()?;
464 result.merge_delta(&delta);
465 }
466
467 Some(
468 bcs::to_bytes(&result)
469 .expect("Failed to deserialize BalanceIndexInfo from RocksDB - data corruption."),
470 )
471}
472
473fn balance_compaction_filter(_level: u32, _key: &[u8], value: &[u8]) -> Decision {
474 let balance_info = match bcs::from_bytes::<BalanceIndexInfo>(value) {
475 Ok(info) => info,
476 Err(_) => return Decision::Keep,
477 };
478
479 if balance_info.coin_balance_delta == 0 && balance_info.address_balance_delta == 0 {
480 Decision::Remove
481 } else {
482 Decision::Keep
483 }
484}
485
486fn balance_table_options() -> typed_store::rocks::DBOptions {
487 default_table_options()
488 .set_merge_operator_associative("balance_merge", balance_delta_merge_operator)
489 .set_compaction_filter("balance_zero_filter", balance_compaction_filter)
490}
491
492fn decode_bitmap_blob(bcs_bytes: &[u8]) -> Result<RoaringBitmap, anyhow::Error> {
495 let blob: BitmapBlob = bcs::from_bytes(bcs_bytes)?;
496 Ok(RoaringBitmap::deserialize_from(&blob.0[..])?)
497}
498
499fn encode_bitmap_blob(bm: &RoaringBitmap) -> Vec<u8> {
500 let mut buf = Vec::with_capacity(bm.serialized_size());
501 bm.serialize_into(&mut buf)
502 .expect("RoaringBitmap::serialize_into on Vec cannot fail");
503 bcs::to_bytes(&BitmapBlob(buf)).expect("BCS encode of BitmapBlob cannot fail")
504}
505
506fn bitmap_union_merge_operator(
509 _key: &[u8],
510 existing_val: Option<&[u8]>,
511 operands: &MergeOperands,
512) -> Option<Vec<u8>> {
513 let mut acc = match existing_val {
514 Some(v) => match decode_bitmap_blob(v) {
515 Ok(bm) => bm,
516 Err(e) => {
517 tracing::error!(
518 "Failed to deserialize existing BitmapBlob during merge - data corruption: {e}"
519 );
520 return None;
521 }
522 },
523 None => RoaringBitmap::new(),
524 };
525
526 for operand in operands.iter() {
527 match decode_bitmap_blob(operand) {
528 Ok(bm) => acc |= bm,
529 Err(e) => {
530 tracing::error!(
531 "Failed to deserialize BitmapBlob operand during merge - data corruption: {e}"
532 );
533 return None;
534 }
535 }
536 }
537
538 acc.optimize();
546 Some(encode_bitmap_blob(&acc))
547}
548
549#[derive(Clone)]
559pub struct BitmapCompactionFilter {
560 pruning_tx_seq_exclusive: Arc<AtomicU64>,
561 kind: BitmapKind,
562}
563
564impl BitmapCompactionFilter {
565 pub fn new(pruning_tx_seq_exclusive: Arc<AtomicU64>, kind: BitmapKind) -> Self {
566 Self {
567 pruning_tx_seq_exclusive,
568 kind,
569 }
570 }
571
572 pub fn filter(&self, key: &[u8], _value: &[u8]) -> Decision {
573 if key.len() < 8 {
574 warn!(
575 kind = ?self.kind,
576 "bitmap compaction filter saw key shorter than 8 bytes ({}); keeping",
577 key.len(),
578 );
579 return Decision::Keep;
580 }
581 let bucket_id =
582 u64::from_be_bytes(key[key.len() - 8..].try_into().expect("len checked above"));
583 let pruned_exclusive = self.pruning_tx_seq_exclusive.load(Ordering::Relaxed);
584
585 if self.kind.bucket_fully_pruned(bucket_id, pruned_exclusive) {
586 Decision::Remove
587 } else {
588 Decision::Keep
589 }
590 }
591}
592
593fn bitmap_cf_default_options() -> typed_store::rocks::DBOptions {
596 default_table_options()
597 .set_merge_operator_associative("bitmap_union_merge", bitmap_union_merge_operator)
598}
599
600fn bitmap_cf_options(
602 filter_name: &str,
603 filter: BitmapCompactionFilter,
604) -> typed_store::rocks::DBOptions {
605 let mut options = bitmap_cf_default_options();
606 options
607 .options
608 .set_compaction_filter(filter_name, move |_level, key, value| {
609 filter.filter(key, value)
610 });
611 options
612}
613
614impl CoinIndexInfo {
615 fn merge(&mut self, other: Self) {
616 self.coin_metadata_object_id = self
617 .coin_metadata_object_id
618 .or(other.coin_metadata_object_id);
619 self.regulated_coin_metadata_object_id = self
620 .regulated_coin_metadata_object_id
621 .or(other.regulated_coin_metadata_object_id);
622 self.treasury_object_id = self.treasury_object_id.or(other.treasury_object_id);
623 }
624}
625
626#[derive(DBMapUtils)]
635struct IndexStoreTables {
636 meta: DBMap<(), MetadataInfo>,
644
645 settings: DBMap<(), Vec<u8>>,
652
653 #[default_options_override_fn = "default_table_options"]
659 watermark: DBMap<Watermark, CheckpointSequenceNumber>,
660
661 #[default_options_override_fn = "default_table_options"]
665 epochs: DBMap<EpochId, EpochInfo>,
666
667 #[default_options_override_fn = "default_table_options"]
671 #[allow(unused)]
672 #[deprecated]
673 transactions: DBMap<TransactionDigest, TransactionInfo>,
674
675 #[default_options_override_fn = "default_table_options"]
680 owner: DBMap<OwnerIndexKey, OwnerIndexInfo>,
681
682 #[default_options_override_fn = "default_table_options"]
687 dynamic_field: DBMap<DynamicFieldKey, ()>,
688
689 #[default_options_override_fn = "default_table_options"]
694 coin: DBMap<CoinIndexKey, CoinIndexInfo>,
695
696 #[default_options_override_fn = "balance_table_options"]
700 balance: DBMap<BalanceKey, BalanceIndexInfo>,
701
702 #[default_options_override_fn = "default_table_options"]
707 package_version: DBMap<PackageVersionKey, PackageVersionInfo>,
708
709 #[default_options_override_fn = "tx_seq_digest_table_options"]
711 tx_seq_digest: DBMap<u64, TxSeqDigestInfo>,
712
713 #[default_options_override_fn = "bitmap_cf_default_options"]
715 transaction_bitmap: DBMap<BitmapIndexKey, BitmapBlob>,
716
717 #[default_options_override_fn = "bitmap_cf_default_options"]
719 event_bitmap: DBMap<BitmapIndexKey, BitmapBlob>,
720 }
724
725fn clear_table<K, V>(map: &DBMap<K, V>) -> Result<(), TypedStoreError>
732where
733 K: Serialize + DeserializeOwned,
734 V: Serialize + DeserializeOwned,
735{
736 let first = map.safe_iter().next().transpose()?.map(|(k, _)| k);
737 let last = map
738 .reversed_safe_iter_with_bounds(None, None)?
739 .next()
740 .transpose()?
741 .map(|(k, _)| k);
742 let (Some(first), Some(last)) = (first, last) else {
743 return Ok(());
744 };
745 let mut batch = map.batch();
746 batch.schedule_delete_range(map, &first, &last)?;
747 batch.delete_batch(map, std::iter::once(&last))?;
748 batch.write()?;
749 map.compact_range(&first, &last)
750}
751
752impl IndexStoreTables {
753 fn extract_version_if_package(
754 object: &Object,
755 ) -> Option<(PackageVersionKey, PackageVersionInfo)> {
756 if let Data::Package(package) = &object.data {
757 let original_id = package.original_package_id();
758 let version = package.version().value();
759 let storage_id = object.id();
760
761 let key = PackageVersionKey {
762 original_package_id: original_id,
763 version,
764 };
765 let info = PackageVersionInfo { storage_id };
766 return Some((key, info));
767 }
768 None
769 }
770
771 fn open_with_index_options<P: Into<PathBuf>>(
772 path: P,
773 index_options: IndexStoreOptions,
774 ) -> Self {
775 let mut table_options = std::collections::BTreeMap::new();
783 for (table_name, _) in IndexStoreTables::describe_tables() {
784 table_options.insert(table_name, default_table_options());
785 }
786 table_options.insert("balance".to_string(), balance_table_options());
787 table_options.insert("tx_seq_digest".to_string(), tx_seq_digest_table_options());
789
790 let bitmap_filter_tx = BitmapCompactionFilter::new(
791 index_options.pruning_tx_seq_exclusive.clone(),
792 BitmapKind::Transaction,
793 );
794 let bitmap_filter_event = BitmapCompactionFilter::new(
795 index_options.pruning_tx_seq_exclusive.clone(),
796 BitmapKind::Event,
797 );
798 table_options.insert(
799 "transaction_bitmap".to_string(),
800 bitmap_cf_options("transaction_bitmap_filter", bitmap_filter_tx),
801 );
802 table_options.insert(
803 "event_bitmap".to_string(),
804 bitmap_cf_options("event_bitmap_filter", bitmap_filter_event),
805 );
806
807 IndexStoreTables::open_tables_read_write_with_deprecation_option(
808 path.into(),
809 MetricConf::new("rpc-index"),
810 None,
811 Some(DBMapTableConfigMap::new(table_options)),
812 true, )
814 }
815
816 fn open_with_options<P: Into<PathBuf>>(
817 path: P,
818 options: typed_store::rocksdb::Options,
819 table_options: Option<DBMapTableConfigMap>,
820 ) -> Self {
821 IndexStoreTables::open_tables_read_write_with_deprecation_option(
822 path.into(),
823 MetricConf::new("rpc-index"),
824 Some(options),
825 table_options,
826 true, )
828 }
829
830 fn persisted_ledger_history_indexing(&self) -> bool {
834 self.read_settings().ledger_history_indexing
835 }
836
837 fn read_settings(&self) -> IndexSettings {
840 self.settings
841 .get(&())
842 .ok()
843 .flatten()
844 .map(|bytes| decode_settings(&bytes))
845 .unwrap_or_default()
846 }
847
848 fn needs_to_do_initialization(
849 &self,
850 checkpoint_store: &CheckpointStore,
851 ledger_history_indexing: bool,
852 ) -> bool {
853 let schema_stale = match self.meta.get(&()) {
854 Ok(Some(metadata)) => metadata.version != CURRENT_DB_VERSION,
855 Ok(None) | Err(_) => true,
856 };
857 let enabling = ledger_history_indexing && !self.persisted_ledger_history_indexing();
862 schema_stale || enabling || self.is_indexed_watermark_out_of_date(checkpoint_store)
863 }
864
865 fn disable_ledger_history_indexing(&self) -> Result<(), TypedStoreError> {
872 clear_table(&self.tx_seq_digest)?;
873 clear_table(&self.transaction_bitmap)?;
874 clear_table(&self.event_bitmap)?;
875 self.settings.insert(
876 &(),
877 &encode_settings(&IndexSettings {
878 ledger_history_indexing: false,
879 }),
880 )?;
881 Ok(())
882 }
883
884 fn is_indexed_watermark_out_of_date(&self, checkpoint_store: &CheckpointStore) -> bool {
886 let highest_executed_checkpint = checkpoint_store
887 .get_highest_executed_checkpoint_seq_number()
888 .ok()
889 .flatten();
890 let watermark = self.watermark.get(&Watermark::Indexed).ok().flatten();
891 watermark < highest_executed_checkpint
892 }
893
894 #[tracing::instrument(skip_all)]
895 fn init(
896 &mut self,
897 authority_store: &AuthorityStore,
898 checkpoint_store: &CheckpointStore,
899 _epoch_store: &AuthorityPerEpochStore,
900 _package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
901 batch_size_limit: usize,
902 rpc_config: &sui_config::RpcConfig,
903 ) -> Result<(), StorageError> {
904 info!("Initializing RPC indexes");
905
906 let restore_target = authority_store
934 .perpetual_tables
935 .get_highest_committed_checkpoint()?
936 .or(checkpoint_store.get_highest_executed_checkpoint_seq_number()?);
937 let lowest_available_checkpoint = checkpoint_store
938 .get_highest_pruned_checkpoint_seq_number()?
939 .map(|c| c.saturating_add(1))
940 .unwrap_or(0);
941 let lowest_available_checkpoint_objects = authority_store
942 .perpetual_tables
943 .get_highest_pruned_checkpoint()?
944 .map(|c| c.saturating_add(1))
945 .unwrap_or(0);
946 let lowest_available_checkpoint =
949 lowest_available_checkpoint.max(lowest_available_checkpoint_objects);
950
951 let checkpoint_range =
952 restore_target.map(|restore_target| lowest_available_checkpoint..=restore_target);
953
954 if let Some(checkpoint_range) = checkpoint_range.clone() {
955 self.index_existing_checkpoints(authority_store, checkpoint_store, checkpoint_range)?;
956 }
957
958 if rpc_config.ledger_history_indexing()
959 && let Some(checkpoint_range) = checkpoint_range
960 {
961 self.backfill_ledger_history_indexes(
962 authority_store,
963 checkpoint_store,
964 checkpoint_range,
965 )?;
966 }
967
968 self.initialize_current_epoch(authority_store, checkpoint_store)?;
969
970 if restore_target.is_some() {
974 let coin_index = Mutex::new(HashMap::new());
975
976 let make_live_object_indexer = RpcParLiveObjectSetIndexer {
977 tables: self,
978 coin_index: &coin_index,
979 batch_size_limit,
980 };
981
982 crate::par_index_live_object_set::par_index_live_object_set(
983 authority_store,
984 &make_live_object_indexer,
985 )?;
986
987 self.coin.multi_insert(coin_index.into_inner().unwrap())?;
988 }
989
990 if let Some(restore_target) = restore_target {
998 self.watermark
999 .insert(&Watermark::Indexed, &restore_target)?;
1000 }
1001
1002 let mut batch = self.meta.batch();
1005 batch.insert_batch(
1006 &self.meta,
1007 [(
1008 (),
1009 MetadataInfo {
1010 version: CURRENT_DB_VERSION,
1011 },
1012 )],
1013 )?;
1014 batch.insert_batch(
1015 &self.settings,
1016 [(
1017 (),
1018 encode_settings(&IndexSettings {
1019 ledger_history_indexing: rpc_config.ledger_history_indexing(),
1020 }),
1021 )],
1022 )?;
1023 batch.write()?;
1024
1025 info!("Finished initializing RPC indexes");
1026
1027 Ok(())
1028 }
1029
1030 #[tracing::instrument(skip(self, authority_store, checkpoint_store))]
1031 fn index_existing_checkpoints(
1032 &mut self,
1033 authority_store: &AuthorityStore,
1034 checkpoint_store: &CheckpointStore,
1035 checkpoint_range: std::ops::RangeInclusive<u64>,
1036 ) -> Result<(), StorageError> {
1037 info!(
1038 "Indexing {} checkpoints in range {checkpoint_range:?}",
1039 checkpoint_range.size_hint().0
1040 );
1041 let start_time = Instant::now();
1042
1043 checkpoint_range.into_par_iter().try_for_each(|seq| {
1044 let Some(checkpoint) =
1045 sparse_checkpoint_for_epoch_backfill(authority_store, checkpoint_store, seq)?
1046 else {
1047 return Ok(());
1048 };
1049
1050 let mut batch = self.epochs.batch();
1051
1052 self.index_epoch(&checkpoint, &mut batch)?;
1053
1054 batch
1055 .write_opt(bulk_ingestion_write_options())
1056 .map_err(StorageError::from)
1057 })?;
1058
1059 info!(
1060 "Indexing checkpoints took {} seconds",
1061 start_time.elapsed().as_secs()
1062 );
1063 Ok(())
1064 }
1065
1066 fn backfill_ledger_history_indexes(
1072 &self,
1073 authority_store: &AuthorityStore,
1074 checkpoint_store: &CheckpointStore,
1075 checkpoint_range: std::ops::RangeInclusive<u64>,
1076 ) -> Result<(), StorageError> {
1077 info!("ledger history backfill: cps {checkpoint_range:?}");
1078 let start_time = Instant::now();
1079
1080 checkpoint_range.clone().into_par_iter().try_for_each(
1081 |seq| -> Result<(), StorageError> {
1082 let checkpoint =
1083 full_checkpoint_for_backfill(authority_store, checkpoint_store, seq)?
1084 .ok_or_else(|| {
1085 StorageError::missing(format!(
1087 "ledger history backfill: checkpoint {seq} is missing from local \
1088 storage but falls inside the retained backfill range \
1089 {checkpoint_range:?}"
1090 ))
1091 })?;
1092 let mut batch = self.meta.batch();
1093 self.write_ledger_history_rows_for_checkpoint(&checkpoint, &mut batch)?;
1094 batch
1095 .write_opt(bulk_ingestion_write_options())
1096 .map_err(StorageError::from)
1097 },
1098 )?;
1099
1100 self.tx_seq_digest.flush().map_err(|e| {
1102 StorageError::custom(format!("flush after ledger history backfill: {e}"))
1103 })?;
1104
1105 info!(
1106 "ledger history backfill took {} seconds",
1107 start_time.elapsed().as_secs()
1108 );
1109 Ok(())
1110 }
1111
1112 fn first_tx_seq_digest_key(&self) -> Result<Option<u64>, TypedStoreError> {
1119 match self.tx_seq_digest.safe_iter().next() {
1120 Some(Ok((k, _))) => Ok(Some(k)),
1121 Some(Err(e)) => Err(e),
1122 None => Ok(None),
1123 }
1124 }
1125
1126 fn prune(
1133 &self,
1134 pruned_checkpoint_watermark: u64,
1135 pruned_tx_seq_exclusive: u64,
1136 ledger_history_enabled: bool,
1137 pruning_atomic: &AtomicU64,
1138 ) -> Result<(), TypedStoreError> {
1139 let mut batch = self.watermark.batch();
1140
1141 batch.insert_batch(
1142 &self.watermark,
1143 [(Watermark::Pruned, pruned_checkpoint_watermark)],
1144 )?;
1145
1146 if ledger_history_enabled {
1147 let prev_exclusive = self.first_tx_seq_digest_key()?.unwrap_or(0);
1151 batch.schedule_delete_range(
1152 &self.tx_seq_digest,
1153 &prev_exclusive,
1154 &pruned_tx_seq_exclusive,
1155 )?;
1156 }
1157
1158 batch.write()?;
1159
1160 if ledger_history_enabled {
1161 pruning_atomic.store(pruned_tx_seq_exclusive, Ordering::Relaxed);
1163 }
1164 Ok(())
1165 }
1166
1167 fn index_checkpoint(
1169 &self,
1170 checkpoint: &Checkpoint,
1171 ledger_history_enabled: bool,
1172 ) -> Result<typed_store::rocks::DBBatch, StorageError> {
1173 debug!(
1174 checkpoint = checkpoint.summary.sequence_number,
1175 "indexing checkpoint"
1176 );
1177
1178 let mut batch = self.owner.batch();
1179
1180 self.index_epoch(checkpoint, &mut batch)?;
1181 self.index_transactions(checkpoint, &mut batch)?;
1182 self.index_objects(checkpoint, &mut batch)?;
1183
1184 if ledger_history_enabled {
1186 self.write_ledger_history_rows_for_checkpoint(checkpoint, &mut batch)?;
1187 }
1188
1189 batch.insert_batch(
1190 &self.watermark,
1191 [(Watermark::Indexed, checkpoint.summary.sequence_number)],
1192 )?;
1193
1194 debug!(
1195 checkpoint = checkpoint.summary.sequence_number,
1196 "finished indexing checkpoint"
1197 );
1198
1199 Ok(batch)
1200 }
1201
1202 fn index_epoch(
1203 &self,
1204 checkpoint: &Checkpoint,
1205 batch: &mut typed_store::rocks::DBBatch,
1206 ) -> Result<(), StorageError> {
1207 let Some(epoch_info) = checkpoint.epoch_info()? else {
1208 return Ok(());
1209 };
1210 if epoch_info.epoch > 0 {
1211 let prev_epoch = epoch_info.epoch - 1;
1212 let mut current_epoch = self.epochs.get(&prev_epoch)?.unwrap_or_default();
1213 current_epoch.epoch = prev_epoch; current_epoch.end_timestamp_ms = epoch_info.start_timestamp_ms;
1215 current_epoch.end_checkpoint = epoch_info.start_checkpoint.map(|sq| sq - 1);
1216 batch.insert_batch(&self.epochs, [(prev_epoch, current_epoch)])?;
1217 }
1218 batch.insert_batch(&self.epochs, [(epoch_info.epoch, epoch_info)])?;
1219 Ok(())
1220 }
1221
1222 fn initialize_current_epoch(
1225 &mut self,
1226 authority_store: &AuthorityStore,
1227 checkpoint_store: &CheckpointStore,
1228 ) -> Result<(), StorageError> {
1229 let Some(checkpoint) = checkpoint_store.get_highest_executed_checkpoint()? else {
1230 return Ok(());
1231 };
1232
1233 let system_state = sui_types::sui_system_state::get_sui_system_state(authority_store)
1234 .map_err(|e| StorageError::custom(format!("Failed to find system state: {e}")))?;
1235
1236 let mut epoch = self.epochs.get(&checkpoint.epoch)?.unwrap_or_default();
1237 epoch.epoch = checkpoint.epoch;
1238
1239 if epoch.protocol_version.is_none() {
1240 epoch.protocol_version = Some(system_state.protocol_version());
1241 }
1242
1243 if epoch.start_timestamp_ms.is_none() {
1244 epoch.start_timestamp_ms = Some(system_state.epoch_start_timestamp_ms());
1245 }
1246
1247 if epoch.reference_gas_price.is_none() {
1248 epoch.reference_gas_price = Some(system_state.reference_gas_price());
1249 }
1250
1251 if epoch.system_state.is_none() {
1252 epoch.system_state = Some(system_state);
1253 }
1254
1255 self.epochs.insert(&epoch.epoch, &epoch)?;
1256
1257 Ok(())
1258 }
1259
1260 fn index_transactions(
1261 &self,
1262 checkpoint: &Checkpoint,
1263 batch: &mut typed_store::rocks::DBBatch,
1264 ) -> Result<(), StorageError> {
1265 for tx in &checkpoint.transactions {
1266 let balance_changes = sui_types::balance_change::derive_detailed_balance_changes_2(
1267 &tx.effects,
1268 &checkpoint.object_set,
1269 )
1270 .into_iter()
1271 .filter_map(|change| {
1272 if let TypeTag::Struct(coin_type) = change.coin_type {
1273 Some((
1274 BalanceKey {
1275 owner: change.address,
1276 coin_type: *coin_type,
1277 },
1278 BalanceIndexInfo {
1279 coin_balance_delta: change.coin_amount,
1280 address_balance_delta: change.address_amount,
1281 },
1282 ))
1283 } else {
1284 None
1285 }
1286 });
1287 batch.partial_merge_batch(&self.balance, balance_changes)?;
1288 }
1289
1290 Ok(())
1291 }
1292
1293 fn write_ledger_history_rows_for_checkpoint(
1298 &self,
1299 checkpoint: &Checkpoint,
1300 batch: &mut typed_store::rocks::DBBatch,
1301 ) -> Result<(), StorageError> {
1302 let cp_seq = checkpoint.summary.sequence_number;
1303 let net_total = checkpoint.summary.data().network_total_transactions;
1304 let tx_count = checkpoint.transactions.len() as u64;
1305 let tx_lo = net_total.checked_sub(tx_count).ok_or_else(|| {
1309 StorageError::custom(format!(
1310 "checkpoint {cp_seq}: network_total_transactions ({net_total}) \
1311 < tx_count ({tx_count})"
1312 ))
1313 })?;
1314
1315 let object_set = &checkpoint.object_set;
1316
1317 let mut tx_groups: FxHashMap<(Vec<u8>, u64), RoaringBitmap> = FxHashMap::default();
1320
1321 for (i, tx) in checkpoint.transactions.iter().enumerate() {
1322 let tx_seq = tx_lo + i as u64;
1323
1324 let tx_data = &tx.transaction;
1325 let digest = *tx.effects.transaction_digest();
1326 let event_count = tx.events.as_ref().map(|e| e.data.len() as u32).unwrap_or(0);
1327
1328 batch.insert_batch(
1330 &self.tx_seq_digest,
1331 [(
1332 tx_seq,
1333 TxSeqDigestInfo {
1334 digest,
1335 event_count,
1336 tx_offset: i as u32,
1338 checkpoint_number: cp_seq,
1339 },
1340 )],
1341 )?;
1342
1343 let tx_bucket = tx_seq / TX_BUCKET_SIZE;
1346 let tx_bit = (tx_seq % TX_BUCKET_SIZE) as u32;
1347 let mut tx_dim_keys: FxHashSet<Vec<u8>> = FxHashSet::default();
1348 for_each_transaction_dimension(
1349 tx_data,
1350 &tx.effects,
1351 tx.events.as_ref(),
1352 object_set,
1353 |dim, value| {
1354 tx_dim_keys.insert(encode_dimension_key(dim, value));
1355 },
1356 );
1357 for dim_key in tx_dim_keys {
1358 tx_groups
1359 .entry((dim_key, tx_bucket))
1360 .or_default()
1361 .insert(tx_bit);
1362 }
1363
1364 let mut event_groups: FxHashMap<(Vec<u8>, u64), RoaringBitmap> = FxHashMap::default();
1368 let mut event_seq_error = None;
1369 for_each_event_dimension(
1370 tx_data.sender(),
1371 &tx.effects,
1372 tx.events.as_ref(),
1373 |event_idx, dim, value| {
1374 let event_seq = match checked_encode_event_seq(tx_seq, event_idx) {
1375 Ok(event_seq) => event_seq,
1376 Err(e) => {
1377 event_seq_error.get_or_insert(e);
1378 return;
1379 }
1380 };
1381 let bucket = event_seq / EVENT_BUCKET_SIZE;
1382 let bit = (event_seq % EVENT_BUCKET_SIZE) as u32;
1383 event_groups
1384 .entry((encode_dimension_key(dim, value), bucket))
1385 .or_default()
1386 .insert(bit);
1387 },
1388 );
1389 if let Some(e) = event_seq_error {
1390 return Err(e);
1391 }
1392 let event_ops = event_groups.into_iter().map(|((dim_key, bucket), bm)| {
1393 (
1394 BitmapIndexKey {
1395 dimension_key: dim_key,
1396 bucket_id: bucket,
1397 },
1398 BitmapBlob::from(bm),
1399 )
1400 });
1401 batch.partial_merge_batch(&self.event_bitmap, event_ops)?;
1402 }
1403
1404 let tx_ops = tx_groups.into_iter().map(|((dim_key, bucket), bm)| {
1405 (
1406 BitmapIndexKey {
1407 dimension_key: dim_key,
1408 bucket_id: bucket,
1409 },
1410 BitmapBlob::from(bm),
1411 )
1412 });
1413 batch.partial_merge_batch(&self.transaction_bitmap, tx_ops)?;
1414
1415 Ok(())
1416 }
1417
1418 fn index_objects(
1419 &self,
1420 checkpoint: &Checkpoint,
1421 batch: &mut typed_store::rocks::DBBatch,
1422 ) -> Result<(), StorageError> {
1423 let mut coin_index: HashMap<CoinIndexKey, CoinIndexInfo> = HashMap::new();
1424 let mut package_version_index: Vec<(PackageVersionKey, PackageVersionInfo)> = vec![];
1425 let object_set = &checkpoint.object_set;
1426
1427 for tx in &checkpoint.transactions {
1428 for removed_object in tx_removed_objects_pre_version(tx, object_set) {
1430 match removed_object.owner() {
1431 Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
1432 let owner_key = OwnerIndexKey::from_object(removed_object);
1433 batch.delete_batch(&self.owner, [owner_key])?;
1434 }
1435 Owner::ObjectOwner(object_id) => {
1436 batch.delete_batch(
1437 &self.dynamic_field,
1438 [DynamicFieldKey::new(*object_id, removed_object.id())],
1439 )?;
1440 }
1441 Owner::Shared { .. } | Owner::Immutable => {}
1442 Owner::Party { .. } => {
1443 todo!("Party WIP");
1445 }
1448 }
1449 }
1450
1451 for (object, old_object) in tx_changed_objects(tx, object_set) {
1453 if let Some(old_object) = old_object {
1454 match old_object.owner() {
1455 Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
1456 let owner_key = OwnerIndexKey::from_object(old_object);
1457 batch.delete_batch(&self.owner, [owner_key])?;
1458 }
1459
1460 Owner::ObjectOwner(object_id) => {
1461 if old_object.owner() != object.owner() {
1462 batch.delete_batch(
1463 &self.dynamic_field,
1464 [DynamicFieldKey::new(*object_id, old_object.id())],
1465 )?;
1466 }
1467 }
1468
1469 Owner::Shared { .. } | Owner::Immutable => {}
1470
1471 Owner::Party { .. } => {
1472 todo!("Party WIP");
1474 }
1477 }
1478 }
1479
1480 match object.owner() {
1481 Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
1482 let owner_key = OwnerIndexKey::from_object(object);
1483 let owner_info = OwnerIndexInfo::new(object);
1484 batch.insert_batch(&self.owner, [(owner_key, owner_info)])?;
1485 }
1486 Owner::ObjectOwner(parent) => {
1487 if should_index_dynamic_field(object) {
1488 let field_key = DynamicFieldKey::new(*parent, object.id());
1489 batch.insert_batch(&self.dynamic_field, [(field_key, ())])?;
1490 }
1491 }
1492 Owner::Shared { .. } | Owner::Immutable => {}
1493 Owner::Party { .. } => todo!("Party WIP"),
1495 }
1496 if let Some((key, info)) = Self::extract_version_if_package(object) {
1497 package_version_index.push((key, info));
1498 }
1499 }
1500
1501 for (key, value) in tx
1507 .created_objects(object_set)
1508 .flat_map(try_create_coin_index_info)
1509 {
1510 use std::collections::hash_map::Entry;
1511
1512 match coin_index.entry(key) {
1513 Entry::Occupied(mut o) => {
1514 o.get_mut().merge(value);
1515 }
1516 Entry::Vacant(v) => {
1517 v.insert(value);
1518 }
1519 }
1520 }
1521 }
1522
1523 batch.insert_batch(&self.coin, coin_index)?;
1524 batch.insert_batch(&self.package_version, package_version_index)?;
1525
1526 Ok(())
1527 }
1528
1529 fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<EpochInfo>, TypedStoreError> {
1530 self.epochs.get(&epoch)
1531 }
1532
1533 fn owner_iter(
1534 &self,
1535 owner: SuiAddress,
1536 object_type: Option<StructTag>,
1537 cursor: Option<OwnerIndexKey>,
1538 ) -> Result<
1539 impl Iterator<Item = Result<(OwnerIndexKey, OwnerIndexInfo), TypedStoreError>> + '_,
1540 TypedStoreError,
1541 > {
1542 let lower_bound = cursor.unwrap_or_else(|| OwnerIndexKey {
1544 owner,
1545 object_type: object_type
1546 .clone()
1547 .unwrap_or_else(|| "0x0::a::a".parse::<StructTag>().unwrap()),
1548 inverted_balance: None,
1549 object_id: ObjectID::ZERO,
1550 });
1551
1552 Ok(self
1553 .owner
1554 .safe_iter_with_bounds(Some(lower_bound), None)
1555 .take_while(move |item| {
1556 let Ok((key, _)) = item else {
1558 return true;
1559 };
1560
1561 key.owner == owner
1563 && object_type
1565 .as_ref()
1566 .map(|ty| {
1567 ty.address == key.object_type.address
1568 && ty.module == key.object_type.module
1569 && ty.name == key.object_type.name
1570 && (ty.type_params.is_empty() ||
1572 ty.type_params == key.object_type.type_params)
1574 }).unwrap_or(true)
1575 }))
1576 }
1577
1578 fn dynamic_field_iter(
1579 &self,
1580 parent: ObjectID,
1581 cursor: Option<ObjectID>,
1582 ) -> Result<impl Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_, TypedStoreError>
1583 {
1584 let lower_bound = DynamicFieldKey::new(parent, cursor.unwrap_or(ObjectID::ZERO));
1585 let upper_bound = DynamicFieldKey::new(parent, ObjectID::MAX);
1586 let iter = self
1587 .dynamic_field
1588 .safe_iter_with_bounds(Some(lower_bound), Some(upper_bound))
1589 .map_ok(|(key, ())| key);
1590 Ok(iter)
1591 }
1592
1593 fn get_coin_info(
1594 &self,
1595 coin_type: &StructTag,
1596 ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
1597 let key = CoinIndexKey {
1598 coin_type: coin_type.to_owned(),
1599 };
1600 self.coin.get(&key)
1601 }
1602
1603 fn get_balance(
1604 &self,
1605 owner: &SuiAddress,
1606 coin_type: &StructTag,
1607 ) -> Result<Option<BalanceIndexInfo>, TypedStoreError> {
1608 let key = BalanceKey {
1609 owner: owner.to_owned(),
1610 coin_type: coin_type.to_owned(),
1611 };
1612 self.balance.get(&key)
1613 }
1614
1615 fn balance_iter(
1616 &self,
1617 owner: SuiAddress,
1618 cursor: Option<BalanceKey>,
1619 ) -> Result<
1620 impl Iterator<Item = Result<(BalanceKey, BalanceIndexInfo), TypedStoreError>> + '_,
1621 TypedStoreError,
1622 > {
1623 let lower_bound = cursor.unwrap_or_else(|| BalanceKey {
1624 owner,
1625 coin_type: "0x0::a::a".parse::<StructTag>().unwrap(),
1626 });
1627
1628 Ok(self
1629 .balance
1630 .safe_iter_with_bounds(Some(lower_bound), None)
1631 .scan((), move |_, item| {
1632 match item {
1633 Ok((key, value)) if key.owner == owner => Some(Ok((key, value))),
1634 Ok(_) => None, Err(e) => Some(Err(e)), }
1637 }))
1638 }
1639
1640 fn package_versions_iter(
1641 &self,
1642 original_id: ObjectID,
1643 cursor: Option<u64>,
1644 ) -> Result<
1645 impl Iterator<Item = Result<(PackageVersionKey, PackageVersionInfo), TypedStoreError>> + '_,
1646 TypedStoreError,
1647 > {
1648 let lower_bound = PackageVersionKey {
1649 original_package_id: original_id,
1650 version: cursor.unwrap_or(0),
1651 };
1652 let upper_bound = PackageVersionKey {
1653 original_package_id: original_id,
1654 version: u64::MAX,
1655 };
1656
1657 Ok(self
1658 .package_version
1659 .safe_iter_with_bounds(Some(lower_bound), Some(upper_bound)))
1660 }
1661}
1662
1663pub struct RpcIndexStore {
1664 tables: IndexStoreTables,
1665 pending_updates: Mutex<BTreeMap<u64, typed_store::rocks::DBBatch>>,
1666 ledger_history_pruning_watermark: Arc<AtomicU64>,
1670 ledger_history_enabled: bool,
1674}
1675
1676impl RpcIndexStore {
1677 fn db_path(dir: &Path) -> PathBuf {
1679 dir.join("rpc-index")
1680 }
1681
1682 pub async fn new(
1683 dir: &Path,
1684 authority_store: &AuthorityStore,
1685 checkpoint_store: &CheckpointStore,
1686 epoch_store: &AuthorityPerEpochStore,
1687 package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
1688 rpc_config: sui_config::RpcConfig,
1689 ) -> Self {
1690 let ledger_history_pruning_watermark = Arc::new(AtomicU64::new(0));
1692 let index_options = IndexStoreOptions {
1693 pruning_tx_seq_exclusive: ledger_history_pruning_watermark,
1694 };
1695
1696 Self::new_with_options(
1697 dir,
1698 authority_store,
1699 checkpoint_store,
1700 epoch_store,
1701 package_store,
1702 index_options,
1703 rpc_config,
1704 )
1705 .await
1706 }
1707
1708 pub async fn new_with_options(
1709 dir: &Path,
1710 authority_store: &AuthorityStore,
1711 checkpoint_store: &CheckpointStore,
1712 epoch_store: &AuthorityPerEpochStore,
1713 package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
1714 index_options: IndexStoreOptions,
1715 rpc_config: sui_config::RpcConfig,
1716 ) -> Self {
1717 let path = Self::db_path(dir);
1718 let index_config = rpc_config.index_initialization_config();
1719
1720 let ledger_history_atomic = index_options.pruning_tx_seq_exclusive.clone();
1721
1722 let tables = {
1723 let tables = IndexStoreTables::open_with_index_options(&path, index_options.clone());
1724
1725 if tables
1728 .needs_to_do_initialization(checkpoint_store, rpc_config.ledger_history_indexing())
1729 {
1730 let batch_size_limit;
1731
1732 let mut tables = {
1733 drop(tables);
1734 typed_store::rocks::safe_drop_db(path.clone(), Duration::from_secs(30))
1735 .await
1736 .expect("unable to destroy old rpc-index db");
1737
1738 let mut options = typed_store::rocksdb::Options::default();
1741 options.set_unordered_write(true);
1742
1743 let max_background_jobs = if let Some(jobs) =
1745 index_config.as_ref().and_then(|c| c.max_background_jobs)
1746 {
1747 debug!("Using config override for max_background_jobs: {}", jobs);
1748 jobs
1749 } else {
1750 let jobs = num_cpus::get() as i32;
1751 debug!(
1752 "Calculated max_background_jobs: {} (based on CPU count)",
1753 jobs
1754 );
1755 jobs
1756 };
1757 options.set_max_background_jobs(max_background_jobs);
1758
1759 options.set_level_zero_file_num_compaction_trigger(0);
1763 options.set_level_zero_slowdown_writes_trigger(-1);
1764 options.set_level_zero_stop_writes_trigger(i32::MAX);
1765
1766 let total_memory_bytes = get_available_memory();
1767 let db_buffer_size = if let Some(size) =
1770 index_config.as_ref().and_then(|c| c.db_write_buffer_size)
1771 {
1772 debug!(
1773 "Using config override for db_write_buffer_size: {} bytes",
1774 size
1775 );
1776 size
1777 } else {
1778 let size = (total_memory_bytes as f64 * 0.8) as usize;
1780 debug!(
1781 "Calculated db_write_buffer_size: {} bytes (80% of {} total bytes)",
1782 size, total_memory_bytes
1783 );
1784 size
1785 };
1786 options.set_db_write_buffer_size(db_buffer_size);
1787
1788 let mut table_config_map = BTreeMap::new();
1790
1791 let mut cf_options = typed_store::rocks::default_db_options();
1795 cf_options.options.set_disable_auto_compactions(true);
1796
1797 let (buffer_size, buffer_count) = match (
1798 index_config.as_ref().and_then(|c| c.cf_write_buffer_size),
1799 index_config
1800 .as_ref()
1801 .and_then(|c| c.cf_max_write_buffer_number),
1802 ) {
1803 (Some(size), Some(count)) => {
1804 debug!(
1805 "Using config overrides - buffer_size: {} bytes, buffer_count: {}",
1806 size, count
1807 );
1808 (size, count)
1809 }
1810 (None, None) => {
1811 let cf_memory_budget = (total_memory_bytes as f64 * 0.25) as usize;
1813 debug!(
1814 "Column family memory budget: {} bytes (25% of {} total bytes)",
1815 cf_memory_budget, total_memory_bytes
1816 );
1817 const MIN_BUFFER_SIZE: usize = 64 * 1024 * 1024; let target_buffer_count = num_cpus::get().max(2);
1822
1823 let buffer_size =
1829 (cf_memory_budget / target_buffer_count).max(MIN_BUFFER_SIZE);
1830 let buffer_count = (cf_memory_budget / buffer_size)
1831 .clamp(2, target_buffer_count)
1832 as i32;
1833 debug!(
1834 "Calculated buffer_size: {} bytes, buffer_count: {} (based on {} CPUs)",
1835 buffer_size, buffer_count, target_buffer_count
1836 );
1837 (buffer_size, buffer_count)
1838 }
1839 _ => {
1840 panic!(
1841 "indexing-cf-write-buffer-size and indexing-cf-max-write-buffer-number must both be specified or both be omitted"
1842 );
1843 }
1844 };
1845
1846 cf_options.options.set_write_buffer_size(buffer_size);
1847 cf_options.options.set_max_write_buffer_number(buffer_count);
1848
1849 batch_size_limit = if let Some(limit) =
1851 index_config.as_ref().and_then(|c| c.batch_size_limit)
1852 {
1853 debug!(
1854 "Using config override for batch_size_limit: {} bytes",
1855 limit
1856 );
1857 limit
1858 } else {
1859 let half_buffer = buffer_size / 2;
1860 let default_limit = 1 << 27; let limit = half_buffer.min(default_limit);
1862 debug!(
1863 "Calculated batch_size_limit: {} bytes (min of half_buffer={} and default_limit={})",
1864 limit, half_buffer, default_limit
1865 );
1866 limit
1867 };
1868
1869 for (table_name, _) in IndexStoreTables::describe_tables() {
1871 table_config_map.insert(table_name, cf_options.clone());
1872 }
1873
1874 let mut balance_options = cf_options.clone();
1876 balance_options = balance_options.set_merge_operator_associative(
1877 "balance_merge",
1878 balance_delta_merge_operator,
1879 );
1880 table_config_map.insert("balance".to_string(), balance_options);
1881
1882 let bitmap_filter_tx = BitmapCompactionFilter::new(
1883 index_options.pruning_tx_seq_exclusive.clone(),
1884 BitmapKind::Transaction,
1885 );
1886 let bitmap_filter_event = BitmapCompactionFilter::new(
1887 index_options.pruning_tx_seq_exclusive.clone(),
1888 BitmapKind::Event,
1889 );
1890 let mut transaction_bitmap_opts = cf_options.clone();
1891 transaction_bitmap_opts = transaction_bitmap_opts
1892 .set_merge_operator_associative(
1893 "bitmap_union_merge",
1894 bitmap_union_merge_operator,
1895 );
1896 transaction_bitmap_opts.options.set_compaction_filter(
1897 "transaction_bitmap_filter",
1898 move |_level, key, value| bitmap_filter_tx.filter(key, value),
1899 );
1900 table_config_map
1901 .insert("transaction_bitmap".to_string(), transaction_bitmap_opts);
1902
1903 let mut event_bitmap_opts = cf_options.clone();
1904 event_bitmap_opts = event_bitmap_opts.set_merge_operator_associative(
1905 "bitmap_union_merge",
1906 bitmap_union_merge_operator,
1907 );
1908 event_bitmap_opts
1909 .options
1910 .set_compaction_filter("event_bitmap_filter", move |_level, key, value| {
1911 bitmap_filter_event.filter(key, value)
1912 });
1913 table_config_map.insert("event_bitmap".to_string(), event_bitmap_opts);
1914
1915 IndexStoreTables::open_with_options(
1916 &path,
1917 options,
1918 Some(DBMapTableConfigMap::new(table_config_map)),
1919 )
1920 };
1921
1922 tables
1923 .init(
1924 authority_store,
1925 checkpoint_store,
1926 epoch_store,
1927 package_store,
1928 batch_size_limit,
1929 &rpc_config,
1930 )
1931 .expect("unable to initialize rpc index from live object set");
1932
1933 tables
1938 .meta
1939 .flush()
1940 .expect("Failed to flush RPC index tables to disk");
1941
1942 let weak_db = Arc::downgrade(&tables.meta.db);
1943 drop(tables);
1944
1945 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30);
1946 loop {
1947 if weak_db.strong_count() == 0 {
1948 break;
1949 }
1950 if std::time::Instant::now() > deadline {
1951 panic!("unable to reopen DB after indexing");
1952 }
1953 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1954 }
1955
1956 let reopened_tables =
1958 IndexStoreTables::open_with_index_options(&path, index_options);
1959
1960 let stored_version = reopened_tables
1963 .meta
1964 .get(&())
1965 .expect("Failed to read metadata from reopened database")
1966 .expect("Metadata not found in reopened database");
1967 assert_eq!(
1968 stored_version.version, CURRENT_DB_VERSION,
1969 "Database version mismatch after flush and reopen: expected {:#x}, found {:#x}",
1970 CURRENT_DB_VERSION, stored_version.version
1971 );
1972 assert_eq!(
1973 reopened_tables.persisted_ledger_history_indexing(),
1974 rpc_config.ledger_history_indexing(),
1975 "ledger-history setting mismatch after flush and reopen"
1976 );
1977
1978 reopened_tables
1979 } else {
1980 if tables.persisted_ledger_history_indexing()
1983 && !rpc_config.ledger_history_indexing()
1984 {
1985 tables
1986 .disable_ledger_history_indexing()
1987 .expect("unable to disable ledger history indexing");
1988 }
1989 tables
1990 }
1991 };
1992
1993 Self::hydrate_ledger_history_pruning_atomic(&tables, &ledger_history_atomic);
1995
1996 let ledger_history_enabled = tables.persisted_ledger_history_indexing();
1999 debug_assert_eq!(
2000 ledger_history_enabled,
2001 rpc_config.ledger_history_indexing(),
2002 "ledger_history_enabled (from settings CF) must match the configured ledger_history_indexing flag"
2003 );
2004
2005 Self {
2006 tables,
2007 pending_updates: Default::default(),
2008 ledger_history_pruning_watermark: ledger_history_atomic,
2009 ledger_history_enabled,
2010 }
2011 }
2012
2013 fn hydrate_ledger_history_pruning_atomic(tables: &IndexStoreTables, atomic: &Arc<AtomicU64>) {
2015 let persisted = tables.first_tx_seq_digest_key().ok().flatten().unwrap_or(0);
2016 atomic.store(persisted, Ordering::Relaxed);
2017 }
2018
2019 pub fn new_without_init(dir: &Path) -> Self {
2020 let path = Self::db_path(dir);
2021
2022 let ledger_history_atomic = Arc::new(AtomicU64::new(0));
2024 let index_options = IndexStoreOptions {
2025 pruning_tx_seq_exclusive: ledger_history_atomic.clone(),
2026 };
2027 let tables = IndexStoreTables::open_with_index_options(path, index_options);
2028
2029 let ledger_history_enabled = tables.persisted_ledger_history_indexing();
2030 Self::hydrate_ledger_history_pruning_atomic(&tables, &ledger_history_atomic);
2031
2032 Self {
2033 tables,
2034 pending_updates: Default::default(),
2035 ledger_history_pruning_watermark: ledger_history_atomic,
2036 ledger_history_enabled,
2037 }
2038 }
2039
2040 pub fn prune(
2041 &self,
2042 pruned_checkpoint_watermark: u64,
2043 pruned_tx_seq_exclusive: u64,
2044 ) -> Result<(), TypedStoreError> {
2045 self.tables.prune(
2046 pruned_checkpoint_watermark,
2047 pruned_tx_seq_exclusive,
2048 self.ledger_history_enabled,
2049 &self.ledger_history_pruning_watermark,
2050 )
2051 }
2052
2053 #[tracing::instrument(
2058 skip_all,
2059 fields(checkpoint = checkpoint.summary.sequence_number)
2060 )]
2061 pub fn index_checkpoint(&self, checkpoint: &Checkpoint) {
2062 let sequence_number = checkpoint.summary.sequence_number;
2063 let batch = self
2064 .tables
2065 .index_checkpoint(checkpoint, self.ledger_history_enabled)
2066 .expect("db error");
2067
2068 self.pending_updates
2069 .lock()
2070 .unwrap()
2071 .insert(sequence_number, batch);
2072 }
2073
2074 #[tracing::instrument(skip(self))]
2096 pub fn commit_update_for_checkpoint(&self, checkpoint: u64) -> Result<(), StorageError> {
2097 let next_batch = self.pending_updates.lock().unwrap().pop_first();
2098
2099 let (next_sequence_number, batch) = next_batch.unwrap();
2101 assert_eq!(
2102 checkpoint, next_sequence_number,
2103 "commit_update_for_checkpoint must be called in order"
2104 );
2105
2106 let indexed = self.tables.watermark.get(&Watermark::Indexed)?;
2107 let expected_next = indexed.map_or(0, |w| w + 1);
2108 if checkpoint < expected_next {
2109 debug!(
2112 checkpoint,
2113 expected_next, "dropping already-indexed checkpoint update"
2114 );
2115 return Ok(());
2116 }
2117 assert_eq!(
2118 checkpoint, expected_next,
2119 "rpc-index forward update is not contiguous: expected checkpoint {expected_next}, \
2120 got {checkpoint}"
2121 );
2122
2123 Ok(batch.write()?)
2124 }
2125
2126 pub fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<EpochInfo>, TypedStoreError> {
2127 self.tables.get_epoch_info(epoch)
2128 }
2129
2130 pub fn owner_iter(
2131 &self,
2132 owner: SuiAddress,
2133 object_type: Option<StructTag>,
2134 cursor: Option<OwnerIndexKey>,
2135 ) -> Result<
2136 impl Iterator<Item = Result<(OwnerIndexKey, OwnerIndexInfo), TypedStoreError>> + '_,
2137 TypedStoreError,
2138 > {
2139 self.tables.owner_iter(owner, object_type, cursor)
2140 }
2141
2142 pub fn dynamic_field_iter(
2143 &self,
2144 parent: ObjectID,
2145 cursor: Option<ObjectID>,
2146 ) -> Result<impl Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_, TypedStoreError>
2147 {
2148 self.tables.dynamic_field_iter(parent, cursor)
2149 }
2150
2151 pub fn get_coin_info(
2152 &self,
2153 coin_type: &StructTag,
2154 ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
2155 self.tables.get_coin_info(coin_type)
2156 }
2157
2158 pub fn get_balance(
2159 &self,
2160 owner: &SuiAddress,
2161 coin_type: &StructTag,
2162 ) -> Result<Option<BalanceIndexInfo>, TypedStoreError> {
2163 self.tables.get_balance(owner, coin_type)
2164 }
2165
2166 pub fn balance_iter(
2167 &self,
2168 owner: SuiAddress,
2169 cursor: Option<BalanceKey>,
2170 ) -> Result<
2171 impl Iterator<Item = Result<(BalanceKey, BalanceIndexInfo), TypedStoreError>> + '_,
2172 TypedStoreError,
2173 > {
2174 self.tables.balance_iter(owner, cursor)
2175 }
2176
2177 pub fn package_versions_iter(
2178 &self,
2179 original_id: ObjectID,
2180 cursor: Option<u64>,
2181 ) -> Result<
2182 impl Iterator<Item = Result<(PackageVersionKey, PackageVersionInfo), TypedStoreError>> + '_,
2183 TypedStoreError,
2184 > {
2185 self.tables.package_versions_iter(original_id, cursor)
2186 }
2187
2188 pub fn get_highest_indexed_checkpoint_seq_number(
2189 &self,
2190 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
2191 self.tables.watermark.get(&Watermark::Indexed)
2192 }
2193
2194 fn ensure_ledger_history_enabled(&self) -> Result<(), TypedStoreError> {
2195 if self.ledger_history_enabled {
2196 Ok(())
2197 } else {
2198 Err(TypedStoreError::SerializationError(
2199 "ledger history indexing is disabled".to_owned(),
2200 ))
2201 }
2202 }
2203
2204 pub fn ledger_tx_seq_digest(
2205 &self,
2206 tx_seq: u64,
2207 ) -> Result<Option<LedgerTxSeqDigest>, TypedStoreError> {
2208 self.ensure_ledger_history_enabled()?;
2209 Ok(self
2210 .tables
2211 .tx_seq_digest
2212 .get(&tx_seq)?
2213 .map(|info| ledger_tx_seq_digest(tx_seq, info)))
2214 }
2215
2216 pub fn ledger_tx_seq_digest_multi_get(
2217 &self,
2218 tx_seqs: &[u64],
2219 ) -> Result<Vec<Option<LedgerTxSeqDigest>>, TypedStoreError> {
2220 self.ensure_ledger_history_enabled()?;
2221 let rows = self
2222 .tables
2223 .tx_seq_digest
2224 .multi_get(tx_seqs)?
2225 .into_iter()
2226 .zip_debug_eq(tx_seqs.iter().copied())
2227 .map(|(info, tx_seq)| info.map(|info| ledger_tx_seq_digest(tx_seq, info)))
2228 .collect();
2229 Ok(rows)
2230 }
2231
2232 pub fn ledger_tx_seq_digest_iter(
2233 &self,
2234 start: u64,
2235 end_exclusive: u64,
2236 descending: bool,
2237 ) -> Result<LedgerTxSeqDigestIterator<'_>, TypedStoreError> {
2238 self.ensure_ledger_history_enabled()?;
2239 if start >= end_exclusive {
2240 return Ok(Box::new(std::iter::empty()));
2241 }
2242
2243 let iter = if descending {
2244 let upper = end_exclusive - 1;
2245 self.tables
2246 .tx_seq_digest
2247 .reversed_safe_iter_with_bounds(Some(start), Some(upper))?
2248 } else {
2249 self.tables
2250 .tx_seq_digest
2251 .safe_iter_with_bounds(Some(start), Some(end_exclusive))
2252 };
2253
2254 Ok(Box::new(iter.map(|result| {
2255 result.map(|(tx_seq, info)| ledger_tx_seq_digest(tx_seq, info))
2256 })))
2257 }
2258
2259 pub fn transaction_bitmap_bucket_iter(
2260 &self,
2261 dimension_key: Vec<u8>,
2262 start_bucket: u64,
2263 end_bucket_exclusive: u64,
2264 descending: bool,
2265 ) -> Result<LedgerBitmapBucketIterator<'_>, TypedStoreError> {
2266 self.ensure_ledger_history_enabled()?;
2267 Self::bitmap_bucket_iter(
2268 &self.tables.transaction_bitmap,
2269 dimension_key,
2270 start_bucket,
2271 end_bucket_exclusive,
2272 descending,
2273 )
2274 }
2275
2276 pub fn event_bitmap_bucket_iter(
2277 &self,
2278 dimension_key: Vec<u8>,
2279 start_bucket: u64,
2280 end_bucket_exclusive: u64,
2281 descending: bool,
2282 ) -> Result<LedgerBitmapBucketIterator<'_>, TypedStoreError> {
2283 self.ensure_ledger_history_enabled()?;
2284 Self::bitmap_bucket_iter(
2285 &self.tables.event_bitmap,
2286 dimension_key,
2287 start_bucket,
2288 end_bucket_exclusive,
2289 descending,
2290 )
2291 }
2292
2293 fn bitmap_bucket_iter(
2294 table: &DBMap<BitmapIndexKey, BitmapBlob>,
2295 dimension_key: Vec<u8>,
2296 start_bucket: u64,
2297 end_bucket_exclusive: u64,
2298 descending: bool,
2299 ) -> Result<LedgerBitmapBucketIterator<'_>, TypedStoreError> {
2300 if start_bucket >= end_bucket_exclusive {
2301 return Ok(Box::new(std::iter::empty()));
2302 }
2303
2304 let lower = BitmapIndexKey {
2305 dimension_key: dimension_key.clone(),
2306 bucket_id: start_bucket,
2307 };
2308 let upper_exclusive = BitmapIndexKey {
2309 dimension_key,
2310 bucket_id: end_bucket_exclusive,
2311 };
2312 let upper_inclusive = BitmapIndexKey {
2313 dimension_key: upper_exclusive.dimension_key.clone(),
2314 bucket_id: end_bucket_exclusive - 1,
2315 };
2316
2317 let iter: Box<
2318 dyn Iterator<Item = Result<(BitmapIndexKey, BitmapBlob), TypedStoreError>> + '_,
2319 > = if descending {
2320 table.reversed_safe_iter_with_bounds(Some(lower), Some(upper_inclusive))?
2321 } else {
2322 table.safe_iter_with_bounds(Some(lower), Some(upper_exclusive))
2323 };
2324
2325 Ok(Box::new(iter.map(|result| {
2326 result.and_then(|(key, blob)| decode_ledger_bitmap_bucket(key, blob))
2327 })))
2328 }
2329}
2330
2331fn tx_removed_objects_pre_version<'a>(
2336 tx: &'a ExecutedTransaction,
2337 object_set: &'a ObjectSet,
2338) -> impl Iterator<Item = &'a Object> + 'a {
2339 tx.effects
2340 .object_changes()
2341 .into_iter()
2342 .filter_map(
2343 move |change| match (change.input_version, change.output_version) {
2344 (Some(input_version), None) => object_set.get(&ObjectKey(change.id, input_version)),
2345 _ => None,
2346 },
2347 )
2348}
2349
2350fn tx_changed_objects<'a>(
2354 tx: &'a ExecutedTransaction,
2355 object_set: &'a ObjectSet,
2356) -> impl Iterator<Item = (&'a Object, Option<&'a Object>)> + 'a {
2357 tx.effects
2358 .object_changes()
2359 .into_iter()
2360 .filter_map(move |change| {
2361 let output = change
2362 .output_version
2363 .and_then(|v| object_set.get(&ObjectKey(change.id, v)))?;
2364 let input = change
2365 .input_version
2366 .and_then(|v| object_set.get(&ObjectKey(change.id, v)));
2367 Some((output, input))
2368 })
2369}
2370
2371fn should_index_dynamic_field(object: &Object) -> bool {
2372 object
2380 .data
2381 .try_as_move()
2382 .is_some_and(|move_object| move_object.type_().is_dynamic_field())
2383}
2384
2385fn try_create_coin_index_info(object: &Object) -> Option<(CoinIndexKey, CoinIndexInfo)> {
2386 use sui_types::coin::CoinMetadata;
2387 use sui_types::coin::RegulatedCoinMetadata;
2388 use sui_types::coin::TreasuryCap;
2389
2390 let object_type = object.type_().and_then(MoveObjectType::other)?;
2391
2392 if let Some(coin_type) = CoinMetadata::is_coin_metadata_with_coin_type(object_type).cloned() {
2393 return Some((
2394 CoinIndexKey { coin_type },
2395 CoinIndexInfo {
2396 coin_metadata_object_id: Some(object.id()),
2397 treasury_object_id: None,
2398 regulated_coin_metadata_object_id: None,
2399 },
2400 ));
2401 }
2402
2403 if let Some(coin_type) = TreasuryCap::is_treasury_with_coin_type(object_type).cloned() {
2404 return Some((
2405 CoinIndexKey { coin_type },
2406 CoinIndexInfo {
2407 coin_metadata_object_id: None,
2408 treasury_object_id: Some(object.id()),
2409 regulated_coin_metadata_object_id: None,
2410 },
2411 ));
2412 }
2413
2414 if let Some(coin_type) =
2415 RegulatedCoinMetadata::is_regulated_coin_metadata_with_coin_type(object_type).cloned()
2416 {
2417 return Some((
2418 CoinIndexKey { coin_type },
2419 CoinIndexInfo {
2420 coin_metadata_object_id: None,
2421 treasury_object_id: None,
2422 regulated_coin_metadata_object_id: Some(object.id()),
2423 },
2424 ));
2425 }
2426
2427 None
2428}
2429
2430struct RpcParLiveObjectSetIndexer<'a> {
2431 tables: &'a IndexStoreTables,
2432 coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
2433 batch_size_limit: usize,
2434}
2435
2436struct RpcLiveObjectIndexer<'a> {
2437 tables: &'a IndexStoreTables,
2438 batch: typed_store::rocks::DBBatch,
2439 coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
2440 balance_changes: HashMap<BalanceKey, BalanceIndexInfo>,
2441 batch_size_limit: usize,
2442}
2443
2444impl<'a> ParMakeLiveObjectIndexer for RpcParLiveObjectSetIndexer<'a> {
2445 type ObjectIndexer = RpcLiveObjectIndexer<'a>;
2446
2447 fn make_live_object_indexer(&self) -> Self::ObjectIndexer {
2448 RpcLiveObjectIndexer {
2449 tables: self.tables,
2450 batch: self.tables.owner.batch(),
2451 coin_index: self.coin_index,
2452 balance_changes: HashMap::new(),
2453 batch_size_limit: self.batch_size_limit,
2454 }
2455 }
2456}
2457
2458impl LiveObjectIndexer for RpcLiveObjectIndexer<'_> {
2459 fn index_object(&mut self, object: Object) -> Result<(), StorageError> {
2460 match object.owner {
2461 Owner::AddressOwner(owner) | Owner::ConsensusAddressOwner { owner, .. } => {
2463 let owner_key = OwnerIndexKey::from_object(&object);
2464 let owner_info = OwnerIndexInfo::new(&object);
2465 self.batch
2466 .insert_batch(&self.tables.owner, [(owner_key, owner_info)])?;
2467
2468 if let Some((coin_type, value)) = get_balance_and_type_if_coin(&object)? {
2469 let balance_key = BalanceKey { owner, coin_type };
2470 let balance_info = BalanceIndexInfo {
2471 coin_balance_delta: value.into(),
2472 address_balance_delta: 0,
2473 };
2474 self.balance_changes
2475 .entry(balance_key)
2476 .or_default()
2477 .merge_delta(&balance_info);
2478
2479 if self.balance_changes.len() >= BALANCE_FLUSH_THRESHOLD {
2480 self.batch.partial_merge_batch(
2481 &self.tables.balance,
2482 std::mem::take(&mut self.balance_changes),
2483 )?;
2484 }
2485 }
2486 }
2487
2488 Owner::ObjectOwner(parent) => {
2490 if should_index_dynamic_field(&object) {
2491 let field_key = DynamicFieldKey::new(parent, object.id());
2492 self.batch
2493 .insert_batch(&self.tables.dynamic_field, [(field_key, ())])?;
2494 }
2495
2496 if parent == SUI_ACCUMULATOR_ROOT_OBJECT_ID.into()
2498 && let Some((owner, coin_type, balance)) = get_address_balance_info(&object)
2499 {
2500 let balance_key = BalanceKey { owner, coin_type };
2501 let balance_info = BalanceIndexInfo {
2502 coin_balance_delta: 0,
2503 address_balance_delta: balance,
2504 };
2505 self.balance_changes
2506 .entry(balance_key)
2507 .or_default()
2508 .merge_delta(&balance_info);
2509
2510 if self.balance_changes.len() >= BALANCE_FLUSH_THRESHOLD {
2511 self.batch.partial_merge_batch(
2512 &self.tables.balance,
2513 std::mem::take(&mut self.balance_changes),
2514 )?;
2515 }
2516 }
2517 }
2518
2519 Owner::Shared { .. } | Owner::Immutable => {}
2520
2521 Owner::Party { .. } => {
2522 todo!("Party WIP");
2524 }
2526 }
2527
2528 if let Some((key, value)) = try_create_coin_index_info(&object) {
2530 use std::collections::hash_map::Entry;
2531
2532 match self.coin_index.lock().unwrap().entry(key) {
2533 Entry::Occupied(mut o) => {
2534 o.get_mut().merge(value);
2535 }
2536 Entry::Vacant(v) => {
2537 v.insert(value);
2538 }
2539 }
2540 }
2541
2542 if let Some((key, info)) = IndexStoreTables::extract_version_if_package(&object) {
2543 self.batch
2544 .insert_batch(&self.tables.package_version, [(key, info)])?;
2545 }
2546
2547 if self.batch.size_in_bytes() >= self.batch_size_limit {
2550 std::mem::replace(&mut self.batch, self.tables.owner.batch())
2551 .write_opt(bulk_ingestion_write_options())?;
2552 }
2553
2554 Ok(())
2555 }
2556
2557 fn finish(mut self) -> Result<(), StorageError> {
2558 self.batch.partial_merge_batch(
2559 &self.tables.balance,
2560 std::mem::take(&mut self.balance_changes),
2561 )?;
2562 self.batch.write_opt(bulk_ingestion_write_options())?;
2563 Ok(())
2564 }
2565}
2566
2567fn full_checkpoint_for_backfill(
2576 authority_store: &AuthorityStore,
2577 checkpoint_store: &CheckpointStore,
2578 checkpoint: u64,
2579) -> Result<Option<Checkpoint>, StorageError> {
2580 let Some(summary) = checkpoint_store.get_checkpoint_by_sequence_number(checkpoint)? else {
2581 return Ok(None);
2582 };
2583 let Some(contents) = checkpoint_store.get_checkpoint_contents(&summary.content_digest)? else {
2584 return Ok(None);
2585 };
2586
2587 let (transactions, object_set) = load_executed_transactions(authority_store, &contents, true)?;
2591
2592 Ok(Some(Checkpoint {
2593 summary: summary.into(),
2594 contents,
2595 transactions,
2596 object_set,
2597 }))
2598}
2599
2600fn sparse_checkpoint_for_epoch_backfill(
2601 authority_store: &AuthorityStore,
2602 checkpoint_store: &CheckpointStore,
2603 checkpoint: u64,
2604) -> Result<Option<Checkpoint>, StorageError> {
2605 let summary = checkpoint_store
2606 .get_checkpoint_by_sequence_number(checkpoint)?
2607 .ok_or_else(|| StorageError::missing(format!("missing checkpoint {checkpoint}")))?;
2608
2609 if summary.end_of_epoch_data.is_none() && summary.sequence_number != 0 {
2611 return Ok(None);
2612 }
2613
2614 let contents = checkpoint_store
2615 .get_checkpoint_contents(&summary.content_digest)?
2616 .ok_or_else(|| StorageError::missing(format!("missing checkpoint {checkpoint}")))?;
2617
2618 let (transactions, object_set) = load_executed_transactions(authority_store, &contents, false)?;
2619
2620 Ok(Some(Checkpoint {
2621 summary: summary.into(),
2622 contents,
2623 transactions,
2624 object_set,
2625 }))
2626}
2627
2628fn load_executed_transactions(
2634 authority_store: &AuthorityStore,
2635 contents: &sui_types::messages_checkpoint::CheckpointContents,
2636 load_events: bool,
2637) -> Result<(Vec<ExecutedTransaction>, ObjectSet), StorageError> {
2638 let transaction_digests = contents
2639 .iter()
2640 .map(|execution_digests| execution_digests.transaction)
2641 .collect::<Vec<_>>();
2642 let transactions = authority_store
2643 .multi_get_transaction_blocks(&transaction_digests)?
2644 .into_iter()
2645 .map(|maybe_transaction| {
2646 maybe_transaction.ok_or_else(|| StorageError::custom("missing transaction"))
2647 })
2648 .collect::<Result<Vec<_>, _>>()?;
2649
2650 let effects = authority_store
2651 .multi_get_executed_effects(&transaction_digests)?
2652 .into_iter()
2653 .map(|maybe_effects| maybe_effects.ok_or_else(|| StorageError::custom("missing effects")))
2654 .collect::<Result<Vec<_>, _>>()?;
2655
2656 let events = if load_events {
2657 authority_store
2658 .multi_get_events(&transaction_digests)
2659 .map_err(|e| StorageError::custom(e.to_string()))?
2660 } else {
2661 vec![None; transaction_digests.len()]
2662 };
2663
2664 let mut full_transactions = Vec::with_capacity(transactions.len());
2665 let mut object_set = ObjectSet::default();
2666 for ((tx, fx), ev) in transactions
2667 .into_iter()
2668 .zip_debug_eq(effects)
2669 .zip_debug_eq(events)
2670 {
2671 let input_objects =
2672 sui_types::storage::get_transaction_input_objects(authority_store, &fx)?;
2673 let output_objects =
2674 sui_types::storage::get_transaction_output_objects(authority_store, &fx)?;
2675
2676 for obj in input_objects.into_iter().chain(output_objects.into_iter()) {
2677 object_set.insert(obj);
2678 }
2679
2680 let sender_signed = sui_types::transaction::Transaction::from(tx)
2681 .into_data()
2682 .into_inner();
2683 full_transactions.push(ExecutedTransaction {
2684 transaction: sender_signed.intent_message.value,
2685 signatures: sender_signed.tx_signatures,
2686 effects: fx,
2687 events: ev,
2688 unchanged_loaded_runtime_objects: Vec::new(),
2694 });
2695 }
2696
2697 Ok((full_transactions, object_set))
2698}
2699
2700fn get_balance_and_type_if_coin(object: &Object) -> Result<Option<(StructTag, u64)>, StorageError> {
2701 match Coin::extract_balance_if_coin(object) {
2702 Ok(Some((TypeTag::Struct(struct_tag), value))) => Ok(Some((*struct_tag, value))),
2703 Ok(Some(_)) => {
2704 debug!("Coin object {} has non-struct type tag", object.id());
2705 Ok(None)
2706 }
2707 Ok(None) => {
2708 Ok(None)
2710 }
2711 Err(e) => {
2712 Err(StorageError::custom(format!(
2714 "Failed to deserialize coin object {}: {}",
2715 object.id(),
2716 e
2717 )))
2718 }
2719 }
2720}
2721
2722fn get_address_balance_info(object: &Object) -> Option<(SuiAddress, StructTag, i128)> {
2723 let move_object = object.data.try_as_move()?;
2724
2725 let TypeTag::Struct(coin_type) = move_object.type_().balance_accumulator_field_type_maybe()?
2726 else {
2727 return None;
2728 };
2729
2730 let (key, value): (
2731 sui_types::accumulator_root::AccumulatorKey,
2732 sui_types::accumulator_root::AccumulatorValue,
2733 ) = move_object.try_into().ok()?;
2734
2735 let balance = value.as_u128()? as i128;
2736 if balance <= 0 {
2737 return None;
2738 }
2739
2740 Some((key.owner, *coin_type, balance))
2741}
2742
2743#[cfg(test)]
2744mod tests {
2745 use super::*;
2746 use std::sync::atomic::AtomicU64;
2747
2748 #[tokio::test]
2757 async fn open_with_index_options_overrides_every_cf() {
2758 let temp_dir = tempfile::tempdir().unwrap();
2759 let db_path = temp_dir.path().join("rpc-index");
2760
2761 let _tables =
2762 IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
2763
2764 let per_cf = parse_cf_options(&db_path);
2770 assert!(
2771 !per_cf.is_empty(),
2772 "expected at least one CFOptions section in OPTIONS file"
2773 );
2774 for (cf_name, opts) in &per_cf {
2775 if cf_name == "default" {
2776 continue;
2777 }
2778 for (key, expected) in [
2779 ("level0_slowdown_writes_trigger", "512"),
2780 ("level0_stop_writes_trigger", "1024"),
2781 ("soft_pending_compaction_bytes_limit", "0"),
2782 ("hard_pending_compaction_bytes_limit", "0"),
2783 ] {
2784 let actual = opts
2785 .get(key)
2786 .unwrap_or_else(|| panic!("cf `{cf_name}` missing `{key}`"));
2787 assert_eq!(
2788 actual, expected,
2789 "cf `{cf_name}` has `{key}={actual}`, expected `{expected}` — \
2790 the typed-store override map likely doesn't cover this CF"
2791 );
2792 }
2793 }
2794 }
2795
2796 #[test]
2797 fn checked_encode_event_seq_rejects_unrepresentable_values() {
2798 assert!(
2799 checked_encode_event_seq(0, MAX_EVENTS_PER_TX).is_err(),
2800 "event_idx at MAX_EVENTS_PER_TX must be rejected"
2801 );
2802 assert!(
2803 checked_encode_event_seq(MAX_TX_SEQ + 1, 0).is_err(),
2804 "tx_seq past MAX_TX_SEQ must be rejected"
2805 );
2806 }
2807
2808 #[test]
2814 fn bitmap_filter_keeps_bucket_with_live_tx_above_zero_watermark() {
2815 let watermark = Arc::new(AtomicU64::new(1));
2816 let filter = BitmapCompactionFilter::new(watermark.clone(), BitmapKind::Transaction);
2817
2818 let key = typed_store::be_fix_int_ser(&BitmapIndexKey {
2819 dimension_key: vec![1, 2, 3],
2820 bucket_id: 0,
2821 });
2822 assert!(matches!(filter.filter(&key, &[]), Decision::Keep));
2823
2824 watermark.store(TX_BUCKET_SIZE, Ordering::Relaxed);
2829 assert!(matches!(filter.filter(&key, &[]), Decision::Remove));
2830 }
2831
2832 #[test]
2836 fn bitmap_filter_event_bucket_uses_event_seq_lo() {
2837 let watermark = Arc::new(AtomicU64::new(0));
2838 let filter = BitmapCompactionFilter::new(watermark.clone(), BitmapKind::Event);
2839
2840 let key = typed_store::be_fix_int_ser(&BitmapIndexKey {
2841 dimension_key: vec![5],
2842 bucket_id: 0,
2843 });
2844 assert!(matches!(filter.filter(&key, &[]), Decision::Keep));
2846
2847 let txs_per_bucket = EVENT_BUCKET_SIZE / MAX_EVENTS_PER_TX as u64;
2851 watermark.store(txs_per_bucket, Ordering::Relaxed);
2852 assert!(matches!(filter.filter(&key, &[]), Decision::Remove));
2853 }
2854
2855 #[test]
2860 fn bitmap_filter_keeps_malformed_keys() {
2861 let watermark = Arc::new(AtomicU64::new(u64::MAX));
2862 let filter = BitmapCompactionFilter::new(watermark.clone(), BitmapKind::Transaction);
2863
2864 assert!(matches!(filter.filter(b"short", &[]), Decision::Keep));
2865 assert!(matches!(filter.filter(&[], &[]), Decision::Keep));
2866
2867 let huge = typed_store::be_fix_int_ser(&BitmapIndexKey {
2870 dimension_key: vec![],
2871 bucket_id: u64::MAX - 1,
2872 });
2873 assert!(huge.len() >= 8);
2874 assert!(matches!(filter.filter(&huge, &[]), Decision::Keep));
2875 }
2876
2877 #[test]
2882 fn bitmap_filter_decodes_typed_store_keys() {
2883 let watermark = Arc::new(AtomicU64::new(0));
2884 let filter = BitmapCompactionFilter::new(watermark.clone(), BitmapKind::Transaction);
2885
2886 let bucket_id = 7u64;
2889 let key = typed_store::be_fix_int_ser(&BitmapIndexKey {
2890 dimension_key: vec![0xAA, 0xBB, 0xCC],
2891 bucket_id,
2892 });
2893 assert!(matches!(filter.filter(&key, &[]), Decision::Keep));
2895
2896 watermark.store((bucket_id + 1) * TX_BUCKET_SIZE, Ordering::Relaxed);
2898 assert!(matches!(filter.filter(&key, &[]), Decision::Remove));
2899 }
2900
2901 #[test]
2904 fn bitmap_merge_operator_unions_operands() {
2905 let mut bm_a = RoaringBitmap::new();
2906 bm_a.insert(1);
2907 bm_a.insert(5);
2908 let blob_a = encode_bitmap_blob(&bm_a);
2909
2910 let mut bm_b = RoaringBitmap::new();
2911 bm_b.insert(5);
2912 bm_b.insert(7);
2913 let blob_b = encode_bitmap_blob(&bm_b);
2914
2915 let mut bm_c = RoaringBitmap::new();
2916 bm_c.insert(100);
2917 let blob_c = encode_bitmap_blob(&bm_c);
2918
2919 let decoded_a = decode_bitmap_blob(&blob_a).expect("decode a");
2929 let decoded_b = decode_bitmap_blob(&blob_b).expect("decode b");
2930 let decoded_c = decode_bitmap_blob(&blob_c).expect("decode c");
2931 let unioned = decoded_a | decoded_b | decoded_c;
2932 let mut expected = RoaringBitmap::new();
2933 for b in [1, 5, 7, 100] {
2934 expected.insert(b);
2935 }
2936 assert_eq!(unioned, expected);
2937 }
2938
2939 #[tokio::test]
2945 async fn bitmap_merge_operator_unions_across_writes() {
2946 let temp_dir = tempfile::tempdir().unwrap();
2947 let db_path = temp_dir.path().join("rpc-index");
2948 let tables =
2949 IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
2950
2951 let key = BitmapIndexKey {
2952 dimension_key: vec![1, 2, 3],
2953 bucket_id: 0,
2954 };
2955
2956 for bits in [vec![1u32, 2], vec![3, 4], vec![5, 6, 7]] {
2960 let mut bm = RoaringBitmap::new();
2961 for b in bits {
2962 bm.insert(b);
2963 }
2964 let mut batch = tables.transaction_bitmap.batch();
2965 batch
2966 .partial_merge_batch(
2967 &tables.transaction_bitmap,
2968 [(key.clone(), BitmapBlob::from(bm))],
2969 )
2970 .unwrap();
2971 batch.write().unwrap();
2972 }
2973
2974 let blob = tables
2975 .transaction_bitmap
2976 .get(&key)
2977 .unwrap()
2978 .expect("merged row should exist");
2979 let bm = RoaringBitmap::deserialize_from(&blob.0[..]).unwrap();
2980 let got: Vec<u32> = bm.iter().collect();
2981 assert_eq!(got, vec![1, 2, 3, 4, 5, 6, 7]);
2982 }
2983
2984 #[tokio::test]
2988 async fn bitmap_filter_removes_whole_bucket_after_compaction() {
2989 let temp_dir = tempfile::tempdir().unwrap();
2990 let db_path = temp_dir.path().join("rpc-index");
2991
2992 let watermark = Arc::new(AtomicU64::new(0));
2993 let index_options = IndexStoreOptions {
2994 pruning_tx_seq_exclusive: watermark.clone(),
2995 };
2996 let tables = IndexStoreTables::open_with_index_options(&db_path, index_options);
2997
2998 let dim_key = vec![0x01, 0xAA];
2999 let k0 = BitmapIndexKey {
3000 dimension_key: dim_key.clone(),
3001 bucket_id: 0,
3002 };
3003 let k1 = BitmapIndexKey {
3004 dimension_key: dim_key.clone(),
3005 bucket_id: 1,
3006 };
3007 let mut bm = RoaringBitmap::new();
3008 bm.insert(0);
3009
3010 let blob = BitmapBlob::from(bm);
3014 let mut batch = tables.transaction_bitmap.batch();
3015 batch
3016 .insert_batch(
3017 &tables.transaction_bitmap,
3018 [(k0.clone(), blob.clone()), (k1.clone(), blob)],
3019 )
3020 .unwrap();
3021 batch.write().unwrap();
3022 tables.transaction_bitmap.flush().unwrap();
3023
3024 assert!(tables.transaction_bitmap.get(&k0).unwrap().is_some());
3027 assert!(tables.transaction_bitmap.get(&k1).unwrap().is_some());
3028
3029 watermark.store(TX_BUCKET_SIZE, Ordering::Relaxed);
3031
3032 tables
3036 .transaction_bitmap
3037 .compact_range_raw("transaction_bitmap", vec![], vec![0xFF; 128])
3038 .unwrap();
3039
3040 assert!(
3041 tables.transaction_bitmap.get(&k0).unwrap().is_none(),
3042 "bucket 0 should have been removed by the compaction filter"
3043 );
3044 assert!(
3045 tables.transaction_bitmap.get(&k1).unwrap().is_some(),
3046 "bucket 1 should still be present (only bucket 0 was below the watermark)"
3047 );
3048 }
3049
3050 #[test]
3054 fn backfill_missing_cp_in_retained_range_is_error() {
3055 let checkpoint_range = 5u64..=10u64;
3059 let seq = 7u64;
3060 let loaded: Result<Option<Checkpoint>, StorageError> = Ok(None);
3061
3062 let result: Result<(), StorageError> = (|| {
3063 let checkpoint = loaded?.ok_or_else(|| {
3064 StorageError::missing(format!(
3065 "ledger history backfill: checkpoint {seq} is missing from local storage \
3066 but falls inside the retained backfill range {checkpoint_range:?}"
3067 ))
3068 })?;
3069 let _ = checkpoint;
3070 Ok(())
3071 })();
3072
3073 let err = result.expect_err("missing cp must error out, not silently succeed");
3074 let msg = err.to_string();
3075 assert!(
3076 msg.contains(&format!("checkpoint {seq}")),
3077 "error should name the missing cp: {msg}"
3078 );
3079 assert!(
3080 msg.contains("5..=10"),
3081 "error should name the retained range: {msg}"
3082 );
3083 }
3084
3085 #[test]
3090 fn legacy_watermark_bytes_still_deserialize() {
3091 let indexed_bytes = bcs::to_bytes(&Watermark::Indexed).unwrap();
3093 let pruned_bytes = bcs::to_bytes(&Watermark::Pruned).unwrap();
3094 assert_eq!(
3095 indexed_bytes,
3096 vec![0],
3097 "Watermark::Indexed must encode as 0"
3098 );
3099 assert_eq!(pruned_bytes, vec![1], "Watermark::Pruned must encode as 1");
3100
3101 let decoded_indexed: Watermark = bcs::from_bytes(&[0]).unwrap();
3105 let decoded_pruned: Watermark = bcs::from_bytes(&[1]).unwrap();
3106 assert!(matches!(decoded_indexed, Watermark::Indexed));
3107 assert!(matches!(decoded_pruned, Watermark::Pruned));
3108 }
3109
3110 #[tokio::test]
3115 async fn settings_cf_round_trips_and_drives_reinit() {
3116 let temp_dir = tempfile::tempdir().unwrap();
3117 let db_path = temp_dir.path().join("rpc-index");
3118 let tables =
3119 IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3120
3121 tables
3123 .meta
3124 .insert(
3125 &(),
3126 &MetadataInfo {
3127 version: CURRENT_DB_VERSION,
3128 },
3129 )
3130 .unwrap();
3131 tables
3132 .settings
3133 .insert(
3134 &(),
3135 &encode_settings(&IndexSettings {
3136 ledger_history_indexing: false,
3137 }),
3138 )
3139 .unwrap();
3140 assert!(!tables.persisted_ledger_history_indexing());
3141
3142 let checkpoint_store = CheckpointStore::new_for_tests();
3145
3146 assert!(!tables.needs_to_do_initialization(&checkpoint_store, false));
3148 assert!(tables.needs_to_do_initialization(&checkpoint_store, true));
3150
3151 tables
3153 .settings
3154 .insert(
3155 &(),
3156 &encode_settings(&IndexSettings {
3157 ledger_history_indexing: true,
3158 }),
3159 )
3160 .unwrap();
3161 assert!(tables.persisted_ledger_history_indexing());
3162 assert!(!tables.needs_to_do_initialization(&checkpoint_store, true));
3164 assert!(!tables.needs_to_do_initialization(&checkpoint_store, false));
3167 }
3168
3169 #[tokio::test]
3173 async fn disable_ledger_history_indexing_drops_history_cfs_in_place() {
3174 let temp_dir = tempfile::tempdir().unwrap();
3175 let db_path = temp_dir.path().join("rpc-index");
3176 let tables =
3177 IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3178
3179 tables
3181 .settings
3182 .insert(
3183 &(),
3184 &encode_settings(&IndexSettings {
3185 ledger_history_indexing: true,
3186 }),
3187 )
3188 .unwrap();
3189 for tx_seq in 0..5u64 {
3190 tables
3191 .tx_seq_digest
3192 .insert(
3193 &tx_seq,
3194 &TxSeqDigestInfo {
3195 digest: TransactionDigest::new([0; 32]),
3196 event_count: 0,
3197 tx_offset: 0,
3198 checkpoint_number: 0,
3199 },
3200 )
3201 .unwrap();
3202 }
3203 for bucket_id in 0..5u64 {
3204 let key = BitmapIndexKey {
3205 dimension_key: vec![1, 2, 3],
3206 bucket_id,
3207 };
3208 tables
3209 .transaction_bitmap
3210 .insert(&key, &BitmapBlob(vec![0xab]))
3211 .unwrap();
3212 tables
3213 .event_bitmap
3214 .insert(&key, &BitmapBlob(vec![0xcd]))
3215 .unwrap();
3216 }
3217 tables.watermark.insert(&Watermark::Indexed, &42).unwrap();
3219
3220 tables.disable_ledger_history_indexing().unwrap();
3221
3222 assert!(tables.tx_seq_digest.is_empty());
3224 assert!(tables.transaction_bitmap.is_empty());
3225 assert!(tables.event_bitmap.is_empty());
3226 assert_eq!(tables.first_tx_seq_digest_key().unwrap(), None);
3227
3228 assert!(!tables.persisted_ledger_history_indexing());
3230 assert_eq!(tables.watermark.get(&Watermark::Indexed).unwrap(), Some(42));
3231 }
3232
3233 #[tokio::test]
3236 async fn prune_maintains_ledger_history_state_when_active() {
3237 let temp_dir = tempfile::tempdir().unwrap();
3238 let db_path = temp_dir.path().join("rpc-index");
3239 let tables =
3240 IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3241
3242 let mut batch = tables.tx_seq_digest.batch();
3244 for tx_seq in 0..5u64 {
3245 batch
3246 .insert_batch(
3247 &tables.tx_seq_digest,
3248 [(
3249 tx_seq,
3250 TxSeqDigestInfo {
3251 digest: TransactionDigest::new([0; 32]),
3252 event_count: 0,
3253 tx_offset: 0,
3254 checkpoint_number: 0,
3255 },
3256 )],
3257 )
3258 .unwrap();
3259 }
3260 batch.write().unwrap();
3261
3262 let pruning_atomic = AtomicU64::new(0);
3265 tables
3266 .prune(1, 3, true, &pruning_atomic)
3267 .unwrap();
3268 assert_eq!(pruning_atomic.load(Ordering::Relaxed), 3);
3269
3270 assert_eq!(tables.watermark.get(&Watermark::Pruned).unwrap(), Some(1));
3271 for tx_seq in 0..3u64 {
3272 assert!(tables.tx_seq_digest.get(&tx_seq).unwrap().is_none());
3273 }
3274 for tx_seq in 3..5u64 {
3275 assert!(tables.tx_seq_digest.get(&tx_seq).unwrap().is_some());
3276 }
3277 assert_eq!(tables.first_tx_seq_digest_key().unwrap(), Some(3));
3278 }
3279
3280 #[tokio::test]
3282 async fn prune_skips_ledger_history_state_when_inactive() {
3283 let temp_dir = tempfile::tempdir().unwrap();
3284 let db_path = temp_dir.path().join("rpc-index");
3285 let tables =
3286 IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3287
3288 tables
3290 .tx_seq_digest
3291 .insert(
3292 &0u64,
3293 &TxSeqDigestInfo {
3294 digest: TransactionDigest::new([0; 32]),
3295 event_count: 0,
3296 tx_offset: 0,
3297 checkpoint_number: 0,
3298 },
3299 )
3300 .unwrap();
3301
3302 let pruning_atomic = AtomicU64::new(0);
3303 tables
3304 .prune(
3305 5,
3306 3,
3307 false,
3308 &pruning_atomic,
3309 )
3310 .unwrap();
3311 assert_eq!(
3312 pruning_atomic.load(Ordering::Relaxed),
3313 0,
3314 "disabled prune must not advance the compaction-filter atomic"
3315 );
3316 assert_eq!(
3317 tables.watermark.get(&Watermark::Pruned).unwrap(),
3318 Some(5),
3319 "base pruning must still advance"
3320 );
3321 assert!(
3322 tables.tx_seq_digest.get(&0u64).unwrap().is_some(),
3323 "tx_seq_digest rows must remain untouched when inactive"
3324 );
3325 }
3326
3327 #[tokio::test]
3329 async fn index_checkpoint_gates_on_ledger_history_enabled() {
3330 use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
3331
3332 let temp_dir = tempfile::tempdir().unwrap();
3333 let db_path = temp_dir.path().join("rpc-index");
3334 let tables =
3335 IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3336
3337 let checkpoint = TestCheckpointBuilder::new(1)
3340 .start_transaction(1)
3341 .finish_transaction()
3342 .build_checkpoint();
3343
3344 let batch = tables
3346 .index_checkpoint(&checkpoint, false)
3347 .expect("index_checkpoint failed");
3348 batch.write().expect("batch write failed");
3349 assert_eq!(tables.tx_seq_digest.safe_iter().count(), 0);
3350 assert_eq!(tables.transaction_bitmap.safe_iter().count(), 0);
3351 assert_eq!(tables.event_bitmap.safe_iter().count(), 0);
3352
3353 let checkpoint2 = TestCheckpointBuilder::new(2)
3355 .start_transaction(1)
3356 .finish_transaction()
3357 .build_checkpoint();
3358 let batch = tables
3359 .index_checkpoint(&checkpoint2, true)
3360 .expect("index_checkpoint failed");
3361 batch.write().expect("batch write failed");
3362 assert!(
3363 tables.tx_seq_digest.safe_iter().count() > 0,
3364 "tx_seq_digest must have rows when ledger_history_enabled=true"
3365 );
3366 }
3367
3368 #[tokio::test]
3372 async fn index_checkpoint_records_within_checkpoint_tx_offset() {
3373 use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
3374
3375 let temp_dir = tempfile::tempdir().unwrap();
3376 let db_path = temp_dir.path().join("rpc-index");
3377 let tables =
3378 IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3379
3380 let checkpoint1 = TestCheckpointBuilder::new(1)
3383 .with_network_total_transactions(100)
3384 .start_transaction(0)
3385 .finish_transaction()
3386 .start_transaction(1)
3387 .finish_transaction()
3388 .start_transaction(2)
3389 .finish_transaction()
3390 .build_checkpoint();
3391 tables
3392 .index_checkpoint(&checkpoint1, true)
3393 .expect("index_checkpoint failed")
3394 .write()
3395 .expect("batch write failed");
3396
3397 let checkpoint2 = TestCheckpointBuilder::new(2)
3400 .with_network_total_transactions(103)
3401 .start_transaction(0)
3402 .finish_transaction()
3403 .start_transaction(1)
3404 .finish_transaction()
3405 .build_checkpoint();
3406 tables
3407 .index_checkpoint(&checkpoint2, true)
3408 .expect("index_checkpoint failed")
3409 .write()
3410 .expect("batch write failed");
3411
3412 let offset_by_tx_seq: std::collections::BTreeMap<u64, u32> = tables
3413 .tx_seq_digest
3414 .safe_iter()
3415 .map(|row| row.map(|(tx_seq, info)| (tx_seq, info.tx_offset)))
3416 .collect::<Result<_, _>>()
3417 .unwrap();
3418
3419 assert_eq!(
3420 offset_by_tx_seq,
3421 std::collections::BTreeMap::from([(100, 0), (101, 1), (102, 2), (103, 0), (104, 1),]),
3422 "tx_offset must be the within-checkpoint position, not the global tx_seq"
3423 );
3424 }
3425
3426 #[tokio::test]
3431 async fn prune_commits_deletes_and_watermark_atomically() {
3432 let temp_dir = tempfile::tempdir().unwrap();
3433 let db_path = temp_dir.path().join("rpc-index");
3434 let tables =
3435 IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3436
3437 let mut batch = tables.tx_seq_digest.batch();
3438 for tx_seq in 0..4u64 {
3439 batch
3440 .insert_batch(
3441 &tables.tx_seq_digest,
3442 [(
3443 tx_seq,
3444 TxSeqDigestInfo {
3445 digest: TransactionDigest::new([0; 32]),
3446 event_count: 0,
3447 tx_offset: 0,
3448 checkpoint_number: 0,
3449 },
3450 )],
3451 )
3452 .unwrap();
3453 }
3454 batch.write().unwrap();
3455
3456 let pruning_atomic = AtomicU64::new(0);
3457 tables
3458 .prune(1, 2, true, &pruning_atomic)
3459 .unwrap();
3460
3461 assert_eq!(tables.watermark.get(&Watermark::Pruned).unwrap(), Some(1));
3464 for tx_seq in 0..2u64 {
3465 assert!(tables.tx_seq_digest.get(&tx_seq).unwrap().is_none());
3466 }
3467 for tx_seq in 2..4u64 {
3468 assert!(tables.tx_seq_digest.get(&tx_seq).unwrap().is_some());
3469 }
3470 assert_eq!(tables.first_tx_seq_digest_key().unwrap(), Some(2));
3471 assert_eq!(pruning_atomic.load(Ordering::Relaxed), 2);
3472 }
3473
3474 #[tokio::test]
3479 async fn prune_idempotent_replay() {
3480 let temp_dir = tempfile::tempdir().unwrap();
3481 let db_path = temp_dir.path().join("rpc-index");
3482 let tables =
3483 IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3484
3485 let mut batch = tables.tx_seq_digest.batch();
3486 for tx_seq in 0..5u64 {
3487 batch
3488 .insert_batch(
3489 &tables.tx_seq_digest,
3490 [(
3491 tx_seq,
3492 TxSeqDigestInfo {
3493 digest: TransactionDigest::new([0; 32]),
3494 event_count: 0,
3495 tx_offset: 0,
3496 checkpoint_number: 0,
3497 },
3498 )],
3499 )
3500 .unwrap();
3501 }
3502 batch.write().unwrap();
3503
3504 let pruning_atomic = AtomicU64::new(0);
3505 tables.prune(1, 3, true, &pruning_atomic).unwrap();
3506 assert_eq!(pruning_atomic.load(Ordering::Relaxed), 3);
3507 tables.prune(1, 3, true, &pruning_atomic).unwrap();
3509 assert_eq!(
3510 pruning_atomic.load(Ordering::Relaxed),
3511 3,
3512 "replay must not move the atomic"
3513 );
3514 assert_eq!(tables.first_tx_seq_digest_key().unwrap(), Some(3));
3515 for tx_seq in 3..5u64 {
3516 assert!(tables.tx_seq_digest.get(&tx_seq).unwrap().is_some());
3517 }
3518 }
3519
3520 #[tokio::test]
3525 async fn prune_consecutive_ranges_advance_floor() {
3526 let temp_dir = tempfile::tempdir().unwrap();
3527 let db_path = temp_dir.path().join("rpc-index");
3528 let tables =
3529 IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3530
3531 let mut batch = tables.tx_seq_digest.batch();
3532 for tx_seq in 0..6u64 {
3533 batch
3534 .insert_batch(
3535 &tables.tx_seq_digest,
3536 [(
3537 tx_seq,
3538 TxSeqDigestInfo {
3539 digest: TransactionDigest::new([0; 32]),
3540 event_count: 0,
3541 tx_offset: 0,
3542 checkpoint_number: 0,
3543 },
3544 )],
3545 )
3546 .unwrap();
3547 }
3548 batch.write().unwrap();
3549
3550 let pruning_atomic = AtomicU64::new(0);
3551 tables.prune(1, 3, true, &pruning_atomic).unwrap();
3552 assert_eq!(pruning_atomic.load(Ordering::Relaxed), 3);
3553 assert_eq!(tables.first_tx_seq_digest_key().unwrap(), Some(3));
3554 tables.prune(2, 5, true, &pruning_atomic).unwrap();
3556 assert_eq!(pruning_atomic.load(Ordering::Relaxed), 5);
3557 for tx_seq in 0..5u64 {
3558 assert!(tables.tx_seq_digest.get(&tx_seq).unwrap().is_none());
3559 }
3560 assert_eq!(tables.first_tx_seq_digest_key().unwrap(), Some(5));
3561 }
3562
3563 #[tokio::test]
3565 async fn new_without_init_enables_ledger_history_for_db_with_ledger_history_setting() {
3566 let temp_dir = tempfile::tempdir().unwrap();
3567 let db_path = temp_dir.path().join("rpc-index");
3568
3569 {
3571 let tables =
3572 IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3573 tables
3574 .meta
3575 .insert(
3576 &(),
3577 &MetadataInfo {
3578 version: CURRENT_DB_VERSION,
3579 },
3580 )
3581 .unwrap();
3582 tables
3583 .settings
3584 .insert(
3585 &(),
3586 &encode_settings(&IndexSettings {
3587 ledger_history_indexing: true,
3588 }),
3589 )
3590 .unwrap();
3591 let mut batch = tables.tx_seq_digest.batch();
3592 for tx_seq in 100..105u64 {
3593 batch
3594 .insert_batch(
3595 &tables.tx_seq_digest,
3596 [(
3597 tx_seq,
3598 TxSeqDigestInfo {
3599 digest: TransactionDigest::new([0; 32]),
3600 event_count: 0,
3601 tx_offset: 0,
3602 checkpoint_number: 0,
3603 },
3604 )],
3605 )
3606 .unwrap();
3607 }
3608 batch.write().unwrap();
3609 }
3610
3611 let store = RpcIndexStore::new_without_init(temp_dir.path());
3612 assert!(
3613 store.ledger_history_enabled,
3614 "new_without_init on a ledger-history DB must enable ledger history indexing"
3615 );
3616 let atomic = &store.ledger_history_pruning_watermark;
3617 assert_eq!(
3618 atomic.load(Ordering::Relaxed),
3619 100,
3620 "pruning atomic must be hydrated from the first tx_seq_digest key"
3621 );
3622
3623 store.prune(7, 103).unwrap();
3625
3626 for tx_seq in 100..103u64 {
3627 assert!(store.tables.tx_seq_digest.get(&tx_seq).unwrap().is_none());
3628 }
3629 for tx_seq in 103..105u64 {
3630 assert!(store.tables.tx_seq_digest.get(&tx_seq).unwrap().is_some());
3631 }
3632 assert_eq!(
3633 store.tables.first_tx_seq_digest_key().unwrap(),
3634 Some(103),
3635 "prune must advance the derived tx-seq floor"
3636 );
3637 assert_eq!(
3638 atomic.load(Ordering::Relaxed),
3639 103,
3640 "prune must advance the compaction-filter atomic"
3641 );
3642 }
3643
3644 #[tokio::test]
3646 async fn new_without_init_disables_ledger_history_for_db_without_ledger_history_setting() {
3647 let temp_dir = tempfile::tempdir().unwrap();
3649 let store = RpcIndexStore::new_without_init(temp_dir.path());
3650 assert!(
3651 !store.ledger_history_enabled,
3652 "new_without_init on a fresh DB must leave ledger history indexing disabled"
3653 );
3654
3655 store.prune(5, 0).unwrap();
3656 assert_eq!(
3657 store.tables.watermark.get(&Watermark::Pruned).unwrap(),
3658 Some(5)
3659 );
3660 assert_eq!(
3661 store.tables.first_tx_seq_digest_key().unwrap(),
3662 None,
3663 "disabled ledger history indexing must leave tx_seq_digest untouched"
3664 );
3665
3666 let temp_dir = tempfile::tempdir().unwrap();
3669 let db_path = temp_dir.path().join("rpc-index");
3670 {
3671 let tables =
3672 IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3673 tables
3674 .meta
3675 .insert(
3676 &(),
3677 &MetadataInfo {
3678 version: CURRENT_DB_VERSION,
3679 },
3680 )
3681 .unwrap();
3682 }
3683 let store = RpcIndexStore::new_without_init(temp_dir.path());
3684 assert!(
3685 !store.ledger_history_enabled,
3686 "new_without_init on a DB with no settings row must leave ledger history indexing disabled"
3687 );
3688 }
3689
3690 fn parse_cf_options(db_path: &Path) -> HashMap<String, HashMap<String, String>> {
3694 let mut options_file: Option<(u64, PathBuf)> = None;
3695 for entry in std::fs::read_dir(db_path).expect("read_dir failed") {
3696 let entry = entry.unwrap();
3697 let name = entry.file_name().to_string_lossy().into_owned();
3698 let Some(rest) = name.strip_prefix("OPTIONS-") else {
3699 continue;
3700 };
3701 let Ok(seq) = rest.parse::<u64>() else {
3703 continue;
3704 };
3705 if options_file.as_ref().is_none_or(|(s, _)| seq > *s) {
3706 options_file = Some((seq, entry.path()));
3707 }
3708 }
3709 let (_, path) = options_file.expect("no OPTIONS-* file written");
3710 let content = std::fs::read_to_string(&path).expect("read OPTIONS failed");
3711
3712 let mut result: HashMap<String, HashMap<String, String>> = HashMap::new();
3713 let mut current_cf: Option<String> = None;
3714 for line in content.lines() {
3715 let line = line.trim();
3716 if let Some(rest) = line.strip_prefix("[CFOptions \"") {
3717 let cf_name = rest.trim_end_matches("\"]").to_string();
3718 current_cf = Some(cf_name);
3719 } else if line.starts_with('[') {
3720 current_cf = None;
3722 } else if let Some(cf) = current_cf.as_ref()
3723 && let Some((k, v)) = line.split_once('=')
3724 {
3725 result
3726 .entry(cf.clone())
3727 .or_default()
3728 .insert(k.trim().to_string(), v.trim().to_string());
3729 }
3730 }
3731 result
3732 }
3733}