1use crate::authority::AuthorityStore;
5use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
6use crate::checkpoints::CheckpointStore;
7use crate::par_index_live_object_set::LiveObjectIndexer;
8use crate::par_index_live_object_set::ParMakeLiveObjectIndexer;
9use itertools::Itertools;
10use move_core_types::language_storage::{StructTag, TypeTag};
11use mysten_common::ZipDebugEqIteratorExt;
12use rayon::iter::IntoParallelIterator;
13use rayon::iter::ParallelIterator;
14use roaring::RoaringBitmap;
15use rustc_hash::{FxHashMap, FxHashSet};
16use serde::Deserialize;
17use serde::Serialize;
18use serde::de::DeserializeOwned;
19use std::collections::{BTreeMap, HashMap};
20use std::path::Path;
21use std::path::PathBuf;
22use std::sync::Arc;
23use std::sync::Mutex;
24use std::sync::atomic::AtomicU64;
25use std::sync::atomic::Ordering;
26use std::time::Duration;
27use std::time::Instant;
28use sui_inverted_index::encode_dimension_key;
29use sui_inverted_index::for_each_event_dimension;
30use sui_inverted_index::for_each_transaction_dimension;
31use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
32use sui_types::base_types::MoveObjectType;
33use sui_types::base_types::ObjectID;
34use sui_types::base_types::SequenceNumber;
35use sui_types::base_types::SuiAddress;
36use sui_types::coin::Coin;
37use sui_types::committee::EpochId;
38use sui_types::digests::TransactionDigest;
39use sui_types::effects::TransactionEffectsAPI;
40use sui_types::full_checkpoint_content::Checkpoint;
41use sui_types::full_checkpoint_content::ExecutedTransaction;
42use sui_types::full_checkpoint_content::ObjectSet;
43use sui_types::messages_checkpoint::CheckpointSequenceNumber;
44use sui_types::object::Data;
45use sui_types::object::Object;
46use sui_types::object::Owner;
47use sui_types::storage::BackingPackageStore;
48use sui_types::storage::DynamicFieldKey;
49use sui_types::storage::EpochInfo;
50use sui_types::storage::LedgerBitmapBucket;
51use sui_types::storage::LedgerBitmapBucketIterator;
52use sui_types::storage::LedgerTxSeqDigest;
53use sui_types::storage::LedgerTxSeqDigestIterator;
54use sui_types::storage::ObjectKey;
55use sui_types::storage::TransactionInfo;
56use sui_types::storage::error::Error as StorageError;
57use sui_types::sui_system_state::SuiSystemStateTrait;
58use sui_types::transaction::TransactionDataAPI;
59use sysinfo::{MemoryRefreshKind, RefreshKind, System};
60use tracing::{debug, info, warn};
61use typed_store::DBMapUtils;
62use typed_store::TypedStoreError;
63use typed_store::rocks::{DBMap, DBMapTableConfigMap, MetricConf};
64use typed_store::rocksdb::{MergeOperands, WriteOptions, compaction_filter::Decision};
65use typed_store::traits::Map;
66
67const CURRENT_DB_VERSION: u64 = 4;
68
69const BALANCE_FLUSH_THRESHOLD: usize = 10_000;
71
72const TX_BUCKET_SIZE: u64 = 65_536;
75const EVENT_BUCKET_SIZE: u64 = 268_435_456;
78const EVENT_BITS: u32 = 16;
79const MAX_EVENTS_PER_TX: u32 = 1 << EVENT_BITS;
80const MAX_TX_SEQ: u64 = u64::MAX >> EVENT_BITS;
81
82const _: () = assert!(TX_BUCKET_SIZE <= u32::MAX as u64);
83const _: () = assert!(EVENT_BUCKET_SIZE <= u32::MAX as u64);
84const _: () = assert!(EVENT_BITS < u32::BITS);
85const _: () = assert!(EVENT_BITS < u64::BITS);
86const _: () = assert!(MAX_EVENTS_PER_TX as u64 == 1u64 << EVENT_BITS);
87const _: () = assert!(EVENT_BUCKET_SIZE.is_multiple_of(MAX_EVENTS_PER_TX as u64));
88
89fn checked_encode_event_seq(tx_seq: u64, event_idx: u32) -> Result<u64, StorageError> {
90 if event_idx >= MAX_EVENTS_PER_TX {
91 return Err(StorageError::custom(format!(
92 "event_idx {event_idx} exceeds packed event-seq limit {}",
93 MAX_EVENTS_PER_TX - 1
94 )));
95 }
96 if tx_seq > MAX_TX_SEQ {
97 return Err(StorageError::custom(format!(
98 "tx_seq {tx_seq} exceeds packed event-seq limit {MAX_TX_SEQ}"
99 )));
100 }
101 Ok((tx_seq << EVENT_BITS) | (event_idx as u64))
102}
103
104fn checked_event_seq_lo(tx_seq: u64) -> Option<u64> {
108 if tx_seq <= MAX_TX_SEQ {
109 Some(tx_seq << EVENT_BITS)
110 } else {
111 None
112 }
113}
114
115fn bulk_ingestion_write_options() -> WriteOptions {
116 let mut opts = WriteOptions::default();
117 opts.disable_wal(true);
118 opts
119}
120
121fn get_available_memory() -> u64 {
123 let mut sys = System::new_with_specifics(
125 RefreshKind::nothing().with_memory(MemoryRefreshKind::everything()),
126 );
127 sys.refresh_memory();
128
129 if let Some(cgroup_limits) = sys.cgroup_limits() {
131 let memory_limit = cgroup_limits.total_memory;
132 if memory_limit > 0 {
134 debug!("Using cgroup memory limit: {} bytes", memory_limit);
135 return memory_limit;
136 }
137 }
138
139 let total_memory_bytes = sys.total_memory();
142 debug!("Using system memory: {} bytes", total_memory_bytes);
143 total_memory_bytes
144}
145
146#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
147struct MetadataInfo {
148 version: u64,
150}
151
152#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Default)]
164struct IndexSettings {
165 #[serde(default)]
167 ledger_history_indexing: bool,
168}
169
170fn encode_settings(settings: &IndexSettings) -> Vec<u8> {
173 serde_json::to_vec(settings).expect("IndexSettings is always JSON-serializable")
174}
175
176fn decode_settings(bytes: &[u8]) -> IndexSettings {
180 serde_json::from_slice(bytes).unwrap_or_default()
181}
182
183#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
185pub enum Watermark {
186 Indexed,
187 Pruned,
188}
189
190#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
191pub struct OwnerIndexKey {
192 pub owner: SuiAddress,
193
194 pub object_type: StructTag,
195
196 pub inverted_balance: Option<u64>,
199
200 pub object_id: ObjectID,
201}
202
203impl OwnerIndexKey {
204 fn from_object(object: &Object) -> Self {
207 let owner = match object.owner() {
208 Owner::AddressOwner(owner) => owner,
209 Owner::ConsensusAddressOwner { owner, .. } => owner,
210 _ => panic!("cannot create OwnerIndexKey if object is not address-owned"),
211 };
212 let object_type = object.struct_tag().expect("packages cannot be owned");
213
214 let inverted_balance = object.as_coin_maybe().map(|coin| !coin.balance.value());
215
216 Self {
217 owner: *owner,
218 object_type,
219 inverted_balance,
220 object_id: object.id(),
221 }
222 }
223}
224
225#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
226pub struct OwnerIndexInfo {
227 pub version: SequenceNumber,
229}
230
231impl OwnerIndexInfo {
232 pub fn new(object: &Object) -> Self {
233 Self {
234 version: object.version(),
235 }
236 }
237}
238
239#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
240pub struct CoinIndexKey {
241 coin_type: StructTag,
242}
243
244#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
245pub struct BalanceKey {
246 pub owner: SuiAddress,
247 pub coin_type: StructTag,
248}
249
250#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
251pub struct CoinIndexInfo {
252 pub coin_metadata_object_id: Option<ObjectID>,
253 pub treasury_object_id: Option<ObjectID>,
254 pub regulated_coin_metadata_object_id: Option<ObjectID>,
255}
256
257#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug, Default)]
258pub struct BalanceIndexInfo {
259 pub coin_balance_delta: i128,
260 pub address_balance_delta: i128,
261}
262
263impl BalanceIndexInfo {
264 fn merge_delta(&mut self, other: &Self) {
265 self.coin_balance_delta = self
266 .coin_balance_delta
267 .saturating_add(other.coin_balance_delta);
268 self.address_balance_delta = self
269 .address_balance_delta
270 .saturating_add(other.address_balance_delta);
271 }
272}
273
274impl From<BalanceIndexInfo> for sui_types::storage::BalanceInfo {
275 fn from(index_info: BalanceIndexInfo) -> Self {
276 let coin_balance = index_info.coin_balance_delta.clamp(0, u64::MAX as i128) as u64;
283 let address_balance = index_info.address_balance_delta.clamp(0, u64::MAX as i128) as u64;
284 sui_types::storage::BalanceInfo {
285 coin_balance,
286 address_balance,
287 }
288 }
289}
290
291#[derive(Copy, Clone, PartialEq, Eq, Debug, Serialize, Deserialize, PartialOrd, Ord)]
292pub struct PackageVersionKey {
293 pub original_package_id: ObjectID,
294 pub version: u64,
295}
296
297#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
298pub struct PackageVersionInfo {
299 pub storage_id: ObjectID,
300}
301
302#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
309pub struct TxSeqDigestInfo {
310 pub digest: TransactionDigest,
311 pub event_count: u32,
312 pub tx_offset: u32,
313 pub checkpoint_number: CheckpointSequenceNumber,
314}
315
316#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
330pub struct BitmapIndexKey {
331 pub dimension_key: Vec<u8>,
332 pub bucket_id: u64,
333}
334
335#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug, Default)]
341pub struct BitmapBlob(pub Vec<u8>);
342
343impl From<RoaringBitmap> for BitmapBlob {
344 fn from(bm: RoaringBitmap) -> Self {
345 let mut buf = Vec::with_capacity(bm.serialized_size());
346 bm.serialize_into(&mut buf)
347 .expect("RoaringBitmap::serialize_into on Vec cannot fail");
348 Self(buf)
349 }
350}
351
352fn ledger_tx_seq_digest(tx_sequence_number: u64, info: TxSeqDigestInfo) -> LedgerTxSeqDigest {
353 LedgerTxSeqDigest {
354 tx_sequence_number,
355 digest: info.digest,
356 event_count: info.event_count,
357 tx_offset: info.tx_offset,
358 checkpoint_number: info.checkpoint_number,
359 }
360}
361
362fn decode_ledger_bitmap_bucket(
363 key: BitmapIndexKey,
364 blob: BitmapBlob,
365) -> Result<LedgerBitmapBucket, TypedStoreError> {
366 let bitmap = RoaringBitmap::deserialize_from(&blob.0[..]).map_err(|e| {
367 TypedStoreError::SerializationError(format!("decode ledger bitmap bucket: {e}"))
368 })?;
369 Ok(LedgerBitmapBucket {
370 bucket_id: key.bucket_id,
371 bitmap,
372 })
373}
374
375#[derive(Clone, Copy, Debug, PartialEq, Eq)]
378pub enum BitmapKind {
379 Transaction,
380 Event,
381}
382
383impl BitmapKind {
384 fn bucket_fully_pruned(self, bucket_id: u64, pruned_tx_seq_exclusive: u64) -> bool {
397 match self {
398 BitmapKind::Transaction => bucket_id
399 .checked_add(1)
400 .and_then(|b| b.checked_mul(TX_BUCKET_SIZE))
401 .map(|hi| hi <= pruned_tx_seq_exclusive)
402 .unwrap_or(false),
403 BitmapKind::Event => {
404 let bucket_hi = bucket_id
405 .checked_add(1)
406 .and_then(|b| b.checked_mul(EVENT_BUCKET_SIZE));
407 let threshold = checked_event_seq_lo(pruned_tx_seq_exclusive);
408 match (bucket_hi, threshold) {
409 (Some(hi), Some(th)) => hi <= th,
410 _ => false,
411 }
412 }
413 }
414 }
415}
416
417#[derive(Default, Clone)]
418pub struct IndexStoreOptions {
419 pub pruning_tx_seq_exclusive: Arc<AtomicU64>,
422}
423
424fn default_table_options() -> typed_store::rocks::DBOptions {
425 typed_store::rocks::default_db_options().disable_write_throttling()
426}
427
428fn tx_seq_digest_table_options() -> typed_store::rocks::DBOptions {
434 let mut options = default_table_options();
435 options.rw_options = options.rw_options.clone().set_ignore_range_deletions(false);
436 options
437}
438
439fn balance_delta_merge_operator(
440 _key: &[u8],
441 existing_val: Option<&[u8]>,
442 operands: &MergeOperands,
443) -> Option<Vec<u8>> {
444 let mut result = if let Some(existing_val) = existing_val {
445 bcs::from_bytes::<BalanceIndexInfo>(existing_val)
446 .inspect_err(|e| {
447 tracing::error!(
448 "Failed to deserialize BalanceIndexInfo from RocksDB - data corruption: {e}"
449 )
450 })
451 .ok()?
452 } else {
453 BalanceIndexInfo::default()
454 };
455
456 for operand in operands.iter() {
457 let delta = bcs::from_bytes::<BalanceIndexInfo>(operand)
458 .inspect_err(|e| {
459 tracing::error!(
460 "Failed to deserialize BalanceIndexInfo from RocksDB - data corruption: {e}"
461 )
462 })
463 .ok()?;
464 result.merge_delta(&delta);
465 }
466
467 Some(
468 bcs::to_bytes(&result)
469 .expect("Failed to deserialize BalanceIndexInfo from RocksDB - data corruption."),
470 )
471}
472
473fn balance_compaction_filter(_level: u32, _key: &[u8], value: &[u8]) -> Decision {
474 let balance_info = match bcs::from_bytes::<BalanceIndexInfo>(value) {
475 Ok(info) => info,
476 Err(_) => return Decision::Keep,
477 };
478
479 if balance_info.coin_balance_delta == 0 && balance_info.address_balance_delta == 0 {
480 Decision::Remove
481 } else {
482 Decision::Keep
483 }
484}
485
486fn balance_table_options() -> typed_store::rocks::DBOptions {
487 default_table_options()
488 .set_merge_operator_associative("balance_merge", balance_delta_merge_operator)
489 .set_compaction_filter("balance_zero_filter", balance_compaction_filter)
490}
491
492fn decode_bitmap_blob(bcs_bytes: &[u8]) -> Result<RoaringBitmap, anyhow::Error> {
495 let blob: BitmapBlob = bcs::from_bytes(bcs_bytes)?;
496 Ok(RoaringBitmap::deserialize_from(&blob.0[..])?)
497}
498
499fn encode_bitmap_blob(bm: &RoaringBitmap) -> Vec<u8> {
500 let mut buf = Vec::with_capacity(bm.serialized_size());
501 bm.serialize_into(&mut buf)
502 .expect("RoaringBitmap::serialize_into on Vec cannot fail");
503 bcs::to_bytes(&BitmapBlob(buf)).expect("BCS encode of BitmapBlob cannot fail")
504}
505
506fn bitmap_union_merge_operator(
509 _key: &[u8],
510 existing_val: Option<&[u8]>,
511 operands: &MergeOperands,
512) -> Option<Vec<u8>> {
513 let mut acc = match existing_val {
514 Some(v) => match decode_bitmap_blob(v) {
515 Ok(bm) => bm,
516 Err(e) => {
517 tracing::error!(
518 "Failed to deserialize existing BitmapBlob during merge - data corruption: {e}"
519 );
520 return None;
521 }
522 },
523 None => RoaringBitmap::new(),
524 };
525
526 for operand in operands.iter() {
527 match decode_bitmap_blob(operand) {
528 Ok(bm) => acc |= bm,
529 Err(e) => {
530 tracing::error!(
531 "Failed to deserialize BitmapBlob operand during merge - data corruption: {e}"
532 );
533 return None;
534 }
535 }
536 }
537
538 acc.optimize();
546 Some(encode_bitmap_blob(&acc))
547}
548
549#[derive(Clone)]
559pub struct BitmapCompactionFilter {
560 pruning_tx_seq_exclusive: Arc<AtomicU64>,
561 kind: BitmapKind,
562}
563
564impl BitmapCompactionFilter {
565 pub fn new(pruning_tx_seq_exclusive: Arc<AtomicU64>, kind: BitmapKind) -> Self {
566 Self {
567 pruning_tx_seq_exclusive,
568 kind,
569 }
570 }
571
572 pub fn filter(&self, key: &[u8], _value: &[u8]) -> Decision {
573 if key.len() < 8 {
574 warn!(
575 kind = ?self.kind,
576 "bitmap compaction filter saw key shorter than 8 bytes ({}); keeping",
577 key.len(),
578 );
579 return Decision::Keep;
580 }
581 let bucket_id =
582 u64::from_be_bytes(key[key.len() - 8..].try_into().expect("len checked above"));
583 let pruned_exclusive = self.pruning_tx_seq_exclusive.load(Ordering::Relaxed);
584
585 if self.kind.bucket_fully_pruned(bucket_id, pruned_exclusive) {
586 Decision::Remove
587 } else {
588 Decision::Keep
589 }
590 }
591}
592
593fn bitmap_cf_default_options() -> typed_store::rocks::DBOptions {
596 default_table_options()
597 .set_merge_operator_associative("bitmap_union_merge", bitmap_union_merge_operator)
598}
599
600fn bitmap_cf_options(
602 filter_name: &str,
603 filter: BitmapCompactionFilter,
604) -> typed_store::rocks::DBOptions {
605 let mut options = bitmap_cf_default_options();
606 options
607 .options
608 .set_compaction_filter(filter_name, move |_level, key, value| {
609 filter.filter(key, value)
610 });
611 options
612}
613
614impl CoinIndexInfo {
615 fn merge(&mut self, other: Self) {
616 self.coin_metadata_object_id = self
617 .coin_metadata_object_id
618 .or(other.coin_metadata_object_id);
619 self.regulated_coin_metadata_object_id = self
620 .regulated_coin_metadata_object_id
621 .or(other.regulated_coin_metadata_object_id);
622 self.treasury_object_id = self.treasury_object_id.or(other.treasury_object_id);
623 }
624}
625
626#[derive(DBMapUtils)]
635struct IndexStoreTables {
636 meta: DBMap<(), MetadataInfo>,
644
645 settings: DBMap<(), Vec<u8>>,
652
653 #[default_options_override_fn = "default_table_options"]
659 watermark: DBMap<Watermark, CheckpointSequenceNumber>,
660
661 #[default_options_override_fn = "default_table_options"]
665 epochs: DBMap<EpochId, EpochInfo>,
666
667 #[default_options_override_fn = "default_table_options"]
671 #[allow(unused)]
672 #[deprecated]
673 transactions: DBMap<TransactionDigest, TransactionInfo>,
674
675 #[default_options_override_fn = "default_table_options"]
680 owner: DBMap<OwnerIndexKey, OwnerIndexInfo>,
681
682 #[default_options_override_fn = "default_table_options"]
687 dynamic_field: DBMap<DynamicFieldKey, ()>,
688
689 #[default_options_override_fn = "default_table_options"]
694 coin: DBMap<CoinIndexKey, CoinIndexInfo>,
695
696 #[default_options_override_fn = "balance_table_options"]
700 balance: DBMap<BalanceKey, BalanceIndexInfo>,
701
702 #[default_options_override_fn = "default_table_options"]
707 package_version: DBMap<PackageVersionKey, PackageVersionInfo>,
708
709 #[default_options_override_fn = "tx_seq_digest_table_options"]
711 tx_seq_digest: DBMap<u64, TxSeqDigestInfo>,
712
713 #[default_options_override_fn = "bitmap_cf_default_options"]
715 transaction_bitmap: DBMap<BitmapIndexKey, BitmapBlob>,
716
717 #[default_options_override_fn = "bitmap_cf_default_options"]
719 event_bitmap: DBMap<BitmapIndexKey, BitmapBlob>,
720 }
724
725fn clear_table<K, V>(map: &DBMap<K, V>) -> Result<(), TypedStoreError>
732where
733 K: Serialize + DeserializeOwned,
734 V: Serialize + DeserializeOwned,
735{
736 let first = map.safe_iter().next().transpose()?.map(|(k, _)| k);
737 let last = map
738 .reversed_safe_iter_with_bounds(None, None)?
739 .next()
740 .transpose()?
741 .map(|(k, _)| k);
742 let (Some(first), Some(last)) = (first, last) else {
743 return Ok(());
744 };
745 let mut batch = map.batch();
746 batch.schedule_delete_range(map, &first, &last)?;
747 batch.delete_batch(map, std::iter::once(&last))?;
748 batch.write()?;
749 map.compact_range(&first, &last)
750}
751
752impl IndexStoreTables {
753 fn extract_version_if_package(
754 object: &Object,
755 ) -> Option<(PackageVersionKey, PackageVersionInfo)> {
756 if let Data::Package(package) = &object.data {
757 let original_id = package.original_package_id();
758 let version = package.version().value();
759 let storage_id = object.id();
760
761 let key = PackageVersionKey {
762 original_package_id: original_id,
763 version,
764 };
765 let info = PackageVersionInfo { storage_id };
766 return Some((key, info));
767 }
768 None
769 }
770
771 fn open_with_index_options<P: Into<PathBuf>>(
772 path: P,
773 index_options: IndexStoreOptions,
774 ) -> Self {
775 let mut table_options = std::collections::BTreeMap::new();
783 for (table_name, _) in IndexStoreTables::describe_tables() {
784 table_options.insert(table_name, default_table_options());
785 }
786 table_options.insert("balance".to_string(), balance_table_options());
787 table_options.insert("tx_seq_digest".to_string(), tx_seq_digest_table_options());
789
790 let bitmap_filter_tx = BitmapCompactionFilter::new(
791 index_options.pruning_tx_seq_exclusive.clone(),
792 BitmapKind::Transaction,
793 );
794 let bitmap_filter_event = BitmapCompactionFilter::new(
795 index_options.pruning_tx_seq_exclusive.clone(),
796 BitmapKind::Event,
797 );
798 table_options.insert(
799 "transaction_bitmap".to_string(),
800 bitmap_cf_options("transaction_bitmap_filter", bitmap_filter_tx),
801 );
802 table_options.insert(
803 "event_bitmap".to_string(),
804 bitmap_cf_options("event_bitmap_filter", bitmap_filter_event),
805 );
806
807 IndexStoreTables::open_tables_read_write_with_deprecation_option(
808 path.into(),
809 MetricConf::new("rpc-index"),
810 None,
811 Some(DBMapTableConfigMap::new(table_options)),
812 true, )
814 }
815
816 fn open_with_options<P: Into<PathBuf>>(
817 path: P,
818 options: typed_store::rocksdb::Options,
819 table_options: Option<DBMapTableConfigMap>,
820 ) -> Self {
821 IndexStoreTables::open_tables_read_write_with_deprecation_option(
822 path.into(),
823 MetricConf::new("rpc-index"),
824 Some(options),
825 table_options,
826 true, )
828 }
829
830 fn persisted_ledger_history_indexing(&self) -> bool {
834 self.read_settings().ledger_history_indexing
835 }
836
837 fn read_settings(&self) -> IndexSettings {
840 self.settings
841 .get(&())
842 .ok()
843 .flatten()
844 .map(|bytes| decode_settings(&bytes))
845 .unwrap_or_default()
846 }
847
848 fn needs_to_do_initialization(
849 &self,
850 checkpoint_store: &CheckpointStore,
851 ledger_history_indexing: bool,
852 ) -> bool {
853 let schema_stale = match self.meta.get(&()) {
854 Ok(Some(metadata)) => metadata.version != CURRENT_DB_VERSION,
855 Ok(None) | Err(_) => true,
856 };
857 let enabling = ledger_history_indexing && !self.persisted_ledger_history_indexing();
862 schema_stale || enabling || self.is_indexed_watermark_out_of_date(checkpoint_store)
863 }
864
865 fn disable_ledger_history_indexing(&self) -> Result<(), TypedStoreError> {
872 clear_table(&self.tx_seq_digest)?;
873 clear_table(&self.transaction_bitmap)?;
874 clear_table(&self.event_bitmap)?;
875 self.settings.insert(
876 &(),
877 &encode_settings(&IndexSettings {
878 ledger_history_indexing: false,
879 }),
880 )?;
881 Ok(())
882 }
883
884 fn is_indexed_watermark_out_of_date(&self, checkpoint_store: &CheckpointStore) -> bool {
886 let highest_executed_checkpint = checkpoint_store
887 .get_highest_executed_checkpoint_seq_number()
888 .ok()
889 .flatten();
890 let watermark = self.watermark.get(&Watermark::Indexed).ok().flatten();
891 watermark < highest_executed_checkpint
892 }
893
894 #[tracing::instrument(skip_all)]
895 fn init(
896 &mut self,
897 authority_store: &AuthorityStore,
898 checkpoint_store: &CheckpointStore,
899 _epoch_store: &AuthorityPerEpochStore,
900 _package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
901 batch_size_limit: usize,
902 rpc_config: &sui_config::RpcConfig,
903 ) -> Result<(), StorageError> {
904 info!("Initializing RPC indexes");
905
906 let highest_executed_checkpint =
907 checkpoint_store.get_highest_executed_checkpoint_seq_number()?;
908 let lowest_available_checkpoint = checkpoint_store
909 .get_highest_pruned_checkpoint_seq_number()?
910 .map(|c| c.saturating_add(1))
911 .unwrap_or(0);
912 let lowest_available_checkpoint_objects = authority_store
913 .perpetual_tables
914 .get_highest_pruned_checkpoint()?
915 .map(|c| c.saturating_add(1))
916 .unwrap_or(0);
917 let lowest_available_checkpoint =
920 lowest_available_checkpoint.max(lowest_available_checkpoint_objects);
921
922 let checkpoint_range = highest_executed_checkpint.map(|highest_executed_checkpint| {
923 lowest_available_checkpoint..=highest_executed_checkpint
924 });
925
926 if let Some(checkpoint_range) = checkpoint_range.clone() {
927 self.index_existing_checkpoints(authority_store, checkpoint_store, checkpoint_range)?;
928 }
929
930 if rpc_config.ledger_history_indexing()
931 && let Some(checkpoint_range) = checkpoint_range
932 {
933 self.backfill_ledger_history_indexes(
934 authority_store,
935 checkpoint_store,
936 checkpoint_range,
937 )?;
938 }
939
940 self.initialize_current_epoch(authority_store, checkpoint_store)?;
941
942 if highest_executed_checkpint.is_some() {
946 let coin_index = Mutex::new(HashMap::new());
947
948 let make_live_object_indexer = RpcParLiveObjectSetIndexer {
949 tables: self,
950 coin_index: &coin_index,
951 batch_size_limit,
952 };
953
954 crate::par_index_live_object_set::par_index_live_object_set(
955 authority_store,
956 &make_live_object_indexer,
957 )?;
958
959 self.coin.multi_insert(coin_index.into_inner().unwrap())?;
960 }
961
962 self.watermark.insert(
963 &Watermark::Indexed,
964 &highest_executed_checkpint.unwrap_or(0),
965 )?;
966
967 let mut batch = self.meta.batch();
970 batch.insert_batch(
971 &self.meta,
972 [(
973 (),
974 MetadataInfo {
975 version: CURRENT_DB_VERSION,
976 },
977 )],
978 )?;
979 batch.insert_batch(
980 &self.settings,
981 [(
982 (),
983 encode_settings(&IndexSettings {
984 ledger_history_indexing: rpc_config.ledger_history_indexing(),
985 }),
986 )],
987 )?;
988 batch.write()?;
989
990 info!("Finished initializing RPC indexes");
991
992 Ok(())
993 }
994
995 #[tracing::instrument(skip(self, authority_store, checkpoint_store))]
996 fn index_existing_checkpoints(
997 &mut self,
998 authority_store: &AuthorityStore,
999 checkpoint_store: &CheckpointStore,
1000 checkpoint_range: std::ops::RangeInclusive<u64>,
1001 ) -> Result<(), StorageError> {
1002 info!(
1003 "Indexing {} checkpoints in range {checkpoint_range:?}",
1004 checkpoint_range.size_hint().0
1005 );
1006 let start_time = Instant::now();
1007
1008 checkpoint_range.into_par_iter().try_for_each(|seq| {
1009 let Some(checkpoint) =
1010 sparse_checkpoint_for_epoch_backfill(authority_store, checkpoint_store, seq)?
1011 else {
1012 return Ok(());
1013 };
1014
1015 let mut batch = self.epochs.batch();
1016
1017 self.index_epoch(&checkpoint, &mut batch)?;
1018
1019 batch
1020 .write_opt(bulk_ingestion_write_options())
1021 .map_err(StorageError::from)
1022 })?;
1023
1024 info!(
1025 "Indexing checkpoints took {} seconds",
1026 start_time.elapsed().as_secs()
1027 );
1028 Ok(())
1029 }
1030
1031 fn backfill_ledger_history_indexes(
1037 &self,
1038 authority_store: &AuthorityStore,
1039 checkpoint_store: &CheckpointStore,
1040 checkpoint_range: std::ops::RangeInclusive<u64>,
1041 ) -> Result<(), StorageError> {
1042 info!("ledger history backfill: cps {checkpoint_range:?}");
1043 let start_time = Instant::now();
1044
1045 checkpoint_range.clone().into_par_iter().try_for_each(
1046 |seq| -> Result<(), StorageError> {
1047 let checkpoint =
1048 full_checkpoint_for_backfill(authority_store, checkpoint_store, seq)?
1049 .ok_or_else(|| {
1050 StorageError::missing(format!(
1052 "ledger history backfill: checkpoint {seq} is missing from local \
1053 storage but falls inside the retained backfill range \
1054 {checkpoint_range:?}"
1055 ))
1056 })?;
1057 let mut batch = self.meta.batch();
1058 self.write_ledger_history_rows_for_checkpoint(&checkpoint, &mut batch)?;
1059 batch
1060 .write_opt(bulk_ingestion_write_options())
1061 .map_err(StorageError::from)
1062 },
1063 )?;
1064
1065 self.tx_seq_digest.flush().map_err(|e| {
1067 StorageError::custom(format!("flush after ledger history backfill: {e}"))
1068 })?;
1069
1070 info!(
1071 "ledger history backfill took {} seconds",
1072 start_time.elapsed().as_secs()
1073 );
1074 Ok(())
1075 }
1076
1077 fn first_tx_seq_digest_key(&self) -> Result<Option<u64>, TypedStoreError> {
1084 match self.tx_seq_digest.safe_iter().next() {
1085 Some(Ok((k, _))) => Ok(Some(k)),
1086 Some(Err(e)) => Err(e),
1087 None => Ok(None),
1088 }
1089 }
1090
1091 fn prune(
1098 &self,
1099 pruned_checkpoint_watermark: u64,
1100 pruned_tx_seq_exclusive: u64,
1101 ledger_history_enabled: bool,
1102 pruning_atomic: &AtomicU64,
1103 ) -> Result<(), TypedStoreError> {
1104 let mut batch = self.watermark.batch();
1105
1106 batch.insert_batch(
1107 &self.watermark,
1108 [(Watermark::Pruned, pruned_checkpoint_watermark)],
1109 )?;
1110
1111 if ledger_history_enabled {
1112 let prev_exclusive = self.first_tx_seq_digest_key()?.unwrap_or(0);
1116 batch.schedule_delete_range(
1117 &self.tx_seq_digest,
1118 &prev_exclusive,
1119 &pruned_tx_seq_exclusive,
1120 )?;
1121 }
1122
1123 batch.write()?;
1124
1125 if ledger_history_enabled {
1126 pruning_atomic.store(pruned_tx_seq_exclusive, Ordering::Relaxed);
1128 }
1129 Ok(())
1130 }
1131
1132 fn index_checkpoint(
1134 &self,
1135 checkpoint: &Checkpoint,
1136 ledger_history_enabled: bool,
1137 ) -> Result<typed_store::rocks::DBBatch, StorageError> {
1138 debug!(
1139 checkpoint = checkpoint.summary.sequence_number,
1140 "indexing checkpoint"
1141 );
1142
1143 let mut batch = self.owner.batch();
1144
1145 self.index_epoch(checkpoint, &mut batch)?;
1146 self.index_transactions(checkpoint, &mut batch)?;
1147 self.index_objects(checkpoint, &mut batch)?;
1148
1149 if ledger_history_enabled {
1151 self.write_ledger_history_rows_for_checkpoint(checkpoint, &mut batch)?;
1152 }
1153
1154 batch.insert_batch(
1155 &self.watermark,
1156 [(Watermark::Indexed, checkpoint.summary.sequence_number)],
1157 )?;
1158
1159 debug!(
1160 checkpoint = checkpoint.summary.sequence_number,
1161 "finished indexing checkpoint"
1162 );
1163
1164 Ok(batch)
1165 }
1166
1167 fn index_epoch(
1168 &self,
1169 checkpoint: &Checkpoint,
1170 batch: &mut typed_store::rocks::DBBatch,
1171 ) -> Result<(), StorageError> {
1172 let Some(epoch_info) = checkpoint.epoch_info()? else {
1173 return Ok(());
1174 };
1175 if epoch_info.epoch > 0 {
1176 let prev_epoch = epoch_info.epoch - 1;
1177 let mut current_epoch = self.epochs.get(&prev_epoch)?.unwrap_or_default();
1178 current_epoch.epoch = prev_epoch; current_epoch.end_timestamp_ms = epoch_info.start_timestamp_ms;
1180 current_epoch.end_checkpoint = epoch_info.start_checkpoint.map(|sq| sq - 1);
1181 batch.insert_batch(&self.epochs, [(prev_epoch, current_epoch)])?;
1182 }
1183 batch.insert_batch(&self.epochs, [(epoch_info.epoch, epoch_info)])?;
1184 Ok(())
1185 }
1186
1187 fn initialize_current_epoch(
1190 &mut self,
1191 authority_store: &AuthorityStore,
1192 checkpoint_store: &CheckpointStore,
1193 ) -> Result<(), StorageError> {
1194 let Some(checkpoint) = checkpoint_store.get_highest_executed_checkpoint()? else {
1195 return Ok(());
1196 };
1197
1198 let system_state = sui_types::sui_system_state::get_sui_system_state(authority_store)
1199 .map_err(|e| StorageError::custom(format!("Failed to find system state: {e}")))?;
1200
1201 let mut epoch = self.epochs.get(&checkpoint.epoch)?.unwrap_or_default();
1202 epoch.epoch = checkpoint.epoch;
1203
1204 if epoch.protocol_version.is_none() {
1205 epoch.protocol_version = Some(system_state.protocol_version());
1206 }
1207
1208 if epoch.start_timestamp_ms.is_none() {
1209 epoch.start_timestamp_ms = Some(system_state.epoch_start_timestamp_ms());
1210 }
1211
1212 if epoch.reference_gas_price.is_none() {
1213 epoch.reference_gas_price = Some(system_state.reference_gas_price());
1214 }
1215
1216 if epoch.system_state.is_none() {
1217 epoch.system_state = Some(system_state);
1218 }
1219
1220 self.epochs.insert(&epoch.epoch, &epoch)?;
1221
1222 Ok(())
1223 }
1224
1225 fn index_transactions(
1226 &self,
1227 checkpoint: &Checkpoint,
1228 batch: &mut typed_store::rocks::DBBatch,
1229 ) -> Result<(), StorageError> {
1230 for tx in &checkpoint.transactions {
1231 let balance_changes = sui_types::balance_change::derive_detailed_balance_changes_2(
1232 &tx.effects,
1233 &checkpoint.object_set,
1234 )
1235 .into_iter()
1236 .filter_map(|change| {
1237 if let TypeTag::Struct(coin_type) = change.coin_type {
1238 Some((
1239 BalanceKey {
1240 owner: change.address,
1241 coin_type: *coin_type,
1242 },
1243 BalanceIndexInfo {
1244 coin_balance_delta: change.coin_amount,
1245 address_balance_delta: change.address_amount,
1246 },
1247 ))
1248 } else {
1249 None
1250 }
1251 });
1252 batch.partial_merge_batch(&self.balance, balance_changes)?;
1253 }
1254
1255 Ok(())
1256 }
1257
1258 fn write_ledger_history_rows_for_checkpoint(
1263 &self,
1264 checkpoint: &Checkpoint,
1265 batch: &mut typed_store::rocks::DBBatch,
1266 ) -> Result<(), StorageError> {
1267 let cp_seq = checkpoint.summary.sequence_number;
1268 let net_total = checkpoint.summary.data().network_total_transactions;
1269 let tx_count = checkpoint.transactions.len() as u64;
1270 let tx_lo = net_total.checked_sub(tx_count).ok_or_else(|| {
1274 StorageError::custom(format!(
1275 "checkpoint {cp_seq}: network_total_transactions ({net_total}) \
1276 < tx_count ({tx_count})"
1277 ))
1278 })?;
1279
1280 let object_set = &checkpoint.object_set;
1281
1282 let mut tx_groups: FxHashMap<(Vec<u8>, u64), RoaringBitmap> = FxHashMap::default();
1285
1286 for (i, tx) in checkpoint.transactions.iter().enumerate() {
1287 let tx_seq = tx_lo + i as u64;
1288
1289 let tx_data = &tx.transaction;
1290 let digest = *tx.effects.transaction_digest();
1291 let event_count = tx.events.as_ref().map(|e| e.data.len() as u32).unwrap_or(0);
1292
1293 batch.insert_batch(
1295 &self.tx_seq_digest,
1296 [(
1297 tx_seq,
1298 TxSeqDigestInfo {
1299 digest,
1300 event_count,
1301 tx_offset: i as u32,
1303 checkpoint_number: cp_seq,
1304 },
1305 )],
1306 )?;
1307
1308 let tx_bucket = tx_seq / TX_BUCKET_SIZE;
1311 let tx_bit = (tx_seq % TX_BUCKET_SIZE) as u32;
1312 let mut tx_dim_keys: FxHashSet<Vec<u8>> = FxHashSet::default();
1313 for_each_transaction_dimension(
1314 tx_data,
1315 &tx.effects,
1316 tx.events.as_ref(),
1317 object_set,
1318 |dim, value| {
1319 tx_dim_keys.insert(encode_dimension_key(dim, value));
1320 },
1321 );
1322 for dim_key in tx_dim_keys {
1323 tx_groups
1324 .entry((dim_key, tx_bucket))
1325 .or_default()
1326 .insert(tx_bit);
1327 }
1328
1329 let mut event_groups: FxHashMap<(Vec<u8>, u64), RoaringBitmap> = FxHashMap::default();
1333 let mut event_seq_error = None;
1334 for_each_event_dimension(
1335 tx_data.sender(),
1336 &tx.effects,
1337 tx.events.as_ref(),
1338 |event_idx, dim, value| {
1339 let event_seq = match checked_encode_event_seq(tx_seq, event_idx) {
1340 Ok(event_seq) => event_seq,
1341 Err(e) => {
1342 event_seq_error.get_or_insert(e);
1343 return;
1344 }
1345 };
1346 let bucket = event_seq / EVENT_BUCKET_SIZE;
1347 let bit = (event_seq % EVENT_BUCKET_SIZE) as u32;
1348 event_groups
1349 .entry((encode_dimension_key(dim, value), bucket))
1350 .or_default()
1351 .insert(bit);
1352 },
1353 );
1354 if let Some(e) = event_seq_error {
1355 return Err(e);
1356 }
1357 let event_ops = event_groups.into_iter().map(|((dim_key, bucket), bm)| {
1358 (
1359 BitmapIndexKey {
1360 dimension_key: dim_key,
1361 bucket_id: bucket,
1362 },
1363 BitmapBlob::from(bm),
1364 )
1365 });
1366 batch.partial_merge_batch(&self.event_bitmap, event_ops)?;
1367 }
1368
1369 let tx_ops = tx_groups.into_iter().map(|((dim_key, bucket), bm)| {
1370 (
1371 BitmapIndexKey {
1372 dimension_key: dim_key,
1373 bucket_id: bucket,
1374 },
1375 BitmapBlob::from(bm),
1376 )
1377 });
1378 batch.partial_merge_batch(&self.transaction_bitmap, tx_ops)?;
1379
1380 Ok(())
1381 }
1382
1383 fn index_objects(
1384 &self,
1385 checkpoint: &Checkpoint,
1386 batch: &mut typed_store::rocks::DBBatch,
1387 ) -> Result<(), StorageError> {
1388 let mut coin_index: HashMap<CoinIndexKey, CoinIndexInfo> = HashMap::new();
1389 let mut package_version_index: Vec<(PackageVersionKey, PackageVersionInfo)> = vec![];
1390 let object_set = &checkpoint.object_set;
1391
1392 for tx in &checkpoint.transactions {
1393 for removed_object in tx_removed_objects_pre_version(tx, object_set) {
1395 match removed_object.owner() {
1396 Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
1397 let owner_key = OwnerIndexKey::from_object(removed_object);
1398 batch.delete_batch(&self.owner, [owner_key])?;
1399 }
1400 Owner::ObjectOwner(object_id) => {
1401 batch.delete_batch(
1402 &self.dynamic_field,
1403 [DynamicFieldKey::new(*object_id, removed_object.id())],
1404 )?;
1405 }
1406 Owner::Shared { .. } | Owner::Immutable => {}
1407 Owner::Party { .. } => {
1408 todo!("Party WIP");
1410 }
1413 }
1414 }
1415
1416 for (object, old_object) in tx_changed_objects(tx, object_set) {
1418 if let Some(old_object) = old_object {
1419 match old_object.owner() {
1420 Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
1421 let owner_key = OwnerIndexKey::from_object(old_object);
1422 batch.delete_batch(&self.owner, [owner_key])?;
1423 }
1424
1425 Owner::ObjectOwner(object_id) => {
1426 if old_object.owner() != object.owner() {
1427 batch.delete_batch(
1428 &self.dynamic_field,
1429 [DynamicFieldKey::new(*object_id, old_object.id())],
1430 )?;
1431 }
1432 }
1433
1434 Owner::Shared { .. } | Owner::Immutable => {}
1435
1436 Owner::Party { .. } => {
1437 todo!("Party WIP");
1439 }
1442 }
1443 }
1444
1445 match object.owner() {
1446 Owner::AddressOwner(_) | Owner::ConsensusAddressOwner { .. } => {
1447 let owner_key = OwnerIndexKey::from_object(object);
1448 let owner_info = OwnerIndexInfo::new(object);
1449 batch.insert_batch(&self.owner, [(owner_key, owner_info)])?;
1450 }
1451 Owner::ObjectOwner(parent) => {
1452 if should_index_dynamic_field(object) {
1453 let field_key = DynamicFieldKey::new(*parent, object.id());
1454 batch.insert_batch(&self.dynamic_field, [(field_key, ())])?;
1455 }
1456 }
1457 Owner::Shared { .. } | Owner::Immutable => {}
1458 Owner::Party { .. } => todo!("Party WIP"),
1460 }
1461 if let Some((key, info)) = Self::extract_version_if_package(object) {
1462 package_version_index.push((key, info));
1463 }
1464 }
1465
1466 for (key, value) in tx
1472 .created_objects(object_set)
1473 .flat_map(try_create_coin_index_info)
1474 {
1475 use std::collections::hash_map::Entry;
1476
1477 match coin_index.entry(key) {
1478 Entry::Occupied(mut o) => {
1479 o.get_mut().merge(value);
1480 }
1481 Entry::Vacant(v) => {
1482 v.insert(value);
1483 }
1484 }
1485 }
1486 }
1487
1488 batch.insert_batch(&self.coin, coin_index)?;
1489 batch.insert_batch(&self.package_version, package_version_index)?;
1490
1491 Ok(())
1492 }
1493
1494 fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<EpochInfo>, TypedStoreError> {
1495 self.epochs.get(&epoch)
1496 }
1497
1498 fn owner_iter(
1499 &self,
1500 owner: SuiAddress,
1501 object_type: Option<StructTag>,
1502 cursor: Option<OwnerIndexKey>,
1503 ) -> Result<
1504 impl Iterator<Item = Result<(OwnerIndexKey, OwnerIndexInfo), TypedStoreError>> + '_,
1505 TypedStoreError,
1506 > {
1507 let lower_bound = cursor.unwrap_or_else(|| OwnerIndexKey {
1509 owner,
1510 object_type: object_type
1511 .clone()
1512 .unwrap_or_else(|| "0x0::a::a".parse::<StructTag>().unwrap()),
1513 inverted_balance: None,
1514 object_id: ObjectID::ZERO,
1515 });
1516
1517 Ok(self
1518 .owner
1519 .safe_iter_with_bounds(Some(lower_bound), None)
1520 .take_while(move |item| {
1521 let Ok((key, _)) = item else {
1523 return true;
1524 };
1525
1526 key.owner == owner
1528 && object_type
1530 .as_ref()
1531 .map(|ty| {
1532 ty.address == key.object_type.address
1533 && ty.module == key.object_type.module
1534 && ty.name == key.object_type.name
1535 && (ty.type_params.is_empty() ||
1537 ty.type_params == key.object_type.type_params)
1539 }).unwrap_or(true)
1540 }))
1541 }
1542
1543 fn dynamic_field_iter(
1544 &self,
1545 parent: ObjectID,
1546 cursor: Option<ObjectID>,
1547 ) -> Result<impl Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_, TypedStoreError>
1548 {
1549 let lower_bound = DynamicFieldKey::new(parent, cursor.unwrap_or(ObjectID::ZERO));
1550 let upper_bound = DynamicFieldKey::new(parent, ObjectID::MAX);
1551 let iter = self
1552 .dynamic_field
1553 .safe_iter_with_bounds(Some(lower_bound), Some(upper_bound))
1554 .map_ok(|(key, ())| key);
1555 Ok(iter)
1556 }
1557
1558 fn get_coin_info(
1559 &self,
1560 coin_type: &StructTag,
1561 ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
1562 let key = CoinIndexKey {
1563 coin_type: coin_type.to_owned(),
1564 };
1565 self.coin.get(&key)
1566 }
1567
1568 fn get_balance(
1569 &self,
1570 owner: &SuiAddress,
1571 coin_type: &StructTag,
1572 ) -> Result<Option<BalanceIndexInfo>, TypedStoreError> {
1573 let key = BalanceKey {
1574 owner: owner.to_owned(),
1575 coin_type: coin_type.to_owned(),
1576 };
1577 self.balance.get(&key)
1578 }
1579
1580 fn balance_iter(
1581 &self,
1582 owner: SuiAddress,
1583 cursor: Option<BalanceKey>,
1584 ) -> Result<
1585 impl Iterator<Item = Result<(BalanceKey, BalanceIndexInfo), TypedStoreError>> + '_,
1586 TypedStoreError,
1587 > {
1588 let lower_bound = cursor.unwrap_or_else(|| BalanceKey {
1589 owner,
1590 coin_type: "0x0::a::a".parse::<StructTag>().unwrap(),
1591 });
1592
1593 Ok(self
1594 .balance
1595 .safe_iter_with_bounds(Some(lower_bound), None)
1596 .scan((), move |_, item| {
1597 match item {
1598 Ok((key, value)) if key.owner == owner => Some(Ok((key, value))),
1599 Ok(_) => None, Err(e) => Some(Err(e)), }
1602 }))
1603 }
1604
1605 fn package_versions_iter(
1606 &self,
1607 original_id: ObjectID,
1608 cursor: Option<u64>,
1609 ) -> Result<
1610 impl Iterator<Item = Result<(PackageVersionKey, PackageVersionInfo), TypedStoreError>> + '_,
1611 TypedStoreError,
1612 > {
1613 let lower_bound = PackageVersionKey {
1614 original_package_id: original_id,
1615 version: cursor.unwrap_or(0),
1616 };
1617 let upper_bound = PackageVersionKey {
1618 original_package_id: original_id,
1619 version: u64::MAX,
1620 };
1621
1622 Ok(self
1623 .package_version
1624 .safe_iter_with_bounds(Some(lower_bound), Some(upper_bound)))
1625 }
1626}
1627
1628pub struct RpcIndexStore {
1629 tables: IndexStoreTables,
1630 pending_updates: Mutex<BTreeMap<u64, typed_store::rocks::DBBatch>>,
1631 ledger_history_pruning_watermark: Arc<AtomicU64>,
1635 ledger_history_enabled: bool,
1639}
1640
1641impl RpcIndexStore {
1642 fn db_path(dir: &Path) -> PathBuf {
1644 dir.join("rpc-index")
1645 }
1646
1647 pub async fn new(
1648 dir: &Path,
1649 authority_store: &AuthorityStore,
1650 checkpoint_store: &CheckpointStore,
1651 epoch_store: &AuthorityPerEpochStore,
1652 package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
1653 rpc_config: sui_config::RpcConfig,
1654 ) -> Self {
1655 let ledger_history_pruning_watermark = Arc::new(AtomicU64::new(0));
1657 let index_options = IndexStoreOptions {
1658 pruning_tx_seq_exclusive: ledger_history_pruning_watermark,
1659 };
1660
1661 Self::new_with_options(
1662 dir,
1663 authority_store,
1664 checkpoint_store,
1665 epoch_store,
1666 package_store,
1667 index_options,
1668 rpc_config,
1669 )
1670 .await
1671 }
1672
1673 pub async fn new_with_options(
1674 dir: &Path,
1675 authority_store: &AuthorityStore,
1676 checkpoint_store: &CheckpointStore,
1677 epoch_store: &AuthorityPerEpochStore,
1678 package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
1679 index_options: IndexStoreOptions,
1680 rpc_config: sui_config::RpcConfig,
1681 ) -> Self {
1682 let path = Self::db_path(dir);
1683 let index_config = rpc_config.index_initialization_config();
1684
1685 let ledger_history_atomic = index_options.pruning_tx_seq_exclusive.clone();
1686
1687 let tables = {
1688 let tables = IndexStoreTables::open_with_index_options(&path, index_options.clone());
1689
1690 if tables
1693 .needs_to_do_initialization(checkpoint_store, rpc_config.ledger_history_indexing())
1694 {
1695 let batch_size_limit;
1696
1697 let mut tables = {
1698 drop(tables);
1699 typed_store::rocks::safe_drop_db(path.clone(), Duration::from_secs(30))
1700 .await
1701 .expect("unable to destroy old rpc-index db");
1702
1703 let mut options = typed_store::rocksdb::Options::default();
1706 options.set_unordered_write(true);
1707
1708 let max_background_jobs = if let Some(jobs) =
1710 index_config.as_ref().and_then(|c| c.max_background_jobs)
1711 {
1712 debug!("Using config override for max_background_jobs: {}", jobs);
1713 jobs
1714 } else {
1715 let jobs = num_cpus::get() as i32;
1716 debug!(
1717 "Calculated max_background_jobs: {} (based on CPU count)",
1718 jobs
1719 );
1720 jobs
1721 };
1722 options.set_max_background_jobs(max_background_jobs);
1723
1724 options.set_level_zero_file_num_compaction_trigger(0);
1728 options.set_level_zero_slowdown_writes_trigger(-1);
1729 options.set_level_zero_stop_writes_trigger(i32::MAX);
1730
1731 let total_memory_bytes = get_available_memory();
1732 let db_buffer_size = if let Some(size) =
1735 index_config.as_ref().and_then(|c| c.db_write_buffer_size)
1736 {
1737 debug!(
1738 "Using config override for db_write_buffer_size: {} bytes",
1739 size
1740 );
1741 size
1742 } else {
1743 let size = (total_memory_bytes as f64 * 0.8) as usize;
1745 debug!(
1746 "Calculated db_write_buffer_size: {} bytes (80% of {} total bytes)",
1747 size, total_memory_bytes
1748 );
1749 size
1750 };
1751 options.set_db_write_buffer_size(db_buffer_size);
1752
1753 let mut table_config_map = BTreeMap::new();
1755
1756 let mut cf_options = typed_store::rocks::default_db_options();
1760 cf_options.options.set_disable_auto_compactions(true);
1761
1762 let (buffer_size, buffer_count) = match (
1763 index_config.as_ref().and_then(|c| c.cf_write_buffer_size),
1764 index_config
1765 .as_ref()
1766 .and_then(|c| c.cf_max_write_buffer_number),
1767 ) {
1768 (Some(size), Some(count)) => {
1769 debug!(
1770 "Using config overrides - buffer_size: {} bytes, buffer_count: {}",
1771 size, count
1772 );
1773 (size, count)
1774 }
1775 (None, None) => {
1776 let cf_memory_budget = (total_memory_bytes as f64 * 0.25) as usize;
1778 debug!(
1779 "Column family memory budget: {} bytes (25% of {} total bytes)",
1780 cf_memory_budget, total_memory_bytes
1781 );
1782 const MIN_BUFFER_SIZE: usize = 64 * 1024 * 1024; let target_buffer_count = num_cpus::get().max(2);
1787
1788 let buffer_size =
1794 (cf_memory_budget / target_buffer_count).max(MIN_BUFFER_SIZE);
1795 let buffer_count = (cf_memory_budget / buffer_size)
1796 .clamp(2, target_buffer_count)
1797 as i32;
1798 debug!(
1799 "Calculated buffer_size: {} bytes, buffer_count: {} (based on {} CPUs)",
1800 buffer_size, buffer_count, target_buffer_count
1801 );
1802 (buffer_size, buffer_count)
1803 }
1804 _ => {
1805 panic!(
1806 "indexing-cf-write-buffer-size and indexing-cf-max-write-buffer-number must both be specified or both be omitted"
1807 );
1808 }
1809 };
1810
1811 cf_options.options.set_write_buffer_size(buffer_size);
1812 cf_options.options.set_max_write_buffer_number(buffer_count);
1813
1814 batch_size_limit = if let Some(limit) =
1816 index_config.as_ref().and_then(|c| c.batch_size_limit)
1817 {
1818 debug!(
1819 "Using config override for batch_size_limit: {} bytes",
1820 limit
1821 );
1822 limit
1823 } else {
1824 let half_buffer = buffer_size / 2;
1825 let default_limit = 1 << 27; let limit = half_buffer.min(default_limit);
1827 debug!(
1828 "Calculated batch_size_limit: {} bytes (min of half_buffer={} and default_limit={})",
1829 limit, half_buffer, default_limit
1830 );
1831 limit
1832 };
1833
1834 for (table_name, _) in IndexStoreTables::describe_tables() {
1836 table_config_map.insert(table_name, cf_options.clone());
1837 }
1838
1839 let mut balance_options = cf_options.clone();
1841 balance_options = balance_options.set_merge_operator_associative(
1842 "balance_merge",
1843 balance_delta_merge_operator,
1844 );
1845 table_config_map.insert("balance".to_string(), balance_options);
1846
1847 let bitmap_filter_tx = BitmapCompactionFilter::new(
1848 index_options.pruning_tx_seq_exclusive.clone(),
1849 BitmapKind::Transaction,
1850 );
1851 let bitmap_filter_event = BitmapCompactionFilter::new(
1852 index_options.pruning_tx_seq_exclusive.clone(),
1853 BitmapKind::Event,
1854 );
1855 let mut transaction_bitmap_opts = cf_options.clone();
1856 transaction_bitmap_opts = transaction_bitmap_opts
1857 .set_merge_operator_associative(
1858 "bitmap_union_merge",
1859 bitmap_union_merge_operator,
1860 );
1861 transaction_bitmap_opts.options.set_compaction_filter(
1862 "transaction_bitmap_filter",
1863 move |_level, key, value| bitmap_filter_tx.filter(key, value),
1864 );
1865 table_config_map
1866 .insert("transaction_bitmap".to_string(), transaction_bitmap_opts);
1867
1868 let mut event_bitmap_opts = cf_options.clone();
1869 event_bitmap_opts = event_bitmap_opts.set_merge_operator_associative(
1870 "bitmap_union_merge",
1871 bitmap_union_merge_operator,
1872 );
1873 event_bitmap_opts
1874 .options
1875 .set_compaction_filter("event_bitmap_filter", move |_level, key, value| {
1876 bitmap_filter_event.filter(key, value)
1877 });
1878 table_config_map.insert("event_bitmap".to_string(), event_bitmap_opts);
1879
1880 IndexStoreTables::open_with_options(
1881 &path,
1882 options,
1883 Some(DBMapTableConfigMap::new(table_config_map)),
1884 )
1885 };
1886
1887 tables
1888 .init(
1889 authority_store,
1890 checkpoint_store,
1891 epoch_store,
1892 package_store,
1893 batch_size_limit,
1894 &rpc_config,
1895 )
1896 .expect("unable to initialize rpc index from live object set");
1897
1898 tables
1903 .meta
1904 .flush()
1905 .expect("Failed to flush RPC index tables to disk");
1906
1907 let weak_db = Arc::downgrade(&tables.meta.db);
1908 drop(tables);
1909
1910 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30);
1911 loop {
1912 if weak_db.strong_count() == 0 {
1913 break;
1914 }
1915 if std::time::Instant::now() > deadline {
1916 panic!("unable to reopen DB after indexing");
1917 }
1918 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1919 }
1920
1921 let reopened_tables =
1923 IndexStoreTables::open_with_index_options(&path, index_options);
1924
1925 let stored_version = reopened_tables
1928 .meta
1929 .get(&())
1930 .expect("Failed to read metadata from reopened database")
1931 .expect("Metadata not found in reopened database");
1932 assert_eq!(
1933 stored_version.version, CURRENT_DB_VERSION,
1934 "Database version mismatch after flush and reopen: expected {:#x}, found {:#x}",
1935 CURRENT_DB_VERSION, stored_version.version
1936 );
1937 assert_eq!(
1938 reopened_tables.persisted_ledger_history_indexing(),
1939 rpc_config.ledger_history_indexing(),
1940 "ledger-history setting mismatch after flush and reopen"
1941 );
1942
1943 reopened_tables
1944 } else {
1945 if tables.persisted_ledger_history_indexing()
1948 && !rpc_config.ledger_history_indexing()
1949 {
1950 tables
1951 .disable_ledger_history_indexing()
1952 .expect("unable to disable ledger history indexing");
1953 }
1954 tables
1955 }
1956 };
1957
1958 Self::hydrate_ledger_history_pruning_atomic(&tables, &ledger_history_atomic);
1960
1961 let ledger_history_enabled = tables.persisted_ledger_history_indexing();
1964 debug_assert_eq!(
1965 ledger_history_enabled,
1966 rpc_config.ledger_history_indexing(),
1967 "ledger_history_enabled (from settings CF) must match the configured ledger_history_indexing flag"
1968 );
1969
1970 Self {
1971 tables,
1972 pending_updates: Default::default(),
1973 ledger_history_pruning_watermark: ledger_history_atomic,
1974 ledger_history_enabled,
1975 }
1976 }
1977
1978 fn hydrate_ledger_history_pruning_atomic(tables: &IndexStoreTables, atomic: &Arc<AtomicU64>) {
1980 let persisted = tables.first_tx_seq_digest_key().ok().flatten().unwrap_or(0);
1981 atomic.store(persisted, Ordering::Relaxed);
1982 }
1983
1984 pub fn new_without_init(dir: &Path) -> Self {
1985 let path = Self::db_path(dir);
1986
1987 let ledger_history_atomic = Arc::new(AtomicU64::new(0));
1989 let index_options = IndexStoreOptions {
1990 pruning_tx_seq_exclusive: ledger_history_atomic.clone(),
1991 };
1992 let tables = IndexStoreTables::open_with_index_options(path, index_options);
1993
1994 let ledger_history_enabled = tables.persisted_ledger_history_indexing();
1995 Self::hydrate_ledger_history_pruning_atomic(&tables, &ledger_history_atomic);
1996
1997 Self {
1998 tables,
1999 pending_updates: Default::default(),
2000 ledger_history_pruning_watermark: ledger_history_atomic,
2001 ledger_history_enabled,
2002 }
2003 }
2004
2005 pub fn prune(
2006 &self,
2007 pruned_checkpoint_watermark: u64,
2008 pruned_tx_seq_exclusive: u64,
2009 ) -> Result<(), TypedStoreError> {
2010 self.tables.prune(
2011 pruned_checkpoint_watermark,
2012 pruned_tx_seq_exclusive,
2013 self.ledger_history_enabled,
2014 &self.ledger_history_pruning_watermark,
2015 )
2016 }
2017
2018 #[tracing::instrument(
2023 skip_all,
2024 fields(checkpoint = checkpoint.summary.sequence_number)
2025 )]
2026 pub fn index_checkpoint(&self, checkpoint: &Checkpoint) {
2027 let sequence_number = checkpoint.summary.sequence_number;
2028 let batch = self
2029 .tables
2030 .index_checkpoint(checkpoint, self.ledger_history_enabled)
2031 .expect("db error");
2032
2033 self.pending_updates
2034 .lock()
2035 .unwrap()
2036 .insert(sequence_number, batch);
2037 }
2038
2039 #[tracing::instrument(skip(self))]
2047 pub fn commit_update_for_checkpoint(&self, checkpoint: u64) -> Result<(), StorageError> {
2048 let next_batch = self.pending_updates.lock().unwrap().pop_first();
2049
2050 let (next_sequence_number, batch) = next_batch.unwrap();
2052 assert_eq!(
2053 checkpoint, next_sequence_number,
2054 "commit_update_for_checkpoint must be called in order"
2055 );
2056
2057 Ok(batch.write()?)
2058 }
2059
2060 pub fn get_epoch_info(&self, epoch: EpochId) -> Result<Option<EpochInfo>, TypedStoreError> {
2061 self.tables.get_epoch_info(epoch)
2062 }
2063
2064 pub fn owner_iter(
2065 &self,
2066 owner: SuiAddress,
2067 object_type: Option<StructTag>,
2068 cursor: Option<OwnerIndexKey>,
2069 ) -> Result<
2070 impl Iterator<Item = Result<(OwnerIndexKey, OwnerIndexInfo), TypedStoreError>> + '_,
2071 TypedStoreError,
2072 > {
2073 self.tables.owner_iter(owner, object_type, cursor)
2074 }
2075
2076 pub fn dynamic_field_iter(
2077 &self,
2078 parent: ObjectID,
2079 cursor: Option<ObjectID>,
2080 ) -> Result<impl Iterator<Item = Result<DynamicFieldKey, TypedStoreError>> + '_, TypedStoreError>
2081 {
2082 self.tables.dynamic_field_iter(parent, cursor)
2083 }
2084
2085 pub fn get_coin_info(
2086 &self,
2087 coin_type: &StructTag,
2088 ) -> Result<Option<CoinIndexInfo>, TypedStoreError> {
2089 self.tables.get_coin_info(coin_type)
2090 }
2091
2092 pub fn get_balance(
2093 &self,
2094 owner: &SuiAddress,
2095 coin_type: &StructTag,
2096 ) -> Result<Option<BalanceIndexInfo>, TypedStoreError> {
2097 self.tables.get_balance(owner, coin_type)
2098 }
2099
2100 pub fn balance_iter(
2101 &self,
2102 owner: SuiAddress,
2103 cursor: Option<BalanceKey>,
2104 ) -> Result<
2105 impl Iterator<Item = Result<(BalanceKey, BalanceIndexInfo), TypedStoreError>> + '_,
2106 TypedStoreError,
2107 > {
2108 self.tables.balance_iter(owner, cursor)
2109 }
2110
2111 pub fn package_versions_iter(
2112 &self,
2113 original_id: ObjectID,
2114 cursor: Option<u64>,
2115 ) -> Result<
2116 impl Iterator<Item = Result<(PackageVersionKey, PackageVersionInfo), TypedStoreError>> + '_,
2117 TypedStoreError,
2118 > {
2119 self.tables.package_versions_iter(original_id, cursor)
2120 }
2121
2122 pub fn get_highest_indexed_checkpoint_seq_number(
2123 &self,
2124 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
2125 self.tables.watermark.get(&Watermark::Indexed)
2126 }
2127
2128 fn ensure_ledger_history_enabled(&self) -> Result<(), TypedStoreError> {
2129 if self.ledger_history_enabled {
2130 Ok(())
2131 } else {
2132 Err(TypedStoreError::SerializationError(
2133 "ledger history indexing is disabled".to_owned(),
2134 ))
2135 }
2136 }
2137
2138 pub fn ledger_tx_seq_digest(
2139 &self,
2140 tx_seq: u64,
2141 ) -> Result<Option<LedgerTxSeqDigest>, TypedStoreError> {
2142 self.ensure_ledger_history_enabled()?;
2143 Ok(self
2144 .tables
2145 .tx_seq_digest
2146 .get(&tx_seq)?
2147 .map(|info| ledger_tx_seq_digest(tx_seq, info)))
2148 }
2149
2150 pub fn ledger_tx_seq_digest_multi_get(
2151 &self,
2152 tx_seqs: &[u64],
2153 ) -> Result<Vec<Option<LedgerTxSeqDigest>>, TypedStoreError> {
2154 self.ensure_ledger_history_enabled()?;
2155 let rows = self
2156 .tables
2157 .tx_seq_digest
2158 .multi_get(tx_seqs)?
2159 .into_iter()
2160 .zip_debug_eq(tx_seqs.iter().copied())
2161 .map(|(info, tx_seq)| info.map(|info| ledger_tx_seq_digest(tx_seq, info)))
2162 .collect();
2163 Ok(rows)
2164 }
2165
2166 pub fn ledger_tx_seq_digest_iter(
2167 &self,
2168 start: u64,
2169 end_exclusive: u64,
2170 descending: bool,
2171 ) -> Result<LedgerTxSeqDigestIterator<'_>, TypedStoreError> {
2172 self.ensure_ledger_history_enabled()?;
2173 if start >= end_exclusive {
2174 return Ok(Box::new(std::iter::empty()));
2175 }
2176
2177 let iter = if descending {
2178 let upper = end_exclusive - 1;
2179 self.tables
2180 .tx_seq_digest
2181 .reversed_safe_iter_with_bounds(Some(start), Some(upper))?
2182 } else {
2183 self.tables
2184 .tx_seq_digest
2185 .safe_iter_with_bounds(Some(start), Some(end_exclusive))
2186 };
2187
2188 Ok(Box::new(iter.map(|result| {
2189 result.map(|(tx_seq, info)| ledger_tx_seq_digest(tx_seq, info))
2190 })))
2191 }
2192
2193 pub fn transaction_bitmap_bucket_iter(
2194 &self,
2195 dimension_key: Vec<u8>,
2196 start_bucket: u64,
2197 end_bucket_exclusive: u64,
2198 descending: bool,
2199 ) -> Result<LedgerBitmapBucketIterator<'_>, TypedStoreError> {
2200 self.ensure_ledger_history_enabled()?;
2201 Self::bitmap_bucket_iter(
2202 &self.tables.transaction_bitmap,
2203 dimension_key,
2204 start_bucket,
2205 end_bucket_exclusive,
2206 descending,
2207 )
2208 }
2209
2210 pub fn event_bitmap_bucket_iter(
2211 &self,
2212 dimension_key: Vec<u8>,
2213 start_bucket: u64,
2214 end_bucket_exclusive: u64,
2215 descending: bool,
2216 ) -> Result<LedgerBitmapBucketIterator<'_>, TypedStoreError> {
2217 self.ensure_ledger_history_enabled()?;
2218 Self::bitmap_bucket_iter(
2219 &self.tables.event_bitmap,
2220 dimension_key,
2221 start_bucket,
2222 end_bucket_exclusive,
2223 descending,
2224 )
2225 }
2226
2227 fn bitmap_bucket_iter(
2228 table: &DBMap<BitmapIndexKey, BitmapBlob>,
2229 dimension_key: Vec<u8>,
2230 start_bucket: u64,
2231 end_bucket_exclusive: u64,
2232 descending: bool,
2233 ) -> Result<LedgerBitmapBucketIterator<'_>, TypedStoreError> {
2234 if start_bucket >= end_bucket_exclusive {
2235 return Ok(Box::new(std::iter::empty()));
2236 }
2237
2238 let lower = BitmapIndexKey {
2239 dimension_key: dimension_key.clone(),
2240 bucket_id: start_bucket,
2241 };
2242 let upper_exclusive = BitmapIndexKey {
2243 dimension_key,
2244 bucket_id: end_bucket_exclusive,
2245 };
2246 let upper_inclusive = BitmapIndexKey {
2247 dimension_key: upper_exclusive.dimension_key.clone(),
2248 bucket_id: end_bucket_exclusive - 1,
2249 };
2250
2251 let iter: Box<
2252 dyn Iterator<Item = Result<(BitmapIndexKey, BitmapBlob), TypedStoreError>> + '_,
2253 > = if descending {
2254 table.reversed_safe_iter_with_bounds(Some(lower), Some(upper_inclusive))?
2255 } else {
2256 table.safe_iter_with_bounds(Some(lower), Some(upper_exclusive))
2257 };
2258
2259 Ok(Box::new(iter.map(|result| {
2260 result.and_then(|(key, blob)| decode_ledger_bitmap_bucket(key, blob))
2261 })))
2262 }
2263}
2264
2265fn tx_removed_objects_pre_version<'a>(
2270 tx: &'a ExecutedTransaction,
2271 object_set: &'a ObjectSet,
2272) -> impl Iterator<Item = &'a Object> + 'a {
2273 tx.effects
2274 .object_changes()
2275 .into_iter()
2276 .filter_map(
2277 move |change| match (change.input_version, change.output_version) {
2278 (Some(input_version), None) => object_set.get(&ObjectKey(change.id, input_version)),
2279 _ => None,
2280 },
2281 )
2282}
2283
2284fn tx_changed_objects<'a>(
2288 tx: &'a ExecutedTransaction,
2289 object_set: &'a ObjectSet,
2290) -> impl Iterator<Item = (&'a Object, Option<&'a Object>)> + 'a {
2291 tx.effects
2292 .object_changes()
2293 .into_iter()
2294 .filter_map(move |change| {
2295 let output = change
2296 .output_version
2297 .and_then(|v| object_set.get(&ObjectKey(change.id, v)))?;
2298 let input = change
2299 .input_version
2300 .and_then(|v| object_set.get(&ObjectKey(change.id, v)));
2301 Some((output, input))
2302 })
2303}
2304
2305fn should_index_dynamic_field(object: &Object) -> bool {
2306 object
2314 .data
2315 .try_as_move()
2316 .is_some_and(|move_object| move_object.type_().is_dynamic_field())
2317}
2318
2319fn try_create_coin_index_info(object: &Object) -> Option<(CoinIndexKey, CoinIndexInfo)> {
2320 use sui_types::coin::CoinMetadata;
2321 use sui_types::coin::RegulatedCoinMetadata;
2322 use sui_types::coin::TreasuryCap;
2323
2324 let object_type = object.type_().and_then(MoveObjectType::other)?;
2325
2326 if let Some(coin_type) = CoinMetadata::is_coin_metadata_with_coin_type(object_type).cloned() {
2327 return Some((
2328 CoinIndexKey { coin_type },
2329 CoinIndexInfo {
2330 coin_metadata_object_id: Some(object.id()),
2331 treasury_object_id: None,
2332 regulated_coin_metadata_object_id: None,
2333 },
2334 ));
2335 }
2336
2337 if let Some(coin_type) = TreasuryCap::is_treasury_with_coin_type(object_type).cloned() {
2338 return Some((
2339 CoinIndexKey { coin_type },
2340 CoinIndexInfo {
2341 coin_metadata_object_id: None,
2342 treasury_object_id: Some(object.id()),
2343 regulated_coin_metadata_object_id: None,
2344 },
2345 ));
2346 }
2347
2348 if let Some(coin_type) =
2349 RegulatedCoinMetadata::is_regulated_coin_metadata_with_coin_type(object_type).cloned()
2350 {
2351 return Some((
2352 CoinIndexKey { coin_type },
2353 CoinIndexInfo {
2354 coin_metadata_object_id: None,
2355 treasury_object_id: None,
2356 regulated_coin_metadata_object_id: Some(object.id()),
2357 },
2358 ));
2359 }
2360
2361 None
2362}
2363
2364struct RpcParLiveObjectSetIndexer<'a> {
2365 tables: &'a IndexStoreTables,
2366 coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
2367 batch_size_limit: usize,
2368}
2369
2370struct RpcLiveObjectIndexer<'a> {
2371 tables: &'a IndexStoreTables,
2372 batch: typed_store::rocks::DBBatch,
2373 coin_index: &'a Mutex<HashMap<CoinIndexKey, CoinIndexInfo>>,
2374 balance_changes: HashMap<BalanceKey, BalanceIndexInfo>,
2375 batch_size_limit: usize,
2376}
2377
2378impl<'a> ParMakeLiveObjectIndexer for RpcParLiveObjectSetIndexer<'a> {
2379 type ObjectIndexer = RpcLiveObjectIndexer<'a>;
2380
2381 fn make_live_object_indexer(&self) -> Self::ObjectIndexer {
2382 RpcLiveObjectIndexer {
2383 tables: self.tables,
2384 batch: self.tables.owner.batch(),
2385 coin_index: self.coin_index,
2386 balance_changes: HashMap::new(),
2387 batch_size_limit: self.batch_size_limit,
2388 }
2389 }
2390}
2391
2392impl LiveObjectIndexer for RpcLiveObjectIndexer<'_> {
2393 fn index_object(&mut self, object: Object) -> Result<(), StorageError> {
2394 match object.owner {
2395 Owner::AddressOwner(owner) | Owner::ConsensusAddressOwner { owner, .. } => {
2397 let owner_key = OwnerIndexKey::from_object(&object);
2398 let owner_info = OwnerIndexInfo::new(&object);
2399 self.batch
2400 .insert_batch(&self.tables.owner, [(owner_key, owner_info)])?;
2401
2402 if let Some((coin_type, value)) = get_balance_and_type_if_coin(&object)? {
2403 let balance_key = BalanceKey { owner, coin_type };
2404 let balance_info = BalanceIndexInfo {
2405 coin_balance_delta: value.into(),
2406 address_balance_delta: 0,
2407 };
2408 self.balance_changes
2409 .entry(balance_key)
2410 .or_default()
2411 .merge_delta(&balance_info);
2412
2413 if self.balance_changes.len() >= BALANCE_FLUSH_THRESHOLD {
2414 self.batch.partial_merge_batch(
2415 &self.tables.balance,
2416 std::mem::take(&mut self.balance_changes),
2417 )?;
2418 }
2419 }
2420 }
2421
2422 Owner::ObjectOwner(parent) => {
2424 if should_index_dynamic_field(&object) {
2425 let field_key = DynamicFieldKey::new(parent, object.id());
2426 self.batch
2427 .insert_batch(&self.tables.dynamic_field, [(field_key, ())])?;
2428 }
2429
2430 if parent == SUI_ACCUMULATOR_ROOT_OBJECT_ID.into()
2432 && let Some((owner, coin_type, balance)) = get_address_balance_info(&object)
2433 {
2434 let balance_key = BalanceKey { owner, coin_type };
2435 let balance_info = BalanceIndexInfo {
2436 coin_balance_delta: 0,
2437 address_balance_delta: balance,
2438 };
2439 self.balance_changes
2440 .entry(balance_key)
2441 .or_default()
2442 .merge_delta(&balance_info);
2443
2444 if self.balance_changes.len() >= BALANCE_FLUSH_THRESHOLD {
2445 self.batch.partial_merge_batch(
2446 &self.tables.balance,
2447 std::mem::take(&mut self.balance_changes),
2448 )?;
2449 }
2450 }
2451 }
2452
2453 Owner::Shared { .. } | Owner::Immutable => {}
2454
2455 Owner::Party { .. } => {
2456 todo!("Party WIP");
2458 }
2460 }
2461
2462 if let Some((key, value)) = try_create_coin_index_info(&object) {
2464 use std::collections::hash_map::Entry;
2465
2466 match self.coin_index.lock().unwrap().entry(key) {
2467 Entry::Occupied(mut o) => {
2468 o.get_mut().merge(value);
2469 }
2470 Entry::Vacant(v) => {
2471 v.insert(value);
2472 }
2473 }
2474 }
2475
2476 if let Some((key, info)) = IndexStoreTables::extract_version_if_package(&object) {
2477 self.batch
2478 .insert_batch(&self.tables.package_version, [(key, info)])?;
2479 }
2480
2481 if self.batch.size_in_bytes() >= self.batch_size_limit {
2484 std::mem::replace(&mut self.batch, self.tables.owner.batch())
2485 .write_opt(bulk_ingestion_write_options())?;
2486 }
2487
2488 Ok(())
2489 }
2490
2491 fn finish(mut self) -> Result<(), StorageError> {
2492 self.batch.partial_merge_batch(
2493 &self.tables.balance,
2494 std::mem::take(&mut self.balance_changes),
2495 )?;
2496 self.batch.write_opt(bulk_ingestion_write_options())?;
2497 Ok(())
2498 }
2499}
2500
2501fn full_checkpoint_for_backfill(
2510 authority_store: &AuthorityStore,
2511 checkpoint_store: &CheckpointStore,
2512 checkpoint: u64,
2513) -> Result<Option<Checkpoint>, StorageError> {
2514 let Some(summary) = checkpoint_store.get_checkpoint_by_sequence_number(checkpoint)? else {
2515 return Ok(None);
2516 };
2517 let Some(contents) = checkpoint_store.get_checkpoint_contents(&summary.content_digest)? else {
2518 return Ok(None);
2519 };
2520
2521 let (transactions, object_set) = load_executed_transactions(authority_store, &contents, true)?;
2525
2526 Ok(Some(Checkpoint {
2527 summary: summary.into(),
2528 contents,
2529 transactions,
2530 object_set,
2531 }))
2532}
2533
2534fn sparse_checkpoint_for_epoch_backfill(
2535 authority_store: &AuthorityStore,
2536 checkpoint_store: &CheckpointStore,
2537 checkpoint: u64,
2538) -> Result<Option<Checkpoint>, StorageError> {
2539 let summary = checkpoint_store
2540 .get_checkpoint_by_sequence_number(checkpoint)?
2541 .ok_or_else(|| StorageError::missing(format!("missing checkpoint {checkpoint}")))?;
2542
2543 if summary.end_of_epoch_data.is_none() && summary.sequence_number != 0 {
2545 return Ok(None);
2546 }
2547
2548 let contents = checkpoint_store
2549 .get_checkpoint_contents(&summary.content_digest)?
2550 .ok_or_else(|| StorageError::missing(format!("missing checkpoint {checkpoint}")))?;
2551
2552 let (transactions, object_set) = load_executed_transactions(authority_store, &contents, false)?;
2553
2554 Ok(Some(Checkpoint {
2555 summary: summary.into(),
2556 contents,
2557 transactions,
2558 object_set,
2559 }))
2560}
2561
2562fn load_executed_transactions(
2568 authority_store: &AuthorityStore,
2569 contents: &sui_types::messages_checkpoint::CheckpointContents,
2570 load_events: bool,
2571) -> Result<(Vec<ExecutedTransaction>, ObjectSet), StorageError> {
2572 let transaction_digests = contents
2573 .iter()
2574 .map(|execution_digests| execution_digests.transaction)
2575 .collect::<Vec<_>>();
2576 let transactions = authority_store
2577 .multi_get_transaction_blocks(&transaction_digests)?
2578 .into_iter()
2579 .map(|maybe_transaction| {
2580 maybe_transaction.ok_or_else(|| StorageError::custom("missing transaction"))
2581 })
2582 .collect::<Result<Vec<_>, _>>()?;
2583
2584 let effects = authority_store
2585 .multi_get_executed_effects(&transaction_digests)?
2586 .into_iter()
2587 .map(|maybe_effects| maybe_effects.ok_or_else(|| StorageError::custom("missing effects")))
2588 .collect::<Result<Vec<_>, _>>()?;
2589
2590 let events = if load_events {
2591 authority_store
2592 .multi_get_events(&transaction_digests)
2593 .map_err(|e| StorageError::custom(e.to_string()))?
2594 } else {
2595 vec![None; transaction_digests.len()]
2596 };
2597
2598 let mut full_transactions = Vec::with_capacity(transactions.len());
2599 let mut object_set = ObjectSet::default();
2600 for ((tx, fx), ev) in transactions
2601 .into_iter()
2602 .zip_debug_eq(effects)
2603 .zip_debug_eq(events)
2604 {
2605 let input_objects =
2606 sui_types::storage::get_transaction_input_objects(authority_store, &fx)?;
2607 let output_objects =
2608 sui_types::storage::get_transaction_output_objects(authority_store, &fx)?;
2609
2610 for obj in input_objects.into_iter().chain(output_objects.into_iter()) {
2611 object_set.insert(obj);
2612 }
2613
2614 let sender_signed = sui_types::transaction::Transaction::from(tx)
2615 .into_data()
2616 .into_inner();
2617 full_transactions.push(ExecutedTransaction {
2618 transaction: sender_signed.intent_message.value,
2619 signatures: sender_signed.tx_signatures,
2620 effects: fx,
2621 events: ev,
2622 unchanged_loaded_runtime_objects: Vec::new(),
2628 });
2629 }
2630
2631 Ok((full_transactions, object_set))
2632}
2633
2634fn get_balance_and_type_if_coin(object: &Object) -> Result<Option<(StructTag, u64)>, StorageError> {
2635 match Coin::extract_balance_if_coin(object) {
2636 Ok(Some((TypeTag::Struct(struct_tag), value))) => Ok(Some((*struct_tag, value))),
2637 Ok(Some(_)) => {
2638 debug!("Coin object {} has non-struct type tag", object.id());
2639 Ok(None)
2640 }
2641 Ok(None) => {
2642 Ok(None)
2644 }
2645 Err(e) => {
2646 Err(StorageError::custom(format!(
2648 "Failed to deserialize coin object {}: {}",
2649 object.id(),
2650 e
2651 )))
2652 }
2653 }
2654}
2655
2656fn get_address_balance_info(object: &Object) -> Option<(SuiAddress, StructTag, i128)> {
2657 let move_object = object.data.try_as_move()?;
2658
2659 let TypeTag::Struct(coin_type) = move_object.type_().balance_accumulator_field_type_maybe()?
2660 else {
2661 return None;
2662 };
2663
2664 let (key, value): (
2665 sui_types::accumulator_root::AccumulatorKey,
2666 sui_types::accumulator_root::AccumulatorValue,
2667 ) = move_object.try_into().ok()?;
2668
2669 let balance = value.as_u128()? as i128;
2670 if balance <= 0 {
2671 return None;
2672 }
2673
2674 Some((key.owner, *coin_type, balance))
2675}
2676
2677#[cfg(test)]
2678mod tests {
2679 use super::*;
2680 use std::sync::atomic::AtomicU64;
2681
2682 #[tokio::test]
2691 async fn open_with_index_options_overrides_every_cf() {
2692 let temp_dir = tempfile::tempdir().unwrap();
2693 let db_path = temp_dir.path().join("rpc-index");
2694
2695 let _tables =
2696 IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
2697
2698 let per_cf = parse_cf_options(&db_path);
2704 assert!(
2705 !per_cf.is_empty(),
2706 "expected at least one CFOptions section in OPTIONS file"
2707 );
2708 for (cf_name, opts) in &per_cf {
2709 if cf_name == "default" {
2710 continue;
2711 }
2712 for (key, expected) in [
2713 ("level0_slowdown_writes_trigger", "512"),
2714 ("level0_stop_writes_trigger", "1024"),
2715 ("soft_pending_compaction_bytes_limit", "0"),
2716 ("hard_pending_compaction_bytes_limit", "0"),
2717 ] {
2718 let actual = opts
2719 .get(key)
2720 .unwrap_or_else(|| panic!("cf `{cf_name}` missing `{key}`"));
2721 assert_eq!(
2722 actual, expected,
2723 "cf `{cf_name}` has `{key}={actual}`, expected `{expected}` — \
2724 the typed-store override map likely doesn't cover this CF"
2725 );
2726 }
2727 }
2728 }
2729
2730 #[test]
2731 fn checked_encode_event_seq_rejects_unrepresentable_values() {
2732 assert!(
2733 checked_encode_event_seq(0, MAX_EVENTS_PER_TX).is_err(),
2734 "event_idx at MAX_EVENTS_PER_TX must be rejected"
2735 );
2736 assert!(
2737 checked_encode_event_seq(MAX_TX_SEQ + 1, 0).is_err(),
2738 "tx_seq past MAX_TX_SEQ must be rejected"
2739 );
2740 }
2741
2742 #[test]
2748 fn bitmap_filter_keeps_bucket_with_live_tx_above_zero_watermark() {
2749 let watermark = Arc::new(AtomicU64::new(1));
2750 let filter = BitmapCompactionFilter::new(watermark.clone(), BitmapKind::Transaction);
2751
2752 let key = typed_store::be_fix_int_ser(&BitmapIndexKey {
2753 dimension_key: vec![1, 2, 3],
2754 bucket_id: 0,
2755 });
2756 assert!(matches!(filter.filter(&key, &[]), Decision::Keep));
2757
2758 watermark.store(TX_BUCKET_SIZE, Ordering::Relaxed);
2763 assert!(matches!(filter.filter(&key, &[]), Decision::Remove));
2764 }
2765
2766 #[test]
2770 fn bitmap_filter_event_bucket_uses_event_seq_lo() {
2771 let watermark = Arc::new(AtomicU64::new(0));
2772 let filter = BitmapCompactionFilter::new(watermark.clone(), BitmapKind::Event);
2773
2774 let key = typed_store::be_fix_int_ser(&BitmapIndexKey {
2775 dimension_key: vec![5],
2776 bucket_id: 0,
2777 });
2778 assert!(matches!(filter.filter(&key, &[]), Decision::Keep));
2780
2781 let txs_per_bucket = EVENT_BUCKET_SIZE / MAX_EVENTS_PER_TX as u64;
2785 watermark.store(txs_per_bucket, Ordering::Relaxed);
2786 assert!(matches!(filter.filter(&key, &[]), Decision::Remove));
2787 }
2788
2789 #[test]
2794 fn bitmap_filter_keeps_malformed_keys() {
2795 let watermark = Arc::new(AtomicU64::new(u64::MAX));
2796 let filter = BitmapCompactionFilter::new(watermark.clone(), BitmapKind::Transaction);
2797
2798 assert!(matches!(filter.filter(b"short", &[]), Decision::Keep));
2799 assert!(matches!(filter.filter(&[], &[]), Decision::Keep));
2800
2801 let huge = typed_store::be_fix_int_ser(&BitmapIndexKey {
2804 dimension_key: vec![],
2805 bucket_id: u64::MAX - 1,
2806 });
2807 assert!(huge.len() >= 8);
2808 assert!(matches!(filter.filter(&huge, &[]), Decision::Keep));
2809 }
2810
2811 #[test]
2816 fn bitmap_filter_decodes_typed_store_keys() {
2817 let watermark = Arc::new(AtomicU64::new(0));
2818 let filter = BitmapCompactionFilter::new(watermark.clone(), BitmapKind::Transaction);
2819
2820 let bucket_id = 7u64;
2823 let key = typed_store::be_fix_int_ser(&BitmapIndexKey {
2824 dimension_key: vec![0xAA, 0xBB, 0xCC],
2825 bucket_id,
2826 });
2827 assert!(matches!(filter.filter(&key, &[]), Decision::Keep));
2829
2830 watermark.store((bucket_id + 1) * TX_BUCKET_SIZE, Ordering::Relaxed);
2832 assert!(matches!(filter.filter(&key, &[]), Decision::Remove));
2833 }
2834
2835 #[test]
2838 fn bitmap_merge_operator_unions_operands() {
2839 let mut bm_a = RoaringBitmap::new();
2840 bm_a.insert(1);
2841 bm_a.insert(5);
2842 let blob_a = encode_bitmap_blob(&bm_a);
2843
2844 let mut bm_b = RoaringBitmap::new();
2845 bm_b.insert(5);
2846 bm_b.insert(7);
2847 let blob_b = encode_bitmap_blob(&bm_b);
2848
2849 let mut bm_c = RoaringBitmap::new();
2850 bm_c.insert(100);
2851 let blob_c = encode_bitmap_blob(&bm_c);
2852
2853 let decoded_a = decode_bitmap_blob(&blob_a).expect("decode a");
2863 let decoded_b = decode_bitmap_blob(&blob_b).expect("decode b");
2864 let decoded_c = decode_bitmap_blob(&blob_c).expect("decode c");
2865 let unioned = decoded_a | decoded_b | decoded_c;
2866 let mut expected = RoaringBitmap::new();
2867 for b in [1, 5, 7, 100] {
2868 expected.insert(b);
2869 }
2870 assert_eq!(unioned, expected);
2871 }
2872
2873 #[tokio::test]
2879 async fn bitmap_merge_operator_unions_across_writes() {
2880 let temp_dir = tempfile::tempdir().unwrap();
2881 let db_path = temp_dir.path().join("rpc-index");
2882 let tables =
2883 IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
2884
2885 let key = BitmapIndexKey {
2886 dimension_key: vec![1, 2, 3],
2887 bucket_id: 0,
2888 };
2889
2890 for bits in [vec![1u32, 2], vec![3, 4], vec![5, 6, 7]] {
2894 let mut bm = RoaringBitmap::new();
2895 for b in bits {
2896 bm.insert(b);
2897 }
2898 let mut batch = tables.transaction_bitmap.batch();
2899 batch
2900 .partial_merge_batch(
2901 &tables.transaction_bitmap,
2902 [(key.clone(), BitmapBlob::from(bm))],
2903 )
2904 .unwrap();
2905 batch.write().unwrap();
2906 }
2907
2908 let blob = tables
2909 .transaction_bitmap
2910 .get(&key)
2911 .unwrap()
2912 .expect("merged row should exist");
2913 let bm = RoaringBitmap::deserialize_from(&blob.0[..]).unwrap();
2914 let got: Vec<u32> = bm.iter().collect();
2915 assert_eq!(got, vec![1, 2, 3, 4, 5, 6, 7]);
2916 }
2917
2918 #[tokio::test]
2922 async fn bitmap_filter_removes_whole_bucket_after_compaction() {
2923 let temp_dir = tempfile::tempdir().unwrap();
2924 let db_path = temp_dir.path().join("rpc-index");
2925
2926 let watermark = Arc::new(AtomicU64::new(0));
2927 let index_options = IndexStoreOptions {
2928 pruning_tx_seq_exclusive: watermark.clone(),
2929 };
2930 let tables = IndexStoreTables::open_with_index_options(&db_path, index_options);
2931
2932 let dim_key = vec![0x01, 0xAA];
2933 let k0 = BitmapIndexKey {
2934 dimension_key: dim_key.clone(),
2935 bucket_id: 0,
2936 };
2937 let k1 = BitmapIndexKey {
2938 dimension_key: dim_key.clone(),
2939 bucket_id: 1,
2940 };
2941 let mut bm = RoaringBitmap::new();
2942 bm.insert(0);
2943
2944 let blob = BitmapBlob::from(bm);
2948 let mut batch = tables.transaction_bitmap.batch();
2949 batch
2950 .insert_batch(
2951 &tables.transaction_bitmap,
2952 [(k0.clone(), blob.clone()), (k1.clone(), blob)],
2953 )
2954 .unwrap();
2955 batch.write().unwrap();
2956 tables.transaction_bitmap.flush().unwrap();
2957
2958 assert!(tables.transaction_bitmap.get(&k0).unwrap().is_some());
2961 assert!(tables.transaction_bitmap.get(&k1).unwrap().is_some());
2962
2963 watermark.store(TX_BUCKET_SIZE, Ordering::Relaxed);
2965
2966 tables
2970 .transaction_bitmap
2971 .compact_range_raw("transaction_bitmap", vec![], vec![0xFF; 128])
2972 .unwrap();
2973
2974 assert!(
2975 tables.transaction_bitmap.get(&k0).unwrap().is_none(),
2976 "bucket 0 should have been removed by the compaction filter"
2977 );
2978 assert!(
2979 tables.transaction_bitmap.get(&k1).unwrap().is_some(),
2980 "bucket 1 should still be present (only bucket 0 was below the watermark)"
2981 );
2982 }
2983
2984 #[test]
2988 fn backfill_missing_cp_in_retained_range_is_error() {
2989 let checkpoint_range = 5u64..=10u64;
2993 let seq = 7u64;
2994 let loaded: Result<Option<Checkpoint>, StorageError> = Ok(None);
2995
2996 let result: Result<(), StorageError> = (|| {
2997 let checkpoint = loaded?.ok_or_else(|| {
2998 StorageError::missing(format!(
2999 "ledger history backfill: checkpoint {seq} is missing from local storage \
3000 but falls inside the retained backfill range {checkpoint_range:?}"
3001 ))
3002 })?;
3003 let _ = checkpoint;
3004 Ok(())
3005 })();
3006
3007 let err = result.expect_err("missing cp must error out, not silently succeed");
3008 let msg = err.to_string();
3009 assert!(
3010 msg.contains(&format!("checkpoint {seq}")),
3011 "error should name the missing cp: {msg}"
3012 );
3013 assert!(
3014 msg.contains("5..=10"),
3015 "error should name the retained range: {msg}"
3016 );
3017 }
3018
3019 #[test]
3024 fn legacy_watermark_bytes_still_deserialize() {
3025 let indexed_bytes = bcs::to_bytes(&Watermark::Indexed).unwrap();
3027 let pruned_bytes = bcs::to_bytes(&Watermark::Pruned).unwrap();
3028 assert_eq!(
3029 indexed_bytes,
3030 vec![0],
3031 "Watermark::Indexed must encode as 0"
3032 );
3033 assert_eq!(pruned_bytes, vec![1], "Watermark::Pruned must encode as 1");
3034
3035 let decoded_indexed: Watermark = bcs::from_bytes(&[0]).unwrap();
3039 let decoded_pruned: Watermark = bcs::from_bytes(&[1]).unwrap();
3040 assert!(matches!(decoded_indexed, Watermark::Indexed));
3041 assert!(matches!(decoded_pruned, Watermark::Pruned));
3042 }
3043
3044 #[tokio::test]
3049 async fn settings_cf_round_trips_and_drives_reinit() {
3050 let temp_dir = tempfile::tempdir().unwrap();
3051 let db_path = temp_dir.path().join("rpc-index");
3052 let tables =
3053 IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3054
3055 tables
3057 .meta
3058 .insert(
3059 &(),
3060 &MetadataInfo {
3061 version: CURRENT_DB_VERSION,
3062 },
3063 )
3064 .unwrap();
3065 tables
3066 .settings
3067 .insert(
3068 &(),
3069 &encode_settings(&IndexSettings {
3070 ledger_history_indexing: false,
3071 }),
3072 )
3073 .unwrap();
3074 assert!(!tables.persisted_ledger_history_indexing());
3075
3076 let checkpoint_store = CheckpointStore::new_for_tests();
3079
3080 assert!(!tables.needs_to_do_initialization(&checkpoint_store, false));
3082 assert!(tables.needs_to_do_initialization(&checkpoint_store, true));
3084
3085 tables
3087 .settings
3088 .insert(
3089 &(),
3090 &encode_settings(&IndexSettings {
3091 ledger_history_indexing: true,
3092 }),
3093 )
3094 .unwrap();
3095 assert!(tables.persisted_ledger_history_indexing());
3096 assert!(!tables.needs_to_do_initialization(&checkpoint_store, true));
3098 assert!(!tables.needs_to_do_initialization(&checkpoint_store, false));
3101 }
3102
3103 #[tokio::test]
3107 async fn disable_ledger_history_indexing_drops_history_cfs_in_place() {
3108 let temp_dir = tempfile::tempdir().unwrap();
3109 let db_path = temp_dir.path().join("rpc-index");
3110 let tables =
3111 IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3112
3113 tables
3115 .settings
3116 .insert(
3117 &(),
3118 &encode_settings(&IndexSettings {
3119 ledger_history_indexing: true,
3120 }),
3121 )
3122 .unwrap();
3123 for tx_seq in 0..5u64 {
3124 tables
3125 .tx_seq_digest
3126 .insert(
3127 &tx_seq,
3128 &TxSeqDigestInfo {
3129 digest: TransactionDigest::new([0; 32]),
3130 event_count: 0,
3131 tx_offset: 0,
3132 checkpoint_number: 0,
3133 },
3134 )
3135 .unwrap();
3136 }
3137 for bucket_id in 0..5u64 {
3138 let key = BitmapIndexKey {
3139 dimension_key: vec![1, 2, 3],
3140 bucket_id,
3141 };
3142 tables
3143 .transaction_bitmap
3144 .insert(&key, &BitmapBlob(vec![0xab]))
3145 .unwrap();
3146 tables
3147 .event_bitmap
3148 .insert(&key, &BitmapBlob(vec![0xcd]))
3149 .unwrap();
3150 }
3151 tables.watermark.insert(&Watermark::Indexed, &42).unwrap();
3153
3154 tables.disable_ledger_history_indexing().unwrap();
3155
3156 assert!(tables.tx_seq_digest.is_empty());
3158 assert!(tables.transaction_bitmap.is_empty());
3159 assert!(tables.event_bitmap.is_empty());
3160 assert_eq!(tables.first_tx_seq_digest_key().unwrap(), None);
3161
3162 assert!(!tables.persisted_ledger_history_indexing());
3164 assert_eq!(tables.watermark.get(&Watermark::Indexed).unwrap(), Some(42));
3165 }
3166
3167 #[tokio::test]
3170 async fn prune_maintains_ledger_history_state_when_active() {
3171 let temp_dir = tempfile::tempdir().unwrap();
3172 let db_path = temp_dir.path().join("rpc-index");
3173 let tables =
3174 IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3175
3176 let mut batch = tables.tx_seq_digest.batch();
3178 for tx_seq in 0..5u64 {
3179 batch
3180 .insert_batch(
3181 &tables.tx_seq_digest,
3182 [(
3183 tx_seq,
3184 TxSeqDigestInfo {
3185 digest: TransactionDigest::new([0; 32]),
3186 event_count: 0,
3187 tx_offset: 0,
3188 checkpoint_number: 0,
3189 },
3190 )],
3191 )
3192 .unwrap();
3193 }
3194 batch.write().unwrap();
3195
3196 let pruning_atomic = AtomicU64::new(0);
3199 tables
3200 .prune(1, 3, true, &pruning_atomic)
3201 .unwrap();
3202 assert_eq!(pruning_atomic.load(Ordering::Relaxed), 3);
3203
3204 assert_eq!(tables.watermark.get(&Watermark::Pruned).unwrap(), Some(1));
3205 for tx_seq in 0..3u64 {
3206 assert!(tables.tx_seq_digest.get(&tx_seq).unwrap().is_none());
3207 }
3208 for tx_seq in 3..5u64 {
3209 assert!(tables.tx_seq_digest.get(&tx_seq).unwrap().is_some());
3210 }
3211 assert_eq!(tables.first_tx_seq_digest_key().unwrap(), Some(3));
3212 }
3213
3214 #[tokio::test]
3216 async fn prune_skips_ledger_history_state_when_inactive() {
3217 let temp_dir = tempfile::tempdir().unwrap();
3218 let db_path = temp_dir.path().join("rpc-index");
3219 let tables =
3220 IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3221
3222 tables
3224 .tx_seq_digest
3225 .insert(
3226 &0u64,
3227 &TxSeqDigestInfo {
3228 digest: TransactionDigest::new([0; 32]),
3229 event_count: 0,
3230 tx_offset: 0,
3231 checkpoint_number: 0,
3232 },
3233 )
3234 .unwrap();
3235
3236 let pruning_atomic = AtomicU64::new(0);
3237 tables
3238 .prune(
3239 5,
3240 3,
3241 false,
3242 &pruning_atomic,
3243 )
3244 .unwrap();
3245 assert_eq!(
3246 pruning_atomic.load(Ordering::Relaxed),
3247 0,
3248 "disabled prune must not advance the compaction-filter atomic"
3249 );
3250 assert_eq!(
3251 tables.watermark.get(&Watermark::Pruned).unwrap(),
3252 Some(5),
3253 "base pruning must still advance"
3254 );
3255 assert!(
3256 tables.tx_seq_digest.get(&0u64).unwrap().is_some(),
3257 "tx_seq_digest rows must remain untouched when inactive"
3258 );
3259 }
3260
3261 #[tokio::test]
3263 async fn index_checkpoint_gates_on_ledger_history_enabled() {
3264 use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
3265
3266 let temp_dir = tempfile::tempdir().unwrap();
3267 let db_path = temp_dir.path().join("rpc-index");
3268 let tables =
3269 IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3270
3271 let checkpoint = TestCheckpointBuilder::new(1)
3274 .start_transaction(1)
3275 .finish_transaction()
3276 .build_checkpoint();
3277
3278 let batch = tables
3280 .index_checkpoint(&checkpoint, false)
3281 .expect("index_checkpoint failed");
3282 batch.write().expect("batch write failed");
3283 assert_eq!(tables.tx_seq_digest.safe_iter().count(), 0);
3284 assert_eq!(tables.transaction_bitmap.safe_iter().count(), 0);
3285 assert_eq!(tables.event_bitmap.safe_iter().count(), 0);
3286
3287 let checkpoint2 = TestCheckpointBuilder::new(2)
3289 .start_transaction(1)
3290 .finish_transaction()
3291 .build_checkpoint();
3292 let batch = tables
3293 .index_checkpoint(&checkpoint2, true)
3294 .expect("index_checkpoint failed");
3295 batch.write().expect("batch write failed");
3296 assert!(
3297 tables.tx_seq_digest.safe_iter().count() > 0,
3298 "tx_seq_digest must have rows when ledger_history_enabled=true"
3299 );
3300 }
3301
3302 #[tokio::test]
3306 async fn index_checkpoint_records_within_checkpoint_tx_offset() {
3307 use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
3308
3309 let temp_dir = tempfile::tempdir().unwrap();
3310 let db_path = temp_dir.path().join("rpc-index");
3311 let tables =
3312 IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3313
3314 let checkpoint1 = TestCheckpointBuilder::new(1)
3317 .with_network_total_transactions(100)
3318 .start_transaction(0)
3319 .finish_transaction()
3320 .start_transaction(1)
3321 .finish_transaction()
3322 .start_transaction(2)
3323 .finish_transaction()
3324 .build_checkpoint();
3325 tables
3326 .index_checkpoint(&checkpoint1, true)
3327 .expect("index_checkpoint failed")
3328 .write()
3329 .expect("batch write failed");
3330
3331 let checkpoint2 = TestCheckpointBuilder::new(2)
3334 .with_network_total_transactions(103)
3335 .start_transaction(0)
3336 .finish_transaction()
3337 .start_transaction(1)
3338 .finish_transaction()
3339 .build_checkpoint();
3340 tables
3341 .index_checkpoint(&checkpoint2, true)
3342 .expect("index_checkpoint failed")
3343 .write()
3344 .expect("batch write failed");
3345
3346 let offset_by_tx_seq: std::collections::BTreeMap<u64, u32> = tables
3347 .tx_seq_digest
3348 .safe_iter()
3349 .map(|row| row.map(|(tx_seq, info)| (tx_seq, info.tx_offset)))
3350 .collect::<Result<_, _>>()
3351 .unwrap();
3352
3353 assert_eq!(
3354 offset_by_tx_seq,
3355 std::collections::BTreeMap::from([(100, 0), (101, 1), (102, 2), (103, 0), (104, 1),]),
3356 "tx_offset must be the within-checkpoint position, not the global tx_seq"
3357 );
3358 }
3359
3360 #[tokio::test]
3365 async fn prune_commits_deletes_and_watermark_atomically() {
3366 let temp_dir = tempfile::tempdir().unwrap();
3367 let db_path = temp_dir.path().join("rpc-index");
3368 let tables =
3369 IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3370
3371 let mut batch = tables.tx_seq_digest.batch();
3372 for tx_seq in 0..4u64 {
3373 batch
3374 .insert_batch(
3375 &tables.tx_seq_digest,
3376 [(
3377 tx_seq,
3378 TxSeqDigestInfo {
3379 digest: TransactionDigest::new([0; 32]),
3380 event_count: 0,
3381 tx_offset: 0,
3382 checkpoint_number: 0,
3383 },
3384 )],
3385 )
3386 .unwrap();
3387 }
3388 batch.write().unwrap();
3389
3390 let pruning_atomic = AtomicU64::new(0);
3391 tables
3392 .prune(1, 2, true, &pruning_atomic)
3393 .unwrap();
3394
3395 assert_eq!(tables.watermark.get(&Watermark::Pruned).unwrap(), Some(1));
3398 for tx_seq in 0..2u64 {
3399 assert!(tables.tx_seq_digest.get(&tx_seq).unwrap().is_none());
3400 }
3401 for tx_seq in 2..4u64 {
3402 assert!(tables.tx_seq_digest.get(&tx_seq).unwrap().is_some());
3403 }
3404 assert_eq!(tables.first_tx_seq_digest_key().unwrap(), Some(2));
3405 assert_eq!(pruning_atomic.load(Ordering::Relaxed), 2);
3406 }
3407
3408 #[tokio::test]
3413 async fn prune_idempotent_replay() {
3414 let temp_dir = tempfile::tempdir().unwrap();
3415 let db_path = temp_dir.path().join("rpc-index");
3416 let tables =
3417 IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3418
3419 let mut batch = tables.tx_seq_digest.batch();
3420 for tx_seq in 0..5u64 {
3421 batch
3422 .insert_batch(
3423 &tables.tx_seq_digest,
3424 [(
3425 tx_seq,
3426 TxSeqDigestInfo {
3427 digest: TransactionDigest::new([0; 32]),
3428 event_count: 0,
3429 tx_offset: 0,
3430 checkpoint_number: 0,
3431 },
3432 )],
3433 )
3434 .unwrap();
3435 }
3436 batch.write().unwrap();
3437
3438 let pruning_atomic = AtomicU64::new(0);
3439 tables.prune(1, 3, true, &pruning_atomic).unwrap();
3440 assert_eq!(pruning_atomic.load(Ordering::Relaxed), 3);
3441 tables.prune(1, 3, true, &pruning_atomic).unwrap();
3443 assert_eq!(
3444 pruning_atomic.load(Ordering::Relaxed),
3445 3,
3446 "replay must not move the atomic"
3447 );
3448 assert_eq!(tables.first_tx_seq_digest_key().unwrap(), Some(3));
3449 for tx_seq in 3..5u64 {
3450 assert!(tables.tx_seq_digest.get(&tx_seq).unwrap().is_some());
3451 }
3452 }
3453
3454 #[tokio::test]
3459 async fn prune_consecutive_ranges_advance_floor() {
3460 let temp_dir = tempfile::tempdir().unwrap();
3461 let db_path = temp_dir.path().join("rpc-index");
3462 let tables =
3463 IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3464
3465 let mut batch = tables.tx_seq_digest.batch();
3466 for tx_seq in 0..6u64 {
3467 batch
3468 .insert_batch(
3469 &tables.tx_seq_digest,
3470 [(
3471 tx_seq,
3472 TxSeqDigestInfo {
3473 digest: TransactionDigest::new([0; 32]),
3474 event_count: 0,
3475 tx_offset: 0,
3476 checkpoint_number: 0,
3477 },
3478 )],
3479 )
3480 .unwrap();
3481 }
3482 batch.write().unwrap();
3483
3484 let pruning_atomic = AtomicU64::new(0);
3485 tables.prune(1, 3, true, &pruning_atomic).unwrap();
3486 assert_eq!(pruning_atomic.load(Ordering::Relaxed), 3);
3487 assert_eq!(tables.first_tx_seq_digest_key().unwrap(), Some(3));
3488 tables.prune(2, 5, true, &pruning_atomic).unwrap();
3490 assert_eq!(pruning_atomic.load(Ordering::Relaxed), 5);
3491 for tx_seq in 0..5u64 {
3492 assert!(tables.tx_seq_digest.get(&tx_seq).unwrap().is_none());
3493 }
3494 assert_eq!(tables.first_tx_seq_digest_key().unwrap(), Some(5));
3495 }
3496
3497 #[tokio::test]
3499 async fn new_without_init_enables_ledger_history_for_db_with_ledger_history_setting() {
3500 let temp_dir = tempfile::tempdir().unwrap();
3501 let db_path = temp_dir.path().join("rpc-index");
3502
3503 {
3505 let tables =
3506 IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3507 tables
3508 .meta
3509 .insert(
3510 &(),
3511 &MetadataInfo {
3512 version: CURRENT_DB_VERSION,
3513 },
3514 )
3515 .unwrap();
3516 tables
3517 .settings
3518 .insert(
3519 &(),
3520 &encode_settings(&IndexSettings {
3521 ledger_history_indexing: true,
3522 }),
3523 )
3524 .unwrap();
3525 let mut batch = tables.tx_seq_digest.batch();
3526 for tx_seq in 100..105u64 {
3527 batch
3528 .insert_batch(
3529 &tables.tx_seq_digest,
3530 [(
3531 tx_seq,
3532 TxSeqDigestInfo {
3533 digest: TransactionDigest::new([0; 32]),
3534 event_count: 0,
3535 tx_offset: 0,
3536 checkpoint_number: 0,
3537 },
3538 )],
3539 )
3540 .unwrap();
3541 }
3542 batch.write().unwrap();
3543 }
3544
3545 let store = RpcIndexStore::new_without_init(temp_dir.path());
3546 assert!(
3547 store.ledger_history_enabled,
3548 "new_without_init on a ledger-history DB must enable ledger history indexing"
3549 );
3550 let atomic = &store.ledger_history_pruning_watermark;
3551 assert_eq!(
3552 atomic.load(Ordering::Relaxed),
3553 100,
3554 "pruning atomic must be hydrated from the first tx_seq_digest key"
3555 );
3556
3557 store.prune(7, 103).unwrap();
3559
3560 for tx_seq in 100..103u64 {
3561 assert!(store.tables.tx_seq_digest.get(&tx_seq).unwrap().is_none());
3562 }
3563 for tx_seq in 103..105u64 {
3564 assert!(store.tables.tx_seq_digest.get(&tx_seq).unwrap().is_some());
3565 }
3566 assert_eq!(
3567 store.tables.first_tx_seq_digest_key().unwrap(),
3568 Some(103),
3569 "prune must advance the derived tx-seq floor"
3570 );
3571 assert_eq!(
3572 atomic.load(Ordering::Relaxed),
3573 103,
3574 "prune must advance the compaction-filter atomic"
3575 );
3576 }
3577
3578 #[tokio::test]
3580 async fn new_without_init_disables_ledger_history_for_db_without_ledger_history_setting() {
3581 let temp_dir = tempfile::tempdir().unwrap();
3583 let store = RpcIndexStore::new_without_init(temp_dir.path());
3584 assert!(
3585 !store.ledger_history_enabled,
3586 "new_without_init on a fresh DB must leave ledger history indexing disabled"
3587 );
3588
3589 store.prune(5, 0).unwrap();
3590 assert_eq!(
3591 store.tables.watermark.get(&Watermark::Pruned).unwrap(),
3592 Some(5)
3593 );
3594 assert_eq!(
3595 store.tables.first_tx_seq_digest_key().unwrap(),
3596 None,
3597 "disabled ledger history indexing must leave tx_seq_digest untouched"
3598 );
3599
3600 let temp_dir = tempfile::tempdir().unwrap();
3603 let db_path = temp_dir.path().join("rpc-index");
3604 {
3605 let tables =
3606 IndexStoreTables::open_with_index_options(&db_path, IndexStoreOptions::default());
3607 tables
3608 .meta
3609 .insert(
3610 &(),
3611 &MetadataInfo {
3612 version: CURRENT_DB_VERSION,
3613 },
3614 )
3615 .unwrap();
3616 }
3617 let store = RpcIndexStore::new_without_init(temp_dir.path());
3618 assert!(
3619 !store.ledger_history_enabled,
3620 "new_without_init on a DB with no settings row must leave ledger history indexing disabled"
3621 );
3622 }
3623
3624 fn parse_cf_options(db_path: &Path) -> HashMap<String, HashMap<String, String>> {
3628 let mut options_file: Option<(u64, PathBuf)> = None;
3629 for entry in std::fs::read_dir(db_path).expect("read_dir failed") {
3630 let entry = entry.unwrap();
3631 let name = entry.file_name().to_string_lossy().into_owned();
3632 let Some(rest) = name.strip_prefix("OPTIONS-") else {
3633 continue;
3634 };
3635 let Ok(seq) = rest.parse::<u64>() else {
3637 continue;
3638 };
3639 if options_file.as_ref().is_none_or(|(s, _)| seq > *s) {
3640 options_file = Some((seq, entry.path()));
3641 }
3642 }
3643 let (_, path) = options_file.expect("no OPTIONS-* file written");
3644 let content = std::fs::read_to_string(&path).expect("read OPTIONS failed");
3645
3646 let mut result: HashMap<String, HashMap<String, String>> = HashMap::new();
3647 let mut current_cf: Option<String> = None;
3648 for line in content.lines() {
3649 let line = line.trim();
3650 if let Some(rest) = line.strip_prefix("[CFOptions \"") {
3651 let cf_name = rest.trim_end_matches("\"]").to_string();
3652 current_cf = Some(cf_name);
3653 } else if line.starts_with('[') {
3654 current_cf = None;
3656 } else if let Some(cf) = current_cf.as_ref()
3657 && let Some((k, v)) = line.split_once('=')
3658 {
3659 result
3660 .entry(cf.clone())
3661 .or_default()
3662 .insert(k.trim().to_string(), v.trim().to_string());
3663 }
3664 }
3665 result
3666 }
3667}